并发 - FutureTask

概述

FutureTask 表示异步计算的任务,通过间接实现了 Future 和 Runnable 接口,使其具备异步计算结果任务的特性。异步计算结果支持检查任务是否完成、等待任务完成、获取任务结果、以及取消任务等方法;任务特性是指 FutueTask 是一个任务体,并且用于包装 Callable 或 Runnable 对象,即统一任务为 Callable 类型。

FutureTask 相关的 UML 类图如下:

使用示例

我们执行 5 个异步任务,在未来的某一个时刻收集任务执行的结果并打印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public static void futureTaskTest() {
// 1 创建固定大小线程池
ExecutorService threadPool = Executors.newFixedThreadPool(8);
Random random = new Random(10000);

// 2 定义异步计算任务集合
HashSet<Future<String>> futureList = new HashSet<>();
// List<Future<String>> futureList = new ArrayList<>();
for (int i = 1; i <= 100; i++) {
// submit 方法提交的任务会被 FutureTask 包装,返回的也是这个 FutureTask 对象
int finalI = i;
Future<String> future = threadPool.submit(() -> {
try {
Thread.sleep(random.nextInt(10000));
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
}
return Thread.currentThread().getName() + " is run future-task..." + finalI;
}
);
futureList.add(future);
}

// 3 任务提交完成后,等待处理完成并打印
futureList.forEach(future -> {
try {
// 任务取消,不获取结果{@link get()},否则会抛出异常
if (future.isCancelled()) {
return;
}

// 任务已经在执行了,则阻塞等待最终的结果
if (future.isDone()) {
System.out.println(future.get(3, TimeUnit.SECONDS));

} else {
// 执行到这里,说明异步计算任务还是 NEW 状态,阻塞等待被调度、执行
System.out.println(future.get());
}

} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
});
}

需要说明的是,上述使用示例是结合线程池 ThreadPoolExecutor 进行演示的,并没有直接使用 FutureTask API ,毕竟单独使用 FutureTask 并不多。

源码分析

FutureTask 相关的源码结构如下图:

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class FutureTask<V> implements RunnableFuture<V> {
/**
* 任务状态
*/
private volatile int state;

/**
* FutureTask 封装的任务体。如果是 Runnable 类型,则会对Runnable进行适配转为 Callable 类型
*/
private Callable<V> callable;

/**
* 任务最终的结果,要么是正常的结果,要么是异常。get 方法获取的就是这个属性值。
*/
private Object outcome;

/**
* 执行任务的线程,如线程池中的某个线程
*/
private volatile Thread runner;

/**
* 等待任务结果的线程节点,即调用者线程组成的链表结构
*/
private volatile WaitNode waiters;
}

state: 表示异步计算任务的状态,通过该值可以追踪任务进行到哪个阶段了。该属性使用 volatile 进行了修饰;
callable: 异步计算任务封装的任务体,类型统一为 Callable,原因就是异步计算任务需要知道任务执行的结果,而 Runnable 类型不关心任务的结果。这里使用了适配器模型,将任务类型进行了统一。
outcome: 任务最终的结果,要么是正常的结果,要么是异常,get 方法获取的就是这个属性值。通过 state 属性值可以明确 outcome 属性值的情况;
runner: 表示执行任务的线程,谁执行 callable 这个属性就记录谁;
waiters: 等待任务结果的线程节点组成的等待链表,链表中的每个节点都封装了等待结果的线程;

任务状态

异步计算任务 FutureTask 的状态定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 可能的状态转换:
*
* 任务正常运行结束:NEW -> COMPLETING -> NORMAL
* 任务异常运行结束:NEW -> COMPLETING -> EXCEPTIONAL
* 任务取消:NEW -> CANCELLED
* 任务被中断:NEW -> INTERRUPTING -> INTERRUPTED
*
* 异步任务状态仅在 {@link set(V v)}、{@link setException(Throwable t)}、{@link cancel()} 方法中转换为最终状态,
* 在这期间,状态可能会采用 COMPLETING 或 INTERRUPTING 的中间状态。
*/
private volatile int state;

private static final int NEW = 0; // 新建状态
private static final int COMPLETING = 1; // 执行中间态
private static final int NORMAL = 2; // 完成
private static final int EXCEPTIONAL = 3; // 异常
private static final int CANCELLED = 4; // 取消
private static final int INTERRUPTING = 5; // 中断中
private static final int INTERRUPTED = 6; // 中断

