时间轮算法 - HashedWheelTimer

概述

时间轮是一种高效利用线程资源来进行批量化调度的一种调度模型。将大批量的调度任务全部都绑定到同一个调度器上,使用这一调度器来进行所有任务的管理、触发以及执行。本篇文章将对 HashedWheelTimer 进行分析。

示例

编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;

import java.util.concurrent.TimeUnit;

public class HashedWheelTimerClient {
public static void main(String[] args) {

// 1 创建一个 HashedWheelTimer,内部参数全部使用默认值
Timer timer = new HashedWheelTimer();

// 2 编写 TimeTask 任务
TimerTask timerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println(Thread.currentThread().getName() + "is working !");
}
};

// 3 提交任务
// 创建 HashedWheelTimeout 对象,并将该对象放入任务队列中,等待被加入到 Hash 轮中被调用。
Timeout timeout = timer.newTimeout(timerTask, 1, TimeUnit.SECONDS);

}
}

调度结果

1
pool-1-thread-1is working !

算法简介

时间轮调度模型如下图所示:

HashedWheelTimer 时间轮算法可以通过上图来描述。假设时间轮大小为 8 即 8 个格子,1s 转一格,每格都对应一个链表,链表每个节点都保存着待执行的任务。某一时刻,时间轮走到编号为 2 的格子,此时添加了一个 3s 后执行的任务,对应 3 个格子,则 2 + 3 = 5,在编号为 5 的格子对应链表中添加一个任务节点即可,轮次 round 为 0 ;如果添加一个 10s 后执行的任务,同理得 (2 + 10) % 8 = 4,在编号为 4 的格子对应的链表中添加一个任务节点,并标识轮次 round 为 1,当时间轮第二次经过编号为 4 的格子时就会执行该任务。注意,时间轮只会执行 round = 0 的任务,并会把该格子上的其他任务的 round 减 1 。

时间轮算法的原理还是非常容易理解的,下面我们从源码层面进行分析。

源码分析

HashedWheelTimer 相关的核心类图如下:

Timer

HashedWheelTimer 是接口 io.netty.util.Timer 的实现,Timer 是任务调度器,负责对延时任务进行管理、触发和调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public interface Timer {

/**
* 调度指定的 TimerTask ,在指定的延迟后执行
*
* @return 与指定任务相关联的句柄
* @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already
* @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout
* can cause instability in the system.
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);

/**
* 停止所有的还没被执行的定时任务
*
* @return 与被此方法取消的任务相关联的句柄
*/
Set<Timeout> stop();
}

TimerTask

延时任务,由业务方自行实现。Timer 会在触发时间对延时任务进行调度。

1
2
3
4
5
6
7
8
9
10
11
12
/**
* Timer 调度的任务,由业务方实现
*/
public interface TimerTask {
/**
* Executed after the delay specified with
* {@link Timer#newTimeout(TimerTask, long, TimeUnit)}.
*
* @param timeout 任务执行的时候会将该任务对应的 Timeout 传进来
*/
void run(Timeout timeout) throws Exception;
}

Timeout

Timeout 是一个非常重要的接口,它的唯一实现类是 HashedWheelTimer 内部类 HashedWheelTimeout ,该内部类聚合了时间轮主要的核心对象,关于该内部类下文会详细分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 与{@link TimerTask}关联的句柄,由{@link Timer}返回。
*/
public interface Timeout {

/**
* 返回创建此句柄的{@link Timer}。
*/
Timer timer();

/**
* 返回与此句柄关联的{@link TimerTask}。
*/
TimerTask task();

/**
* 当且仅当与此句柄关联的{@link TimerTask}已过期时返回 true
*/
boolean isExpired();

/**
* 当且仅当与此句柄关联的{@link TimerTask}已被取消时返回 true
*/
boolean isCancelled();

/**
* 尝试取消与此句柄关联的{@link TimerTask}。如果任务已经被执行或取消,它将返回而没有副作用。
*
* @return 如果取消成功,则为 true,否则为 false
*/
boolean cancel();
}

