/** * 调度指定的 TimerTask ,在指定的延迟后执行 * * @return 与指定任务相关联的句柄 * @throws IllegalStateException if this timer has been {@linkplain #stop() stopped} already * @throws RejectedExecutionException if the pending timeouts are too many and creating new timeout * can cause instability in the system. */ Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
publicHashedWheelTimer(long tickDuration, TimeUnit unit, int ticksPerWheel){ this(Executors.defaultThreadFactory(), tickDuration, unit, ticksPerWheel); }
publicHashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit){ this(threadFactory, tickDuration, unit, 512); }
publicHashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel){ this(threadFactory, tickDuration, unit, ticksPerWheel, true); }
publicHashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection){ this(threadFactory, tickDuration, unit, ticksPerWheel, leakDetection, -1); }
/** * 初始化时间轮 bucket 数组,用来存储任务 * * @param ticksPerWheel 一圈有多少个 bucket ,默认是 512 * @return */ privatestatic HashedWheelBucket[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0) { thrownew IllegalArgumentException( "ticksPerWheel must be greater than 0: " + ticksPerWheel); }
// 时间轮 tick 不能多大 if (ticksPerWheel > 1073741824) { thrownew IllegalArgumentException( "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); }
/** * 将 timeout 从链表中移除 * * @param timeout * @return */ public HashedWheelTimeout remove(HashedWheelTimeout timeout){ HashedWheelTimeout next = timeout.next; // remove timeout that was either processed or cancelled by updating the linked-list if (timeout.prev != null) { timeout.prev.next = next; } if (timeout.next != null) { timeout.next.prev = timeout.prev; }
if (timeout == head) { // if timeout is also the tail we need to adjust the entry too if (timeout == tail) { tail = null; head = null; } else { head = next; } } elseif (timeout == tail) { // if the timeout is the tail modify the tail to be the prev node. tail = timeout.prev; } // null out prev, next and bucket to allow for GC. timeout.prev = null; timeout.next = null; timeout.bucket = null;
+--- HashedWheelBucket /** * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}. * <p> * 执行 bucket 中的到期任务。注意,只执行 bucket 中轮次为 0 且到期的任务 */ publicvoidexpireTimeouts(long deadline){
// 获取时间任务链表的头 HashedWheelTimeout timeout = head;
// 处理链表上的所有 timeout 实例 while (timeout != null) {
HashedWheelTimeout next = timeout.next;
// 尝试执行任务 if (timeout.remainingRounds <= 0) {
// 调整当前 bucket 的任务链表 next = remove(timeout);
// 到达触发时间 if (timeout.deadline <= deadline) {
// 执行具体的任务,即执行 timeout 中的 TimerTask.run 方法 timeout.expire(); } else { // 不可能进入到这个分支 // The timeout was placed into a wrong slot. This should never happen. thrownew IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); }
// 任务被取消了 } elseif (timeout.isCancelled()) { next = remove(timeout);
+--- HashedWheelTimer /** * 启动工作线程 * * @throws IllegalStateException if this timer has been * {@linkplain #stop() stopped} already */ publicvoidstart(){ switch (WORKER_STATE_UPDATER.get(this)) { // 如果是初始化状态 case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { // 启动工作线程 workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: thrownew IllegalStateException("cannot be started once stopped"); default: thrownew Error("Invalid WorkerState"); }
// 阻塞等待,直到 startTime 被工作线程初始化 while (startTime == 0) { try { startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
可以看到上述方法是 public 修饰的,也就是说用户可以显示的调用,而无需等待第一次提交任务时再启动。但一般没必要显示调用,没有任务提交没必要启动。
publicbooleancancel(){ // only update the state it will be removed from HashedWheelBucket on next tick. if (!compareAndSetState(ST_INIT, ST_CANCELLED)) { returnfalse; }
+--- HashedWheelTimer @Override public Set<Timeout> stop(){ // 工作线程不能停止时间轮 if (Thread.currentThread() == workerThread) { thrownew IllegalStateException( HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName()); }
// 尝试 CAS 替换当前状态为 “停止: if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { // workerState can be 0 or 2 at this moment - let it always be 2. if (WORKER_STATE_UPDATER.getAndSet(this, WORKER_STATE_SHUTDOWN) != WORKER_STATE_SHUTDOWN) { INSTANCE_COUNTER.decrementAndGet(); if (leak != null) { boolean closed = leak.close(this); assert closed; } }