0%

Java 并发 - 多线程:FutureTask源码分析

1. 什么是FutureTask?

前面的Blog介绍了Future接口。这个接口有一个实现类叫FutureTask。FutureTask类有什么用?为什么要有一个FutureTask类?前面说到了Future只是一个接口,而它里面的cancelgetisDone等方法要自己实现起来都是非常复杂的。所以JDK提供了一个FutureTask类来供我们使用。

FutureTask是Future的具体实现,且实现了Runnable接口,即FutureTask满足了Task的行为,是一个可以被用来执行的Future。FutureTask是JUC提供的线程池实现用到的任务基本单元,线程池主要接收两种对象:一个是Runnable任务,一种是Callable任务。按照ExecutorService接口定义的行为,可以将Runnable或Callable任务提交到线程池执行,而被提交的Runnable或Callable任务都会被包装成FutureTask,由线程池的工作线程去执行。

还有的就是前面的文章中所讲的FutureTask 为什么可以使用Executor 也可以使用线程直接执行?因为FutureTask是实现的RunnableFuture接口的,而RunnableFuture接口同时继承了Runnable接口和Future接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。

1
2
3
4
5
6
7
8
9
10
11
12
13
// 片段一
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}

// 片段二
public class FutureTask<V> implements RunnableFuture<V> {
// ...
}

FutureTask还是可以研究一下的,我将里面实现的一个方法一点一点进行分析。


2.FutureTask 核心内容

FutureTask的实现基于AbstractQueuedSynchronizer(以下简称为AQS)。java.util.concurrent中的很多可阻塞类(比如ReentrantLock)都是基于AQS来实现的。AQS是一个同步框架,它提供通用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。JDK 6中AQS被广泛使用,基于AQS实现的同步器包括:ReentrantLock、Semaphore、ReentrantReadWriteLock、CountDownLatch和FutureTask。

每一个基于AQS实现的同步器都会包含两种类型的操作,如下:

  • 至少一个acquire操作。这个操作阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。FutureTask的acquire操作为get()/get(long timeout,TimeUnit unit)方法调用。
  • 至少一个release操作。这个操作改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask的release操作包括run()方法和cancel(…)方法。

基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。

AQS被作为“模板方法模式”的基础类提供给FutureTask 的内部子类Sync,这个内部子类只需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。具体来说,Sync实现了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通过这两个方法来检查和更新同步状态。

FutureTask的设计示意图

如图所示,Sync是FutureTask的内部私有类,它继承自AQS。创建FutureTask时会创建内部私有的成员对象Sync,FutureTask所有的的公有方法都直接委托给了内部私有的Sync。


3.FutureTask源码分析

3.1 FutureTask的几个状态

根据FutureTask.run()方法被执行的时机,FutureTask可以处于下面3种状态(图源:Java并发编程的艺术)。

几种状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
// 初始创建时的状态
private static final int NEW = 0;
// 当任务执行完毕,FutureTask会将执行结果设置给outcome属性,在设置之前会将FutureTask的状态修改为COMPLETING。
private static final int COMPLETING = 1;
// 当任务执行完毕,FutureTask会将执行结果设置给outcome属性,在设置之后会将FutureTask的状态修改为NORMAL。
private static final int NORMAL = 2;
// 当任务在执行的过程中抛了异常,FutureTask会将异常信息设置给outcome属性,
// 在设置之前会将FutureTask的状态修改为COMPLETING,在设置之后将状态修改为EXCEPTIONAL。
private static final int EXCEPTIONAL = 3;
// 当外部想要取消任务,而又不允许当任务正在执行的时候被取消时会将FutureTask的状态修改为CANCELLED。
private static final int CANCELLED = 4;
// 当外部想要取消任务,同时允许当任务正在执行的时候被取消时,会先将FutureTask的状态设置为INTERRUPTING,
// 然后设置执行任务的线程的中断标记位。
private static final int INTERRUPTING = 5;
// 当外部想要取消任务,同时允许当任务正在执行的时候被取消时,会先将FutureTask的状态设置为INTERRUPTING,
// 然后设置执行任务的线程的中断标记位,最后将Future的状态设置为INTERRUPTED。
private static final int INTERRUPTED = 6;

