前提

之前的一篇文章JUC线程池ThreadPoolExecutor源码分析深入分析了JUC线程池的源码实现,特别对Executor#execute()接口的实现做了行级别的源码分析。这篇文章主要分析一下线程池扩展服务ExecutorService接口的实现源码,同时会重点分析Future的底层实现。ThreadPoolExecutor和其抽象父类AbstractExecutorService的源码从JDK8到JDK11基本没有变化,本文编写的时候使用的是JDK11,由于ExecutorService接口的定义在JDK[8,11]都没有变化,本文的分析适用于这个JDK版本范围的任意版本。最近尝试找Hexo可以渲染Asciidoc的插件,但是没有找到,于是就先移植了Asciidoc中的五种Tip

ExecutorService接口简介

ExecutorService接口是线程池扩展功能服务接口,它的定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public interface ExecutorService extends Executor {

// 停止线程池
void shutdown();

// 立即停止线程池,返回尚未执行的任务列表
List<Runnable> shutdownNow();

// 线程池是否停止
boolean isShutdown();

// 线程池是否终结
boolean isTerminated();

// 等待线程池终结
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;

// 提交Callable类型任务
<T> Future<T> submit(Callable<T> task);

// 提交Runnable类型任务,预先知道返回值
<T> Future<T> submit(Runnable task, T result);

// 提交Runnable类型任务,对返回值无感知
Future<?> submit(Runnable task);

// 永久阻塞 - 提交和执行一个任务列表的所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

// 带超时阻塞 - 提交和执行一个任务列表的所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

// 永久阻塞 - 提交和执行一个任务列表的某一个任务
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

// 带超时阻塞 - 提交和执行一个任务列表的某一个任务
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService继承自Executor,主要提供了线程池的关闭、状态查询查询、可获取返回值的任务提交、整个任务列表或者执行任务列表中任意一个任务(返回执行最快的任务的结果)等功能。

Future实现的通俗原理

ExecutorService接口的扩展方法都是返回Future相关的实例。java.util.concurrent.Future(中文翻译就是未来,还是挺有意思的),代表着一次异步计算的结果,它提供了检查计算是否已经完成、等待计算完成、获取计算结果等一系列方法。笔者之前强调过:线程池ThreadPoolExecutor的顶级接口Executor只提供了一个无状态的返回值类型为voidexecute(Runnable command)方法,无法感知异步任务执行的完成时间和获取任务计算结果。如果我们需要感知异步任务执行的返回值或者计算结果,就必须提供带返回值的接口方法去承载计算结果的操作。这些方法上一节已经介绍过,而Future就是一个担任了承载计算结果(包括结果值、状态、阻塞等待获取结果操作等)的工具。这里举一个模拟Future实现过程的例子,例子是伪代码和真实代码的混合实现,不需要太较真。

首先,假设我们定义了一个动作函数式接口Action

1
2
3
4
5
// 带泛型的动作接口,可以返回一个泛型结果
public interface Action<V>{

V doAction();
}

我们可以尝试实现一下Action接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 假设1个动作做的是一个十分复杂的运算,返回一个BigDecimal类型的结果
Action<BigDecimal> action1 = () -> {
// 模拟随机耗时
sleep(x秒);
return BigDecimal.valueOf(result);
};

// 假设1个动作做的是制作一个面包的过程,返回一个Bread面包实例
Action<Bread> action2 = () -> {
// 模拟随机耗时
sleep(x秒);
return new Bread();
};

由于Action没有实现Runnable接口,上面的两个动作无法通过Executor#execute()方法提交异步任务,所以我们需要添加一个适配器ActionAdapter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ActionAdapter<V> implements Runnable {

private Action<V> action;

private ActionAdapter(Action<V> action) {
this.action = action;
}

public static <V> ActionAdapter<V> newActionAdapter(Action<V> action) {
return new ActionAdapter<>(action);
}

@Override
public void run() {
action.doAction();
}
}

这里只做了简单粗暴的适配,虽然可以提交到线程池中执行,但是功能太过简陋。很多时候,我们还需要添加任务执行状态判断和获取结果的功能,于是新增一个接口ActionFuture

1
2
3
4
5
6
public interface ActionFuture<V> extends Runnable{

V get() throws Exception;

boolean isDone();
}

