0%

Java 并发 - AQS:框架分析

1.介绍AQS

AbstractQueuedSynchronizer (抽象队列同步器,以下简称 AQS)出现在 JDK 1.5 中,AQS 这个东西在Java的并发中是很重要的一部分,因为他是很多同步器的基础框架,比如 ReentrantLock、CountDownLatch 和 Semaphore 等等都是基于 AQS 实现的。基于AQS来构建同步器可以带来很多好处。它不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。

在基于AQS构建的同步器中,只能在一个时刻发生阻塞,从而降低上下文切换的开销,提高了吞吐量。同时在设计AQS时充分考虑了可伸缩行,因此J.U.C中所有基于AQS构建的同步器均可以获得这个优势。

AQS的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。

AQS使用一个int类型的成员变量state来表示同步状态,当state>0时表示已经获取了锁,当state = 0时表示释放了锁。它提供了三个方法(getState()setState(int newState)compareAndSetState(int expect,int update))来对同步状态state进行操作,当然AQS可以确保对state的操作是安全的。

AQS通过内置的FIFO同步队列(这个会重点分析一下)来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

一句话:AQS使用一个Volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作,通过CAS完成对State值的修改。

这里列出了AQS主要提供的一些方法,方便快速定位:

  • getState():返回同步状态的当前值;
  • setState(int newState):设置当前同步状态;
  • compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;
  • tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态;
  • tryRelease(int arg):独占式释放同步状态;
  • tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
  • tryReleaseShared(int arg):共享式释放同步状态;
  • isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;
  • acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
  • acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;
  • tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
  • acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
  • acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
  • tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;
  • release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
  • releaseShared(int arg):共享式释放同步状态;

以上这些方法在下面的源码分析中都会有所涉及,这个AQS框架还是有些代码量挺复杂的,好好理解的话对于后面的其他的一些锁相关机制、通信工具类都是很有帮助的,内功心法都学会了,其他的就简单很多了。

大体整个框架图(图出自美团):

AQS框架图


2.AQS理论的数据结构

AQS类中维护了一个双向链表(FIFO队列), 这个队列也称CLH同步队列,有什么用呢?AQS就是靠这个队列来完成同步状态的管理的!

怎么进行管理的?这个问题问得好,大概流程就是这样:当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

Node源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
static final class Node {
//标记一个结点(对应的线程)在共享模式下等待
static final Node SHARED = new Node();
// 标记一个结点(对应的线程)在独占模式下等待
static final Node EXCLUSIVE = null;

// waitStatus的值,表示该结点(对应的线程)已被取消
static final int CANCELLED = 1;
//waitStatus的值,表示后继结点(对应的线程)需要被唤醒
static final int SIGNAL = -1;
//waitStatus的值,表示该结点(对应的线程)在等待某一条件
static final int CONDITION = -2;
// waitStatus的值,表示有资源可用,新head结点需要继续唤醒后继结点
// (共享模式下,多线程并发释放资源,而head唤醒其后继结点后,需要把多出来的资源留给后面的结点;
// 设置新的head结点时,会继续唤醒其后继结点)*/
static final int PROPAGATE = -3;

// 等待状态,取值范围,-3,-2,-1,0,1(如上)
volatile int waitStatus;
// 前驱结点
volatile Node prev;
// 后继结点
volatile Node next;
// 结点对应的线程
volatile Thread thread;
// 等待队列里下一个等待条件的结点
Node nextWaiter;

}

线程两种锁的模式:

模式 含义
SHARED 表示线程以共享的模式等待锁
EXCLUSIVE 表示线程正在以独占的方式等待锁

waitStatus有下面几个枚举值:

枚举 含义
0 当一个Node被初始化的时候的默认值
CANCELLED 为1,表示线程获取锁的请求已经取消了
CONDITION 为-2,表示节点在等待队列中,节点线程等待唤醒
PROPAGATE 为-3,当前线程处在SHARED情况下,该字段才会使用
SIGNAL 为-1,表示线程已经准备好了,就等资源释放了

Node的内部其实是这样的:

Node结点内部

如上图所示,Node 的数据结构其实也挺简单的,就是 thread + waitStatus + pre + next 四个属性而已。

而AQS还有哪些属性呢?如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 头结点,可以理解为:当前持有锁的线程 
private transient volatile Node head;

// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个链表
private transient volatile Node tail;

// 注意区分state和waiteState!!!
// 这个是最重要的,代表当前锁的状态,0代表没有被占用,大于 0 代表有线程持有当前锁
// 这个值可以大于 1,是因为锁可以重入,每次重入都加上 1
private volatile int waitStatus;

// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

再抽象一点就是这样了,使用图示的话就如下可以很清晰的表达了(图源文末参考文章),不过需要注意的是:阻塞队列不包含 head 节点。(如上图所示)

解释一下head:head是队列中标志,用于指示下一个被unpack的node,head来源于初始化的或曾取得过锁的node。

AbstractQueuedSynchronizer 的等待队列示意如下所示:

CLH队列(FIFO)

关于双向队列的入队操作和出队操作这些应该比较容易理解,这里就不再讲了。


3.源码分析

AQS同时提供了互斥模式(exclusive)和共享模式(shared)两种不同的同步逻辑。一般情况下,子类只需要根据需求实现其中一种模式,当然也有同时实现两种模式的同步类,如ReadWriteLock。接下来将详细介绍这两种模式。

3.1 独占模式

3.1.1 独占式同步状态获取:acquire

