并发 - ScheduledThreadPoolExecutor

概述

ScheduledThreadPoolExecutor 是基于 ThreadPoolExecutor 扩展的执行定时任务线程池,线程池相关的核心逻辑都是在后者中实现的,前者主要用于实现定时任务或周期性任务逻辑,后续的功能交给父类 ThreadPoolExecutor 实现。

为了保证任务是在未来某个时间点执行,ScheduledThreadPoolExecutor 使用的阻塞队列是具有延时特性的阻塞队列,但它并没有直接使用 DelayQueue ,而是自己实现了一个延时阻塞队列,不过跟 DelayQueue 实现原理是一样的。

ScheduledThreadPoolExecutor 相关的 UML 类图如下所示:

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class ScheduledThreadPoolExecutorDemo {

public static void main(String[] args) throws IOException {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(5);
// 设置 线程池关闭后(shutdown)继续执行存在的周期性任务
scheduledThreadPoolExecutor.setContinueExistingPeriodicTasksAfterShutdownPolicy(true);
// scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(true);

// 线程池关闭后(shutdown)是否继续执行存在的周期性任务(包括 fixedRate/fixedDelay)
System.out.println("existingPeriodicTasksAfterShutdownPolicy: " + scheduledThreadPoolExecutor.getContinueExistingPeriodicTasksAfterShutdownPolicy());
// 线程关闭后(shutdown)是否继续执行存在的延迟任务
System.out.println("existingDelayedTasksAfterShutdownPolicy: " + scheduledThreadPoolExecutor.getExecuteExistingDelayedTasksAfterShutdownPolicy());


// 1 固定频率 - 每 2s 执行一次
ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.scheduleAtFixedRate(() -> {
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss")) + " 固定频率执行....");
}, 1, 4, TimeUnit.SECONDS);


// 2 固定延迟时间 - 每 3s 执行一次
ScheduledFuture<?> scheduledFuture1 = scheduledThreadPoolExecutor.scheduleWithFixedDelay(() -> {
System.out.println(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd hh:mm:ss")) + " 固定延迟时间执行....");
}, 1, 2, TimeUnit.SECONDS);


// 3 延迟调度 - 执行无返回值任务
ScheduledFuture<?> scheduledFuture2 = scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("schedule...runnable...");
}, 1, TimeUnit.SECONDS);


// 4 延迟调度 - 执行有返回值任务
ScheduledFuture<Object> scheduledFuture3 = scheduledThreadPoolExecutor.schedule(() -> {
System.out.println("schedule...callable...");
return "future";
}, 1, TimeUnit.SECONDS);
try {
Object o = scheduledFuture3.get();
System.out.println(o);
} catch (Exception ex) {
ex.printStackTrace();
}

// 5 关闭线程池
scheduledThreadPoolExecutor.shutdown();

try {
System.out.println("10s 后开始停止线程池,届时所有还未执行的任务会被终止!");
Thread.sleep(10000);
} catch (Exception ex) {
ex.printStackTrace();
}

// 6 停止线程池
List<Runnable> runnables = scheduledThreadPoolExecutor.shutdownNow();
System.out.println("还未执行的任务数:" + runnables.size());
}
}

ScheduledThreadPoolExecutor 执行的定时任务类型可分为四种,上述示例中已全部列出:

  • 定时任务,有返回值;
  • 定时任务,无返回值;
  • 周期性任务,按固定频率重复执行任务,无返回值;
  • 周期性任务,按固定延时重复执行任务,无返回值;

源码分析

ScheduledThreadPoolExecutor 相关的源码结构如下图所示:

ScheduledThreadPoolExecutor 主要通过以下方式对 线程池 进行扩展:

  • 自定义任务类型 ScheduledFutureTask,统一任务为延迟任务;
  • 自定义延时队列 DelayedWorkQueue,同 DelayQueue 是无界的延时阻塞队列;
  • 支持可配置的 run-after-shutdown 参数,在线程池关闭后(shutdown)对任务执行进行干预;

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {

/**
* 线程池关闭后(shutdown)是否继续执行存在的周期性任务(包括 fixedRate/fixedDelay),默认为 flase
*/
private volatile boolean continueExistingPeriodicTasksAfterShutdown;

/**
* 线程池关闭后(shutdown)是否继续执行存在的延迟任务,默认为 true
*/
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

/**
* 取消任务时,是否立即从队列中删除
*/
private volatile boolean removeOnCancel = false;

/**
* 用于生成任务添加到 ScheduledThreadPoolExecutor 中的序号
*/
private static final AtomicLong sequencer = new AtomicLong();
}

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ScheduledThreadPoolExecutor(int corePoolSize) {
// 最大线程数为 Integer.MAX_VALUE;空闲线程存活时间为 0 ;阻塞队列为 ScheduledThreadPoolExecutor 自定义的延时队列 DelayedWorkQueue ;
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}