然后ActionAdapter实现ActionFuture接口,内部添加简单的状态控制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ActionAdapter<V> implements Runnable, ActionFuture<V> {

private static final int NEW = 0;
private static final int DONE = 1;
private int state;
private final Action<V> action;
private Object result;

private ActionAdapter(Action<V> action) {
this.action = action;
this.state = NEW;
}

public static <V> ActionAdapter<V> newActionAdapter(Action<V> action) {
return new ActionAdapter<>(action);
}

@Override
public void run() {
try {
result = action.doAction();
} catch (Throwable e) {
result = e;
} finally {
state = DONE;
}
}

@Override
public V get() throws Exception{
while (state < DONE){
// 这个等待方法没有实现,只是表明逻辑
currentThreadWaitForResult();
}
if (result instanceof Throwable){
throw new ExecutionException((Throwable) result);
}else {
return (V) result;
}
}

@Override
public boolean isDone() {
return state == DONE;
}
}

这里有个技巧是用Object类型的对象存放Action执行的结果或者抛出的异常实例,这样可以在ActionFuture#get()方法中进行判断和处理。最后一步,依赖Executor#execute()新增一个提交异步任务的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ActionPool {

private final Executor executor;

public ActionPool(Executor executor) {
this.executor = executor;
}

public <V> ActionFuture<V> submit(Action<V> action) {
ActionFuture<V> actionFuture = ActionAdapter.newActionAdapter(action);
executor.execute(actionFuture);
return actionFuture;
}

public static void main(String[] args) throws Exception{
ActionPool pool = new ActionPool(Executors.newSingleThreadExecutor());
Action<BigDecimal> action1 = () -> {
// 模拟随机耗时
sleep(x秒);
return BigDecimal.valueOf(result);
};
pool.submit(action1);
Action<Bread> action2 = () -> {
// 模拟随机耗时
sleep(x秒);
return new Bread();
};
pool.submit(action2);
}
}

上面例子提到的虚拟核心组件,在JUC包中有对应的实现(当时,JUC包对逻辑和状态控制会比虚拟例子更加严谨),对应关系如下:

虚拟组件 JUC中的组件
Action Callable
ActionFuture RunnableFuture
ActionAdapter FutureTask
ActionPool ExecutorService(ThreadPoolExecutor)

其中大部分实现逻辑都由FutureTaskThreadPoolExecutor的抽象父类AbstractExecutorService承担,下面会重点分析这两个类核心功能的源码实现。

Tip

实际上,Future的实现使用的是Promise模式,具体可以查阅相关的资料。

FutureTask源码实现

提供回调的Runnable类型任务实际最终都会包装为FutureTask再提交到线程池中执行,而FutureTaskRunnableFutureCallable三者的桥梁。先看FutureTask的类继承关系:

j-u-c-e-s-1

利用接口可以多继承的特性,RunnableFuture接口继承自RunnableFuture接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public interface RunnableFuture<V> extends Runnable, Future<V> {

void run();
}

@FunctionalInterface
public interface Runnable {

public abstract void run();
}

public interface Future<V> {

// 取消,mayInterruptIfRunning用于控制是否中断,实际上这个方法并不能终止已经提交的任务,后面会详细说明
boolean cancel(boolean mayInterruptIfRunning);

// 是否取消
boolean isCancelled();

// 是否完成,包括正常和异常的情况
boolean isDone();

// 永久阻塞获取结果,响应中断
V get() throws InterruptedException, ExecutionException;

// 带超时的阻塞获取结果,响应中断
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

FutureTask实现了RunnableFuture接口,本质就是实现RunnableFuture接口的方法。先看FutureTask的重要属性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 状态
private volatile int state;
// 初始化状态
private static final int NEW = 0;
// 完成中状态
private static final int COMPLETING = 1;
// 正常情况下的完成状态
private static final int NORMAL = 2;
// 异常情况下的完成状态
private static final int EXCEPTIONAL = 3;
// 取消状态
private static final int CANCELLED = 4;
// 中断中状态
private static final int INTERRUPTING = 5;
// 已中断状态
private static final int INTERRUPTED = 6;

// 底层的Callable实现,执行完毕后需要置为null
private Callable<V> callable;

// 输出结果,如果是正常执行完成,get()方法会返回此结果,如果是异常执行完成,get()方法会抛出outcome包装为ExecutionException的异常
private Object outcome;

// 真正的执行Callable对象的线程实例,运行期间通过CAS操作此线程实例
private volatile Thread runner;

// 等待线程集合,Treiber Stack实现
private volatile WaitNode waiters;

// 下面是变量句柄,底层是基于Unsafe实现,通过相对顶层的操作原语,如CAS等
private static final VarHandle STATE;
private static final VarHandle RUNNER;
private static final VarHandle WAITERS;
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
STATE = l.findVarHandle(FutureTask.class, "state", int.class);
RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class);
WAITERS = l.findVarHandle(FutureTask.class, "waiters", WaitNode.class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}