独占式获取同步状态时通过 acquire 进行的,他是AQS提供的模板方法,该方法为独占式获取同步状态。但是该方法对中断不敏感,也就是说由于线程获取同步状态失败加入到CLH同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移除。

acquire的主要完成的事情是这样的:

  1. 获取独占锁,对中断不敏感。
  2. 首先尝试获取一次锁,如果成功,则返回,就结束了!!!
  3. 否则会把当前线程包装成Node插入到队列中,在队列中会检测是否为head的直接后继,并尝试获取锁
  4. 如果获取失败,则会通过LockSupport阻塞当前线程,直至被释放锁的线程唤醒或者被中断,随后再次尝试获取锁,如此反复。

其源码如下:

1
2
3
4
5
6
7
8
9
10
public final void acquire(int arg) {
// tryAcquire:去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。
// addWaiter:如果tryAcquire返回FALSE(获取同步状态失败),则调用该方法将当前线程加入到CLH同步队列尾部。
// acquireQueued:当前线程会根据公平性原则来进行阻塞等待(自旋),直到获取锁为止;并且返回当前线程在等待过程中有没有中断过。
// selfInterrupt:产生一个中断。
if (!tryAcquire(arg) &&
// tryAcquire(arg)没有成功,这个时候需要把当前线程挂起,放到阻塞队列中。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

tryAcquire(int)

上面提到的tryAcquire方法, tryAcquire尝试以独占的方式获取资源,如果获取成功,则直接返回true,否则直接返回false。该方法可以用于实现Lock中的tryLock()方法。而AQS中并没有实现上面的tryAcquire(arg)方法,当你跟进去的时候会发现,只是抛出一个异常而已,该方法自定义同步组件自己实现,该方法必须要保证线程安全的获取同步状态。AQS在这里只负责定义了一个公共的方法框架。

这里之所以没有定义成abstract,是因为独占模式下只用实现tryAcquire-tryRelease,而共享模式下只用实现tryAcquireShared-tryReleaseShared。如果都定义成abstract,那么每个模式也要去实现另一模式下的接口。

其代码如下所示:

1
2
3
4
5
6
7
/*
* 去尝试获取锁,获取成功则设置锁状态并返回true,否则返回false。
* 该方法自定义同步组件自己实现,该方法必须要保证线程安全的获取同步状态。
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

而具体获取锁的操作需要由其子类进行实现,比如ReentrantLock中的Sync实现,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
// 尝试直接获取锁,返回值是boolean,代表是否获取到锁
// 返回true:1.没有线程在等待锁;2.重入锁,线程本来就持有锁,也就可以理所当然可以直接获取
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// state == 0 此时此刻没有线程持有锁
if (c == 0) {
// 虽然此时此刻锁是可以用的,但是这是公平锁,既然是公平,就得讲究先来后到,
// 看看有没有别人在队列中等了半天了
if (!hasQueuedPredecessors() &&
// 如果没有线程在等待,那就用CAS尝试一下,成功了就获取到锁了,
// 不成功的话,只能说明一个问题,就在刚刚几乎同一时刻有个线程抢先了 =_=
// 因为刚刚还没人的,我判断过了
compareAndSetState(0, acquires)) {

// 到这里就是获取到锁了,标记一下,告诉大家,现在是我占用了锁
setExclusiveOwnerThread(current);
return true;
}
}
// 会进入这个else if分支,说明是重入了,需要操作:state=state+1
// 这里不存在并发问题
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}

// 如果到这里,说明前面的if和else if都没有返回true,说明没有获取到锁
// 回到上面一个外层调用方法继续看:
// if (!tryAcquire(arg)
// && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// selfInterrupt();
return false;
}

这里你会发现上面有重入锁的概念,意思就是已经获取到锁的线程还可以再次获取到同一个锁,这里多嘴一下,有哪些锁是重入锁呢?比如:syschronized、ReentrantLock都属于重入锁,而自旋锁不属于重入锁。


addWaiter(Node)

假设tryAcquire(arg) 返回false,那么代码将执行:acquireQueued(addWaiter(Node.EXCLUSIVE), arg),这个方法,首先需要执行:addWaiter(Node.EXCLUSIVE)

该方法用于将当前线程根据不同的模式(Node.EXCLUSIVE互斥模式、Node.SHARED共享模式)加入到等待队列的队尾,并返回当前线程所在的结点。其添加过程是一个自旋过程,会去尝试能否添加到尾结点,如果队列为空会进行同步队列的初始化。需要注意的是,这里取消了快速尝试这个方法,addWaiter直接就自旋了,所以在这里是没有enq(node);这个方法的。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private Node addWaiter(Node mode) {
// 新建Node
Node node = new Node(mode);
// 通过 CAS + 自旋的方式插入节点到队尾
for (;;) {
// 如果队列不为空,则先记录当前尾结点为旧的尾结点
Node oldTail = tail;
// 判断队列是否为空,队列为空的话就先初始化队列再重新进入for循环当中,将结点node入队!
if (oldTail != null) {
/*
* 这个操作其实就是:node.prev = pred;
* AQS的精妙就是体现在很多细节的代码,比如需要用CAS往队尾里增加一个元素
* 此处的else分支是先在CAS的if前设置node.prev = oldTail,而不是在CAS成功之后再设置。
* 一方面是基于CAS的双向链表插入目前没有完美的解决方案,另一方面这样子做的好处是:
* 保证每时每刻tail.prev都不会是一个null值,否则如果node.prev = t
* 放在下面if的里面,会导致一个瞬间tail.prev = null,这样会使得队列不完整。
*/
node.setPrevRelaxed(oldTail);
// CAS设置tail为node,成功后把老的tail也就是连接到node。
if (compareAndSetTail(oldTail, node)) {
// 实现了和之前的尾节点双向连接了
oldTail.next = node;
// 线程入队了,可以返回了
return node;
}
} else {
// 如果队列为空,则初始化该同步队列
// 初始化之后,没有return,会继续for循环!!!
initializeSyncQueue();
}
}
}