HashedWheelTimer

HashedWheelTimer 是对 Timer 的实现,也就是我们说的时间轮。

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class HashedWheelTimer implements Timer {

// HashedWheelTimer 实例统计原子变量
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger();

// 过多 HashedWheelTimer 阈值开关
private static final AtomicBoolean WARNED_TOO_MANY_INSTANCES = new AtomicBoolean();

// HashedWheelTimer 数量的阈值
private static final int INSTANCE_COUNT_LIMIT = 64;

// 最小延时时间,默认是 1 毫秒
private static final long MILLISECOND_NANOS = TimeUnit.MILLISECONDS.toNanos(1);

private static final ResourceLeakDetector<HashedWheelTimer> leakDetector = ResourceLeakDetectorFactory.instance()
.newResourceLeakDetector(HashedWheelTimer.class, 1);

// 时间轮状态,可以控制工作线程执行任务的状态。
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState");
private final ResourceLeakTracker<HashedWheelTimer> leak;

// 工作任务
private final Worker worker = new Worker();

// 工作线程
private final Thread workerThread;

// 状态 0 - init, 1 - started, 2 - shut down
public static final int WORKER_STATE_INIT = 0;
public static final int WORKER_STATE_STARTED = 1;
public static final int WORKER_STATE_SHUTDOWN = 2;
@SuppressWarnings({"unused", "FieldMayBeFinal"})
private volatile int workerState; // 0 - init, 1 - started, 2 - shut down

// 走一个 bucket 需要花费的纳秒时长
private final long tickDuration;

// bucket 数组,用于存储任务,即 HashedWheelTimeout 实例们
private final HashedWheelBucket[] wheel;

// 掩码,用于 与运算 ,计算属于 wheel 哪个下标
private final int mask;

// 调用 newTimeout 方法线程等待工作线程 workerThread 开启执行任务
private final CountDownLatch startTimeInitialized = new CountDownLatch(1);

// HashedWheelTimeout 任务队列。适用于这里的多生产线程,单消费线程的场景
// 提交的任务会先进入到该队列中,每次 tick 才会将队列中的任务(一次最多 10 万个)加入到 bucket 中的链表里
private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();

// HashedWheelTimeout 任务取消队列
// 取消的任务会加入到该队列中,此次 tick 会将该队列中的任务从 bucket 中移除
private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();

// 时间轮中处于等待执行的任务数
private final AtomicLong pendingTimeouts = new AtomicLong(0);

// 允许最大的等待任务数
private final long maxPendingTimeouts;

// 工作线程启动时间,作为时间轮的基准时间
private volatile long startTime;
}

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
+--- HashedWheelTimer

/*----------------- 系列构造方法 -------------------*/

public HashedWheelTimer() {
this(Executors.defaultThreadFactory());
}

public HashedWheelTimer(long tickDuration, TimeUnit unit) {
this(Executors.defaultThreadFactory(), tickDuration, unit);
}

public HashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel);
}

public HashedWheelTimer(ThreadFactory threadFactory) {
this(threadFactory, 100, TimeUnit.MILLISECONDS);
}


public HashedWheelTimer(
ThreadFactory threadFactory, long tickDuration, TimeUnit unit) {
this(threadFactory, tickDuration, unit, 512);
}


public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}


public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection) {
this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1);
}