异步计算任务 FutureTask 的状态流转如下图:

可以看到异步计算任务 FutureTask 具有生命周期的特性,每个阶段都对应着任务的执行进度。其中,需要强调两个状态:

  • COMPLETING: 表示任务计算过程的中间状态,任务快要完成,达到这个状态说明任务已经执行完成或者出现异常,紧接着保存结果或异常信息即可。
  • INTERRUPTING: 表示对新建状态的任务进行中断处理中,如果任务正被线程在执行,那么就对该线程进行中断,否则不处理线程中断逻辑。达到这个状态说明已经成功对未完成的任务进行了中断处理,紧接着会将异步计算任务状态更新为 INTERRUPTED,此时中断才算完成。

既然 COMPLETINGINTERRUPTING 都只是一个中间状态,那么它们的必要性体现在哪里呢?

  • COMPLETING: 表示任务快要完成,有了这个标志,获取任务结果的调用线程更加明确任务执行到哪个阶段了,可以选择等待结果还是直接获取结果;
  • INTERRUPTING: 表示中断者准备对异步计算任务进行中断,只是还没有完成这个中断的整个过程,有了这个标志就可以明确异步任务被中断的阶段了;

等待链表

异步计算任务 FutureTask 实现了一个单向链表,用来排队等待结果的业务线程。链表结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static final class WaitNode {
/**
* 等待任务结果的线程
*/
volatile Thread thread;
/**
* 后置指针
*/
volatile WaitNode next;

/**
* 创建线程节点
*/
WaitNode() {
thread = Thread.currentThread();
}
}

等待链表本身比较简单,需要特别说明的是,进入到链表的线程都是处于等待(任务结果)状态,具体是通过 LockSupport.park 方法来实现阻塞的。当任务执行结束或被取消,那么等待链表中的线程会通过 LockSupport.unpark 方法依次被唤醒。

CAS属性值

异步计算任务 FutureTask 针对 staterunnerwaiters 这 3 个属性,通过 Unsafe 实现了 CAS 机制对其进行写操作,保证线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class FutureTask<V> implements RunnableFuture<V> {

private volatile int state;
private volatile Thread runner;
private volatile WaitNode waiters;

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
// 内存地址偏移量
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;

static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
}

构造方法

FutureTask 的构造方法用于封装任务体以屏蔽不同任务的差异。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* 创建任务
*
* @param callable the callable task 可调用任务
* @throws NullPointerException if the callable is null
*/
public FutureTask(Callable<V> callable) {
// 任务不允许为空
if (callable == null)
throw new NullPointerException();

// 保存到属性中
this.callable = callable;

// 异步计算任务状态初始化为新建状态
this.state = NEW; // ensure visibility of callable
}