initializeSyncQueue就是一个初始化同步队列的方法,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Initializes head and tail fields on first contention.
*/
private final void initializeSyncQueue() {
Node h;
if (HEAD.compareAndSet(this, null, (h = new Node())))
// 这个时候有了head,但是tail还是null,设置一下, 把tail指向head
//,但是很快有线程进来,tail就会重新指向
// 注意:这里只是设置了tail=head,此处没有return;
// 所以,设置完了以后,继续for循环
tail = h;
}

acquireQueued(Node, int)

返回acquire方法中,参数node,经过addWaiter(Node.EXCLUSIVE),此时已经进入阻塞队列,注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的话,意味着上面这段代码将进入selfInterrupt(),所以正常情况下,下面应该返回false。

这个方法非常重要,应该说真正的线程挂起,然后被唤醒后去获取锁,都在这个方法里了!!!

acquireQueued方法为一个自旋的过程,也就是说当前线程(Node)进入同步队列后,就会进入一个自旋的过程,每个节点都会自省地观察,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final boolean acquireQueued(final Node node, int arg) {
// 中断标志
boolean interrupted = false;
try {
// 自旋的过程
for (;;) {
// 当前线程的前驱节点
final Node p = node.predecessor();
// 检测当前节点前驱是否head,这是试获取锁的资格
// 如果是的话,则调用tryAcquire尝试获取锁
// 成功,则将head置为当前节点。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}

// 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头,要么就是tryAcquire(arg)没有抢赢别人
// 如果未成功获取锁则根据前驱节点判断是否要阻塞
// 注意:在获取同步状态失败后,线程并不是立马进行阻塞,需要检查该线程的状态
// 如果shouldParkAfterFailedAcquire返回false,会继续上面的for循环!!!
if (shouldParkAfterFailedAcquire(p, node))
// 上面的判断如果返回true, 说明前驱节点的waitStatus==-1,是正常情况
// 那么当前线程需要被挂起,等待以后被唤醒
// 以后是被前驱节点唤醒,就等着前驱节点拿到锁,然后释放锁的时候叫你好了
// 如果返回false, 说明当前不需要被挂起
// 仔细看shouldParkAfterFailedAcquire(p, node),我们可以发现,其实第一次进来的时候,一般都不会返回true的
// 原因很简单,前驱节点的waitStatus=-1是依赖于后继节点设置的。
// 也就是说,我都还没给前驱设置-1呢,怎么可能是true呢,但是要看到,这个方法是套在循环里的,所以第二次进来的时候状态就是-1了。
// 所以:是为了应对在经过这个方法后,node已经是head的直接后继节点了
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
// 如果tryAcquire 抛出异常的话,则进行状态获取的取消
cancelAcquire(node);
// 如果是中断状态的话,进行一个自我中断
if (interrupted)
selfInterrupt();
throw t;
}
}

从上面代码中可以看到,当前线程会一直尝试获取同步状态,当然前提是只有其前驱节点为头结点才能够尝试获取同步状态,理由:

  1. 保持FIFO同步队列原则。
  2. 头节点释放同步状态后,将会唤醒其后继节点,后继节点被唤醒后需要检查自己是否为头节点。

上面还有一点,就是当获取同步状态失败后,线程并不是立马进行阻塞,需要检查该线程的状态,检查状态的方法为 shouldParkAfterFailedAcquire(Node pred, Node node) 方法。该方法主要用途是:当线程在获取同步状态失败时,根据前驱节点的等待状态,决定后续的动作。比如前驱节点等待状态为 SIGNAL,表明当前节点线程应该被阻塞住了。不能老是尝试,避免 CPU 忙等。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// 会到这里就是没有抢到锁,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?"
// 第一个参数是前驱节点,第二个参数才是代表当前线程的节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前驱节点
int ws = pred.waitStatus;
// 前驱节点状态为SIGNAL(-1),说明前驱节点状态正常,当前线程需要挂起,直接可以返回true
if (ws == Node.SIGNAL)
return true;

// 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。
// 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
// 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
// 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,
// 找前驱节点的前驱节点做爹,往前遍历总能找到一个好爹的!!!
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 仔细想想,如果进入到这个分支意味着什么
// 前驱节点的waitStatus不等于-1和1,那也就是只可能是0,-2,-3
// 在我们前面的源码中,都没有看到有设置waitStatus的,所以每个新的node入队时,waitStatu都是0
// 正常情况下,前驱节点是之前的 tail,那么它的 waitStatus 应该是 0
// 用CAS将前驱节点的waitStatus设置为Node.SIGNAL(也就是-1)
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
// 这个方法返回 false,那么会再走一次 for 循序,
// 然后再次进来此方法,此时会从第一个分支返回 true
return false;
}

这段代码主要检查当前线程是否需要被阻塞,具体规则如下:

  1. 如果当前线程的前驱节点状态为SIGNAL,则表明当前线程需要被阻塞,调用unpark()方法唤醒,直接返回true,当前线程阻塞
  2. 如果当前线程的前驱节点状态为CANCELLED(ws > 0),则表明该线程的前驱节点已经等待超时或者被中断了,则需要从CLH队列中将该前驱节点删除掉,直到回溯到前驱节点状态 <= 0 ,返回false
  3. 如果前驱节点非SINGAL,非CANCELLED,则通过CAS的方式将其前驱节点设置为SIGNAL,返回false