/**
* 创建 HashedWheelTimer 对象
*
* @param threadFactory 线程工厂,用于创建执行 TimerTask 任务的工作线程
* @param tickDuration tick 之间的持续时间,即一次 tick 的时间长度。默认是 100
* @param unit tickDuration 的时间单位。默认是 毫秒
* @param ticksPerWheel 定义一圈有多少个 bucket 。 默认是 512
* @param leakDetection 用于追踪内存泄漏
* @param maxPendingTimeouts 最大允许等待的任务数,也就是 Timeout 实例数(调用 newTimeout 方法产生),可以根据该参数控制不允许太多的任务等待。
* 如果未执行任务数达到阈值,那么再次提交任务会抛出 RejectedExecutionException 异常。如果该值为 0 或 负数,则不限制。
* 默认不限制。
*/
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {

// 1 参数校验
ObjectUtil.checkNotNull(threadFactory, "threadFactory");
ObjectUtil.checkNotNull(unit, "unit");
ObjectUtil.checkPositive(tickDuration, "tickDuration");
ObjectUtil.checkPositive(ticksPerWheel, "ticksPerWheel");

// 2 创建时间轮 bucket 结构,这里做向上取整,保证 bucket 数组长度是 2 的 n 次方
// wheel 就是一个个 bucket。时间轮会以循环的方式走这个 wheel 数组
wheel = createWheel(ticksPerWheel);

// 3 掩码,bucket - 1,用来做取模,计算任务应该放到哪个 bucket 中
// HashMap 在进行 hash 之后,进行index的hash寻址寻址的算法也是和这个一样的
mask = wheel.length - 1;

// 4 将延迟时间统一转为纳秒
long duration = unit.toNanos(tickDuration);

// 5 防止延迟时间溢出
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}

// 6 延迟时间不能小于 1 毫秒
if (duration < MILLISECOND_NANOS) {
logger.warn("Configured tickDuration {} smaller then {}, using 1ms.",
tickDuration, MILLISECOND_NANOS);
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}

// 7 根据线程工厂创建线程。注意,这里并没有立即启动线程,启动线程是在第一次提交延迟任务的时候。
workerThread = threadFactory.newThread(worker);

// 追踪内存泄露的,略
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

// 8 最大允许等待的 Timeout 实例数
this.maxPendingTimeouts = maxPendingTimeouts;

// 9 如果超过 64 个 HashedWheelTimer 实例,它会打印错误日志提醒你
// 因为时间轮是一个非常耗费资源的结构,所以一个 jvm 中的实例数目不能太高
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
// 打印错误日志
reportTooManyInstances();
}
}

通过以上构造方法可以初始化一个时间轮对象,默认情况下,时间轮大小是 512,也就是一圈有 512 个 bucket,走一个 bucket 需要时间为 100ms 。

初始化 bucket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
+--- HashedWheelTimer

/**
* 初始化时间轮 bucket 数组,用来存储任务
*
* @param ticksPerWheel 一圈有多少个 bucket ,默认是 512
* @return
*/
private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException(
"ticksPerWheel must be greater than 0: " + ticksPerWheel);
}

// 时间轮 tick 不能多大
if (ticksPerWheel > 1073741824) {
throw new IllegalArgumentException(
"ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
}

// 标准化时间轮大小
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);

// 创建 HashedWheelBucket 数组
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for (int i = 0; i < wheel.length; i++) {
wheel[i] = new HashedWheelBucket();
}
return wheel;
}

/**
* 标准化时间轮大小,原则:向上取整,达到 2 的 n 次方
*
* @param ticksPerWheel
* @return
*/
private static int normalizeTicksPerWheel(int ticksPerWheel) {
int normalizedTicksPerWheel = 1;

// 取第一个大于 ticksPerWheel 的 2 的 n 次方的值
while (normalizedTicksPerWheel < ticksPerWheel) {
// 左移一位,即 扩大 2 倍
normalizedTicksPerWheel <<= 1;
}
return normalizedTicksPerWheel;
}

初始化 bucket 的过程就是初始化时间轮的格子,每个格子用于管理落在当前位置的延时任务们,这些任务由链表组织起来,且任务具有轮次的语义,只有任务的轮次为 0 时才能被时间轮执行。注意,时间轮中的每个 bucket 和延时时间是通过 tick 来间接关联的,通过延时任务的时间可以计算出它对应 N 个 tick ,而 tick 数对 bucket 数组长度取模运算就能确定具体的 bucket。延时任务完成 Bucket 分配后,时间轮不断进行 tick 的过程就可以通过计算找到 tick 对应的 Bucket ,进而处理延时任务。

HashedWheelTimeout