public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}

ScheduledThreadPoolExecutor 的构造函数只需注意两点即可:

  • 最终调用父类 ThreadPoolExecutor 的构造方法创建线程池;
  • 使用的阻塞队列是内置的 DelayedWorkQueue ,并且最大线程数固定为 Integer.MAX_VALUE,且空闲线程最大存活时间为 0;

注意:从 ScheduledThreadPoolExecutor 的构造方法中,我们可以知道执行定时任务的线程池底层使用的也是 ThreadPoolExecutor 的主流程,只是通过自定义阻塞队列 DelayedWorkQueue 来干预线程池的主流程。

提交延时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/**
* 执行延迟任务
*
* @param command 没有返回值的任务
* @param delay 延时时间粒度
* @param unit 延时时间粒度的单位
*
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
// 参数校验
if (command == null || unit == null)
throw new NullPointerException();

// 执行钩子方法
RunnableScheduledFuture<?> t = decorateTask(command,
// 将普通的任务装饰成 ScheduledFutureTask
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));

// 延时执行
delayedExecute(t);

// 返回延时计算任务
return t;
}

/**
* 执行延迟任务
*
* @param callable 有返回值的任务
* @param delay 延时时间粒度
* @param unit 延时时间粒度的单位
*
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
// 参数校验
if (callable == null || unit == null)
throw new NullPointerException();

// 执行钩子方法
RunnableScheduledFuture<V> t = decorateTask(callable,
// 将普通的任务装饰成 ScheduledFutureTask
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));

// 延时执行
delayedExecute(t);

// 返回延时计算任务
return t;
}

提交周期性延时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

/**
* 提交一个按固定频率执行的任务
*
* @param command 没有返回值的任务
* @param initialDelay 初始延时时间
* @param period 频率
* @param unit 延时时间粒度的单位
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {

// 参数判断
if (command == null || unit == null)
throw new NullPointerException();
// 必须是固定频率(该方法就是按固定频率执行任务)
if (period <= 0)
throw new IllegalArgumentException();

// 将普通的任务装饰成 ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(
command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));

// 钩子方法,可在任务执行之前进行干预
RunnableScheduledFuture<Void> t = decorateTask(command, sft);

// 保存任务 t,用于周期性执行
sft.outerTask = t;

// 延时执行
delayedExecute(t);

// 返回延时计算任务
return t;
}

/**
* 提交一个固定延迟的任务
*
* @param command 没有返回值的任务
* @param initialDelay 初始延时时间
* @param delay 固定延时时间
* @param unit 延时时间粒度的单位
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
// 参数判断
if (command == null || unit == null)
throw new NullPointerException();

// 固定延迟时间校验
if (delay <= 0)
throw new IllegalArgumentException();

// 将普通的任务装饰成 ScheduledFutureTask
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay)); // 注意这里是个负数,用于判断周期性任务属于哪一种

// 钩子方法,可在任务执行之前进行干预
RunnableScheduledFuture<Void> t = decorateTask(command, sft);

// 保存任务 t,用于周期性执行
sft.outerTask = t;

// 延时执行
delayedExecute(t);

// 返回延时计算任务
return t;
}

提交零延时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void execute(Runnable command) {
schedule(command, 0, NANOSECONDS);
}

// Override AbstractExecutorService methods

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}

/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}

ScheduledThreadPoolExecutor 重写了 execute 和 submit 方法,通过这两个方法提交的任务会被当成 0 延时的任务进行提交

延时调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
* @param task the task
*/
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果线程池关闭了,执行拒绝策略
if (isShutdown())
reject(task);

// 线程池未关闭
else {
// 先把任务放到队列中(这个队列本质上是一个优先级队列)
super.getQueue().add(task);

// 再次检查线程池状态,必要时取消并删除任务
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);

// 确保有线程执行任务
else
ensurePrestart();
}
}