/**
* 创建任务,需要将 Runnable 类型包装成 Callable 类型
*
* @param runnable the runnable task 可运行任务
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* 成功完成后返回的结果。如果不需要特定结果,传入 null
*
* @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
// 1 将 Runnable 类型任务适配成 Callable 类型,并保存到属性中
this.callable = Executors.callable(runnable, result);

// 2 异步计算任务状态初始化为新建状态
this.state = NEW; // ensure visibility of callable
}

构造方法主要做了以下两个工作:

  • 将不同的任务统一处理成 Callable 类型,这是异步计算结果必须的;
  • 异步计算任务在创建时的状态为新建状态 NEW

任务执行 - run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public void run() {
// 判断任务是否可以执行
// 任务状态不为 NEW 或 设置执行该任务的线程(也就是 runner 属性)失败 ,那么不能执行该任务
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;

try {
// 真正的任务,这里统一是 Callable 类型,Runnable 被适配成了 Callable 类型
Callable<V> c = callable;

// 任务状态必须是 NEW 才能运行
if (c != null && state == NEW) {
// 任务执行结果
V result;
// 任务是否执行成功
boolean ran;
try {
// 执行任务
result = c.call();
ran = true;

// 出现异常
} catch (Throwable ex) {
result = null;
ran = false;
// 处理异常结果并更新任务状态
setException(ex);
}

// 任务执行成功,设置结果并更新任务状态
if (ran)
set(result);
}
} finally {
// 任务处理完成后,将 runner 置 null ,防止并发调用 run()
runner = null;

// 重新读取状态以防止漏掉中断的处理「想一下这种场景,线程在执行 c.call() 方法时被中断了」
int s = state;
if (s >= INTERRUPTING)
// 处理可能因取消时进行的中断,如果处于中断中,等待中断者完成即可
handlePossibleCancellationInterrupt(s);
}
}
  1. 任务可以执行的条件是:异步任务状态为 NEW && 设置执行任务的线程为当前线程成功;
  2. 调用任务的 call() 方法,即方法调用,执行任务;
  3. 处理结果和异常,并更新异步任务状态以及唤醒等待异步计算结果的(调用)线程;
  4. 线程执行完任务后,置空 runner 为下次任务执行做准备(试想下,如果不置空会怎样?后续的线程可能不能执行任务,因为 CAS 不会成功),并且处理异步任务被中断的情况;

处理异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void setException(Throwable t) {
// 将任务状态 NEW 更新为 COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

// 结果设置为异常信息
// 该属性是 get 方法返回的
outcome = t;

// 设置任务最终的状态为 EXCEPTIONAL
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state

// 处理任务完成的后续工作
finishCompletion();
}
}

当执行任务的过程发生异常,就会执行该方法,该方法有四步操作:

  1. 将异步计算任务状态 NEW 转换为 COMPLETING 状态,标志异步计算任务快完成了;
  2. 将异常信息保存到待返回的属性中,该属性是 get 方法获取的值;
  3. 将异步计算任务状态由 COMPLETING 转换为 EXCEPTIONAL 状态,标志异步计算任务执行异常,是个完结状态;
  4. 任务完结后的收尾工作,包括唤醒等待链表中的线程和置空任务体等;

至此,FutureTask 的状态流转为:NEW -> COMPLETING -> EXCEPTIONAL

处理结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected void set(V v) {
// 将任务状态 NEW 更新为 COMPLETING
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {

// 结果设置为传入的值
// 该属性是 get 方法返回的
outcome = v;

// 设置任务最终的状态为 NORMAL
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state

// 处理任务完成的后续工作
finishCompletion();
}
}

执行任务无异常,就会执行该方法,该方法同样有四步操作:

  1. 将异步计算任务状态 NEW 转换为 COMPLETING 状态,标志异步计算任务快完成了;
  2. 将任务结果保存到待返回的属性中,该属性是 get 方法获取的值;
  3. 将异步计算任务状态由 COMPLETING 转换为 NORMAL 状态,标志异步计算任务完成,是个完结状态;
  4. 任务完结后的收尾工作,包括唤醒等待链表中的线程和置空任务体等;

至此,FutureTask 的状态流转为:NEW -> COMPLETING -> NORMAL

处理异步任务被中断的情况

1
2
3
4
5
6
7
8
9
/**
* 确保在 run 或 runAndReset 执行时,来自可能的 cancel(true) 方法的中断
*/
private void handlePossibleCancellationInterrupt(int s) {
// 等待中断者完成中断,这里让出 CPU 等待
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt 等待挂起的中断
}

执行该方法的情况是,在线程执行任务的 call() 方法期间,任务被中断者中断了,一般是中断者调用 cancel(true) 方法,但是还没有完成整个中断流程,因此任务执行完毕后等待中断者完成整个中断逻辑;

任务取消 - cancel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 取消任务,可能是中断,也可能是取消
*
* @param mayInterruptIfRunning 是否中断
* @return
*/
public boolean cancel(boolean mayInterruptIfRunning) {
// 取消任务的条件:任务状态为 NEW && 更新任务状态 NEW->INTERRUPTING 或者 NEW->CANCELLED 成功
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;


try { // in case call to interrupt throws exception
// 如果中断线程
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
// 中断线程
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}

} finally {
// 完结任务
finishCompletion();
}
return true;
}

异步计算任务 FutureTask 支持取消任务,这个取消任务可能是中断任务,也可能是取消任务,由调用方决定。最终的结果是任务不能执行了。

  1. 取消任务的前提是,异步计算任务的状态为 NEW,否则不能取消;
  2. 如果是中断式取消任务,那么即使任务正在线程执行,也会对该线程进行中断,没有被执行则忽略中断;
  3. 取消任务后,执行后续的收尾工作,包括唤醒等待结果的线程、置空任务体;

取消任务的情况下,异步计算任务的状态变更:NEW->CANCELLED
中断式取消任务的情况下,异步计算任务的状态变更:NEW->INTERRUPTING->INTERRUPTED