延时任务的包装类,该类聚合了时间轮所有的核心对象及属性,也就是说通过该对象可以拿到所有核心的对象和属性,并且该类包含了延时任务执行的方法expire()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private static final class HashedWheelTimeout implements Timeout {

// 初始化
private static final int ST_INIT = 0;
// 取消
private static final int ST_CANCELLED = 1;
// 到期
private static final int ST_EXPIRED = 2;

// 用CAS方式更新任务状态
private static final AtomicIntegerFieldUpdater<HashedWheelTimeout> STATE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimeout.class, "state");

/**
* Timer
*/
private final HashedWheelTimer timer;
/**
* TimerTask
*/
private final TimerTask task;
/**
* 任务触发时间
*/
private final long deadline;

@SuppressWarnings({"unused", "FieldMayBeFinal", "RedundantFieldInitialization"})
private volatile int state = ST_INIT;

// 离任务执行的轮数,0 表示当前轮次执行。当任务从队列加入 bucket 时会计算这个值。
// 对与轮次非 0 的任务,那么时间轮执行到对应的 bucket 时会将该任务的该属性值 -1
long remainingRounds;

// 这将用于通过双向链表在 hashhedwheeltimerbucket 中链接超时 的前后指针
HashedWheelTimeout next;
HashedWheelTimeout prev;

// 当前 HashedWheelTimeout 所在的 bucket
HashedWheelBucket bucket;

/**
* HashedWheelTimeout 用于封装 HashedWheelTimer、TimerTask 以及 deadLine 触发时间
*
* @param timer
* @param task
* @param deadline
*/
HashedWheelTimeout(HashedWheelTimer timer, TimerTask task, long deadline) {
this.timer = timer;
this.task = task;
this.deadline = deadline;
}
}

执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
+---HashedWheelTimeout
/**
* 到期并执行任务
*/
public void expire() {
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}

try {
// 执行 TimerTask.run 方法
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
}

执行任务本质上是调用 HashedWheelTimeout 对象中封装的 TimerTask 对象的 run 方法。需要注意的是,即使任务执行过程抛出异常工作线程也不会退出;为了提高时间轮的精准度,任务执行最好使用异步方式。

取消任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
+---HashedWheelTimeout
/**
* 取消任务
*
* @return
*/
@Override
public boolean cancel() {
// 这里只是修改状态为取消,实际会在下次tick的时候移除
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}

// 加入到时间轮的全局待取消队列,并在每次tick的时候,从相应 bucket 中移除。
timer.cancelledTimeouts.add(this);
return true;
}

取消任务只是将待执行的 HashedWheelTimeout 对象加入到全局取消队列中,在后续的 tick 过程才会从对应的 bucket 中删除。

移除任务

1
2
3
4
5
6
7
8
9
10
11
12
+--- HashedWheelTimeout
/**
* 将当前 Timeout 从对应的 bucket 链表中移除
*/
void remove() {
HashedWheelBucket bucket = this.bucket;
if (bucket != null) {
bucket.remove(this);
} else {
timer.pendingTimeouts.decrementAndGet();
}
}

将 HashedWheelTimeout 对象从对应的 bucket 中删除。

HashedWheelBucket

用来存放包装任务的 HashedWheelTimeout ,以链表结构的形式进行管理,链表中的每一个节点都是 HashedWheelTimeout。