上面一堆看得眼花,简略来说如下:

  1. 前驱节点为SIGNAL:阻塞。
  2. 前驱节点为CANCELLED :向前遍历, 移除前面所有为该状态的节点。
  3. 前驱节点为waitStatus < 0:将前驱节点状态设为 SIGNAL, 并再次尝试获取同步状态。

如果shouldParkAfterFailedAcquire返回true,则acquireQueued则会接着调用parkAndCheckInterrupt来阻塞当前线程,该方法主要是把当前线程挂起,从而阻塞住线程的调用栈,同时返回当前线程的中断状态,其源码如下:

1
2
3
4
5
6
private final boolean parkAndCheckInterrupt() {
// 调用LockSupport工具类的park()方法来阻塞该方法。
LockSupport.park(this);
// 返回当前线程的中断状态
return Thread.interrupted();
}

而关于LockSupport相关的东西,可以查看我对其分析的笔记:Java 并发 - AQS:LockSupport阻塞唤醒线程

再回到acquireQueued中,如果在获取同步状态中出现异常,failed = truecancelAcquire 方法会被执行。因为tryAcquire 需同步组件开发者覆写,难免不了会出现异常。该方法主要的作用就是:取消获取同步状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
   private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null;

// 遍历并更新节点前驱,把node的prev指向前部第一个非取消节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// 记录pred节点的后继为predNext,后续CAS会用到
Node predNext = pred.next;

// 直接把当前节点的等待状态置为取消,后继节点即便也在cancel可以跨越node节点。
node.waitStatus = Node.CANCELLED;

/*
* 如果CAS将tail从node置为pred节点了
* 则剩下要做的事情就是尝试用CAS将pred节点的next更新为null以彻底切断pred和node的联系。
* 这样一来就断开了pred与pred的所有后继节点,这些节点由于变得不可达,最终会被回收掉。
* 由于node没有后继节点,所以这种情况到这里整个cancel就算是处理完毕了。
*
* 这里的CAS更新pred的next即使失败了也没关系,说明有其它新入队线程或者其它取消线程更新掉了。
*/
if (node == tail && compareAndSetTail(node, pred)) {
/*
* 执行到这里,表明 pred 节点被成功设为了尾节点,这里通过 CAS 将 pred 节点的后继节点
* 设为 null。注意这里的 CAS 即使失败了,也没关系。失败了,表明 pred 的后继节点更新
* 了。pred 此时已经是尾节点了,若后继节点被更新,则是有新节点入队了。这种情况下,CAS
* 会失败,但失败不会影响同步队列的结构。
*/
pred.compareAndSetNext(predNext, null);
} else {
// 根据条件判断是唤醒后继节点,还是将前驱节点和后继节点连接到一起
int ws;
// 如果当前节点不是head的后继节点,1:判断当前节点前驱节点的是否为SIGNAL,2:如果不是,则把前驱节点设置为SINGAL看是否成功
// 如果1和2中有一个为true,再判断当前节点的线程是否为null
// 如果上述条件都满足,把当前节点的前驱节点的后继指针指向当前节点的后继节点
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
/*
* 这里使用 CAS 设置 pred 的 next,表明多个线程同时在取消,这里存在竞争。
* 不过此处没针对 compareAndSetNext 方法失败后做一些处理,表明即使失败了也
* 没关系。实际上,多个线程同时设置 pred 的 next 引用时,只要有一个能设置成
* 功即可。
*/
pred.compareAndSetNext(predNext, next);
} else {
/*
* 如果当前节点是head的后继节点,或者上述条件不满足,那就唤醒当前节点的后继节点
* 这里简单讲一下为什么要唤醒后继线程,考虑下面一种情况:
* head node1 node2 tail
* ws=0 ws=1 ws=-1 ws=0
* +------+ prev +-----+ prev +-----+ prev +-----+
* | | <---- | | <---- | | <---- | |
* | | ----> | | ----> | | ----> | |
* +------+ next +-----+ next +-----+ next +-----+
*
* 头结点初始状态为 0,node1、node2 和 tail 节点依次入队。node1 自旋过程中调用
* tryAcquire 出现异常,进入 cancelAcquire。head 节点此时等待状态仍然是 0,它
* 会认为后继节点还在运行中,所它在释放同步状态后,不会去唤醒后继等待状态为非取消的
* 节点 node2。如果 node1 再不唤醒 node2 的线程,该线程面临无法被唤醒的情况。此
* 时,整个同步队列就回全部阻塞住。
*/
unparkSuccessor(node);
}
/*
* 取消节点的next之所以设置为自己本身而不是null,
* 是为了方便AQS中Condition部分的isOnSyncQueue方法,
* 判断一个原先属于条件队列的节点是否转移到了同步队列。
*
* 因为同步队列中会用到节点的next域,取消节点的next也有值的话,
* 可以断言next域有值的节点一定在同步队列上。
*
* 在GC层面,和设置为null具有相同的效果。
*/
node.next = node; // help GC
}
}

// unparkSuccessor的作用是唤醒后继节点,其源码如下:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 当前节点状态
int ws = node.waitStatus;
// 通过 CAS 将等待状态设为 0,让后继节点线程多一次尝试获取同步状态的机会
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);

// 当前节点的后继节点
Node s = node.next;

