概述 ScheduledThreadPoolExecutor 是基于 ThreadPoolExecutor 扩展的 执行定时任务线程池,线程池相关的核心逻辑都是在后者中实现的 ,前者主要用于实现定时任务或周期性任务逻辑 ,后续的功能交给父类 ThreadPoolExecutor 实现。
为了保证任务是在未来某个时间点执行,ScheduledThreadPoolExecutor 使用的阻塞队列是具有延时特性的阻塞队列,但它并没有直接使用 DelayQueue ,而是自己实现了一个延时阻塞队列,不过跟 DelayQueue 实现原理是一样的。
ScheduledThreadPoolExecutor 相关的 UML 类图如下所示:
使用示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public class ScheduledThreadPoolExecutorDemo { public static void main (String[] args) throws IOException { ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5 ); scheduledThreadPoolExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(true ); System.out.println("existingPeriodicTasksAfterShutdownPolicy: " + scheduledThreadPoolExecutor.getContinueExistingPeriodicTasksAfterShutdownPolicy()); System.out.println("existingDelayedTasksAfterShutdownPolicy: " + scheduledThreadPoolExecutor.getExecuteExistingDelayedTasksAfterShutdownPolicy()); ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> { System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss" )) + " 固定频率执行...." ); }, 1 , 4 , TimeUnit.SECONDS); ScheduledFuture<?> scheduledFuture1 = scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> { System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss" )) + " 固定延迟时间执行...." ); }, 1 , 2 , TimeUnit.SECONDS); ScheduledFuture<?> scheduledFuture2 = scheduledThreadPoolExecutor.schedule(() -> { System.out.println("schedule...runnable..." ); }, 1 , TimeUnit.SECONDS); ScheduledFuture<Object> scheduledFuture3 = scheduledThreadPoolExecutor.schedule(() -> { System.out.println("schedule...callable..." ); return "future" ; }, 1 , TimeUnit.SECONDS); try { Object o = scheduledFuture3.get(); System.out.println(o); } catch (Exception ex) { ex.printStackTrace(); } scheduledThreadPoolExecutor.shutdown(); try { System.out.println("10s 后开始停止线程池,届时所有还未执行的任务会被终止!" ); Thread.sleep(10000 ); } catch (Exception ex) { ex.printStackTrace(); } List<Runnable> runnables = scheduledThreadPoolExecutor.shutdownNow(); System.out.println("还未执行的任务数:" + runnables.size()); } }
ScheduledThreadPoolExecutor 执行的定时任务类型可分为四种 ,上述示例中已全部列出:
定时任务,有返回值;
定时任务,无返回值;
周期性任务,按固定频率重复执行任务,无返回值;
周期性任务,按固定延时重复执行任务,无返回值;
源码分析 ScheduledThreadPoolExecutor 相关的源码结构如下图所示:
ScheduledThreadPoolExecutor 主要通过以下方式对 线程池 进行扩展:
自定义任务类型 ScheduledFutureTask
,统一任务为延迟任务;
自定义延时队列 DelayedWorkQueue
,同 DelayQueue 是无界的延时阻塞队列;
支持可配置的 run-after-shutdown
参数,在线程池关闭后(shutdown)对任务执行进行干预;
属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { private volatile boolean continueExistingPeriodicTasksAfterShutdown; private volatile boolean executeExistingDelayedTasksAfterShutdown = true ; private volatile boolean removeOnCancel = false ; private static final AtomicLong sequencer = new AtomicLong(); }
构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public ScheduledThreadPoolExecutor (int corePoolSize) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor (int corePoolSize, ThreadFactory threadFactory) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor (int corePoolSize, RejectedExecutionHandler handler) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor (int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super (corePoolSize, Integer.MAX_VALUE, 0 , NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
ScheduledThreadPoolExecutor 的构造函数只需注意两点即可:
最终调用父类 ThreadPoolExecutor 的构造方法创建线程池;
使用的阻塞队列是内置的 DelayedWorkQueue ,并且最大线程数固定为 Integer.MAX_VALUE
,且空闲线程最大存活时间为 0;
注意: 从 ScheduledThreadPoolExecutor 的构造方法中,我们可以知道执行定时任务的线程池底层使用的也是 ThreadPoolExecutor 的主流程,只是通过自定义阻塞队列 DelayedWorkQueue 来干预线程池的主流程。
提交延时任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null ) throw new NullPointerException(); RunnableScheduledFuture<?> t = decorateTask(command, new ScheduledFutureTask<Void>(command, null , triggerTime(delay, unit))); delayedExecute(t); return t; } public <V> ScheduledFuture<V> schedule (Callable<V> callable, long delay, TimeUnit unit) { if (callable == null || unit == null ) throw new NullPointerException(); RunnableScheduledFuture<V> t = decorateTask(callable, new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))); delayedExecute(t); return t; }
提交周期性延时任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null ) throw new NullPointerException(); if (period <= 0 ) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>( command, null , triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; } public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { if (command == null || unit == null ) throw new NullPointerException(); if (delay <= 0 ) throw new IllegalArgumentException(); ScheduledFutureTask<Void> sft = new ScheduledFutureTask<Void>(command, null , triggerTime(initialDelay, unit), unit.toNanos(-delay)); RunnableScheduledFuture<Void> t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
提交零延时任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void execute (Runnable command) { schedule(command, 0 , NANOSECONDS); } public Future<?> submit(Runnable task) { return schedule(task, 0 , NANOSECONDS); } public <T> Future<T> submit (Runnable task, T result) { return schedule(Executors.callable(task, result), 0 , NANOSECONDS); } public <T> Future<T> submit (Callable<T> task) { return schedule(task, 0 , NANOSECONDS); }
ScheduledThreadPoolExecutor 重写了 execute 和 submit 方法,通过这两个方法提交的任务会被当成 0 延时的任务进行提交 。
延时调度 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private void delayedExecute (RunnableScheduledFuture<?> task) { if (isShutdown()) reject(task); else { super .getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false ); else ensurePrestart(); } }
延时调度方法 delayedExecute 是 ScheduledThreadPoolExecutor 提交任务最底层的方法,在此之前的流程都是上层延时计算任务的处理。该方法就相当于线程池中的 execute
方法,只是两者的流程不一样,根本原因是 ScheduledThreadPoolExecutor 执行的是延时任务 。
下面对该流程进行说明:
判断线程池是否关闭,关闭则执行拒绝策略以拒绝任务的调度;
将延时计算任务放入队列中。注意,ThreadPollExecutor 是 >=
核心线程数时才添加任务到阻塞队列中。因为这里是定时线程池,任务时间到了才会执行,因此是要从任务队列中获取的。
再次判断线程池状态,如果是 shutdown 状态并且当前任务的特性是遇到线程池关闭则无需执行,那么就取消该任务,必要情况还会从队列中删除。
当前任务可以被执行,那么就需要确保有线程执行任务;
延时计算任务是否可执行 1 2 3 4 5 6 7 8 9 10 11 12 13 boolean canRunInCurrentRunState (boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
判断延时计算任务是否可被执行的情况有两种:
如果线程池处于运行状态,那么任务可被执行;
如果线程池处于关闭状态(shutdown),那么只有该延时计算任务设置了线程池关闭后仍可被执行的情况下才能被执行;
对于第二种情况就是 ScheduledThreadPoolExecutor 的 run-after-shutdown
的参数控制情况,也就是定时线程池提供了两个可配置参数用来控制即使线程池 shutdown 的情况下,是否继续执行延时计算任务。由于延时计算任务有两种,因此提供了两个可配置参数:
continueExistingPeriodicTasksAfterShutdown:
线程池关闭后是否继续执行周期性任务(包括固定频率和固定延时);
executeExistingDelayedTasksAfterShutdown:
线程池关闭后是否继续执行延迟任务;
确保有线程执行任务 1 2 3 4 5 6 7 8 9 10 11 12 13 +--- ThreadPoolExecutor void ensurePrestart () { int wc = workerCountOf(ctl.get()); if (wc < corePoolSize) addWorker(null , true ); else if (wc == 0 ) addWorker(null , false ); }
由于 ScheduledThreadPoolExecutor 是定时线程池,它是先将延时任务放到队列中,然后再创建线程去执行,这个和 ThreadPoolExecutor 的调度流程不同。通过上述方法可以知道,如果设置了核心线程数(>0),那么每次都会创建新的线程直到达到核心线程数;如果没有设置核心线程数(=0),那么线程池中最多只会有一个线程,每次判断线程池空了创建一个就行了。
这也指导我们在使用 ScheduledThreadPoolExecutor 时最好设置合适的核心线程数,虽然构造方法中最大线程数是 Integer.MAX_VALUE
,但事实上最大线程没啥特别的用处,只有当核心线程数为 0 时,才会作为是否能创建线程的条件,而且线程池同时最多有一个非核心线程。如果设置了核心线程数,那么就不会创建非核心线程了。
重新排队周期任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void reExecutePeriodic (RunnableScheduledFuture<?> task) { if (canRunInCurrentRunState(true )) { super .getQueue().add(task); if (!canRunInCurrentRunState(true ) && remove(task)) task.cancel(false ); else ensurePrestart(); } }
上述方法用于 ScheduledThreadPoolExecutor 调度的是一个周期性任务,当任务执行完毕后,任务会重新被加入队列中等待调度。
取消并清除不适应shutdown的任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 void onShutdown () { BlockingQueue<Runnable> q = super .getQueue(); boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) { for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture<?>) ((RunnableScheduledFuture<?>) e).cancel(false ); q.clear(); } else { for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture<?> t = (RunnableScheduledFuture<?>) e; if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { if (q.remove(t)) t.cancel(false ); } } } } tryTerminate(); }
在线程池 shutdown 时会取消并清除不适应线程池关闭的所有任务。也就是说,如果 continueExistingPeriodicTasksAfterShutdown 或者 continueExistingPeriodicTasksAfterShutdown 为 true,那么表示线程池关闭也会执行任务;为 false ,那么在关闭的时候就会移除并取消任务。
内置任务 - ScheduledFutureTask ScheduledThreadPoolExecutor 自定义了 ScheduledFutureTask 类型的任务,用于描述延时计算任务。该类的 UML 类图如下:
属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 +--- ScheduledFutureTask private final long sequenceNumber;private long time;private final long period;RunnableScheduledFuture<V> outerTask = this ; int heapIndex;
sequenceNumber: 优先级排序使用,当执行时间 time 相同的情况下,越早加入到 ScheduledThreadPoolExecutor 中的任务优先级越高;
time: 延时任务执行的时间;
period: 任务的周期性特征,具体见注释;
构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 ScheduledFutureTask(Runnable r, V result, long ns) { super (r, result); this .time = ns; this .period = 0 ; this .sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Runnable r, V result, long ns, long period) { super (r, result); this .time = ns; this .period = period; this .sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Callable<V> callable, long ns) { super (callable); this .time = ns; this .period = 0 ; this .sequenceNumber = sequencer.getAndIncrement(); }
从构造方法可以看出,ScheduledFutureTask 主要是基于 FutureTask 异步计算任务进行扩展的;
元素优先级 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 +--- ScheduledFutureTask public int compareTo (Delayed other) { if (other == this ) return 0 ; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other; long diff = time - x.time; if (diff < 0 ) return -1 ; else if (diff > 0 ) return 1 ; else if (sequenceNumber < x.sequenceNumber) return -1 ; else return 1 ; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0 ) ? -1 : (diff > 0 ) ? 1 : 0 ; }
上述 ScheduledFutureTask 的比较方法将用于加入内置队列时判断元素的优先级,也就是最先执行的任务排在前面。
任务体 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 +--- ScheduledFutureTask public void run () { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false ); else if (!periodic) ScheduledFutureTask.super .run(); else if (ScheduledFutureTask.super .runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } }
ScheduledFutureTask 可能是延时任务,也可能是周期性延时任务,因此需要根据情况执行不同的分支逻辑。
判断即使线程池关闭(shutdown)后是否可以继续执行 ScheduledFutureTask ;
如果是延时任务,直接调用父类的 run 方法执行任务即可;
如果是周期性延时任务,直接调用父类的 runAndReset 方法执行并重置任务,然后重新计算任务执行时间并加入到阻塞队列中。
周期性时间 1 2 3 4 5 6 7 8 9 10 11 12 13 14 +--- ScheduledFutureTask private void setNextRunTime () { long p = period; if (p > 0 ) time += p; else time = triggerTime(-p); }
上述方法只针对周期性任务,一次性延时任务不会用到该方法。
注意: 固定频率的周期任务 period
的值在提交任务时被设置为正数,固定延时时间的周期任务 period
的值在提交任务时被设置为负数;
取消任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public boolean cancel (boolean mayInterruptIfRunning) { boolean cancelled = super .cancel(mayInterruptIfRunning); if (cancelled && removeOnCancel && heapIndex >= 0 ) remove(this ); return cancelled; }
ScheduledFutureTask 重写了父类 FutureTask 中的取消任务方法,在此基础上加入了从内置队列中移除元素的逻辑,因为任务取消了,那么在队列中的该任务也就无需执行。
内置延时队列 - DelayedWorkQueue 我们知道,阻塞队列对于线程池来说是非常重要的,不仅可以缓冲任务,还可以根据阻塞队列的特性调整线程池的调度流程 。ScheduledThreadPoolExecutor 是一个延时线程池,这个延时的特性其实就体现在它所使用的队列上 。这就要求这个队列不能像普通的阻塞队列那样,队列中即使有任务,如果任务没有到期是不能拿出来的。从这个特点也解释了前面介绍的延时线程池的调度流程,先把任务放到队列中然后再确保线程执行该任务,目的就是为了延时执行 。
具体对应到线程池中的执行逻辑如下,从队列中获取任务时:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 +--- ThreadPoolExecutor#getTask private Runnable getTask () { boolean timedOut = false ; for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null ; } int wc = workerCountOf(c); boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null ; continue ; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null ) return r; timedOut = true ; } catch (InterruptedException retry) { timedOut = false ; } } }
ScheduledThreadPoolExecutor 为了控制任务在某个时刻执行 ,自己实现了一个和 DelayQueue 功能一致的内部延时队列 DelayedWorkQueue 。区别点是 DelayQueue 内部使用的是优先级队列 PriorityQueue ,延时逻辑由自身实现。而 DelayedWorkQueue 将优先级和延时特性重新实现了一遍,即将 PriorityQueue 和 DelayQueue 逻辑在一个类中实现,本质上同 DelayQueue 的特性和逻辑,源码部分就不再详细说明,参考 DelayQueue 即可。
小结 ScheduledThreadPoolExecutor 是基于 ThreadPoolExecutor 扩展的、可执行定时任务 的线程池。前者重写了 ThreadPoolExecutor.execute 的调度流程,遵循任务先加入队列原则,根本原因是 ScheduledThreadPoolExecutor 是执行定时任务的线程池。
ScheduledThreadPoolExecutor 支持定时任务和周期性任务。对于定时任务,是通过内部的延时队列 DelayedWorkQueue 来实现的;对于周期性任务,是通过在任务执行后再次将任务加入到延时队列中来实现的。可见,延时队列在定时任务执行过程中的重要性。
ScheduledThreadPoolExecutor 通过内部的 ScheduledFutureTask 对任务进行了统一封装,使任务具备异步计算、延时 的特性,这不仅是 ScheduledThreadPoolExecutor 的要求,也是内部延时队列 DelayedWorkQueue 的要求。