// Reduce the risk of rare disastrous classloading in first call to
// LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}
// ... 省略其他代码

上面的主要属性中,有两点比较复杂,但却是最重要的:

  1. FutureTask生命周期的状态管理或者跃迁。
  2. 等待(获取结果)线程集合WaitNode基于Treiber Stack实现,需要彻底弄清楚Treiber Stack的工作原理。

FutureTask的状态管理

FutureTask的内建状态包括了七种,也就是属性state有七种可选状态值,总结成表格如下:

状态 状态值 描述
NEW 0 初始化状态,FutureTask实例创建时候在构造函数中标记为此状态
COMPLETING 1 完成中状态,这个是中间状态,执行完成后设置outcome之前标记为此状态
NORMAL 2 正常执行完成,通过调用get()方法能够获取正确的计算结果
EXCEPTIONAL 3 异常执行完成,通过调用get()方法会抛出包装后的ExecutionException异常
CANCELLED 4 取消状态
INTERRUPTING 5 中断中状态,执行线程实例Thread#interrupt()之前会标记为此状态
INTERRUPTED 6 中断完成状态

这些状态之间的跃迁流程图如下:

j-u-c-e-s-2

每一种状态跃迁都是由于调用或者触发了某个方法,下文的一个小节会分析这些方法的实现。

等待线程集合数据结构Treiber Stack的原理

Treiber Stack,中文翻译是驱动栈,听起来比较怪。实际上,Treiber Stack算法是R. Kent Treiber在其1986年的论文Systems Programming: Coping with Parallelism中首次提出,这种算法提供了一种可扩展的无锁栈,基于细粒度的并发原语CAS(Compare And Swap)实现。笔者并没有花时间去研读Treiber的论文,因为在Doug Lea大神参与编写的《Java Concurrency in Practice(Java并发编程实战)》中的第15.4.1小节中有简单分析非阻塞算法中的非阻塞栈。

在实现相同功能的前提下,非阻塞算法通常比基于锁的算法更加复杂。创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性。下面的ConcurrentStack是基于Java语言实现的Treiber算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class ConcurrentStack<E> {

private AtomicReference<Node<E>> top = new AtomicReference<>();

public void push(E item) {
Node<E> newHead = new Node<>(item);
Node<E> oldHead;
do {
oldHead = top.get();
newHead.next = oldHead;
} while (!top.compareAndSet(oldHead, newHead));
}

public E pop() {
Node<E> oldHead;
Node<E> newHead;
do {
oldHead = top.get();
if (null == oldHead) {
return null;
}
newHead = oldHead.next;
} while (!top.compareAndSet(oldHead, newHead));
return oldHead.item;
}

private static class Node<E> {

final E item;
Node<E> next;

Node(E item) {
this.item = item;
}
}
}

ConcurrentStack是一个栈,它是由Node元素构成的一个链表,其中栈顶作为根节点,并且每个元素都包含了一个值以及指向下一个元素的链接。push()方法创建一个新的节点,该节点的next域指向了当前的栈顶,然后通过CAS把这个新节点放入栈顶。如果在开始插入节点时,位于栈顶的节点没有发生变化,那么CAS就会成功,如果栈顶节点发生变化(例如由于其他线程在当前线程开始之前插入或者移除了元素),那么CAS就会失败,而push()方法会根据栈的当前状态来更新节点(其实就是while循环会进入下一轮),并且再次尝试。无论哪种情况,在CAS执行完成之后,栈仍然回处于一致的状态。这里通过一个图来模拟一下push()方法的流程:

j-u-c-e-s-3

