全称是 AbstractQueuedSynchronizer,是同步器的相关框架,juc中很多锁的实现类依赖同步器(AQS的子类)完成核心操作
子类主要实现的方法
acquire(AQS已实现)
// 如果获取锁失败
if (!tryAcquire(arg)) {// 入队, 可以选择阻塞当前线程 park unpark
}
**release(AQS已实现)
// 如果释放锁成功
if (tryRelease(arg)) {// 让阻塞线程恢复运行
}
自定义同步器
class MySync extends AbstractQueuedSynchronizer {@Overrideprotected boolean tryAcquire(int arg) {if(compareAndSetState(0, 1)) {// 加上了锁,并设置 owner 为当前线程setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}@Overrideprotected boolean tryRelease(int arg) {setExclusiveOwnerThread(null);setState(0);return true;}@Override // 是否持有独占锁protected boolean isHeldExclusively() {return getState() == 1;}public Condition newCondition() {return new ConditionObject();}}

static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock. Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {//cas设置state成功,则设置当前线程为OwnerThreadif (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());else //否则执行acquire进入阻塞队列acquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}}
非公平性:虽然等待队列是先来先服务的,但是当锁被释放时,等待队列中的线程仍可能与队列外的线程去竞争锁,可能会竞争失败
注意:等待队列的head指向的要么是正在运行的线程所在节点,要么是dummy结点(不含线程)
非公平性具体体现在acquireQueued方法中
加锁
public final void acquire(int arg) {//1.尝试获取锁,获取成功则结束//2.获取失败则构建Node,关联当前线程,将其添加到等待队列中if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {//获取当前节点的前驱节点final Node p = node.predecessor();//前驱节点若为头节点则重试获取锁,获取成功则将此节点设为头节点,并将前驱节点移除if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}//首次修改前驱节点的waitStatus=-1,返回false//再次执行时,waitStatus已经为-1,返回true,此时执行parkAndCheckInterrupt()中断当前线程//当线程恢复时从这开始执行,继续循环if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}} private Node addWaiter(Node mode) {Node node = new Node(Thread.currentThread(), mode);// Try the fast path of enq; backup to full enq on failureNode pred = tail;if (pred != null) {node.prev = pred;if (compareAndSetTail(pred, node)) {pred.next = node;return node;}}//将node添加至队尾enq(node);return node;}
解锁
public final boolean release(int arg) {//1.尝试释放锁,释放成功则将state设置为0并返回trueif (tryRelease(arg)) {//2.找到头结点Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}protected final boolean tryRelease(int releases) {int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)compareAndSetWaitStatus(node, ws, 0);//获取当前节点的后继节点Node s = node.next;//后继节点若为空或者后继节点的waitStatus>0则遍历队列直到遇到第一个waitStatus<=0的节点或者遍历到队尾if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}//恢复s节点关联线程的运行if (s != null)LockSupport.unpark(s.thread);}
进行加锁时,首先判断state是否为0,若不为0则说明该锁已经被占用,判断占用锁的线程是否为当前线程,若是则将state+1,返回true,不是则返回false。同理释放锁的时候会对判断占用锁的线程是否为当前线程,若是则将state-1,state=0表示释放成功返回true,否则返回false
static final class NonfairSync extends Sync {// ...// Sync 继承过来的方法, 方便阅读, 放在此处final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入else if (current == getExclusiveOwnerThread()) {// state++int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}// Sync 继承过来的方法, 方便阅读, 放在此处protected final boolean tryRelease(int releases) {// state-- int c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// 支持锁重入, 只有 state 减为 0, 才释放成功if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;}
}
当我们的线程被打断后会重新进入循环,如果当前线程位于等待队列的第一个则会尝试获取锁,获取失败则再次进入阻塞,若不是队列第一个则直接进入阻塞状态,代码见acquireQueued()
简单说就是不可打断模式下,若阻塞的线程被打断会再次进入阻塞队列
NoneFairSyn
private final boolean parkAndCheckInterrupt() {//1.将当前线程阻塞在此,若被打断才会往下执行LockSupport.park(this);//2.若被打断则返回true,同时清楚interrupted标记//若是被正常恢复则返回falsereturn Thread.interrupted();}final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;for (;;) {//获取当前节点的前驱节点final Node p = node.predecessor();//前驱节点若为头节点则重试获取锁,获取成功则将此节点设为头节点,并将前驱节点移除if (p == head && tryAcquire(arg)) {setHead(node);p.next = null; // help GCfailed = false;return interrupted;}//首次修改前驱节点的waitStatus=-1,返回false//再次执行时,waitStatus已经为-1,返回true,此时执行parkAndCheckInterrupt()中断当前线程//当线程恢复时从这开始执行,继续循环if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())//若被打断则将interrupted置为trueinterrupted = true;}} finally {if (failed)cancelAcquire(node);}} public final void acquire(int arg) {//1.尝试获取锁,获取失败则加入阻塞队列//2.从阻塞队列出来后,返回interupted(该interrupted是方法内部的变量,不是线程的标记)if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//若interrupted=true,则执行selfInterrupt()重新将线程的interrupted设为trueselfInterrupt();}
若阻塞的线程被打断则直接抛出异常,不继续在等待队列进行等待
public final void acquireInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//1.尝试获取锁if (!tryAcquire(arg))//2.获取失败则进入阻塞队列,但是是不可打断模式//该方法与acquireQueued()基本一致,只是对于打断处理不同,如果被打断则抛出异常退出等待队列doAcquireInterruptibly(arg); }
公平锁采用的是FairSync,与NoneFairSync的区别主要在于tryAcquire的不同,在竞争锁时先要判断队列中是否存在线程等待,若不存在或者等待队列中第一个线程就是当前线程时获取成功,否则进入等待队列
FairSync
protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {//1. 判断队列是否为空 || 队列中第一个待恢复的线程是否为当前线程if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;}
acquire与NoneFairSync一致,都使用的是AQS的方法
每个条件变量都维护了一条等待队列,如图所示

它将state置为0后park该线程并设置ws=-2,将该线程添加到此条件变量的等待队列中
public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//创建node关联当前线程,并将node添加到等待队列队尾Node node = addConditionWaiter();//释放锁int savedState = fullyRelease(node);int interruptMode = 0;while (!isOnSyncQueue(node)) {//park当前线程LockSupport.park(this);if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
从头遍历等待队列,直到遇见第一个ws=-2的节点,将该节点移出等待队列并添加到阻塞队列尾部
public final void signal() {//1.校验当前线程是否为持有锁的线程if (!isHeldExclusively())throw new IllegalMonitorStateException();//2.得到头结点Node first = firstWaiter;if (first != null)//3.signal头节点doSignal(first);}//将头节点移出等待队列
//如果头结点的waitStatus>0则一直移出头结点
//直到遇到ws<0的节点,将其放入阻塞队列
private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);}final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}