链表结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
private static final class HashedWheelBucket {

// 头指针
private HashedWheelTimeout head;
// 尾指针
private HashedWheelTimeout tail;

/**
* Add {@link HashedWheelTimeout} to this bucket.
*
* 添加 HashedWheelTimeout 到 当前 bucket 中,即加入到链中
*/
public void addTimeout(HashedWheelTimeout timeout) {
assert timeout.bucket == null;

// 设置 timeout 的桶,即聚合桶对象
timeout.bucket = this;

// 维护桶中的 HashedWheelTimeout
if (head == null) {
head = tail = timeout;
} else {
tail.next = timeout;
timeout.prev = tail;
tail = timeout;
}
}


/**
* 将 timeout 从链表中移除
*
* @param timeout
* @return
*/
public HashedWheelTimeout remove(HashedWheelTimeout timeout) {
HashedWheelTimeout next = timeout.next;
// remove timeout that was either processed or cancelled by updating the linked-list
if (timeout.prev != null) {
timeout.prev.next = next;
}
if (timeout.next != null) {
timeout.next.prev = timeout.prev;
}

if (timeout == head) {
// if timeout is also the tail we need to adjust the entry too
if (timeout == tail) {
tail = null;
head = null;
} else {
head = next;
}
} else if (timeout == tail) {
// if the timeout is the tail modify the tail to be the prev node.
tail = timeout.prev;
}
// null out prev, next and bucket to allow for GC.
timeout.prev = null;
timeout.next = null;
timeout.bucket = null;

// timeout 对应的 timer 的等待任务数减 1
timeout.timer.pendingTimeouts.decrementAndGet();
return next;
}

// 省略其他代码
}

执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
+--- HashedWheelBucket
/**
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
* <p>
* 执行 bucket 中的到期任务。注意,只执行 bucket 中轮次为 0 且到期的任务
*/
public void expireTimeouts(long deadline) {

// 获取时间任务链表的头
HashedWheelTimeout timeout = head;

// 处理链表上的所有 timeout 实例
while (timeout != null) {

HashedWheelTimeout next = timeout.next;

// 尝试执行任务
if (timeout.remainingRounds <= 0) {

// 调整当前 bucket 的任务链表
next = remove(timeout);

// 到达触发时间
if (timeout.deadline <= deadline) {

// 执行具体的任务,即执行 timeout 中的 TimerTask.run 方法
timeout.expire();
} else {
// 不可能进入到这个分支
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}


// 任务被取消了
} else if (timeout.isCancelled()) {
next = remove(timeout);

// 轮次减 1
} else {
timeout.remainingRounds--;
}

// 处理下个任务
timeout = next;
}
}

工作线程 tick 时会找到当前 tick 对应的 bucket ,然后执行上述方法进而调度延时任务。

Worker

Worker 是工作线程的任务体,里面封装了时间轮任务触发和执行的逻辑。一旦工作线程启动后,就会不停地 “滴答” bucket ,直到时间轮关闭。

任务体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
    private final class Worker implements Runnable {
// 记录没有处理的时间任务
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

// 记录走了几个 bucket ,不拥堵的情况下每隔 tickDuration 时间走一个 bucket
// 默认值为 0
private long tick;

@Override
public void run() {

// 初始化启动时间。
// 注意,在 HashedWheelTimer 中用的都是相对时间,因此需要以启动时间为基准。这里使用 volatile 修饰
startTime = System.nanoTime();
if (startTime == 0) {
// 因为 startTime = 0 作为工作线程未开始执行任务的标志。这里开始执行了,需要设置非 0
startTime = 1;
}

// 第一个提交任务的线程在 start() 处等待,需要唤醒它
startTimeInitialized.countDown();

/**
* do-while 执行任务逻辑:
*
* 工作线程是逐个 bucket 顺序处理的,所以即使有些任务执行时间超过了一次 tick 时间,也没关系,这些任务并不会被漏掉。
* 但是可能被延迟执行,毕竟工作线程是单线程。
*/
do {

// 等待下次 tick 到来,理论上每次等待 tickDuration 就会返回,然后继续往下走
final long deadline = waitForNextTick();

if (deadline > 0) {

// 当前 tick 下 bucket 数组对应 index,即哪个 bucket
int idx = (int) (tick & mask);

// 处理已经取消的任务
processCancelledTasks();

// 获取当前 tick 对应的桶
HashedWheelBucket bucket = wheel[idx];

// 将 timeouts 队列中的 HashedWheelTimeout 转移到相应的桶中
transferTimeoutsToBuckets();

// 执行进入到 bucket 中的任务
bucket.expireTimeouts(deadline);

// 记录走了多少个 tick
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);


/* 执行到这里,说明当前 Timer 要关闭了,做一些清理工作 */

// 将所有 bucket 中没有执行的任务,添加到 unprocessedTimeouts 这个 HashSet 中,用于 stop() 方法返回。
for (HashedWheelBucket bucket : wheel) {
// 将当前 bucket 上链表节点任务都加入到 unprocessedTimeouts
bucket.clearTimeouts(unprocessedTimeouts);
}

// 将任务队列中的任务也添加到 unprocessedTimeouts 中
for (; ; ) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}

// 处理已经取消的任务
processCancelledTasks();
}

// 省略其他方法
}