FutureTask的状态流转可能流程:

  • NEW—>COMPLETING—>NORMAL(任务执行正常)
  • NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常)
  • NEW—>CANCELLED(不允许执行中的取消)
  • NEW—>INTERRUPTING—>INTERRUPTED(允许执行中的取消)

需要注意的是:FutureTask中使用CAS操作更新state来表示任务完成,极大地降低了使用加锁进行同步控制的性能开销。

3.2 Treiber 堆

FutureTask中使用简单的Treiber堆栈来保存等待线程,Treiber堆是非阻塞的,使用CAS操作来实现节点的出栈和入栈操作。FutureTask中使用WaitNode来表示等待节点,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;

/**
* Simple linked list nodes to record waiting threads in a Treiber
* stack. See other classes such as Phaser and SynchronousQueue
* for more detailed explanation.
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

3.3 构造方法

可以发现FutureTask的参数可以有Callable或者RunnableresultFutureTask的构造方法将提交的RunnableCallable任务都会被包装成FutureTask。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

// 参数为callable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

// 参数为runnable 和 result
public FutureTask(Runnable runnable, V result) {
// 返回的还是一个callable
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

查看Executors.callable(runnable, result); 源码如下,会发现其实FutureTask最终将Runnabl转化为Callable,而且这里还使用了一种模式:适配器模式。

1
2
3
4
5
6
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
// 返回这么RunnableAdapter一个对象,其实现了Callable接口
return new RunnableAdapter<T>(task, result);
}

其中RunnableAdapter的源码如下所示,你会发现兜兜转转,还是实现了Callable,这就很有意思了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* A callable that runs given task and returns given result.
*/
private static final class RunnableAdapter<T> implements Callable<T> {
private final Runnable task;
private final T result;
// 构造函数,上面就是根据传递进来的task和result构建了这么一个对象然后返回
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
// 其call 方法也就是run方法
public T call() {
task.run();
return result;
}
public String toString() {
return super.toString() + "[Wrapped task = " + task + "]";
}
}

3.4 运行任务 run

FutureTask中使用run方法来执行任务,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public void run() {
// 如果当前状态不为NEW,而且没有其他线程运行当前任务
// 否则直接return;
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// callable不为null,且在此判断状态
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 进行执行任务
// 发现FutureTask的run的方法调用的就是Callable的call方法
result = c.call();
ran = true;
} catch (Throwable ex) { // 如果发生异常,则记录异常result
result = null;
ran = false;
// 修改状态为EXCEPTIONAL,并且记录异常
setException(ex);
}
if (ran)
// 没有发生异常,则记录call 返回的数据到outcome中
// 并同时修改状态,set方法会将状态state设置为NORMAL
set(result);
}
} finally {
// 在设置状态state前runner必须非空,防止并发调用run()方法
// 而且将runner置空,主要是使后续等待线程可继续执行
runner = null;

// runner置为null后,必须重新读取state以防止有中断发生
int s = state;

// 如果state被其他线程调用cancel(true)修改为INTERRUPTING
// 这表示有中断事件发生,那就要调用下面的方法进行暂停了
if (s >= INTERRUPTING)
// 这里主要就是调用Thread.yield()让出CPU,保证线程能够成功暂停。
// 注意:执行yield()的线程有可能在进入到暂停状态后马上又被执行。
handlePossibleCancellationInterrupt(s);
}
}

以上的过程大概如此:

  1. 运行任务之前必须要保证其状态是NEW,并且没有其他线程在执行当前任务。然后调用c.call();执行任务并接收返回值,然后修改状态。
  2. 任务运行中如果发生了异常则调用setException(ex);进行异常处理;
  3. 任务正常完成时,记录call执行返回的数据,同时将调用set方法修改state为normal,最后将runner置空。同时判断是否有中断事件发生,有的话进行线程暂停,释放资源。

