概述 FutureTask 表示异步计算的任务,通过间接实现了 Future 和 Runnable 接口,使其具备异步计算结果 和任务 的特性。异步计算结果支持检查任务是否完成、等待任务完成、获取任务结果、以及取消任务等方法;任务特性是指 FutueTask 是一个任务体,并且用于包装 Callable 或 Runnable 对象,即统一任务为 Callable 类型。
FutureTask 相关的 UML 类图如下:
使用示例 我们执行 5 个异步任务,在未来的某一个时刻收集任务执行的结果并打印。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public static void futureTaskTest () { ExecutorService threadPool = Executors.newFixedThreadPool(8 ); Random random = new Random(10000 ); HashSet<Future<String>> futureList = new HashSet<>(); for (int i = 1 ; i <= 100 ; i++) { int finalI = i; Future<String> future = threadPool.submit(() -> { try { Thread.sleep(random.nextInt(10000 )); } catch (InterruptedException interruptedException) { Thread.currentThread().interrupt(); } return Thread.currentThread().getName() + " is run future-task..." + finalI; } ); futureList.add(future); } futureList.forEach(future -> { try { if (future.isCancelled()) { return ; } if (future.isDone()) { System.out.println(future.get(3 , TimeUnit.SECONDS)); } else { System.out.println(future.get()); } } catch (InterruptedException | ExecutionException | TimeoutException e) { e.printStackTrace(); } }); }
需要说明的是,上述使用示例是结合线程池 ThreadPoolExecutor 进行演示的,并没有直接使用 FutureTask API ,毕竟单独使用 FutureTask 并不多。
源码分析 FutureTask 相关的源码结构如下图:
属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 public class FutureTask <V > implements RunnableFuture <V > { private volatile int state; private Callable<V> callable; private Object outcome; private volatile Thread runner; private volatile WaitNode waiters; }
state: 表示异步计算任务的状态,通过该值可以追踪任务进行到哪个阶段 了。该属性使用 volatile
进行了修饰;callable: 异步计算任务封装的任务体,类型统一为 Callable,原因就是异步计算任务需要知道任务执行的结果,而 Runnable 类型不关心任务的结果。这里使用了适配器模型,将任务类型进行了统一。outcome: 任务最终的结果,要么是正常的结果,要么是异常,get 方法获取的就是这个属性值。通过 state 属性值可以明确 outcome 属性值的情况;runner: 表示执行任务的线程,谁执行 callable 这个属性就记录谁;waiters: 等待任务结果的线程节点组成的等待链表,链表中的每个节点都封装了等待结果的线程;
任务状态 异步计算任务 FutureTask 的状态定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private volatile int state;private static final int NEW = 0 ; private static final int COMPLETING = 1 ; private static final int NORMAL = 2 ; private static final int EXCEPTIONAL = 3 ; private static final int CANCELLED = 4 ; private static final int INTERRUPTING = 5 ; private static final int INTERRUPTED = 6 ;
异步计算任务 FutureTask 的状态流转如下图:
可以看到异步计算任务 FutureTask 具有生命周期的特性,每个阶段都对应着任务的执行进度。其中,需要强调两个状态:
COMPLETING
: 表示任务计算过程的中间状态,任务快要完成,达到这个状态说明任务已经执行完成或者出现异常,紧接着保存结果或异常信息即可。
INTERRUPTING
: 表示对新建状态的任务进行中断处理中,如果任务正被线程在执行,那么就对该线程进行中断,否则不处理线程中断逻辑。达到这个状态说明已经成功对未完成的任务进行了中断处理,紧接着会将异步计算任务状态更新为 INTERRUPTED
,此时中断才算完成。
既然 COMPLETING
和 INTERRUPTING
都只是一个中间状态,那么它们的必要性体现在哪里呢?
COMPLETING
: 表示任务快要完成,有了这个标志,获取任务结果的调用线程更加明确任务执行到哪个阶段了,可以选择等待结果还是直接获取结果;
INTERRUPTING
: 表示中断者准备对异步计算任务进行中断,只是还没有完成这个中断的整个过程,有了这个标志就可以明确异步任务被中断的阶段了;
等待链表 异步计算任务 FutureTask 实现了一个单向链表,用来排队等待结果的业务线程。链表结构如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
等待链表本身比较简单,需要特别说明的是,进入到链表的线程都是处于等待(任务结果)状态,具体是通过 LockSupport.park
方法来实现阻塞的。当任务执行结束或被取消,那么等待链表中的线程会通过 LockSupport.unpark
方法依次被唤醒。
CAS属性值 异步计算任务 FutureTask 针对 state
、runner
、waiters
这 3 个属性,通过 Unsafe 实现了 CAS 机制对其进行写操作,保证线程安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public class FutureTask <V > implements RunnableFuture <V > { private volatile int state; private volatile Thread runner; private volatile WaitNode waiters; private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> k = FutureTask.class ; stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state" )); runnerOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("runner" )); waitersOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiters" )); } catch (Exception e) { throw new Error(e); } } }
构造方法 FutureTask 的构造方法用于封装任务体以屏蔽不同任务的差异。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 public FutureTask (Callable<V> callable) { if (callable == null ) throw new NullPointerException(); this .callable = callable; this .state = NEW; } public FutureTask (Runnable runnable, V result) { this .callable = Executors.callable(runnable, result); this .state = NEW; }
构造方法主要做了以下两个工作:
将不同的任务统一处理成 Callable 类型,这是异步计算结果必须的;
异步计算任务在创建时的状态为新建状态 NEW ;
任务执行 - run() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public void run () { if (state != NEW || !UNSAFE.compareAndSwapObject(this , runnerOffset, null , Thread.currentThread())) return ; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true ; } catch (Throwable ex) { result = null ; ran = false ; setException(ex); } if (ran) set(result); } } finally { runner = null ; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
任务可以执行的条件是:异步任务状态为 NEW && 设置执行任务的线程为当前线程成功;
调用任务的 call() 方法,即方法调用,执行任务;
处理结果和异常,并更新异步任务状态以及唤醒等待异步计算结果的(调用)线程;
线程执行完任务后,置空 runner 为下次任务执行做准备(试想下,如果不置空会怎样?后续的线程可能不能执行任务,因为 CAS 不会成功),并且处理异步任务被中断的情况;
处理异常 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected void setException (Throwable t) { if (UNSAFE.compareAndSwapInt(this , stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this , stateOffset, EXCEPTIONAL); finishCompletion(); } }
当执行任务的过程发生异常,就会执行该方法,该方法有四步操作:
将异步计算任务状态 NEW 转换为 COMPLETING 状态,标志异步计算任务快完成了;
将异常信息保存到待返回的属性中,该属性是 get 方法获取的值;
将异步计算任务状态由 COMPLETING 转换为 EXCEPTIONAL 状态,标志异步计算任务执行异常,是个完结状态;
任务完结后的收尾工作,包括唤醒等待链表中的线程和置空任务体等;
至此,FutureTask 的状态流转为:NEW -> COMPLETING -> EXCEPTIONAL
处理结果 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 protected void set (V v) { if (UNSAFE.compareAndSwapInt(this , stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this , stateOffset, NORMAL); finishCompletion(); } }
执行任务无异常,就会执行该方法,该方法同样有四步操作:
将异步计算任务状态 NEW 转换为 COMPLETING 状态,标志异步计算任务快完成了;
将任务结果保存到待返回的属性中,该属性是 get 方法获取的值;
将异步计算任务状态由 COMPLETING 转换为 NORMAL 状态,标志异步计算任务完成,是个完结状态;
任务完结后的收尾工作,包括唤醒等待链表中的线程和置空任务体等;
至此,FutureTask 的状态流转为:NEW -> COMPLETING -> NORMAL
处理异步任务被中断的情况 1 2 3 4 5 6 7 8 9 private void handlePossibleCancellationInterrupt (int s) { if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); }
执行该方法的情况是,在线程执行任务的 call()
方法期间,任务被中断者中断了,一般是中断者调用 cancel(true)
方法,但是还没有完成整个中断流程,因此任务执行完毕后等待中断者完成整个中断逻辑;
任务取消 - cancel 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public boolean cancel (boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this , stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false ; try { if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null ) t.interrupt(); } finally { UNSAFE.putOrderedInt(this , stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true ; }
异步计算任务 FutureTask 支持取消任务,这个取消任务可能是中断任务,也可能是取消任务,由调用方决定。最终的结果是任务不能执行了。
取消任务的前提是,异步计算任务的状态为 NEW,否则不能取消;
如果是中断式取消任务,那么即使任务正在线程执行,也会对该线程进行中断,没有被执行则忽略中断;
取消任务后,执行后续的收尾工作,包括唤醒等待结果的线程、置空任务体;
取消任务的情况下,异步计算任务的状态变更:NEW->CANCELLED 。 中断式取消任务的情况下,异步计算任务的状态变更:NEW->INTERRUPTING->INTERRUPTED 。
其中,中断式取消任务的情况下,涉及到两个状态的变更。INTERRUPTING
只是标记中断开始,此时中断的整个过程还没有完成。还记得任务执行过程中对中断情况的特别处理吗?就是这里的情况。
完结任务 - finishCompletion 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private void finishCompletion () { for (WaitNode q; (q = waiters) != null ; ) { if (UNSAFE.compareAndSwapObject(this , waitersOffset, q, null )) { for (; ; ) { Thread t = q.thread; if (t != null ) { q.thread = null ; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null ) break ; q.next = null ; q = next; } break ; } } done(); callable = null ; }
完结任务方法被多个地方使用,该方法主要做以下 3 件事:
依次唤醒等待链表中的调用者线程,让它们去拿结果或异常;
执行钩子方法;
置空任务体 callable;
获取任务结果 - get 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public V get () throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false , 0L ); return report(s); } public V get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null ) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true , unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
获取任务结果有两种方式,一种是阻塞式,另一种是超时获取,它们分别对应上述的方法。
等待任务完成 - awaitDone 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 private int awaitDone (boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L ; WaitNode q = null ; boolean queued = false ; for (; ; ) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null ) q.thread = null ; return s; } else if (s == COMPLETING) Thread.yield(); else if (q == null ) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this , waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L ) { removeWaiter(q); return state; } LockSupport.parkNanos(this , nanos); } else LockSupport.park(this ); } }
等待任务完成的方法还是比较复杂的,它的特点是:在循环中控制整个流程,以状态 state 驱动流程,也就是随着 state 的不同,执行的分支流也不同 。我们假设调用者线程获取结果时任务还没有完成,异步计算任务的状态为 NEW,那么一个完整的流程大致如下:
第一次循环,state=NEW,此时会将调用者线程封装在 WaitNode 节点中;
第二次循环,state=NEW,此时调用者线程节点 WaitNode 会加入到等待链表的头部「这里是入等待链表的入口 」,注意此时调用者线程并没有阻塞等待;
第三次循环,state=NEW,执行调用者线程会进入阻塞,假设调用者获取结果使用的是 get() 方法,那么就是 LockSupport.park(this)
;
假设在调用者线程阻塞等待期间,任务完成了,在完结任务的处理方法中,会通过 LockSupport.unpark(t)
依次唤醒等待链表中的调用者线程;
第四次循环,此时 state > COMPLETING ,退出方法;
在获取任务结果时,异步计算任务的状态是 COMPLETING 的情况,那么就会让出 CPU 等待任务完成。
任务最终执行结果 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @SuppressWarnings ("unchecked" )private V report (int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V) x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable) x); }
如果任务正常执行结束,则返回任务的执行结果;
如果任务被取消,则抛出取消异常;
如果任务异常结束,则包装成 ExecutionException 异常抛出;
任务执行并重置 - runAndReset() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 protected boolean runAndReset () { if (state != NEW || !UNSAFE.compareAndSwapObject(this , runnerOffset, null , Thread.currentThread())) return false ; boolean ran = false ; int s = state; try { Callable<V> c = callable; if (c != null && s == NEW) { try { c.call(); ran = true ; } catch (Throwable ex) { setException(ex); } } } finally { runner = null ; s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
上述方法和 run()
方法基本一致,唯一区别点是,任务执行成功后该方法不会设置执行结果,也不会更新更新状态,这一设计是为周期性任务服务的 。
注意: 一旦任务执行失败或被取消,那么这个任务就会失效,不能再次执行「异常时状态被更新为终止状态」。
FutureTask & ThreadPollExecutor 在介绍并发 - 线程池 一文中,我们简单地对 FutureTask 进行了介绍,下面我们收个尾,看看两者是如何配合使用的。
使用线程池提交任务时,一般有两种方法,其中的 submit
方法返回的就是 FutureTask
。
1 2 3 4 5 6 7 8 9 10 11 12 13 public <T> Future<T> submit (Callable<T> task) { if (task == null ) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor (Callable<T> callable) { return new FutureTask<T>(callable); }
这样一来,任务提交到线程池后就会返回一个和任务相关的异步计算任务对象 FutureTask ,通过这个对象我们就可以对提交到线程池中的任务的状态进行追踪。
应用场景 RPC 框架中的同步调用和异步调用本质上都是异步调用,其中异步调用就可以使用 Future 衍生实现。比如 Dubbo 中的异步调用:
1 2 3 4 5 6 7 8 9 ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult();
小结 FutureTask 表示异步计算的任务,它支持检查任务是否完成、等待任务完成、获取任务结果、以及取消任务等方法。通过这个异步计算任务,我们可以追踪任务的状态,从而可以灵活操作任务 。