延时调度方法 delayedExecute 是 ScheduledThreadPoolExecutor 提交任务最底层的方法,在此之前的流程都是上层延时计算任务的处理。该方法就相当于线程池中的 execute 方法,只是两者的流程不一样,根本原因是 ScheduledThreadPoolExecutor 执行的是延时任务

下面对该流程进行说明:

  1. 判断线程池是否关闭,关闭则执行拒绝策略以拒绝任务的调度;
  2. 将延时计算任务放入队列中。注意,ThreadPollExecutor 是 >= 核心线程数时才添加任务到阻塞队列中。因为这里是定时线程池,任务时间到了才会执行,因此是要从任务队列中获取的。
  3. 再次判断线程池状态,如果是 shutdown 状态并且当前任务的特性是遇到线程池关闭则无需执行,那么就取消该任务,必要情况还会从队列中删除。
  4. 当前任务可以被执行,那么就需要确保有线程执行任务;

延时计算任务是否可执行

1
2
3
4
5
6
7
8
9
10
11
12
13
boolean canRunInCurrentRunState(boolean periodic) {
/*
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
*/
return isRunningOrShutdown(periodic ?
// 线程池关闭后是否继续执行周期性任务
continueExistingPeriodicTasksAfterShutdown :
// 线程池关闭后是否继续执行延迟任务
executeExistingDelayedTasksAfterShutdown);
}

判断延时计算任务是否可被执行的情况有两种:

  • 如果线程池处于运行状态,那么任务可被执行;
  • 如果线程池处于关闭状态(shutdown),那么只有该延时计算任务设置了线程池关闭后仍可被执行的情况下才能被执行;

对于第二种情况就是 ScheduledThreadPoolExecutor 的 run-after-shutdown 的参数控制情况,也就是定时线程池提供了两个可配置参数用来控制即使线程池 shutdown 的情况下,是否继续执行延时计算任务。由于延时计算任务有两种,因此提供了两个可配置参数:

  • continueExistingPeriodicTasksAfterShutdown:线程池关闭后是否继续执行周期性任务(包括固定频率和固定延时);
  • executeExistingDelayedTasksAfterShutdown: 线程池关闭后是否继续执行延迟任务;

确保有线程执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
+--- ThreadPoolExecutor
void ensurePrestart() {
// 线程池线程数
int wc = workerCountOf(ctl.get());

// 当前线程池中线程数未达到核心线程数,则创建核心线程
if (wc < corePoolSize)
addWorker(null, true);

// 这里处理核心线程数为 0 的情况,保证线程池至少有一个线程
else if (wc == 0)
addWorker(null, false);
}

由于 ScheduledThreadPoolExecutor 是定时线程池,它是先将延时任务放到队列中,然后再创建线程去执行,这个和 ThreadPoolExecutor 的调度流程不同。通过上述方法可以知道,如果设置了核心线程数(>0),那么每次都会创建新的线程直到达到核心线程数;如果没有设置核心线程数(=0),那么线程池中最多只会有一个线程,每次判断线程池空了创建一个就行了。

这也指导我们在使用 ScheduledThreadPoolExecutor 时最好设置合适的核心线程数,虽然构造方法中最大线程数是 Integer.MAX_VALUE,但事实上最大线程没啥特别的用处,只有当核心线程数为 0 时,才会作为是否能创建线程的条件,而且线程池同时最多有一个非核心线程。如果设置了核心线程数,那么就不会创建非核心线程了。

重新排队周期任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 重新排队周期任务,除非当前运行状态排除它。
*
* 除了丢弃任务而不是拒绝,其他的与 delayedExecute 方法基本相同
*
* @param task the task
*/
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 判断是否可以该运行任务
if (canRunInCurrentRunState(true)) {

// 将任务再次添加到任务队列中
super.getQueue().add(task);

// 再次检查是否可以运行任务,如果不能运行则移除并取消任务
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);

else
// 确保有线程执行任务
ensurePrestart();
}
}

上述方法用于 ScheduledThreadPoolExecutor 调度的是一个周期性任务,当任务执行完毕后,任务会重新被加入队列中等待调度。

取消并清除不适应shutdown的任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
// 延迟任务关闭策略
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();

// 周期任务关闭策略
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();

// 不支持线程池关闭执行,则取消并移除相关任务
if (!keepDelayed && !keepPeriodic) {
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
} else {
// Traverse snapshot to avoid iterator exceptions
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>) e;
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled
if (q.remove(t))
t.cancel(false);
}
}
}
}

// 尝试终止线程池
tryTerminate();
}