工作线程启动的第一步是初始化全局的 startTime,它将作为时间轮的基准时间,用来计算延时任务的触发时间。并调用 countDown 方法来通知阻塞在 start 方法上的线程。接着进入主循环中,循环中的行为是每隔一段时间(tickDuration)走一个 bucket ,下面我们拆解执行部分。

waitForNextTick

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
+--- Worker
private long waitForNextTick() {

// 1 计算当前 tick 下的 deadline,这值是确定的。即一次 tick 期限是一个固定值
// 注意,这里就体现了时间轮的核心,理论上每隔 tickDuration 就会 "滴答" 一次
long deadline = tickDuration * (tick + 1);

// 2 等待当前 tick 时间到达
for (; ; ) {

// 2.1 基于 startTime 计算距离当前时间的时间戳,该值的理论值认为等于 dealine ,但由于任务执行时间没法控制,实际值一般大于 deadline
// 注意,startTime 值是固定的,在工作线程启动就定了
final long currentTime = System.nanoTime() - startTime;

// 2.2 判断是否可以进行 tick
// 标准是:tick 触发的时间值 - currentTime <= 0,没有到触发时间则休眠 sleepTimeMs 毫秒
// 这里加 999999 是补偿精度,不足 1ms 的补足 1ms
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

// 2.3 因为每次执行任务消耗的时间是不受控制的,因此计算出来的 sleepTimeMs 可能为负数
// 当为负数时,说明前面的任务执行时间过长,导致本该 tick 的时候错过了。这个时候不需要休眠等待,需要立刻处理
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {

// 返回值是基于 startTime 计算的距离当前时间的时间戳
return currentTime;
}
}

// windows 平台特别处理。先除以10再乘以10,是因为windows平台下最小调度单位是10ms,如果不处理成10ms的倍数,可能导致sleep更不准了
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
if (sleepTimeMs == 0) {
sleepTimeMs = 1;
}
}

try {

// 2.4 没有到 tick 时间,则休眠
Thread.sleep(sleepTimeMs);

} catch (InterruptedException ignored) {
// 如果工作线程已经关闭,那么返回 Long.MIN_VALUE
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}

waitForNextTick 方法的逻辑已经详细注释,该方法就是用来控制每隔一定的时间 “滴答” 一次即跳一个 bucket,此外还处理了因上一个 tick 处理任务时间过长问题,采用的是立即触发执行的方式。不难看出,当遇到较长时间执行的任务时,会打乱原本正常 tick 的节奏,导致其他任务延期执行。 tickDuration 控制着时间的精准度,值越小精准度越高,工作线程则越繁忙。

processCancelledTasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
+--- Worker
/**
* 处理已经取消的任务。将已经取消的任务从对应的 bucket 中移除
*/
private void processCancelledTasks() {
// 遍历任务取消队列
for (; ; ) {
HashedWheelTimeout timeout = cancelledTimeouts.poll();
if (timeout == null) {
// all processed
break;
}
try {
// 将 timeout 从对应的 bucket 中移除
// 通过 timeout 持有的 bukcet 进行的操作,即从bucket 链表中删除该 timeout
timeout.remove();

} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown while process a cancellation task", t);
}
}
}
}

该方法是为了处理那些被取消的任务,将被取消的任务从队列和 bucket 中分别移除。