其实就是上面蕴含了两条状态变化的线路:

  1. NEW—>COMPLETING—>EXCEPTIONAL(任务执行异常)
  2. NEW—>COMPLETING—>NORMAL(任务执行正常)

可以看看setExceptionset方法做了一些什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected void setException(Throwable t) {
// 将状态由NEW修改为COMPLETING
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 并且记录异常
outcome = t;
// 设置状态为EXCEPTIONAL
STATE.setRelease(this, EXCEPTIONAL); // final state
// 唤醒等待队列中的所有后续线程(若有)
finishCompletion();
}
}

protected void set(V v) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 保存call 返回的数据
outcome = v;
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}

finishCompletion做了一些什么?finishCompletion用于唤醒等待队列中的所有后续线程(若有)。当任务未完成时,调用get()方法会被加入等待队列并阻塞。FutureTask中done()什么也不做,该方法主要用于子类个性化定制,如ExecutorCompletionServiceQueueingFuture实现FutureTask,实现done()以达到任务完成自动将Future加入结果队列。

可以查看一下他的源码,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
*/
private void finishCompletion() {
// assert state > COMPLETING;
// 遍历所有的等待线程
for (WaitNode q; (q = waiters) != null;) {
// 将waiters设置为null
if (WAITERS.weakCompareAndSet(this, q, null)) {
// 如果上面的设置成功,则进入一个死循环等待
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 进行线程的唤醒
LockSupport.unpark(t);
}
// 下一个等待线程
WaitNode next = q.next;
// 如果没有下一个等待线程,则结束
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}

i


3.6 取消任务 cancel

当FutureTask处于未启动状态时,执行FutureTask.cancel()方法将导致此任务永远不会被执行;当FutureTask处于已启动状态时,执行FutureTask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务,即是说这个cancel 是允许中断的;当FutureTask处于已启动状态时,执行FutureTask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当FutureTask处于已完成状态时,执行FutureTask.cancel(...)方法将返回false。

我们根据状态的变化来捋一下取消任务的主要流程,其主要有两条线路:

  1. NEW—>INTERRUPTING—>INTERRUPTED(允许执行中的取消)
  2. NEW—>CANCELLED(不允许执行中的取消)

就是当外部想要取消任务的时候,看看当前任务是否能够允许被取消。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean cancel(boolean mayInterruptIfRunning) {
// 当前状态为NEW,并且且判断当前线程运行时候时候能够中断
// 可以的话将状态设置为INTERRUPTING,否则设置为CANCELLED并返回false
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) { // 判断是否允许在运行时候进行中断,即判断传进来为true还是false
try {
// 保存当前运行的线程
Thread t = runner;
if (t != null)
// 进行中断
t.interrupt();
} finally { // final state
// 将当前执行线程状态state设置成为INTERRUPTED
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

3.7 获取任务结果 get

当FutureTask处于未启动或已启动状态时,执行FutureTask.get()方法将导致调用线程阻塞;当FutureTask处于已完成状态时,执行FutureTask.get()方法将导致调用线程立即返回结果或抛出异常。

get操作主要用于计算完成后获取结果,还可以使用带等待时间的get方法,其源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

// 取得返回值
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 判断FutureTask 状态,如果为未启动或者已启动状态,则进行阻塞
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 如果FutureTask 为完成状态
// 通过调用report 返回一个结果或者抛出一个异常
return report(s);
}

// 带超时的get方法
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

其中report方法的实现如下,主要完成的是:通过进行状态的判断,返回一个结果,或者抛出异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Returns result or throws exception for completed task.
* 对于已经完成的任务,返回一个结果,或者抛出一个异常
* @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
// 取出记录的outcome
Object x = outcome;
// 如果线程状态state为NORMAL时,则返回一个结果
if (s == NORMAL)
return (V)x;
// 如果线程状态state为CANCELLED、INTERRUPTING、INTERRUPTED,则抛出一个异常
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

参考

  1. Java 并发编程的艺术