pop()方法可以简单理解为push()方法的逆向操作,具体流程是:

  1. 创建一个引用newHead指向当前top的下一个节点,也就是top.nexttop所在引用称为oldHead
  2. 通过CAS更新top的值,伪代码是CAS(expect=oldHead,update=newHead),如果更新成功,那么top就指向top.next,也就是newHead

j-u-c-e-s-4

Warning

这里可以看出Treiber Stack算法有个比较大的问题是有可能产生无效的节点,所以FutureTask也存在可能产生无效的等待节点的问题。

FutureTask方法源码分析

先看FutureTask提供的非阻塞栈节点的实现:

1
2
3
4
5
6
7
8
// 等待获取结果的线程节点(集合),实际上是一个单链表,实现了一个非阻塞栈
static final class WaitNode {
// 记录等待线程实例
volatile Thread thread;
// 指向下一个节点的引用
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

和我们上面分析Treiber Stack时候使用的单链表如出一辙。接着看FutureTask的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 适配使用Callable类型任务的场景
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}

// 适配使用Runnable类型任务和已经提供了最终计算结果的场景
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}

// Executors中
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}

// Runnable和Callable的适配器,设计十分巧妙,实际上run()方法委托给传入的Runnable实例执行,实现了Callable的call()方法,使用的是外部传入的值作为返回结果
private static final class RunnableAdapter<T> implements Callable<T> {
private final Runnable task;
private final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
public String toString() {
return super.toString() + "[Wrapped task = " + task + "]";
}
}

主要是针对两种不同场景的任务类型进行适配,构造函数中直接设置状态state = NEW(0)。因为FutureTask是最终的任务包装类,它的核心功能都在其实现的Runnable#run()方法中,这里重点分析一下run()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// FutureTask实现的Runnable#run()方法
public void run() {
// 如果状态不为NEW(0)或者CAS(null,当前线程实例)更新runner-真正的执行Callable对象的线程实例失败,那么直接返回,不执行任务
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
// 获取Callable任务实例赋值到临时变量c
Callable<V> c = callable;
// 判断任务不能为空,二次校验状态必须为NEW(0)
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 调用任务实例Callable#call()方法,正常情况下的执行完毕,没有抛出异常,则记录执行结果
result = c.call();
// 记录正常执行完毕
ran = true;
} catch (Throwable ex) {
// 异常情况下的执行完毕,执行结果记录为null
result = null;
// 记录异常执行完毕
ran = false;
// 设置异常实例
setException(ex);
}
// 正常执行完毕设置结果
if (ran)
set(result);
}
} finally {
// runner更新为null,防止并发执行run()方法
runner = null;
// 记录新的状态值,因为run()方法执行的时候,状态值有可能被其他方法更新了
int s = state;
if (s >= INTERRUPTING)
// 处理run()方法执行期间调用了cancel(true)方法的情况
handlePossibleCancellationInterrupt(s);
}
}

// 异常执行挖鼻的情况下,设置异常实例
protected void setException(Throwable t) {
// CAS更新状态state,由NEW(0)更新为COMPLETING(1)
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 设置异常实例到outcome属性中
outcome = t;
// 设置最终状态state = EXCEPTIONAL(3),意味着任务最终异常执行完毕
STATE.setRelease(this, EXCEPTIONAL); // final state
// 完成后的通知方法
finishCompletion();
}
}

// 完成任务后的通知方法,最要作用是移除和唤醒所有的等待结果线程,调用钩子方法done()和设置任务实例callable为null
private void finishCompletion() {
// 遍历栈,终止条件是下一个元素为null
for (WaitNode q; (q = waiters) != null;) {
// CAS设置栈顶为null
if (WAITERS.weakCompareAndSet(this, q, null)) {
// 遍历栈中的所有节点,唤醒节点中的线程,这是一个十分常规的遍历单链表的方法,注意几点:
// 1. 使用LockSupport.unpark()唤醒线程,因为后面会分析,线程阻塞等待的时候使用的是LockSupport.park()方法
// 2. 断开链表节点的时候后继节点需要置为null,这样游离节点才能更容易被JVM回收
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null;
q = next;
}
break;
}
}
// 回调钩子方法done(),这个可以通过子类进行扩展
done();
// 置任务实例callable为null,从而减少JVM memory footprint(这个东西有兴趣可以自行扩展阅读)
callable = null; // to reduce footprint
}

// 正常执行完毕的情况下设置执行结果
protected void set(V v) {
// CAS更新状态state,由NEW(0)更新为COMPLETING(1)
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 最终执行结果值更新到outcome中
outcome = v;
// 设置最终状态state = NORMAL(2),意味着任务最终正常执行完毕
STATE.setRelease(this, NORMAL);
// 完成后的通知方法
finishCompletion();
}
}

// 处理run()方法执行期间调用了cancel(true)方法的情况
// 这里还没分析cancel()方法,但是可以提前告知:它会先把状态更新为INTERRUPTING,再进行线程中断,最后更新状态为INTERRUPTED
// 所以如果发现当前状态为INTERRUPTING,当前线程需要让出CPU控制权等待到状态更变为INTERRUPTED即可,这个时间应该十分短暂
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield();

}

// 钩子方法,可以通过子类扩展此方法,方法回调的时机是任务已经执行完毕,阻塞获取结果的线程被唤醒之后
protected void done() {
}

run()方法的执行流程比较直观,这里提供一个简单的流程图:

j-u-c-e-s-5

FutureTask还提供了一个能够重置状态(准确来说是保持状态)的runAndReset()方法,这个方法专门提供给ScheduledThreadPoolExecutor使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 执行任务并且重置状态
// 由于没有执行set()方法设置执行结果,这个方法除了执行过程中抛出异常或者主动取消会到导致state由NEW更变为其他值,正常执行完毕一个任务之后,state是保持为NEW不变
protected boolean runAndReset() {
// 如果状态不为NEW(0)或者CAS(null,当前线程实例)更新runner-真正的执行Callable对象的线程实例失败,那么直接返回false,不执行任务
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
// 这里会忽略执行结果,只记录是否正常执行
c.call();
ran = true;
} catch (Throwable ex) {
// 记录执行异常结果
setException(ex);
}
}
} finally {
runner = null;
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
// 正常情况下的执行完毕,ran会更新为true,state此时也保持为NEW,这个时候方法返回true
return ran && s == NEW;
}

runAndReset()方法保证了在任务正常执行完成之后返回true,此时FutureTask的状态state保持为NEW,由于没有调用set()方法,也就是没有调用finishCompletion()方法,它内部持有的Callable任务引用不会置为null,等待获取结果的线程集合也不会解除阻塞。这种设计方案专门针对可以周期性重复执行的任务。异常执行情况和取消的情况导致的最终结果和run()方法是一致的。接下来分析一下获取执行结果的get()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
// 获取执行结果 - 永久阻塞
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果状态小于等于COMPLETING(1),也就是COMPLETING(1)和NEW(0),那么就需要等待任务完成
if (s <= COMPLETING)
// 注意这里调用awaitDone方法的参数为永久阻塞参数,也就是没有超时期限,返回最新的状态值
s = awaitDone(false, 0L);
// 根据状态值报告结果
return report(s);
}

// 获取执行结果 - 带超时的阻塞
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
// 如果状态小于等于COMPLETING(1),也就是COMPLETING(1)和NEW(0),那么就需要等待任务完成
// 注意这里调用awaitDone方法的参数为带超时上限的阻塞参数
// 如果超过了指定的等待期限(注意会把时间转化为纳秒),返回的最新状态依然为COMPLETING(1)或者NEW(0),那么抛出TimeoutException异常
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
// 根据状态值报告结果
return report(s);
}

// 等待任务完成,区分永久阻塞等待和带超时上限的阻塞等待两种场景
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
long startTime = 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
// 如果状态值已经大于COMPLETING(1),说明任务已经执行完毕,可以直接返回,如果等待节点已经初始化,则置空其线程实例引用,便于GC回收
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
// 状态值等于COMPLETING(1),说明任务执行到达尾声,在执行set()或者setException(),只需让出CPU控制权等待完成即可等待下一轮循环重试即可
Thread.yield();
else if (Thread.interrupted()) {
// 如果线程被中断,则清除其中断状态,并且断开超时或中断的等待节点的链接
removeWaiter(q);
// 抛出InterruptedException异常
throw new InterruptedException();
}
else if (q == null) {
// 等待节点尚未初始化,如果设置了超时期限并且超时时间小于等于0,则直接返回状态并且终止等待,说明已经超时了
// 这里的逻辑属于先行校验,如果命中了就不用进行超时阻塞
if (timed && nanos <= 0L)
return s;
// 初始化等待节点
q = new WaitNode();
}
else if (!queued)
//如果等待节点尚未加入到栈中,则把当前线程所在的节点压入栈中,top引用指向当前等待节点
// 这里就是Treiber Stack算法的入栈操作
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
// 计算开始时间等待时间于当前时间的相差值作为阻塞的时间parkNanos,因为这里涉及到循环,startTime就是第一轮循环时候的当前系统纳秒
final long parkNanos;
if (startTime == 0L) {
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// 如果状态为NEW(0),则进行超时阻塞,阻塞的是当前的线程
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
// 这种就是最后一个if分支,就是不命中任何条件的永久阻塞,阻塞的是当前的线程
LockSupport.park(this);
}
}

// 移除等待节点,这个方法有两次使用的地方:
// 1. 获取结果的线程进行阻塞等待的时候被中断的场景(处理中断)
// 2. 获取结果的线程采用带超时的阻塞等待并且在进行阻塞之前已经判断到超时时间已经到期(处理不小心进栈的无效节点)
// 实际上,这个方法就是Treiber Stack算法的出栈操作
private void removeWaiter(WaitNode node) {
// 只有目标等待节点不空时候才处理
if (node != null) {
// 目标等待节点的线程引用置为空
node.thread = null;
// 这里循环标记用于因为此方法执行的竞态条件需要重试的起点
retry:
for (;;) {
// 遍历的终止条件:q != null,由于变化条件是q = s,并且每轮循环s = q.next,因此终止条件是栈节点的后继节点next为null
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
// 第一轮循环,q其实就是栈顶节点,栈顶节点的后继节点为s,获取说q是当前需要处理的节点,s是其后继节点
s = q.next;
// 如果当前节点时有效(持有的线程引用非空)的节点,则前驱节点pred更新为当前节点,进行下一轮遍历
if (q.thread != null)
pred = q;
// 如果当前节点已经无效,并且它存在前驱节点,那么前驱节点pred的后继节点引用连接到当前节点的后继节点s,实现当前节点的删除
// 这个是单链表删除中间某一个节点的常规操作
else if (pred != null) {
pred.next = s;
// 如果在当前节点已经无效,并且它存在前驱节点,但是前驱节点二次判断为无效,说明出现了竞态,需要重新进行栈waiters的遍历
if (pred.thread == null) // check for race
continue retry;
}
// 当前节点已经无效,它不存在前驱节点,则直接把当前节点的后继节点s通过CAS更新栈顶节点
// 类比前面分析过的ConcurrentStack的pop()方法,这里的q就是oldHead,s就是newHead。
// CAS更新失败说明存在竞态,则需要重新进行栈waiters的遍历
else if (!WAITERS.compareAndSet(this, q, s))
continue retry;
}
break;
}
}
}

// 报告结果的方法,入参是awaitDone()方法返回的状态值
private V report(int s) throws ExecutionException {
Object x = outcome;
// 如果状态值为NORMAL(2)正常执行完毕,则直接基于outcome强转为目标类型实例
if (s == NORMAL)
return (V)x;
// 如果状态值大于等于CANCELLED(4),则抛出CancellationException异常
if (s >= CANCELLED)
throw new CancellationException();
// 其他情况,实际上只剩下状态值为EXCEPTIONAL(3),则基于outcome强转为Throwable类型,则包装成ExecutionException抛出
throw new ExecutionException((Throwable)x);
}