transferTimeoutsToBuckets

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
+--- Worker
/**
* 将 HashedWheelTimeout 队列中的任务加入到相应的 bucket 中
*/
private void transferTimeoutsToBuckets() {

// 限制每 tick 最大转移 10 万个 HashedWheelTimeout 到 bucket,以免阻塞工作线程
// todo 如果有 100万 个,并且 tickDuration 时间为几分钟级别,那这种情况下就会有一批任务延迟。从侧面说明一个时间轮不能一下子添加特别多的任务
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
// 队列为空
if (timeout == null) {
// all processed
break;
}

// 被取消了
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}


/*--- 将任务放到相应的 bucket 中 ----*/

// 计算任务触发时间需要经过多少个 tick
long calculated = timeout.deadline / tickDuration;

// 计算任务所属的轮次
timeout.remainingRounds = (calculated - tick) / wheel.length;

// 如果任务在 timeouts 队列里面放久了, 以至于已经过了执行时间(calculated < tick), 这个时候就使用当前 tick 对应的 bucket,从而让那些本应该在过去执行的任务在当前 tick 快速执行掉。
// 此方法调用完后就会立即执行当前 tick 对应的 bucket 中的任务
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.

// 计算 ticks 对应 bucket
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];

// 单个 bucket 是由 HashedWheelTimeout 实例组成的一个链表,单个线程不存在并发
// 这里将 timeout 加入到 bucket 的链表中
bucket.addTimeout(timeout);
}
}

在每次执行 tick 对应的 bucket 中的延时任务时,会先将全局任务队列中待执行的任务加入到对应的 bucket 中。

expireTimeouts

一次 tick 到来后找到对应的 Bucket,然后就可以处理当前 Bucket 中的延时任务了,具体实现见前文。expireTimeouts 方法中会间接执行 TimeTask.run 方法,如果延时任务执行时间过久则会阻塞工作线程,进一步拖慢超时检测流程。

以上对 HashedWheelTimer 主要源码进行了分析,但没有串起来。下面我们以执行过程的形式进一步说明。

提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
+--- HashedWheelTimer
/**
* 提交任务
*
* @param task 任务
* @param delay 延时时间
* @param unit 延迟时间单位
* @return
*/
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
// 1 校验参数
ObjectUtil.checkNotNull(task, "task");
ObjectUtil.checkNotNull(unit, "unit");

// 2 校验等待任务数是否达到阈值
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}

// 3 如果工作线程没有启动,则启动工作线程。一般由第一个提交任务的线程负责工作线程的启动
start();


/* 将任务添加到队列中,该队列将在下一个 tick 时进行处理,在处理过程中,所有排队的 HashedWheelTimeout 将被添加到正确的 HashedWheelBucket 中 */

// 4 deadline 是一个相对时间,相对于工作线程启动时间。
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

// Guard against overflow.
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}

// 5 创建 HashedWheelTimeout 对象,进一步封装任务对象
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);

// 6 加入到 timeouts 队列中,等待被加入到 Bucket 中
// 注意,还没有加入到时间轮中
timeouts.add(timeout);

return timeout;
}

时间轮在初始化后就可以接收业务方提交的延时任务请求了,任务的处理都是交给工作线程这个后台线程。提交任务的流程主要包含 3 个关键步骤:

1 尝试启动工作线程 workerThread
2 计算延时任务的触发时间,创建 HashedWheelTimeout 对象进一步封装任务对象
3 将创建的 HashedWheelTimeout 对象加入到任务队列

值得一提的是启动工作线程的逻辑,源码逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
+--- HashedWheelTimer
/**
* 启动工作线程
*
* @throws IllegalStateException if this timer has been
* {@linkplain #stop() stopped} already
*/
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
// 如果是初始化状态
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
// 启动工作线程
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}

// 阻塞等待,直到 startTime 被工作线程初始化
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}

可以看到上述方法是 public 修饰的,也就是说用户可以显示的调用,而无需等待第一次提交任务时再启动。但一般没必要显示调用,没有任务提交没必要启动。

执行任务