/*
* 这里的逻辑就是如果node.next存在并且状态不为取消,则直接唤醒s即可
* 否则需要从tail开始向前找到node之后最近的非取消节点。
*
* 这里为什么要从tail开始向前查找也是值得琢磨的:
* 如果读到s == null,不代表node就为tail,参考addWaiter以及enq函数中的我的注释。
* 不妨考虑到如下场景:
* 1. node某时刻为tail
* 2. 有新线程通过addWaiter中的if分支或者enq方法添加自己
* 3. compareAndSetTail成功
* 4. 此时这里的Node s = node.next读出来s == null,但事实上node已经不是tail,它有后继了!
*/
if (s == null || s.waitStatus > 0) {
s = null;
// 从tail节点开始往前遍历来找可用节点
// 为何是从tail尾节点开始,而不是从node.next开始呢?
// 原因在于node.next仍然可能会存在null或者取消了,所以采用tail回溯办法找第一个可用的线程。
// 最后调用LockSupport的unpark(Thread thread)方法唤醒该线程。
/*
* 这里如果 s == null 处理,是不是表明 node 是尾节点?答案是不一定。
* 新节点入队时,队列瞬时结构可能如下:
* node1 node2
* +------+ prev +-----+ prev +-----+
* head | | <---- | | <---- | | tail
* | | ----> | | | |
* +------+ next +-----+ +-----+
*
* node2 节点为新入队节点,此时 tail 已经指向了它,但 node1 后继引用还未设置。
* 这里 node1 就是 node 参数,s = node1.next = null,但此时 node1 并不是尾
* 节点。所以这里不能从前向后遍历同步队列,应该从后向前。
*/
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
// 唤醒后继节点
if (s != null)
LockSupport.unpark(s.thread);
}

上面大概就是:获取当前节点的前驱节点,如果前驱节点的状态是CANCELLED,那就一直往前遍历,找到第一个waitStatus <= 0的节点,将找到的Pred节点和当前Node关联,将当前Node设置为CANCELLED。

进行了上述的一些操作之后根据当前节点的位置,其实需要考虑以下三种情况:

  1. 当前节点是尾节点。
  2. 当前节点是Head的后继节点。
  3. 当前节点不是Head的后继节点,也不是尾节点。

具体分析一下一上三种情况:

当前节点是尾节点

当前节点是尾节点

当前节点是Head的后继节点:

取消节点的next可以设置为自己本身,不设置为null,上面的注释中有进行解释了,这里就不再解释了,如果有疑惑就往上面翻一下下。

当前节点是Head的后继节点

当前节点不是Head的后继节点,也不是尾节点

当前节点不是Head的后继节点 也不是尾节点


selfInterrupt

在上面如果acquireQueued为True,就会执行selfInterrupt方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

public void interrupt() {
if (this != Thread.currentThread()) {
checkAccess();
// 线程可能因为IO操作被阻塞
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // 设置中断状态
b.interrupt(this);
return;
}
}
}
// 设置中断状态
interrupt0();
}

该方法其实是为了中断线程。但为什么获取了锁以后还要中断线程呢?这部分属于Java提供的协作式中断知识内容,这里简单介绍一下:

  1. 当中断线程被唤醒时,并不知道被唤醒的原因,可能是当前线程在等待中被中断,也可能是释放了锁以后被唤醒。因此我们通过Thread.interrupted()方法检查中断标记(该方法返回了当前线程的中断状态,并将当前线程的中断标识设置为False),并记录下来,如果发现该线程被中断过,就再中断一次。
  2. 线程在等待资源的过程中被唤醒,唤醒后还是会不断地去尝试获取锁,直到抢到锁为止。也就是说,在整个流程中,并不响应中断,只是记录中断记录。最后抢到锁返回了,那么如果被中断过的话,就需要补充一次中断。

这里的处理方式主要是运用线程池中基本运作单元Worder中的runWorker,通过Thread.interrupted()进行额外的判断处理。

整个流程大概就是这么一个回事了,大概的流程还是清楚的,但是个中细节还有待深挖。


小结

总结一下,acquire的大概流程如下:

  1. 调用 tryAcquire 方法尝试获取同步状态
  2. 获取成功,直接返回
  3. 获取失败,将线程封装到节点中,并将节点入队
  4. 入队节点在 acquireQueued 方法中自旋获取同步状态
  5. 若节点的前驱节点是头节点,则再次调用 tryAcquire 尝试获取同步状态
  6. 获取成功,当前节点将自己设为头节点并返回
  7. 获取失败,可能再次尝试,也可能会被阻塞。这里简单认为会被阻塞。

acquire的流程图如下(图源见文末文章出处):

acquire执行过程

示例分析:

以下摘自:一行一行源码分析清楚AbstractQueuedSynchronizer,他以reentrantLock进行一个简单的分析:

首先,第一个线程调用 reentrantLock.lock(),tryAcquire(1) 直接就返回 true 了,结束。只是设置了 state=1,连 head 都没有初始化,更谈不上什么阻塞队列了。要是线程 1 调用 unlock() 了,才有线程 2 来,完全没有交集嘛,AQS就派不上用场了。

于是便引出一个问题:如果线程 1 没有调用 unlock() 之前,线程 2 调用了 lock(), 想想会发生什么?

线程 2 会初始化 head【new Node()】,同时线程 2 也会插入到阻塞队列并挂起 (注意看这里是一个 for 循环,而且设置 head 和 tail 的部分是不 return 的,只有入队成功才会跳出循环)

首先,是线程 2 初始化 head 节点,此时 head== tail, waitStatus==0

初始化head节点

然后线程 2 入队:

线程2入队