上面的方法中,removeWaiter()方法相对复杂,它涉及到单链表移除中间节点、考虑多种竞态情况进行重试等设计,需要花大量心思去理解。接着看cancel()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean cancel(boolean mayInterruptIfRunning) {
// 状态必须为NEW(0)
// 如果mayInterruptIfRunning为true,则把状态通过CAS更新为INTERRUPTING(5)
// 如果mayInterruptIfRunning为false,则把状态通过CAS更新为CANCELLED(4)
// 如果状态不为NEW(0)或者CAS更新失败,直接返回false,说明任务已经执行到set()或setException(),无法取消
if (!(state == NEW && STATE.compareAndSet(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
// mayInterruptIfRunning为true,调用执行任务的线程实例的Thread#interrupt()进行中断,更新最终状态为INTERRUPTED(6)
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
// 完成后的通知方法
finishCompletion();
}
return true;
}

cancel()方法只能够中断状态为NEW(0)的线程,并且由于线程只在某些特殊情况下(例如阻塞在同步代码块或者同步方法中阻塞在Object#wait()方法、主动判断线程的中断状态等等)才能响应中断,所以需要思考这个方法是否可以达到预想的目的。最后看剩下的状态判断方法:

1
2
3
4
5
6
7
8
9
// 判断是否取消状态,包括CANCELLED(4)、INTERRUPTING(5)、INTERRUPTED(6)三种状态
public boolean isCancelled() {
return state >= CANCELLED;
}

// 判断是否已经完成,这里只是简单判断状态值不为NEW(0),原因是所有的中间状态都是十分短暂的
public boolean isDone() {
return state != NEW;
}

AbstractExecutorService源码实现

AbstractExecutorService虽然只是ThreadPoolExecutor的抽象父类,但是它已经实现了ExecutorService接口中除了shutdown()shutdownNow()isShutdown()isTerminated()awaitTermination()五个方法之外的其他所有方法(这五个方法在ThreadPoolExecutor实现,因为它们是和线程池的状态相关的)。它的源码体积比较小,下面全量贴出分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
public abstract class AbstractExecutorService implements ExecutorService {

// 静态工厂方法,通过Runnable和具体的返回结果创建FutureTask实例
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

// 静态工厂方法,通过Callable实例创建FutureTask实例
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

// 提交Runnable类型任务
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
// 适配任务为FutureTask实例,注意最终计算结果已经提前设置为null
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 提交到线程池
execute(ftask);
return ftask;
}

// 提交Runnable类型任务,同时传入最终计算结果
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
// 适配任务为FutureTask实例
RunnableFuture<T> ftask = newTaskFor(task, result);
// 提交到线程池
execute(ftask);
return ftask;
}

// 提交Callable类型任务
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// 适配任务为FutureTask实例
RunnableFuture<T> ftask = newTaskFor(task);
// 提交到线程池
execute(ftask);
return ftask;
}

// 执行任务列表中的任意一个任务(实际上有可能会执行多个任务,确保最先完成的任务对应的结果返回)
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
// 通过当前的线程池实例构建ExecutorCompletionService实例
ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);
try {
ExecutionException ee = null;
// 计算deadline
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Iterator<? extends Callable<T>> it = tasks.iterator();
// 提交任务列表的第一个任务实例
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
// 这里获取上一轮(或者第一个任务)任务执行的Future实例
Future<T> f = ecs.poll();
// 如果拿到Future实例为null说明上一轮的任务尚未执行完毕
if (f == null) {
// 如果任务队列中还有任务任务,则添加到线程池中执行
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
// 活跃计算任务为0,说明至少有一个任务成功返回了Future实例
else if (active == 0)
break;
else if (timed) {
// 允许超时的模式下用超时阻塞获取Future实例
f = ecs.poll(nanos, NANOSECONDS);
if (f == null)
throw new TimeoutException();
nanos = deadline - System.nanoTime();
}
// 非超时的模式下永久阻塞获取Future实例
else
f = ecs.take();
}
// 获取到的Future实例不为null,说明已经有至少一个任务执行完毕
if (f != null) {
--active;
try {
return f.get();
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;

} finally {
// 取消所有任务,确保至少有一个任务完成,即使取消所有任务,由于状态管理,成功的任务不受干扰
cancelAll(futures);
}
}

// 永久阻塞 - 执行任务列表中的任意一个任务(实际上有可能会执行多个任务,确保最先完成的任务对应的结果返回)
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}

// 带超时阻塞 - 执行任务列表中的任意一个任务(实际上有可能会执行多个任务,确保最先完成的任务对应的结果返回)
public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}

