1.简介
Condition就是一接口,而在AQS 中的ConditionObject
内部类实现了这个接口。Condition接口中只是进行了一些等待和通知方法的声明,并没有进行实现,Condition 经常可以用在生产者-消费者的场景中,关于Condition相关的东西,我们需要先了解AQS相关的知识,可以看看之前的那篇文章:Java 并发 - AQS:框架分析,然后再进行Condition的了解
这里先讲一句:Condition 中的方法则要配合锁对象使用,并通过newCondition
方法获取实现类对象。这有点像Object 中的方法需要配合 synchronized 关键字使用。关于Condition与Object类实现的这些方法可以看这篇文章中最下面的那个对比,Ojbect类的wait()
, notify()
或 notifyAll()
方法是基于对象的监视器锁的,我们现在所讲的Condition是基于 ReentrantLock
实现的,而ReentrantLock 是依赖于 AbstractQueuedSynchronizer 实现的。
Demo开场
我们可以先从源码中给出的一个Demo来进行了解:
1 | import java.util.concurrent.locks.Condition; |
这里创建两个线程让上面JDK给的Demo跑起来:
1 | public class BoundedBuffer { |
因为设置了容量为5,所以会不断的生产和消费:
1 | 放入了:0 |
通过上面我们可以大概了解到:
- 在使用Condition的时候,必须要先持有相对应的锁,而Object类也是差不多这样的一种机制,我们在Object类中,如果要使用
wait()
,notify()
或notifyAll()
方法,则我们必须要先持有某个对象的监视器! - 还有一点:ArrayBlockingQueue 采用这种方式实现了生产者-消费者,所以请只把这个例子当做学习例子,实际生产中可以直接使用 ArrayBlockingQueue!!!
Condition与ReentrantLock的联系
上面的例子是要让我们清楚的知道,就是condition是与ReentrantLock是息息相关的,可以说他们一般都是结合在一起使用的,那么我们在使用condition的一些方法,比如await()
或者 signale()
方法的时候,我们就应该获取到这个Lock才能进行使用!
而每个 ReentrantLock
实例可以通过调用多次 newCondition
产生多个 ConditionObject
的实例:
就如同我们上面的Demo中所示那样:
1 | final Lock lock = new ReentrantLock(); |
在ReentrantLock
源码中可以看到:
1 | final ConditionObject newCondition() { |
我们再继续看看这个ConditionObject
是个什么东西,进去源码发现,原来是个 Condition 的实现类 ,它存在于AbstractQueuedSynchronizer
中:
1 | public class ConditionObject implements Condition, java.io.Serializable { |
需要注意的是Condition就是一个接口,里面就是实现一些规范而已了,并没有具体实现代码细节,我们要分析的也是这下面的一些内容了:
1 | public interface Condition { |
我们上面也看到了每个 ReentrantLock
实例可以通过调用多次 newCondition
产生多个 ConditionObject
的实例,那么该如何去管理这些实例呢?我们可以引入跟AQS中的同步队列相似的另一种概念,称之为条件队列或者同步队列,图示如下:
其实这里差不多就代表了整个Condition的核心思想所在了:
- 条件队列和同步队列的节点,都是 Node 的实例,因为条件队列的节点是需要转移到同步队列中去的;
- 我们知道一个 ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition 实例,这里对应 condition1 和 condition2。注意,ConditionObject 只有两个属性 firstWaiter 和 lastWaiter;
- 每个 condition 有一个关联的条件队列,如线程 1 调用
condition1.await()
方法即可将当前线程 1 包装成 Node 后加入到条件队列中,然后阻塞在这里,不继续往下执行,条件队列是一个单向链表; - 调用
condition1.signal()
触发一次唤醒,此时唤醒的是队头,会将condition1 对应的条件队列的 firstWaiter(队头) 移到同步队列的队尾,等待获取锁,获取锁后 await 方法才能返回,继续往下执行。
可以先看一下这张图,了解一下简单的流程所在,具体的源码展开在下面。
Condition主要的方法
这里可以先明白Condition有一些什么方法:
方法名称 | 描述 |
---|---|
await() | 当前线程进入等待状态直到被通知(signal)或者中断;当前线程进入运行状态并从await()方法返回的场景包括:(1)其他线程调用相同Condition对象的signal/signalAll方法,并且当前线程被唤醒;(2)其他线程调用interrupt方法中断当前线程; |
awaitUninterruptibly() | 当前线程进入等待状态直到被通知,在此过程中对中断信号不敏感,不支持中断当前线程 |
awaitNanos(long) | 当前线程进入等待状态,直到被通知、中断或者超时。如果返回值小于等于0,可以认定就是超时了 |
awaitUntil(Date) | 当前线程进入等待状态,直到被通知、中断或者超时。如果没到指定时间被通知,则返回true,否则返回false |
signal() | 唤醒一个等待在Condition上的线程,被唤醒的线程在方法返回前必须获得与Condition对象关联的锁 |
signalAll() | 唤醒所有等待在Condition上的线程,能够从await()等方法返回的线程必须先获得与Condition对象关联的锁 |
2.源码分析
2.1 等待 await
主要分析await()
方法,因为其他的几个等待方法也差不多,大同小异,只要将这个最基本的方法了解清楚了,相比其他方法也不难!
await方法的主要流程如下:
- 将当先线程封装成节点,并将节点添加到Condition条件队列尾部。
- 节点入队了之后,完全释放独占锁。
- 判断节点是否在同步队列上,如果不在则阻塞线程(等待其他线程调用signal/signalAll或是被中断)。
- 重新获取互斥锁。
await方法源码如下:
1 | // await是可响应中断的等待方法,而不可响应中断的是`awaitUninterruptibly()` |
2.1.1 将结点加入条件队列
该addConditionWaiter()
方法主要是将当前线程加入到Condition条件队列中。当然在加入到尾节点之前会清楚所有状态不为Condition的节点,即将已取消的所有节点清除出队列。
1 | // 将当先线程封装成节点,并将节点添加到Condition条件队列尾部 |
2.1.2 完全释放独占锁
在节点入队了之后,则调用下面的方法进行完全释放锁,为什么是完全释放呢?因为 ReentrantLock 是可以重入的。
saveState
代表着什么意思呢?如果在 condition1.await() 之前,假设线程先执行了 2 次 lock() 操作,那么 state 为 2,我们理解为该线程持有 2 把锁,这里 await() 方法必须将 state 设置为 0,然后再进入挂起状态,这样其他线程才能持有锁。当它被唤醒的时候,它需要重新持有 2 把锁,才能继续下去。
举个简单的操作,我们在使用condition的过程中,先 lock.lock()
,然后 condition1.await()
,那么state的值就会发生变化,从1变成0,此时锁进行释放,并且fullyRelease(Node node)
这个方法返回1,如果lock重入了n次,则savedState = n
,但是如果这个方法失败,则会将节点设置为”取消”状态,并抛出异常 IllegalMonitorStateException
1 | // 节点入队了之后,完全释放独占锁 |
其实上面这个方法也可以很好的解释一下,假如我的线程没有持有lock,直接调用condition的await()
方法,那会怎样?
如果一个线程在不持有 lock 的基础上,就去调用 condition.await()
方法,它能进入条件队列,但是在上面的这个方法中,由于它不持有锁,release(savedState)
这个方法肯定要返回 false,进入到异常分支,然后进入 finally 块设置 node.waitStatus = Node.CANCELLED
,这个已经入队的节点之后会被后继的节点”请出去“。
2.1.3 判断节点node是否处于同步队列上
经过上面的完全释放锁之后,会走到下面的这一条代码块,会通过isOnSyncQueue(Node node)
方法,会进行一个自旋判断自己是否在同步队列中,如果不在同步队列上的话,将当前线程挂起,等待被转移到同步队列中。
里面其实分为三种方法进行判断节点node是否处于同步队列上:
- 节点状态为CONDITION一定是不在同步队列,或者如果prev为null也一定是不在同步队列。
- 如果节点的next不为null,则其一定是在同步队列的。
- 上面两种方法都不奏效,进行同步队列的遍历查找。
1 | // 判断节点是否在同步队列上 |
2.1.4 检测线程等待期间是否中断
就是因为await()
方法支持中断,所以我们需要对其中断进行细致考虑,我认为主要了解的有以下三点:
- 线程是否发生了中断?
- 线程发生中断是否能够成功进入同步队列?
- 线程中断发生的时机是在节点转移到同步队列之前发生?还是发生在节点转移到同步队列期间或之后发生?
如果理清上面这三个问题,其实中断唤醒这一块就能够很好理解了。
1 | // 检测线程在等待期间是否发生了中断 |
2.1.5 重新获取同步状态
while 循环出来以后,下面是这段代码:
1 | if (acquireQueued(node, savedState) && interruptMode != THROW_IE) |
由于 while 出来后,我们确定节点已经进入了同步队列,准备获取锁。
此处的acquireQueued(node, savedState)
方法中的第一个参数node,之前已经通过方法enq(node)
进入了队列,而这个参数 savedState 是之前释放锁前的 state,这个方法返回的时候,代表当前线程获取了锁,而且 state == savedState
了。
关于中断,需要明白的一点是:不管有没有发生中断,都会进入到同步队列,而 acquireQueued(node, savedState)
的返回值就是代表线程是否被中断。
- 如果返回 true,说明被中断了,而且
interruptMode != THROW_IE
,说明在 signal 之前就发生中断了,这里将 interruptMode 设置为 REINTERRUPT,用于待会重新中断。 - 如果上面的 while 循环没有产生中断,则 interruptMode = 0。
关于acquireQueued()
方法,在AQS中已经有提及到了,这里附上相对于的知识所在:AQS分析:acquireQueued。该方法是一个自旋的过程,当前线程(Node)进入同步队列后,就会进入一个自旋的过程,每个节点都会自省地观察,当条件满足,获取到同步状态后,就可以从这个自旋过程中退出,否则会一直执行下去。
2.1.6 根据中断类型进行不同处理
以下方法主要是根据不同的中断模式进行不同的处理 ,较为简单,需要清楚的是如果中断模式为:REINTERRUPT,则重新中断当前线程
1 | // 根据不同的中断模式进行不同的处理 |
2.2 通知
2.2.1 signal
signal()
主要的作用就是将条件队列中的头结点转移到同步队列中!他会唤醒等待最久的线程,将这个线程对应的node从条件队列转移到同步队列中去。
1 | public final void signal() { |
以上的流程大概就是:
- 首先先去判断当前线程是否已经持有独占锁了,未获取则直接抛出异常。
- 如果线程已经获取了锁,则将唤醒条件队列的首节点(首节点不合适的话就从头向后查找合适的节点)。
- 唤醒首节点是先将条件队列中的头结点移除,然后调用AQS的
enq(Node node)
方法将其安全地移到同步队列中。 - 最后判断该节点的前驱节点等待状态是否为CANCELLED,或者修改前驱节点状态为Signal失败时候,则直接唤醒。
2.2.2 signalAll
了解完了sginal,再来了解一下signalAll方法,其源码如下:
1 | public final void signalAll() { |
3.一些对比
3.1 Condition与Object监视器的对比
每个对象都可以用继承自Object
的wait/notify方法来实现等待/通知机制。而Condition接口也提供了类似Object监视器的方法,通过与Lock配合来实现等待/通知模式。
那为什么既然有Object的监视器方法了,还要用Condition呢?这里有一个二者简单的对比:
对比项 | Object监视器 | Condition |
---|---|---|
前置条件 | 获取对象的锁 | 调用Lock.lock获取锁,调用Lock.newCondition获取Condition对象 |
调用方式 | 直接调用,比如object.notify() | 直接调用,比如condition.await() |
等待队列的个数 | 一个 | 多个 |
当前线程释放锁进入等待状态 | 支持 | 支持 |
当前线程释放锁进入等待状态,在等待状态中不中断 | 不支持 | 支持 |
当前线程释放锁并进入超时等待状态 | 支持 | 支持 |
当前线程释放锁并进入等待状态直到将来的某个时间 | 不支持 | 支持 |
唤醒等待队列中的一个线程 | 支持 | 支持 |
唤醒等待队列中的全部线程 | 支持 | 支持 |
Condition和Object的wait/notify基本相似。其中,Condition的await方法对应的是Object的wait方法,而Condition的signal/signalAll方法则对应Object的notify/notifyAll()。但Condition类似于Object的等待/通知机制的加强版。
4.小结
其实Condition 的大概流程就是:一个线程获取锁后,通过调用Condition的await()
方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过isOnSyncQueue(Node node)
方法不断自检看节点是否已经在同步队列了,如果是则尝试获取锁,也即重新获取同步状态,否则将一直挂起。当线程调用signal()
方法后,程序首先检查当前线程是否获取了锁,然后通过doSignal(Node first)
方法唤醒CLH同步队列的首节点。被唤醒的线程,将从await()
方法中的while循环中退出来,然后调用acquireQueued()
方法竞争同步状态。
用了一两天的时间分析了Condition的原理,其实他与AbstractQueuedSynchronizer 的关系很密切的,要先了解到了AbstractQueuedSynchronizer 相关的内容再来了解Condition 则会发现会有一些融会贯通了,了解其原理是为了能够更好的使用,如果知道整个底层的流程是如何,我想在开发的过程中可以减少一些为什么的这样的问题。总的来说还是很有收获的,自从一系列的分析过来,总感觉之前的学习就像是走马观花,只有深入的去了解了一下发现又一些新的收获,Condition 也还好,不算是很难,主要就是等待和通知两个方面的内容,知道条件队列和同步队列这玩意是什么,问题解决一大半了,对Condition的了解到此结束,以后要做的是记录发现的问题,然后再进行Blog的记录,相信这样的学习效果可以更好的。
以上参考: