1. 什么是FutureTask?
前面的Blog介绍了Future
接口。这个接口有一个实现类叫FutureTask
。FutureTask类有什么用?为什么要有一个FutureTask类?前面说到了Future只是一个接口,而它里面的cancel
,get
,isDone
等方法要自己实现起来都是非常复杂的。所以JDK提供了一个FutureTask
类来供我们使用。
FutureTask是Future的具体实现,且实现了Runnable接口,即FutureTask满足了Task的行为,是一个可以被用来执行的Future。FutureTask是JUC提供的线程池实现用到的任务基本单元,线程池主要接收两种对象:一个是Runnable任务,一种是Callable任务。按照ExecutorService接口定义的行为,可以将Runnable或Callable任务提交到线程池执行,而被提交的Runnable或Callable任务都会被包装成FutureTask,由线程池的工作线程去执行。
还有的就是前面的文章中所讲的FutureTask 为什么可以使用Executor 也可以使用线程直接执行?因为FutureTask是实现的RunnableFuture
接口的,而RunnableFuture
接口同时继承了Runnable
接口和Future
接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run()
)。
1 | // 片段一 |
FutureTask
还是可以研究一下的,我将里面实现的一个方法一点一点进行分析。
2.FutureTask 核心内容
FutureTask的实现基于AbstractQueuedSynchronizer
(以下简称为AQS)。java.util.concurrent
中的很多可阻塞类(比如ReentrantLock)都是基于AQS来实现的。AQS是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。JDK 6中AQS被广泛使用,基于AQS实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch和FutureTask。
每一个基于AQS实现的同步器都会包含两种类型的操作,如下:
- 至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为
get()
/get(long timeout,TimeUnit unit)
方法调用。 - 至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask的release操作包括
run()
方法和cancel(…)
方法。
基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。
AQS被作为“模板方法模式”的基础类提供给FutureTask 的内部子类Sync,这个内部子类只需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。具体来说,Sync实现了AQS的tryAcquireShared(int)
方法和tryReleaseShared(int)
方法,Sync通过这两个方法来检查和更新同步状态。
如图所示,Sync是FutureTask的内部私有类,它继承自AQS。创建FutureTask时会创建内部私有的成员对象Sync,FutureTask所有的的公有方法都直接委托给了内部私有的Sync。
3.FutureTask源码分析
3.1 FutureTask的几个状态
根据FutureTask.run()
方法被执行的时机,FutureTask可以处于下面3种状态(图源:Java并发编程的艺术)。
1 | /** |
FutureTask的状态流转可能流程:
- NEW—>COMPLETING—>NORMAL(任务执行正常)
- NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常)
- NEW—>CANCELLED(不允许执行中的取消)
- NEW—>INTERRUPTING—>INTERRUPTED(允许执行中的取消)
需要注意的是:FutureTask中使用CAS操作更新state来表示任务完成,极大地降低了使用加锁进行同步控制的性能开销。
3.2 Treiber 堆
FutureTask中使用简单的Treiber堆栈来保存等待线程,Treiber堆是非阻塞的,使用CAS操作来实现节点的出栈和入栈操作。FutureTask中使用WaitNode来表示等待节点,源码如下:
1 | /** Treiber stack of waiting threads */ |
3.3 构造方法
可以发现FutureTask
的参数可以有Callable
或者Runnable
和result
,FutureTask
的构造方法将提交的Runnable
或Callable
任务都会被包装成FutureTask
。源码如下:
1 |
|
查看Executors.callable(runnable, result);
源码如下,会发现其实FutureTask
最终将Runnabl
转化为Callable
,而且这里还使用了一种模式:适配器模式。
1 | public static <T> Callable<T> callable(Runnable task, T result) { |
其中RunnableAdapter
的源码如下所示,你会发现兜兜转转,还是实现了Callable
,这就很有意思了。
1 | /** |
3.4 运行任务 run
FutureTask
中使用run方法来执行任务,源码如下:
1 | public void run() { |
以上的过程大概如此:
- 运行任务之前必须要保证其状态是NEW,并且没有其他线程在执行当前任务。然后调用
c.call();
执行任务并接收返回值,然后修改状态。 - 任务运行中如果发生了异常则调用
setException(ex);
进行异常处理; - 任务正常完成时,记录call执行返回的数据,同时将调用set方法修改state为normal,最后将runner置空。同时判断是否有中断事件发生,有的话进行线程暂停,释放资源。
其实就是上面蕴含了两条状态变化的线路:
- NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常)
- NEW—>COMPLETING—>NORMAL(任务执行正常)
可以看看setException
和 set
方法做了一些什么:
1 | protected void setException(Throwable t) { |
finishCompletion
做了一些什么?finishCompletion
用于唤醒等待队列中的所有后续线程(若有)。当任务未完成时,调用get()方法会被加入等待队列并阻塞。FutureTask
中done()什么也不做,该方法主要用于子类个性化定制,如ExecutorCompletionService
中QueueingFuture
实现FutureTask
,实现done()以达到任务完成自动将Future加入结果队列。
可以查看一下他的源码,如下:
1 | /** |
i
3.6 取消任务 cancel
当FutureTask处于未启动状态时,执行FutureTask.cancel()
方法将导致此任务永远不会被执行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)
方法将以中断执行此任务线程的方式来试图停止任务,即是说这个cancel 是允许中断的;当FutureTask处于已启动状态时,执行FutureTask.cancel(false)
方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完成状态时,执行FutureTask.cancel(...)
方法将返回false。
我们根据状态的变化来捋一下取消任务的主要流程,其主要有两条线路:
- NEW—>INTERRUPTING—>INTERRUPTED(允许执行中的取消)
- NEW—>CANCELLED(不允许执行中的取消)
就是当外部想要取消任务的时候,看看当前任务是否能够允许被取消。
1 | public boolean cancel(boolean mayInterruptIfRunning) { |
3.7 获取任务结果 get
当FutureTask处于未启动或已启动状态时,执行FutureTask.get()
方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()
方法将导致调用线程立即返回结果或抛出异常。
get操作主要用于计算完成后获取结果,还可以使用带等待时间的get方法,其源码如下:
1 |
|
其中report方法的实现如下,主要完成的是:通过进行状态的判断,返回一个结果,或者抛出异常。
1 | /** |
参考
- Java 并发编程的艺术