// 永久阻塞 - 执行任务列表中的所有任务
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
try {
// 遍历任务列表进行FutureTask并且提交到线程池,FutureTask实例添加到futures列表中
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
// 遍历futures列表调用get()方法获取结果,注意会忽略所有的CancellationException、ExecutionException
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try { f.get(); }
catch (CancellationException | ExecutionException ignore) {}
}
}
return futures;
} catch (Throwable t) {
// 只要出现非CancellationException或者ExecutionException异常,则取消所有任务,尚未执行或者尚未执行完毕的任务有可能受到影响
cancelAll(futures);
throw t;
}
}

// 带超时阻塞 - 执行任务列表中的所有任务
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
// 转换超时时间单位为纳秒
final long nanos = unit.toNanos(timeout);
// 计算deadline
final long deadline = System.nanoTime() + nanos;
ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
int j = 0;
timedOut: try {
// 遍历任务列表进行FutureTask,FutureTask实例添加到futures列表中
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
final int size = futures.size();
// 遍历futures列表,进行一次超时先验,如果已经超时,则直接跳出,无须执行任务
for (int i = 0; i < size; i++) {
// 这里有个特殊处理,第一个任务只要timeout不为0,必定会进行提交第二个任务起才判断deadline
if (((i == 0) ? nanos : deadline - System.nanoTime()) <= 0L)
break timedOut;
// 提交FutureTask到线程池
execute((Runnable)futures.get(i));
}
// j记录了超时的那个任务的Future的索引值,遍历futures列表进行超时阻塞的get()方法调用
for (; j < size; j++) {
Future<T> f = futures.get(j);
if (!f.isDone()) {
try { f.get(deadline - System.nanoTime(), NANOSECONDS); }
catch (CancellationException | ExecutionException ignore) {}
catch (TimeoutException timedOut) {
break timedOut;
}
}
}
return futures;
} catch (Throwable t) {
cancelAll(futures);
throw t;
}
// 所有任务完成之前发现已经超时,则取消超时任务索引之后的所有任务,已经完成的不受影响
cancelAll(futures, j);
return futures;
}

// 取消所有的Future实例
private static <T> void cancelAll(ArrayList<Future<T>> futures) {
cancelAll(futures, 0);
}

// 遍历所有的Future实例调用其cancel方法,因为参数为true,所以会响应中断
// j参数是决定遍历的起点,0表示整个列表遍历
private static <T> void cancelAll(ArrayList<Future<T>> futures, int j) {
for (int size = futures.size(); j < size; j++)
futures.get(j).cancel(true);
}
}

整个类的源码并不复杂,注意到CallableRunnable的任务最重都会包装为适配器FutureTask的实例,然后通过execute()方法提交包装好的FutureTask任务实例,返回值是Future或者Future的集合时候,实际上是RunnableFuture或者RunnableFuture的集合,只因为RunnableFutureFuture的子接口,这种设计遵循了设计模式原则里面的依赖倒置原则。这里小结一下分析过的几个方法的特征:

方法 特征
submit(Runnable task) 异步执行,执行结果无感知,通过get()方法虽然返回null但是可以确定执行完毕的时刻
submit(Runnable task, T result) 异步执行,预先传入执行结果,最终通过get()方法返回的就是初始传入的结果
submit(Callable<T> task) 异步执行,最终通过get()方法返回的是Callable#call()的结果
invokeAny(Collection<? extends Callable<T>> tasks) 异步执行任务列表中的任意一个任务(实际上有可能会执行多个任务,确保最先完成的任务对应的结果返回),永久阻塞同步返回结果
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 功能同上,获取结果的时候是超时阻塞获取
invokeAll(Collection<? extends Callable<T>> tasks) 异步执行任务列表中的所有任务,必须等待所有Future#get()永久阻塞方法都返回了结果才返回Future列表
invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 异步执行任务列表中的所有任务,只要其中一个任务Future#get()超时阻塞方法超时就会取消该任务索引之后的所有任务并且返回Future列表

小结

ExecutorService提供了一系列便捷的异步任务提交方法,它使用到多种技术:

  • 相对底层的CAS原语。
  • 基于CAS实现的无锁并发栈。
  • 依赖于线程池实现的execute()方法进行异步任务提交。
  • 使用适配器模式设计FutureTask适配FutrueRunnableCallable,提供了状态的生命周期管理。

下一篇文章将会分析一下调度线程池ScheduledThreadPoolExecutor的底层实现和源码。

(本文完 c-7-d e-a-20190727)



 评论