其中,中断式取消任务的情况下,涉及到两个状态的变更。INTERRUPTING 只是标记中断开始,此时中断的整个过程还没有完成。还记得任务执行过程中对中断情况的特别处理吗?就是这里的情况。

完结任务 - finishCompletion

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private void finishCompletion() {
// assert state > COMPLETING;
// 如果线程等待队列不为空(这个线程队列实际上是调用者线程)
for (WaitNode q; (q = waiters) != null; ) {
// 置空队列
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 唤醒并从等待列表中移除等待结果的节点
for (; ; ) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
// 唤醒线程 t
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

// 执行钩子方法
done();

// 任务体完成后置空
callable = null; // to reduce footprint
}

完结任务方法被多个地方使用,该方法主要做以下 3 件事:

  1. 依次唤醒等待链表中的调用者线程,让它们去拿结果或异常;
  2. 执行钩子方法;
  3. 置空任务体 callable;

获取任务结果 - get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 获得任务的执行结果,如果任务未执行完毕,会阻塞直到任务结束;
*
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
// 任务执行状态
int s = state;

// 如果任务还没可能完成,则进入等待链表中等待
if (s <= COMPLETING)
s = awaitDone(false, 0L);


// 返回任务的结果或异常
return report(s);
}

/**
* 超时获取执行结果
*
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();

// 获取任务的状态
int s = state;

// 如果任务还没有完成,则超时等待(进入等待链表中),超时或有结果了会醒来
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
// 超时无结果,抛出异常
throw new TimeoutException();

// 返回任务的结果或异常
return report(s);
}

获取任务结果有两种方式,一种是阻塞式,另一种是超时获取,它们分别对应上述的方法。

等待任务完成 - awaitDone

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* 在中断或超时时等待完成或中止
*
* @param timed true if use timed waits 是否超时等待
* @param nanos time to wait, if timed 等待时间长,纳秒
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 等待的截止时间,非超时就是 0
final long deadline = timed ? System.nanoTime() + nanos : 0L;

WaitNode q = null;
boolean queued = false;
for (; ; ) {
// 如果获取结果的线程被中断,则尝试移除节点 q,然后抛出中断异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

// 任务状态
int s = state;

// 如果任务状态 > COMPLETING,那么可以返回了,此时能拿到最终的结果或异常「参考 report 中的逻辑」。
// todo 这里是自旋出口
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;

// 如果任务状态处于 COMPLETING 状态,说明任务快完成了,就差设置状态到NORMAL或EXCEPTIONAL和设置结果了
} else if (s == COMPLETING) // cannot time out yet 还不能超时
// 让出 CPU,优先完成任务「调用线程不占用CPU了,让任务尽快完成」
Thread.yield();

// 创建线程节点,节点中记录调用者线程
else if (q == null)
q = new WaitNode();

// 将线程节点 q 加入到等待链表中
else if (!queued)
// CAS 的链表头插法
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);

// 超时获取结果
else if (timed) {
// 计算是否超时
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
// 超时了,则从等待链表中移除节点 q,并返回 state
removeWaiter(q);
return state;
}

// 超时阻塞线程
LockSupport.parkNanos(this, nanos);

// 阻塞式获取结果,阻塞当前线程(调用者线程)
} else
// 阻塞线程
LockSupport.park(this);
}
}

等待任务完成的方法还是比较复杂的,它的特点是:在循环中控制整个流程,以状态 state 驱动流程,也就是随着 state 的不同,执行的分支流也不同。我们假设调用者线程获取结果时任务还没有完成,异步计算任务的状态为 NEW,那么一个完整的流程大致如下:

  1. 第一次循环,state=NEW,此时会将调用者线程封装在 WaitNode 节点中;
  2. 第二次循环,state=NEW,此时调用者线程节点 WaitNode 会加入到等待链表的头部「这里是入等待链表的入口」,注意此时调用者线程并没有阻塞等待;
  3. 第三次循环,state=NEW,执行调用者线程会进入阻塞,假设调用者获取结果使用的是 get() 方法,那么就是 LockSupport.park(this)
  4. 假设在调用者线程阻塞等待期间,任务完成了,在完结任务的处理方法中,会通过 LockSupport.unpark(t) 依次唤醒等待链表中的调用者线程;
  5. 第四次循环,此时 state > COMPLETING ,退出方法;

在获取任务结果时,异步计算任务的状态是 COMPLETING 的情况,那么就会让出 CPU 等待任务完成。

任务最终执行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 为已完成的任务返回结果或抛出异常。
*
* @param s completed state value
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
// 任务执行的结果或异常
Object x = outcome;

// 如果任务正常运行结束,返回结果
if (s == NORMAL)
return (V) x;

// 如果任务被取消或被中断,则抛出任务取消异常
if (s >= CANCELLED)
throw new CancellationException();

// 任务异常运行
throw new ExecutionException((Throwable) x);
}
  • 如果任务正常执行结束,则返回任务的执行结果;
  • 如果任务被取消,则抛出取消异常;
  • 如果任务异常结束,则包装成 ExecutionException 异常抛出;

任务执行并重置 - runAndReset()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
* 执行计算而不设置其结果,然后将此未来计算任务重置为初始状态,如果计算遇到异常或被取消则任务失效,不能重复执行。
* 该设计用于周期性任务。
*
* @return {@code true} if successfully run and reset
*/
protected boolean runAndReset() {
// 判断任务是否已经开始执行,没有开始执行,那么设置执行该任务的线程,也就是 runner 属性
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;

// 任务执行成功
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
// 执行任务
if (c != null && s == NEW) {
try {
// 执行任务,但不设置结果,也不更新完成状态「任务可以重复执行」
c.call(); // don't set result
ran = true;

// 异常情况,设置异常并更新完成状态「任务不能再执行」
} catch (Throwable ex) {
setException(ex);
}
}

} finally {
// 任务处理完成后,将 runner 置 null ,防止并发调用 run()
// runner must be non-null until state is settled to prevent concurrent calls to run()
runner = null;

// 重新读取状态以防止漏掉中断的处理「想一下这种场景,线程在执行 c.call() 方法时被中断了」
// state must be re-read after nulling runner to prevent leaked interrupts
s = state;

// 处理可能因取消时进行的中断,如果处于中断中,等待中断者完成即可
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}

