0%

Java 并发 - 锁相关:Condition接口分析

1.简介

Condition就是一接口,而在AQS 中的ConditionObject内部类实现了这个接口。Condition接口中只是进行了一些等待和通知方法的声明,并没有进行实现,Condition 经常可以用在生产者-消费者的场景中,关于Condition相关的东西,我们需要先了解AQS相关的知识,可以看看之前的那篇文章:Java 并发 - AQS:框架分析,然后再进行Condition的了解

这里先讲一句:Condition 中的方法则要配合锁对象使用,并通过newCondition方法获取实现类对象。这有点像Object 中的方法需要配合 synchronized 关键字使用。关于Condition与Object类实现的这些方法可以看这篇文章中最下面的那个对比,Ojbect类的wait(), notify()notifyAll() 方法是基于对象的监视器锁的,我们现在所讲的Condition是基于 ReentrantLock 实现的,而ReentrantLock 是依赖于 AbstractQueuedSynchronizer 实现的。

Demo开场

我们可以先从源码中给出的一个Demo来进行了解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {

final Lock lock = new ReentrantLock();
// condition 依赖于 lock 来产生
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[100];
int putptr, takeptr, count;

// 生产
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await(); // 队列已满,等待,直到 not full 才能继续生产
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去
} finally {
lock.unlock();
}
}

// 消费
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去
return x;
} finally {
lock.unlock();
}
}
}

这里创建两个线程让上面JDK给的Demo跑起来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class BoundedBuffer {

final Lock lock = new ReentrantLock();
// condition 依赖于 lock 来产生
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

final Object[] items = new Object[5];
private int putptr, takeptr, count;

// 省略put和take方法

public static void main(String[] args) {

BoundedBuffer boundedBuffer = new BoundedBuffer();

ExecutorService executor1 = Executors.newCachedThreadPool();
executor1.execute(
new Runnable() {
@Override
public void run() {
for (int i = 0; i < 200; i++) {
try {
boundedBuffer.put(i);
System.out.println("放入了:" + i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
);

ExecutorService executor2 = Executors.newCachedThreadPool();
executor2.execute(
new Runnable() {
@Override
public void run() {
for (int i = 0; i < 200; i++) {
try {
// Thread.sleep(200);
System.out.println("取出了:" + boundedBuffer.take());
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
}
);
//终止线程池
executor1.shutdown();
while (!executor1.isTerminated()) {
}
System.out.println("Finished executor1");

//终止线程池
executor2.shutdown();
while (!executor2.isTerminated()) {
}
System.out.println("Finished executor2");
}
}

因为设置了容量为5,所以会不断的生产和消费:

1
2
3
4
5
6
7
8
9
10
放入了:0
放入了:1
放入了:2
放入了:3
放入了:4
取出了:0
取出了:1
取出了:2
取出了:3
取出了:4

通过上面我们可以大概了解到:

  1. 在使用Condition的时候,必须要先持有相对应的锁,而Object类也是差不多这样的一种机制,我们在Object类中,如果要使用wait(), notify()notifyAll() 方法,则我们必须要先持有某个对象的监视器!
  2. 还有一点:ArrayBlockingQueue 采用这种方式实现了生产者-消费者,所以请只把这个例子当做学习例子,实际生产中可以直接使用 ArrayBlockingQueue!!!

Condition与ReentrantLock的联系

上面的例子是要让我们清楚的知道,就是condition是与ReentrantLock是息息相关的,可以说他们一般都是结合在一起使用的,那么我们在使用condition的一些方法,比如await() 或者 signale()方法的时候,我们就应该获取到这个Lock才能进行使用!

而每个 ReentrantLock 实例可以通过调用多次 newCondition 产生多个 ConditionObject 的实例:

就如同我们上面的Demo中所示那样:

1
2
3
4
final Lock lock = new ReentrantLock();

final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();

ReentrantLock 源码中可以看到:

1
2
3
final ConditionObject newCondition() {
return new ConditionObject();
}

我们再继续看看这个ConditionObject是个什么东西,进去源码发现,原来是个 Condition 的实现类 ,它存在于AbstractQueuedSynchronizer中:

1
2
3
4
5
6
7
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 条件队列的第一个节点
private transient Node firstWaiter;
// 条件队列的最后一个节点
private transient Node lastWaiter;
......

需要注意的是Condition就是一个接口,里面就是实现一些规范而已了,并没有具体实现代码细节,我们要分析的也是这下面的一些内容了:

1
2
3
4
5
6
7
8
9
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}

我们上面也看到了每个 ReentrantLock 实例可以通过调用多次 newCondition 产生多个 ConditionObject 的实例,那么该如何去管理这些实例呢?我们可以引入跟AQS中的同步队列相似的另一种概念,称之为条件队列或者同步队列,图示如下:

条件队列

其实这里差不多就代表了整个Condition的核心思想所在了:

  1. 条件队列和同步队列的节点,都是 Node 的实例,因为条件队列的节点是需要转移到同步队列中去的;
  2. 我们知道一个 ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition 实例,这里对应 condition1 和 condition2。注意,ConditionObject 只有两个属性 firstWaiter 和 lastWaiter;
  3. 每个 condition 有一个关联的条件队列,如线程 1 调用 condition1.await() 方法即可将当前线程 1 包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行,条件队列是一个单向链表;
  4. 调用condition1.signal() 触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列的 firstWaiter(队头) 移到同步队列的队尾,等待获取锁,获取锁后 await 方法才能返回,继续往下执行。

可以先看一下这张图,了解一下简单的流程所在,具体的源码展开在下面。

Condition主要的方法

这里可以先明白Condition有一些什么方法:

方法名称 描述
await() 当前线程进入等待状态直到被通知(signal)或者中断;当前线程进入运行状态并从await()方法返回的场景包括:(1)其他线程调用相同Condition对象的signal/signalAll方法,并且当前线程被唤醒;(2)其他线程调用interrupt方法中断当前线程;
awaitUninterruptibly() 当前线程进入等待状态直到被通知,在此过程中对中断信号不敏感,不支持中断当前线程
awaitNanos(long) 当前线程进入等待状态,直到被通知、中断或者超时。如果返回值小于等于0,可以认定就是超时了
awaitUntil(Date) 当前线程进入等待状态,直到被通知、中断或者超时。如果没到指定时间被通知,则返回true,否则返回false
signal() 唤醒一个等待在Condition上的线程,被唤醒的线程在方法返回前必须获得与Condition对象关联的锁
signalAll() 唤醒所有等待在Condition上的线程,能够从await()等方法返回的线程必须先获得与Condition对象关联的锁

2.源码分析

2.1 等待 await

主要分析await()方法,因为其他的几个等待方法也差不多,大同小异,只要将这个最基本的方法了解清楚了,相比其他方法也不难!

await方法的主要流程如下:

  1. 将当先线程封装成节点,并将节点添加到Condition条件队列尾部。
  2. 节点入队了之后,完全释放独占锁。
  3. 判断节点是否在同步队列上,如果不在则阻塞线程(等待其他线程调用signal/signalAll或是被中断)。
  4. 重新获取互斥锁。

await方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// await是可响应中断的等待方法,而不可响应中断的是`awaitUninterruptibly()`
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 1.将当先线程封装成节点,并将节点添加到Condition条件队列尾部
Node node = addConditionWaiter();
/* 2.节点入队了之后,完全释放独占锁(无论锁是否可以重入)
* 返回值是释放锁之前的state状态值
* await之前,当前线程是持有锁的,所以要进行锁的释放
*/
int savedState = fullyRelease(node);
int interruptMode = 0;
/* 3.判断节点是否在同步队列上,如果不在则阻塞线程
* 循环结束有两个条件:
* a.其他线程调用 singal/singalAll,node 将会被转移到同步队列上,然后被唤醒。
* b.其它线程中断了当前线程,当前线程会自行尝试进入同步队列。
*/
while (!isOnSyncQueue(node)) {
// 调用 LockSupport.park 阻塞当前线程
LockSupport.park(this);
/* 4.获取中断模式,线程从park中被唤醒的时候,需要判断是否此时被中断,若中断则尝试转移到同步队列。
* 这里有两种中断模式,如下:
* THROW_IE(值-1):
* 中断在 node 转移到同步队列“前”发生,需要当前线程自行将 node 转移到同步队列中
* 并在随后抛出 InterruptedException 异常。
* REINTERRUPT:
* 中断在 node 转移到同步队列“期间”或“之后”发生
* 此时表明有线程正在调用singal/singalAll 转移节点。
* 在该种中断模式下,再次设置线程的中断状态。向后传递中断标志,由后续代码去处理中断。
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
/* 5.被转移到同步队列的节点 node 在 acquireQueued 方法中重新获取同步状态
* 不懂?因为while 出来后,我们确定节点已经进入了同步队列,准备获取锁!
* 此处的saveState 即为上面调用fullyRelease 所返回的值.
* 重新获取互斥锁过程中,如果中断并且interruptMode不为"抛出异常",则设置为REINTERRUPT。
*/
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
/*
* 正常通过 singal/singalAll 转移节点到同步队列时,nextWaiter 引用会被置空。
* 若发生线程产生中断(THROW_IE)或 fullyRelease 方法出现错误等异常情况,该引用则不会被置空
* 于是需要进行一些后续步骤处理
*/
if (node.nextWaiter != null)
// 清理等待状态非 CONDITION 的节点
unlinkCancelledWaiters();
/* 6.根据不同的中断模式进行不同的处理
* 如果线程发生过中断则根据THROW_IE或是REINTERRUPT分别抛出异常或者重新中断。
* THROW_IE:抛出 InterruptedException 异常
* REINTERRUPT:重新设置线程中断标志
*/
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

2.1.1 将结点加入条件队列

addConditionWaiter()方法主要是将当前线程加入到Condition条件队列中。当然在加入到尾节点之前会清楚所有状态不为Condition的节点,即将已取消的所有节点清除出队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 将当先线程封装成节点,并将节点添加到Condition条件队列尾部
private Node addConditionWaiter() {
// 这里之前的JDK版本没有添加,其实算是一个Bug,经过博客园博主:活在梦里,提交了之后JDK添加了
// BUG 链接为 JDK-8187408,感兴趣的可以看看
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node t = lastWaiter;
/* 如果条件队列中最后一个waiter节点状态为取消,即不为CONDITION
* 则调用unlinkCancelledWaiters清理队列。
*/
if (t != null && t.waitStatus != Node.CONDITION) {
// 这个方法会遍历整个条件队列,然后会将已取消的所有节点清除出队列
unlinkCancelledWaiters();
// 重读lastWaiter。
t = lastWaiter;
}

// 创建节点,并将节点置于队列尾部
Node node = new Node(Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

// 清理等待状态为 CANCELLED 的节点
// 等待队列是一个单向链表,遍历链表将已经取消等待的节点清除出去
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
// 记住其作用:记录上一个非取消节点
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
// 如果节点的状态不是 Node.CONDITION 的话,这个节点就是被取消的
if (t.waitStatus != Node.CONDITION) {
// 进行断开
t.nextWaiter = null;
// 如果trail 为 null,则表明之前所有节点等待状态都为CANCELLED,需要取消!
// 直接将头结点指向该next,跳过t 节点了!
if (trail == null)
firstWaiter = next;
// 如果trail 不为 null,则证明之前有的节点等待状态为 CONDITION,进行链接!
else
trail.nextWaiter = next;
// next 为 null,表明遍历到条件队列尾部了,此时将 lastWaiter 指向 trail
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}

2.1.2 完全释放独占锁

在节点入队了之后,则调用下面的方法进行完全释放锁,为什么是完全释放呢?因为 ReentrantLock 是可以重入的。

saveState 代表着什么意思呢?如果在 condition1.await() 之前,假设线程先执行了 2 次 lock() 操作,那么 state 为 2,我们理解为该线程持有 2 把锁,这里 await() 方法必须将 state 设置为 0,然后再进入挂起状态,这样其他线程才能持有锁。当它被唤醒的时候,它需要重新持有 2 把锁,才能继续下去。

举个简单的操作,我们在使用condition的过程中,先 lock.lock(),然后 condition1.await(),那么state的值就会发生变化,从1变成0,此时锁进行释放,并且fullyRelease(Node node)这个方法返回1,如果lock重入了n次,则savedState = n,但是如果这个方法失败,则会将节点设置为”取消”状态,并抛出异常 IllegalMonitorStateException

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 节点入队了之后,完全释放独占锁
final int fullyRelease(Node node) {
try {
// 获取同步状态值
int savedState = getState();
// 这里使用了当前的 state 作为 release 的参数,也就是完全释放掉锁,将 state 置为 0
if (release(savedState))
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
// 设置状态为“取消”
node.waitStatus = Node.CANCELLED;
throw t;
}
}

其实上面这个方法也可以很好的解释一下,假如我的线程没有持有lock,直接调用condition的await()方法,那会怎样?

如果一个线程在不持有 lock 的基础上,就去调用 condition.await() 方法,它能进入条件队列,但是在上面的这个方法中,由于它不持有锁,release(savedState) 这个方法肯定要返回 false,进入到异常分支,然后进入 finally 块设置 node.waitStatus = Node.CANCELLED,这个已经入队的节点之后会被后继的节点”请出去“。

2.1.3 判断节点node是否处于同步队列上

经过上面的完全释放锁之后,会走到下面的这一条代码块,会通过isOnSyncQueue(Node node)方法,会进行一个自旋判断自己是否在同步队列中,如果不在同步队列上的话,将当前线程挂起,等待被转移到同步队列中。

里面其实分为三种方法进行判断节点node是否处于同步队列上:

  1. 节点状态为CONDITION一定是不在同步队列,或者如果prev为null也一定是不在同步队列。
  2. 如果节点的next不为null,则其一定是在同步队列的。
  3. 上面两种方法都不奏效,进行同步队列的遍历查找。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 判断节点是否在同步队列上
// 在节点入条件队列的时候,初始化时设置了 waitStatus = Node.CONDITION
// 而signal 的时候需要将节点从条件队列移到同步队列,这个方法就是判断 node 是否已经移动到同步队列了
final boolean isOnSyncQueue(Node node) {
/* 节点在同步队列上时,其状态可能为 0、SIGNAL、PROPAGATE 和 CANCELLED 其中之一,但不会为 CONDITION
* 如果在条件队列中,waitStatus 为Node.CONDITION(-2)
* 如果在同步队列中,waitStatus 会置为0
* 如果节点在同步队列上,node.prev 一定不会为 null
* 因为同步队列里的节点prev为null只可能是获取到锁后调用setHead清为null
* 新入队的节点prev值是不会为null的
* 另外,条件队列里节点是用nextWaiter来维护的,不用next和prev。
*/
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
/*
* 如果节点的next不为null,则其一定是在同步队列的。
* 因为条件队列使用的是 nextWaiter 指向后继节点的,条件队列上节点的 next 指针均为 null。
*
* 为什么上面先判断node.next既可以了,还需要上面判断node.prev 呢?
* 因为节点在入队过程中,是先设置 node.prev,后设置 node.next。
* 如果设置完 node.prev 后,线程被切换了,此时 node.next 仍然为 null
* 但此时 node 确实已经在同步队列上了,所以这里还需要进行判断。
*
* 这里值得一提的是在AQS的cancelAcquire方法中,会使用:node.next = node
* 一个节点将自己移除出队列的时候会把自己的next域指向自己。
* 从GC效果上来看node.next = node和node.next = null无异,
* 但是这对此处next不为null一定在同步队列上来说,
* 这样可以将节点在同步队列上被取消的情况与普通情况归一化判断。
*/
if (node.next != null) // If has successor, it must be on queue
return true;

// 上面的两种判断都不奏效,于是只有进行遍历了
// 在同步队列上,从后向前查找 node 节点
return findNodeFromTail(node);
}

// 由于同步队列上的的节点 prev 引用不会为空,所以这里从后向前查找 node 节点
private boolean findNodeFromTail(Node node) {
// We check for node first, since it's likely to be at or near tail.
// tail is known to be non-null, so we could re-order to "save"
// one null check, but we leave it this way to help the VM.
for (Node p = tail;;) {
if (p == node)
return true;
if (p == null)
return false;
p = p.prev;
}
}

2.1.4 检测线程等待期间是否中断

就是因为await()方法支持中断,所以我们需要对其中断进行细致考虑,我认为主要了解的有以下三点:

  1. 线程是否发生了中断?
  2. 线程发生中断是否能够成功进入同步队列?
  3. 线程中断发生的时机是在节点转移到同步队列之前发生?还是发生在节点转移到同步队列期间或之后发生?

如果理清上面这三个问题,其实中断唤醒这一块就能够很好理解了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 检测线程在等待期间是否发生了中断 
private int checkInterruptWhileWaiting(Node node) {
/*
* 1. 线程未中断返回0
* 2. 线程中断且入同步队列成功,返回THROW_IE表示后续要抛出InterruptedException。
* 3. 线程中断且未能入同步队列(由于被signal方法唤醒),则返回REINTERRUPT表示后续重新中断。
*/
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}

// 下面表示判断中断发生的时机,分为两种:
// 1. 中断在节点被转移到同步队列前发生,此时返回 true
// 2. 中断在节点被转移到同步队列期间或之后发生,此时返回 false
final boolean transferAfterCancelledWait(Node node) {
// 中断在节点被转移到同步队列前发生,此时自行将节点转移到同步队列上,并返回 true
if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
// 调用 enq 将节点转移到同步队列中
// enq(node): 自旋进入同步队列的队尾
enq(node);
return true;
}

/* 如果上面的CAS失败,则说明已经有线程调用signal/signalAll 方法,状态已经被抢先更新了。
* signal/signalAll 方法会先将节点等待状态由 CONDITION 设置为 0 后,再调用 enq 方法转移节点。
* 而下面判断节点是否已经在同步队列上的原因是,signal/signalAll 方法可能仅设置了等待状态,
* 还没来得及转移节点就被切换走了。所以这里用自旋的方式判断 signal/signalAll 是否已经完成了转移操作。
* 以上这种中断情况发生在节点被转移到同步队列期间。
*/
while (!isOnSyncQueue(node))
Thread.yield();

// 中断在节点被转移到同步队列期间或之后发生,返回 false
return false;
}

2.1.5 重新获取同步状态

while 循环出来以后,下面是这段代码:

1
2
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;

由于 while 出来后,我们确定节点已经进入了同步队列,准备获取锁。

此处的acquireQueued(node, savedState)方法中的第一个参数node,之前已经通过方法enq(node) 进入了队列,而这个参数 savedState 是之前释放锁前的 state,这个方法返回的时候,代表当前线程获取了锁,而且 state == savedState了。

关于中断,需要明白的一点是:不管有没有发生中断,都会进入到同步队列,而 acquireQueued(node, savedState) 的返回值就是代表线程是否被中断

  1. 如果返回 true,说明被中断了,而且 interruptMode != THROW_IE,说明在 signal 之前就发生中断了,这里将 interruptMode 设置为 REINTERRUPT,用于待会重新中断。
  2. 如果上面的 while 循环没有产生中断,则 interruptMode = 0。

关于acquireQueued()方法,在AQS中已经有提及到了,这里附上相对于的知识所在:AQS分析:acquireQueued。该方法是一个自旋的过程,当前线程(Node)进入同步队列后,就会进入一个自旋的过程,每个节点都会自省地观察,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。

2.1.6 根据中断类型进行不同处理

以下方法主要是根据不同的中断模式进行不同的处理 ,较为简单,需要清楚的是如果中断模式为:REINTERRUPT,则重新中断当前线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 根据不同的中断模式进行不同的处理 
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
// 中断模式为:THROW_IE,则抛出 InterruptedException 异常
if (interruptMode == THROW_IE)
throw new InterruptedException();
// 中断模式为:REINTERRUPT,则重新中断当前线程。
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}

// 中断线程
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

2.2 通知

2.2.1 signal

signal() 主要的作用就是将条件队列中的头结点转移到同步队列中!他会唤醒等待最久的线程,将这个线程对应的node从条件队列转移到同步队列中去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public final void signal() {
// 检查线程是否获取了独占锁,未获取独占锁调用 signal 方法是不允许的
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 取头部的节点
Node first = firstWaiter;
if (first != null)
// 将头结点转移到同步队列中
doSignal(first);
}
// 从条件队列中从头向后进行遍历,找到第一个需要转移的node
// 因为有些线程会取消排队,但是可能还在队列中!
private void doSignal(Node first) {
do {
// 将 firstWaiter 指向 first 节点后面的第一个,因为 first 节点马上要离开了
// 如果将 first 移除后,后面没有节点在等待了,那么需要将 lastWaiter 置为 null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 断开连接,即将头结点从条件队列中进行移除
first.nextWaiter = null;
// 调用transferForSignal 将节点转移到同步队列中
// 如果first 节点转移不成功,则选择first 后面的第一个节点进行转移,以此类推
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

// 这个方法用于将条件队列中的节点转移到同步队列中
final boolean transferForSignal(Node node) {

// 如果将节点的等待状态由CONDITION转为0失败,则表明节点被取消了。
// 肯定节点被取消是因为transferForSignal 中不存在线程竞争的问题
// 所以下面CAS失败的唯一原因是节点的等待状态为 CANCELLED
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
// 调用 enq 方法将 node 转移到同步队列中,并返回 node 的前驱节点 p
Node p = enq(node);
// 获取前驱节点p的等待状态
int ws = p.waitStatus;
/* 如果前驱节点p的等待状态大于0,说明node 在同步队列中的前驱节点取消了等待锁,直接唤醒node 对应的线程。
* 如果ws <= 0,则CASwaitStatus 将会被调用!如果 CAS 设置失败,则应直接唤醒 node 节点对应的线程。
* 以免因 node 没有被唤醒导致同步队列挂掉。
* 在AQS框架分析中,已经了解到了结点入队后,需要将前驱节点的状态设为:Node.SIGNAL
* 如果将前驱状态切到SIGNAL了,则由相应线程在释放锁之后唤醒node节点对应线程。
*/
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

以上的流程大概就是:

  1. 首先先去判断当前线程是否已经持有独占锁了,未获取则直接抛出异常。
  2. 如果线程已经获取了锁,则将唤醒条件队列的首节点(首节点不合适的话就从头向后查找合适的节点)。
  3. 唤醒首节点是先将条件队列中的头结点移除,然后调用AQS的enq(Node node)方法将其安全地移到同步队列中。
  4. 最后判断该节点的前驱节点等待状态是否为CANCELLED,或者修改前驱节点状态为Signal失败时候,则直接唤醒。

2.2.2 signalAll

了解完了sginal,再来了解一下signalAll方法,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public final void signalAll() {
// 和上面同样道理,需要持有锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
// 调用的是这个方法
doSignalAll(first);
}
//
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
// 将first节点移除
first.nextWaiter = null;
// 转移first节点到同步队列中
transferForSignal(first);
// 指向下一个节点
first = next;
} while (first != null);
}

3.一些对比

3.1 Condition与Object监视器的对比

每个对象都可以用继承自Objectwait/notify方法来实现等待/通知机制。而Condition接口也提供了类似Object监视器的方法,通过与Lock配合来实现等待/通知模式。

那为什么既然有Object的监视器方法了,还要用Condition呢?这里有一个二者简单的对比:

对比项 Object监视器 Condition
前置条件 获取对象的锁 调用Lock.lock获取锁,调用Lock.newCondition获取Condition对象
调用方式 直接调用,比如object.notify() 直接调用,比如condition.await()
等待队列的个数 一个 多个
当前线程释放锁进入等待状态 支持 支持
当前线程释放锁进入等待状态,在等待状态中不中断 不支持 支持
当前线程释放锁并进入超时等待状态 支持 支持
当前线程释放锁并进入等待状态直到将来的某个时间 不支持 支持
唤醒等待队列中的一个线程 支持 支持
唤醒等待队列中的全部线程 支持 支持

Condition和Object的wait/notify基本相似。其中,Condition的await方法对应的是Object的wait方法,而Condition的signal/signalAll方法则对应Object的notify/notifyAll()。但Condition类似于Object的等待/通知机制的加强版。


4.小结

其实Condition 的大概流程就是:一个线程获取锁后,通过调用Condition的await()方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过isOnSyncQueue(Node node)方法不断自检看节点是否已经在同步队列了,如果是则尝试获取锁,也即重新获取同步状态,否则将一直挂起。当线程调用signal()方法后,程序首先检查当前线程是否获取了锁,然后通过doSignal(Node first)方法唤醒CLH同步队列的首节点。被唤醒的线程,将从await()方法中的while循环中退出来,然后调用acquireQueued()方法竞争同步状态。

用了一两天的时间分析了Condition的原理,其实他与AbstractQueuedSynchronizer 的关系很密切的,要先了解到了AbstractQueuedSynchronizer 相关的内容再来了解Condition 则会发现会有一些融会贯通了,了解其原理是为了能够更好的使用,如果知道整个底层的流程是如何,我想在开发的过程中可以减少一些为什么的这样的问题。总的来说还是很有收获的,自从一系列的分析过来,总感觉之前的学习就像是走马观花,只有深入的去了解了一下发现又一些新的收获,Condition 也还好,不算是很难,主要就是等待和通知两个方面的内容,知道条件队列和同步队列这玩意是什么,问题解决一大半了,对Condition的了解到此结束,以后要做的是记录发现的问题,然后再进行Blog的记录,相信这样的学习效果可以更好的。


以上参考:

  1. 一行一行源码分析清楚 AbstractQueuedSynchronizer (二)
  2. AbstractQueuedSynchronizer 原理分析 - Condition 实现原理
  3. 【死磕Java并发】—–J.U.C之Condition
  4. AbstractQueuedSynchronizer源码解读–续篇之Condition
  5. Java并发编程的艺术