前文也说了,时间轮中的任务都是由工作线程触发执行的。具体是一次 tick 到来后找到对应的 Bucket,然后就可以处理当前 Bucket 中的延时任务了。源码见前文。

取消任务

1
2
3
4
5
6
7
8
9
10
11
12
+--- HashedWheelTimeout

public boolean cancel() {
// only update the state it will be removed from HashedWheelBucket on next tick.
if (!compareAndSetState(ST_INIT, ST_CANCELLED)) {
return false;
}

// 加入到取消任务队列中
timer.cancelledTimeouts.add(this);
return true;
}

未到期但被取消的任务会放到 cancelledTimeouts 队列中,工作线程周期性调用 processCancelledTasks() 会从 bucket 中删除对应的 HashedWheelTimeout。

终止时间轮

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
+--- HashedWheelTimer
@Override
public Set<Timeout> stop() {
// 工作线程不能停止时间轮
if (Thread.currentThread() == workerThread) {
throw new IllegalStateException(
HashedWheelTimer.class.getSimpleName() +
".stop() cannot be called from " +
TimerTask.class.getSimpleName());
}

// 尝试 CAS 替换当前状态为 “停止:
if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) {
// workerState can be 0 or 2 at this moment - let it always be 2.
if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}

return Collections.emptySet();
}

try {
// 中断 worker线程,尝试把正在进行任务的线程中断掉,如果某些任务正在执行则会抛出interrupt异常,并且任务会尝试立即中断
boolean interrupted = false;
while (workerThread.isAlive()) {
workerThread.interrupt();
try {
workerThread.join(100);
} catch (InterruptedException ignored) {
interrupted = true;
}
}

// 当前前程会等待stop的结果
if (interrupted) {
Thread.currentThread().interrupt();
}
} finally {
INSTANCE_COUNTER.decrementAndGet();
if (leak != null) {
boolean closed = leak.close(this);
assert closed;
}
}

//返回未处理的任务
return worker.unprocessedTimeouts();
}

流程图

HashedWheelTimer 是基于时间轮算法,提交的任务会被封装成 HashedWheelTimeout 对象并存放到全局任务队列中。时间轮的格子是用 bucket 数组表示,bucket 内部维护一个 HashedWheelTimeout 类型的双向链表,每一个节点都是一个 HashedWheelTimeout 对象。其内部使用一个工作线程自旋地进行 tick ,tick 到来后会先将全局任务队列中的任务添加到对应的 bucket 中,接着轮训当前 tick 对应 bucket 中的任务链表,执行轮次为 0 的任务,轮次非 0 的任务将其轮次减 1 。

特点

优点

1 本地机器直接执行,效率非常高。
2 无需扫描所有任务。通过将环切成 N 份,将查询到期延时任务的耗时降到 1/N,N 视任务量的大小可以灵活设置(1024,2048 等)

缺点

1 可靠性:
- 机器重启,数据即丢失,可以使用 MySQL 等持久化存储,机器重启时从数据库 load 进内存。
- 机器宕机,数据丢失,需要使用方自行处理,如由其它机器接管宕机机器的任务
2 时间轮调度器的时间精度可能不是很高,对于精度要求特别高的调度任务可能不太合适。因为时间轮算法的精度取决于一次 tick 的时间。
3 时间轮是通过单线程实现的,如果在执行任务的过程中出现阻塞,会影响后面任务执行。这个缺点也就是缺点 2 的直接体现。

小结

时间轮算法不难理解,但 HashedWheelTimer 源码中有很多细节需要注意。任务的管理,体现在任务队列和 bucket 数组的使用;任务的触发,体现在工作线程自旋进行 tick ;任务的执行,体现在工作线程轮询 bucket 的任务链表,对 TimerTask.run 的执行;需要注意的是,整个时间轮的调度都是在一个线程中完成的,因此对于那些耗时较大的定时任务会影响其他任务的正常触发和执行,但任务执行异常并不会导致工作线程退出,这是不同于 JDK 中的 Timer 。

参考:
https://www.javadoop.com/post/HashedWheelTimer