// 成功运行并重置
return ran && s == NEW;
}

上述方法和 run() 方法基本一致,唯一区别点是,任务执行成功后该方法不会设置执行结果,也不会更新更新状态,这一设计是为周期性任务服务的

注意:一旦任务执行失败或被取消,那么这个任务就会失效,不能再次执行「异常时状态被更新为终止状态」。

FutureTask & ThreadPollExecutor

在介绍并发 - 线程池一文中,我们简单地对 FutureTask 进行了介绍,下面我们收个尾,看看两者是如何配合使用的。

使用线程池提交任务时,一般有两种方法,其中的 submit 方法返回的就是 FutureTask

1
2
3
4
5
6
7
8
9
10
11
12
13
public <T> Future<T> submit(Callable<T> task) {
// 非空检查
if (task == null) throw new NullPointerException();
// 使用 FutureTask 封装任务
RunnableFuture<T> ftask = newTaskFor(task);
// 使用 execute 方法执行任务
execute(ftask);
// 返回 FutureTask
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

这样一来,任务提交到线程池后就会返回一个和任务相关的异步计算任务对象 FutureTask ,通过这个对象我们就可以对提交到线程池中的任务的状态进行追踪。

应用场景

RPC 框架中的同步调用和异步调用本质上都是异步调用,其中异步调用就可以使用 Future 衍生实现。比如 Dubbo 中的异步调用:

1
2
3
4
5
6
7
8
9
ResponseFuture future = currentClient.request(inv, timeout);
/**
* 1 调用 RpcContext#setFuture(future) 方法,在 FutureFitler 中,异步回调。
* 2 将DefaultFuture 对象封装到 FutureAdapter实例中,并将 FutureAdapter实例设置到RpcContext 中,我们可以在需要的地方取出使用 【在合适的地方调用 get方法】
* 3 FutureAdapter 是一个适配器,用于将 Dubbo 中的 ResponseFuture 与 JDK 中的 Future 进行适配,这样当用户线程调用 Future 的 get 方法时,经过 FutureAdapter 适配,最终会调用 ResponseFuture 实现类对象的 get 方法,也就是 DefaultFuture 的 get 方法
*/
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
// 返回 空结果
return new RpcResult();

小结

FutureTask 表示异步计算的任务,它支持检查任务是否完成、等待任务完成、获取任务结果、以及取消任务等方法。通过这个异步计算任务,我们可以追踪任务的状态,从而可以灵活操作任务