在线程池 shutdown 时会取消并清除不适应线程池关闭的所有任务。也就是说,如果 continueExistingPeriodicTasksAfterShutdown 或者 continueExistingPeriodicTasksAfterShutdown 为 true,那么表示线程池关闭也会执行任务;为 false ,那么在关闭的时候就会移除并取消任务。

内置任务 - ScheduledFutureTask

ScheduledThreadPoolExecutor 自定义了 ScheduledFutureTask 类型的任务,用于描述延时计算任务。该类的 UML 类图如下:

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
+--- ScheduledFutureTask
/**
* 当前任务被添加到 ScheduledThreadPoolExecutor 中的序号
*/
private final long sequenceNumber;

/**
* 以 nanoTime 为单位,任务被执行的时间
*/
private long time;

/**
* 重复任务的周期(以纳秒为单位):
*
* 1. 正值表示固定频率执行。
* 2. 负值表示固定延迟执行。
* 3. 值 0 表示非重复任务。
*/
private final long period;

/**
* 由 reExecutePeriodic 重新入队的实际任务,用于周期性任务
*/
RunnableScheduledFuture<V> outerTask = this;

/**
* 索引到延迟队列,以支持更快的取消。
*/
int heapIndex;
  • sequenceNumber: 优先级排序使用,当执行时间 time 相同的情况下,越早加入到 ScheduledThreadPoolExecutor 中的任务优先级越高;
  • time: 延时任务执行的时间;
  • period: 任务的周期性特征,具体见注释;

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ScheduledFutureTask(Runnable r, V result, long ns) {
// FutureTask 的构造方法
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
// FutureTask 的构造方法
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Callable<V> callable, long ns) {
// FutureTask 的构造方法
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}

从构造方法可以看出,ScheduledFutureTask 主要是基于 FutureTask 异步计算任务进行扩展的;

元素优先级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
+--- ScheduledFutureTask 
/**
* 元素优先级比较
*
* 1. time 小的排在前面(时间早的任务将被先执行)
* 2. 如果两个任务的 time 相同,那么比较 sequenceNumber ,sequenceNumber 小的排在前面(如果两个任务的执行时间相同,那么先提交的任务将被先执行)
*
* @param other
* @return
*/
public int compareTo(Delayed other) {
// 元素相同
if (other == this) // compare zero if same object
return 0;

// ScheduledFutureTask 类型的,则先比较 time 大小,无法区分大小再比较 sequenceNumber
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>) other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}

// 比较剩余延时时间
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

上述 ScheduledFutureTask 的比较方法将用于加入内置队列时判断元素的优先级,也就是最先执行的任务排在前面。

任务体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
+--- ScheduledFutureTask
/**
* 覆盖 FutureTask 的方法,以便定期重置 requeue。
*/
public void run() {
// 判断是否周期性任务
boolean periodic = isPeriodic();

// 判断是否可以运行任务,不可以运行就取消并移除任务
if (!canRunInCurrentRunState(periodic))
cancel(false);

// 如果是一次性任务,直接调用父类的 run 方法,这个方法实际是 FutureTask 的方法
else if (!periodic)
ScheduledFutureTask.super.run();

// 如果是周期性任务,调用父类的 runAndReset() 方法,执行并重复任务,这个父类是 FutureTask 的方法
// runAndReset 和 run 方法类似,只是其任务运行完毕后不会修改 NEW 状态
// todo 如果任务执行异常,那么就不会继续周期性执行。注意,线程并没有退出。
else if (ScheduledFutureTask.super.runAndReset()) {
// 设置下次任务执行的时间
setNextRunTime();

// 重新排队周期任务
reExecutePeriodic(outerTask);
}
}

ScheduledFutureTask 可能是延时任务,也可能是周期性延时任务,因此需要根据情况执行不同的分支逻辑。

  • 判断即使线程池关闭(shutdown)后是否可以继续执行 ScheduledFutureTask ;
  • 如果是延时任务,直接调用父类的 run 方法执行任务即可;
  • 如果是周期性延时任务,直接调用父类的 runAndReset 方法执行并重置任务,然后重新计算任务执行时间并加入到阻塞队列中。

周期性时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+--- ScheduledFutureTask
/**
* 设置下一次运行周期性任务的时间。
*/
private void setNextRunTime() {
long p = period;
// scheduleAtFixedRate 方式,以上一个任务开始时间为基础,计算下次触发时间
if (p > 0)
time += p;

// scheduleWithFixedDelay 方式,以上个任务结束时间即当前时间为基础,计算下次触发时间
else
time = triggerTime(-p);
}