同时我们也要看此时节点的 waitStatus,我们知道 head 节点是线程 2 初始化的,此时的 waitStatus 没有设置, java 默认会设置为 0,但是到 shouldParkAfterFailedAcquire 这个方法的时候,线程 2 会把前驱节点,也就是 head 的waitStatus设置为 -1。

那线程 2 节点此时的 waitStatus 是多少呢,由于没有设置,所以是 0;

如果线程 3 此时再进来,直接插到线程 2 的后面就可以了,此时线程 3 的 waitStatus 是 0,到 shouldParkAfterFailedAcquire 方法的时候把前驱节点线程 2 的 waitStatus 设置为 -1。

线程3入队

这里可以简单说下 waitStatusSIGNAL(-1) 状态的意思,Doug Lea 注释的是:代表后继节点需要被唤醒。也就是说这个 waitStatus 其实代表的不是自己的状态,而是后继节点的状态,我们知道,每个 node 在入队的时候,都会把前驱节点的状态改为 SIGNAL,然后阻塞,等待被前驱唤醒。这里涉及的是两个问题:有线程取消了排队、唤醒操作。其实本质是一样的,可以顺着 “waitStatus代表后继节点的状态” 这种思路去看一遍源码。


3.1.2 独占式获取响应中断

AQS提供了acquire(int arg)方法以供独占式获取同步状态,但是该方法对中断不响应,对线程进行中断操作后,该线程会依然位于CLH同步队列中等待着获取同步状态。为了响应中断,AQS提供了acquireInterruptibly(int arg)方法,该方法在等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常`InterruptedException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg)
throws InterruptedException { // 这里直接抛出InterruptedException
final Node node = addWaiter(Node.EXCLUSIVE);
try {
// 还是一样自旋
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
// 如果有异常,一样要取消申请
// 然后再抛出异常,而不是使用interrupted标志
cancelAcquire(node);
throw t;
}
}

首先校验该线程是否已经中断了,如果是则抛出InterruptedException,否则执行tryAcquire(int arg)方法获取同步状态,如果获取成功,则直接返回,否则执行doAcquireInterruptibly(int arg)doAcquireInterruptibly(int arg)定义如下:

doAcquireInterruptibly(int arg)方法与acquire(int arg)方法仅有两个差别:

  1. 方法声明抛出InterruptedException异常
  2. 在中断方法处不再是使用interrupted标志,而是直接抛出InterruptedException异常。

3.1.3 独占式超时获取

AQS除了提供上面两个方法外,还提供了一个增强版的方法tryAcquireNanos(int arg,long nanos)。该方法为acquireInterruptibly方法的进一步增强,它除了响应中断外,还有超时控制。即如果当前线程没有在指定时间内获取同步状态,则会返回false,否则返回true。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// nanosTimeout <= 0
if (nanosTimeout <= 0L)
return false;
// 超时时间
final long deadline = System.nanoTime() + nanosTimeout;
// 新增Node节点
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 自旋
for (;;) {
final Node p = node.predecessor();
// 获取同步状态成功
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
/*
* 获取失败,做超时、中断判断
*/
// 重新计算需要休眠的时间
nanosTimeout = deadline - System.nanoTime();
// 已经超时,返回false
if (nanosTimeout <= 0L)
return false;
// 如果没有超时,则等待nanosTimeout纳秒
// 注:该线程会直接从LockSupport.parkNanos中返回,
// LockSupport为JUC提供的一个阻塞和唤醒的工具类,后面做详细介绍
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
// 线程是否已经中断了
if (Thread.interrupted())
// 抛出一个异常并且结束
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

其大概流程如下:

独占式超时获取


3.1.4 独占式同步状态释放:release

释放的过程会比较简单点:

  1. 调用 tryRelease(arg) 尝试释放同步状态
  2. 如果 tryRelease 返回true也就是独占锁被完全释放,唤醒后继线程。

这里的唤醒是根据head几点来判断的,下面代码的注释中也分析了head节点的情况,只有在head存在并且等待状态小于零的情况下唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
// 当线程获取同步状态后,执行完相应逻辑后就需要释放同步状态
public final boolean release(int arg) {
/*
* 此时的head节点可能有3种情况:
* 1. null (AQS的head延迟初始化+无竞争的情况)
* 2. 当前线程在获取锁时new出来的节点通过setHead设置的
* 3. 由于通过tryRelease已经完全释放掉了独占锁,有新的节点在acquireQueued中获取到了独占锁,并设置了head

* 第三种情况可以再分为两种情况:
* (一)时刻1:线程A通过acquireQueued,持锁成功,set了head
* 时刻2:线程B通过tryAcquire试图获取独占锁失败失败,进入acquiredQueued
* 时刻3:线程A通过tryRelease释放了独占锁
* 时刻4:线程B通过acquireQueued中的tryAcquire获取到了独占锁并调用setHead
* 时刻5:线程A读到了此时的head实际上是线程B对应的node
* (二)时刻1:线程A通过tryAcquire直接持锁成功,head为null
* 时刻2:线程B通过tryAcquire试图获取独占锁失败失败,入队过程中初始化了head,进入acquiredQueued
* 时刻3:线程A通过tryRelease释放了独占锁,此时线程B还未开始tryAcquire
* 时刻4:线程A读到了此时的head实际上是线程B初始化出来的傀儡head
*/
if (tryRelease(arg)) {
Node h = head;
// head节点状态不会是CANCELLED,所以这里h.waitStatus != 0相当于h.waitStatus < 0
if (h != null && h.waitStatus != 0)
// 释放成功后,会调用unparkSuccessor(Node node)方法唤醒后继节点
// 上面已经分析过了,不再叙述
unparkSuccessor(h);
return true;
}
return false;
}

跟tryAcquire一样,tryRelease也是由用户自己去实现了,其源码如下:

1
2
3
4
5
6
/*
* 独占式释放同步状态;
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

3.2 共享模式

其实如果理解了上面的独享模式之后再来理解共享模式,难度不大,主要是与共享模式下,同一时刻会有多个线程获取共享同步状态。共享模式是实现读写锁中的读锁、CountDownLatch 和 Semaphore 等同步组件的基础,这样再去理解一些共享同步组件就不难了。

3.2.1 同步状态获取:acquireShared

共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;

1
2
3
4
5
6
public final void acquireShared(int arg) {
// 尝试获取共享同步状态,tryAcquireShared 返回的是整型
if (tryAcquireShared(arg) < 0)
// 获取失败,自旋获取同步状态
doAcquireShared(arg);
}

其中doAcquireShared以自旋方式获取同步状态,共享式获取同步状态的标志是返回 >= 0 的值表示获取成功,该方法不响应中断,与独占式相似;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void doAcquireShared(int arg) {
// 共享式节点
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
// 这里和前面一样,也是通过有限次自旋的方式获取同步状态
for (;;) {
// 前驱节点
final Node p = node.predecessor();
/*
* 前驱是头结点,其类型可能是 EXCLUSIVE,也可能是 SHARED.
* 如果是 EXCLUSIVE,线程无法获取共享同步状态。
* 如果是 SHARED,线程则可获取共享同步状态。
* 能不能获取共享同步状态要看 tryAcquireShared 具体的实现。比如多个线程竞争读写
* 锁的中的读锁时,均能成功获取读锁。但多个线程同时竞争信号量时,可能就会有一部分线
* 程因无法竞争到信号量资源而阻塞。
*/
if (p == head) {
// 尝试获取同步状态
int r = tryAcquireShared(arg);
if (r >= 0) {
// 设置头结点,如果后继节点是共享类型,唤醒后继节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}

setHeadAndPropagate这个函数主要做了两件事:

  1. 在获取共享锁成功后,设置head节点
  2. 根据调用tryAcquireShared返回的状态以及节点本身的等待状态来判断是否要需要唤醒后继线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void setHeadAndPropagate(Node node, int propagate) {
// 把当前的head封闭在方法栈上,用以下面的条件检查。
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
// propagate是tryAcquireShared的返回值,这是决定是否传播唤醒的依据之一。
// h.waitStatus为SIGNAL或者PROPAGATE时也根据node的下一个节点共享来决定是否传播唤醒
if (propagate > 0 || h == null || h.waitStatus < 0 ||
// h.waitStatus < 0 时,waitStatus = SIGNAL 或 PROPAGATE。
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 节点 s 如果是共享类型节点,则应该唤醒该节点
// 至于 s == null 的情况前面分析过,这里不在赘述。
if (s == null || s.isShared())
doReleaseShared();
}
}

那继续到doReleaseShared里面看看做了些什么:

doReleaseShared该方法用于在 acquires/releases 存在竞争的情况下,确保唤醒动作向后传播。这是共享锁中的核心唤醒函数,主要做的事情就是唤醒下一个线程或者设置传播状态。后继线程被唤醒后,会尝试获取共享锁,如果成功之后,则又会调用setHeadAndPropagate,将唤醒传播下去。

总的来说:这个函数的作用是保障在acquire和release存在竞争的情况下,保证队列中处于等待状态的节点能够有办法被唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* 以下的循环做的事情就是,在队列存在后继线程的情况下,唤醒后继线程;
* 或者由于多线程同时释放共享锁由于处在中间过程,读到head节点等待状态为0的情况下,
* 虽然不能unparkSuccessor,但为了保证唤醒能够正确稳固传递下去,设置节点状态为PROPAGATE。
* 这样的话获取锁的线程在执行setHeadAndPropagate时可以读到PROPAGATE,从而由获取锁的线程去释放后继等待线程。
*/
for (;;) {
Node h = head;
// 如果队列中存在后继线程。
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 如果h节点的状态为0,需要设置为PROPAGATE用以保证唤醒的传播。
// setHeadAndPropagate 在读到 h.waitStatus < 0 时,可以继续唤醒后面的节点。
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 检查h是否仍然是head,如果不是的话需要再进行循环。
if (h == head) // loop if head changed
break;
}
}

最后说一下共享模式下获取同步状态的大致流程,如下:

  1. 获取共享同步状态
  2. 若获取失败,则生成节点,并入队
  3. 如果前驱为头结点,再次尝试获取共享同步状态
  4. 获取成功则将自己设为头结点,如果后继节点是共享类型的,则唤醒
  5. 若失败,将节点状态设为 SIGNAL,再次尝试。若再次失败,线程进入等待状态

3.2.2 共享状态释放:releaseShared

释放共享状态主要逻辑在 doReleaseShared ,而我们前面已经分析过他了,所以就不继续了。共享节点线程在获取同步状态和释放同步状态时都会调用 doReleaseShared,所以 doReleaseShared 是多线程竞争集中的地方。

1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
// 因为可能会存在多个线程同时进行释放同步状态资源
// 所以在doReleaseShared需要确保同步状态安全地成功释放,一般都是通过CAS和循环来完成的。
doReleaseShared();
return true;
}
return false;
}

4.一些疑问

4.1 插入节点时的代码顺序

addWaiter 方法中新增一个节点时为什么要先将新节点的prev置为tail再尝试CAS,而不是CAS成功后来构造节点之间的双向链接?

这是因为,双向链表目前没有基于CAS原子插入的手段,如果我们将node.prev = tt.next = node(t为方法执行时读到的tail,引用封闭在栈上)放到compareAndSetTail(t, node)成功后执行,如下所示:

1
2
3
4
5
if (compareAndSetTail(t, node)) {
node.prev = t;
t.next = node;
return t;
}

会导致这一瞬间的tail也就是t的prev为null,这就使得这一瞬间队列处于一种不一致的中间状态。

4.2 唤醒节点时为什么从tail向前遍历

unparkSuccessor方法中为什么唤醒后继节点时要从tail向前查找最接近node的非取消节点,而不是直接从node向后找到第一个后break掉?

其实上面的注释中也解释得很清楚了,如果读到s == null,不代表node就为tail。

考虑如下场景:

  1. node某时刻为tail
  2. 有新线程通过addWaiter中的if分支或者enq方法添加自己
  3. compareAndSetTail成功
  4. 此时这里的Node s = node.next读出来s == null,但事实上node已经不是tail,它有后继了!

4.3 AQS如何保证队列活跃

AQS如何保证在节点释放的同时又有新节点入队的情况下,不出现原持锁线程释放锁,后继线程被自己阻塞死的情况,保持同步队列的活跃?

回答这个问题,需要理解shouldParkAfterFailedAcquireunparkSuccessor这两个方法。

  • 以独占锁为例,后继争用线程阻塞自己的情况是读到前驱节点的等待状态为SIGNAL,只要不是这种情况都会再试着去争取锁。假设后继线程读到了前驱状态为SIGNAL,说明之前在tryAcquire的时候,前驱持锁线程还没有tryRelease完全释放掉独占锁。
  • 此时如果前驱线程完全释放掉了独占锁,则在unparkSuccessor中还没执行完置waitStatus为0的操作,也就是还没执行到下面唤醒后继线程的代码,否则后继线程会再去争取锁。那么就算后继争用线程此时把自己阻塞了,也一定会马上被前驱线程唤醒。
  • 那么是否可能持锁线程执行唤醒后继线程的逻辑时,后继线程读到前驱等待状态为SIGNAL把自己给阻塞,再也无法苏醒呢?
  • 确实可能在扫描后继需要唤醒线程时读不到新来的线程,但只要tryRelease语义实现正确,在true时表示完全释放独占锁,则后继线程理应能够tryAcquire成功,shouldParkAfterFailedAcquire在读到前驱状态不为SIGNAL会给当前线程再一次获取锁的机会的。

4.4 AQS如何防止内存泄露

AQS维护了一个FIFO队列,它是如何保证在运行期间不发生内存泄露的?

AQS在无竞争条件下,甚至都不会new出head和tail节点。线程成功获取锁时设置head节点的方法为setHead,由于头节点的thread并不重要,此时会置node的thread和prev为null,完了之后还会置原先head也就是线程对应node的前驱的next为null,从而实现队首元素的安全移出。而在取消节点时,也会令node.thread = null,在node不为tail的情况下,会使node.next = node(之所以这样也是为了isOnSyncQueue实现更加简洁)


5小结

在并发环境下,加锁和解锁需要以下三个部件的协调:

  1. 锁状态。我们要知道锁是不是被别的线程占有了,这个就是 state 的作用,它为 0 的时候代表没有线程占有锁,可以去争抢这个锁,用 CAS 将 state 设为 1,如果 CAS 成功,说明抢到了锁,这样其他线程就抢不到了,如果锁重入的话,state进行 +1 就可以,解锁就是减 1,直到 state 又变为 0,代表释放锁,所以 lock() 和 unlock() 必须要配对啊。然后唤醒等待队列中的第一个线程,让其来占有锁。
  2. 线程的阻塞和解除阻塞。AQS 中采用了 LockSupport.park(thread) 来挂起线程,用 unpark 来唤醒线程。
  3. 阻塞队列。因为争抢锁的线程可能很多,但是只能有一个线程拿到锁,其他的线程都必须等待,这个时候就需要一个 queue 来管理这些线程,AQS 用的是一个 FIFO 的队列,就是一个链表,每个 node 都持有后继节点的引用。AQS 采用了 CLH 锁的变体来实现。

用了好几天,看了很多博客还有翻了一些书,对着JDK源码一点一点的抠了出来上面的这些阅读理解,感觉这个源码还是有些难度,还是需要时不时的回头看看,其实主要就分为独占式和共享式,然后各有没有完成的方法需要继承AQS的子类去完成,要对大致的状态获取、状态释放有所了解,这些会比较重要点,对那几个状态需要多了解了解是什么个意思,一般会出现在什么情况,感觉看了一些源码之后,发现这些源码中的状态位其实很重要,每个方法都伴随着状态位的改变,通过状态位可以了解到很多内部细节,最后还是说分析得太烂,以后又有认识之后一定要把上面这个重新整理一遍,还是不太深刻,盲人摸象,只了解到了一小部分罢了。


以上参考:

  1. 一行一行源码分析清楚AbstractQueuedSynchronizer
  2. 【死磕Java并发】—–J.U.C之AQS:AQS简介
  3. AbstractQueuedSynchronizer 原理分析 - 独占/共享模式
  4. JDK源码之AQS源码剖析
  5. AbstractQueuedSynchronizer源码解读
  6. 从ReentrantLock的实现看AQS的原理及应用
  7. 书籍:Java并发编程的艺术