上述方法只针对周期性任务,一次性延时任务不会用到该方法。

注意:固定频率的周期任务 period 的值在提交任务时被设置为正数,固定延时时间的周期任务 period 的值在提交任务时被设置为负数;

取消任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 取消任务
*
* @param mayInterruptIfRunning
* @return

*/
public boolean cancel(boolean mayInterruptIfRunning) {
// 调用父类 FutureTask 的 cancel 方法
boolean cancelled = super.cancel(mayInterruptIfRunning);

// 取消成功后,判断是否立即移除队列中的任务
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);

return cancelled;
}

ScheduledFutureTask 重写了父类 FutureTask 中的取消任务方法,在此基础上加入了从内置队列中移除元素的逻辑,因为任务取消了,那么在队列中的该任务也就无需执行。

内置延时队列 - DelayedWorkQueue

我们知道,阻塞队列对于线程池来说是非常重要的,不仅可以缓冲任务,还可以根据阻塞队列的特性调整线程池的调度流程。ScheduledThreadPoolExecutor 是一个延时线程池,这个延时的特性其实就体现在它所使用的队列上。这就要求这个队列不能像普通的阻塞队列那样,队列中即使有任务,如果任务没有到期是不能拿出来的。从这个特点也解释了前面介绍的延时线程池的调度流程,先把任务放到队列中然后再确保线程执行该任务,目的就是为了延时执行

具体对应到线程池中的执行逻辑如下,从队列中获取任务时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
+--- ThreadPoolExecutor#getTask
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (; ; ) {

// 获取线程池状态码
int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);

// 线程池关闭且队列为空,应该回收线程。这个条件不仅可以回收非核心线程,也可以回收核心线程。todo 核心线程唯一回收条件
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 减少线程池中的线程数
decrementWorkerCount();
return null;
}

// 线程池中的线程数
int wc = workerCountOf(c);

// 允许核心线程数内的线程回收,或线程池中的线程数超过了核心线程数,那么有可能发生超时关闭
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 如果线程池中的线程数大于最大线程数或获取任务超时(不设置 allowCoreThreadTimeOut,核心线程没有超时概念),并且任务队列为空,则应该回收当前线程。
// wc > maximumPoolSize ,可能是执行 setMaximumPoolSize 方法修改了最大值。
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
// 减少工作线程数
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 从队列中取出任务
// 注意,真正响应中断是在 poll() 方法或者 take() 方法中
Runnable r = timed ?
// 超时获取任务,因为线程超时要被回收。如果线程在等待的过程发生了中断,会抛出中断异常
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 不需要超时
workQueue.take();
if (r != null)
return r;

// 获取任务超时,进行重试
timedOut = true;

} catch (InterruptedException retry) {
// 发生中断重置超时标记
timedOut = false;
}
}
}

ScheduledThreadPoolExecutor 为了控制任务在某个时刻执行,自己实现了一个和 DelayQueue 功能一致的内部延时队列 DelayedWorkQueue 。区别点是 DelayQueue 内部使用的是优先级队列 PriorityQueue ,延时逻辑由自身实现。而 DelayedWorkQueue 将优先级和延时特性重新实现了一遍,即将 PriorityQueue 和 DelayQueue 逻辑在一个类中实现,本质上同 DelayQueue 的特性和逻辑,源码部分就不再详细说明,参考 DelayQueue 即可。

小结

ScheduledThreadPoolExecutor 是基于 ThreadPoolExecutor 扩展的、可执行定时任务的线程池。前者重写了 ThreadPoolExecutor.execute 的调度流程,遵循任务先加入队列原则,根本原因是 ScheduledThreadPoolExecutor 是执行定时任务的线程池。

ScheduledThreadPoolExecutor 支持定时任务和周期性任务。对于定时任务,是通过内部的延时队列 DelayedWorkQueue 来实现的;对于周期性任务,是通过在任务执行后再次将任务加入到延时队列中来实现的。可见,延时队列在定时任务执行过程中的重要性。

ScheduledThreadPoolExecutor 通过内部的 ScheduledFutureTask 对任务进行了统一封装,使任务具备异步计算、延时的特性,这不仅是 ScheduledThreadPoolExecutor 的要求,也是内部延时队列 DelayedWorkQueue 的要求。