概述
DelayQueue 是一个线程安全的、无界的、延时阻塞的队列。线程安全体现在使用 ReentrantLock
保证并发安全;无界队列体现在内部使用 PriorityQueue
优先级队列作为容器维护元素;延时阻塞体现在队列元素必须实现 Delayed 接口,使元素对象成为延时对象。此外,Delayed 接口继承了 Comparable 接口,这样元素的优先级也就能保证了。
DelayQueue 保证只有在延迟期满时才能从中取出元素,否则就要阻塞等待剩余的到期时间。DelayQueue 相关的 UML 类图如下:
源码分析
DelayQueue 相关源码结构如下:
属性
1 | public class DelayQueue<E extends Delayed> extends AbstractQueue<E> |
- lock:用于并发控制的可重入独占锁;
- available:获取元素时,如果队列为空,则线程进入该条件队列等待队列非空;
- q:优先级队列,DelayQueue 把元素的存储和维护交给了优先级队列,在上层进行并发控制和延迟处理逻辑;
- leader:标记在取元素的过程中等待的线程,有了这个标记可以减少不必要的竞争;
- E extends Delayed:说明队列元素必须实现 Delay 接口,这个是延时的关键;
从属性可以知道,DelayQueue 主要是用优先级队列 PriorityQueue 来实现,并使用重入锁和条件来控制并发安全。由于优先级队列可以看作是无界的,所以这里只需要一个非空条件就可以了。
Delayed
1 | public interface Delayed extends Comparable<Delayed> { |
Delayed 是一个继承自 Comparable 的接口,并且定义了一个 getDelay 方法,用于表示还有多少时间到期,到期了应返回小于等于 0 的数值。DelayQueue 要求其中的元素必须实现 Delayed 接口,这有两个意义:
- DelayQueue 内部是使用 PriorityQueue 这个优先级队列来存放和维护元素的,也就是说元素必须要有一定的顺序,而 DelayQueue 继承了 Comparable 接口,因此有序性就可以交给使用方了;
- DelayQueue 是一个具有延时特性的队列,而 Delayed 接口的 getDelay 方法就可以描述元素的到期时间,元素通过实现该接口成为延迟对象;
注意:根据 DelayQueue 队列的意义,一般我们是将元素的剩余延迟时间作为元素优先级的,这样的话在队列头的元素一定具有最小的剩余延时时间(最先过期)。这个逻辑是在 Comparable 接口的比较方法中完成。
构造方法
1 | /** |
DelayQueue 构造方法本身没有什么可说的,我们对隐藏的两个信息进行说明:
- DelayQueue 中封装的 PriorityQueue 在实例化时就创建了,使用的是默认的构造方法;
- DelayQueude 中封装的 PriorityQueue 对象是通过其默认构造方法创建的,也就意味着底层使用的是小顶堆,而且是根据元素自然顺序排序;
入队
由于 DelayQueue 内部使用的是优先级队列 PriorityQueue ,可以认为它是无界的,所以不会出现队列满了而阻塞入队线程的情况,因此入队的方法其实都是一样的。
add()
1 | /** |
是对 offer 方法的简单封装调用。
put()
1 | /** |
是对 offer 方法的简单封装调用。
超时等待 offer()
1 | /** |
offer()
1 | /** |
上述方法是入队的核心方法,还是比较简单的,下面对关键点进行说明:
- 在添加元素时需要加锁成功才行,保证了线程安全;
- 添加元素到优先级队列,一般都会成功,因为优先级队列可以认为是无界的;
- 如果添加的元素成为了堆顶元素,就需要特别处理:
- 因为新添加的元素成为了堆顶元素,这个元素可能就要过期,因此需要对等待进行处理;
- 将正在等待堆顶的元素的标志置空和唤醒条件队列上等待的线程来尽可能及时获取堆顶元素;
- 释放锁;
出队
DelayQueue 是一个延时阻塞队列,入队不会出现阻塞的情况,因为内部使用的优先级队列 PriorityQueue 可以认为是无界的。但是出队时可能会遇到队列为空的情况,这时会出现三种情况:阻塞式出队、非阻塞式出队以及阻塞超时出队。下面我们依次说明。
take()
出队阻塞方法。
1 | /** |
出队本身逻辑还是比较简单的,先判断堆顶元素是否为空,为空就直接阻塞等待;不为空也不能直接弹出,还要先判断堆顶元素是否到期了,没到期就等待剩余过期时间后再尝试获取、判断,到期了就直接弹出即可。
出于线程安全性、线程通信等方面的考虑,整个逻辑相对更完整和复杂一些:
- 只有获取到锁才能进行出队操作;
- 判断堆顶元素是否为空,为空的话直接阻塞等待;
- 判断堆顶元素是否到期,到期了直接调用优先级队列的poll()弹出元素;
- 没到期,再根据等待标志 leader 快速判断前面是否有其它线程在等待,有则直接进入等待直到被唤醒,以减少不必要的竞争;
- 如果在此之前没有其它线程等待获取元素,则把自己设置为等待标记,这样可以让后续获取元素的线程直接进入等待;然后当前线程等待剩余的过期时间后会再尝试获取元素;
- 获取到元素之后再尝试唤醒下一个等待的线程;
- 释放锁;
注意:该方法使用自旋的作用,一方面是阻塞式获取,必须要返回一个到期的元素;另一方面是防止假唤醒;
这里我们不难看出,如果所有其他线程因为 leader 标志而阻塞等待,此时刚好进来一个最先过期的元素,那么是不是只能等待线程自动醒来呢?其实不是的,在 offer 方法中会判断这种情况,如果加入的是最先过期的元素,那么会唤醒一个阻塞线程,以通知尽快获取元素。
poll()
出队非阻塞方法。
1 | /** |
poll 方法相对 take 方法是快速失败,没有元素或元素没有到期不会等待,而是直接返回 null。
同样的套路,DelayQueue 在 take 方法和 poll 方法之间进行了折中,可以超时等待获取元素。具体如下:
1 | /** |
peek()
1 | public E peek() { |
remove()
1 | /** |
使用场景
DelayQueue 主要适用于定时任务。任务调度系统能够相对准确的把握任务的执行时间。为了具有调用行为,存放到 DelayDeque 的元素必须实现 Delayed 接口。
小结
DelayQueue 是一个线程安全的、无界的、延时阻塞队列,它使用的是优先级队列 PriorityQueue + 时间维度(到期时间)来实现。具体来说,是在队列的基础上增加了时间维度的优先级,然后通过锁和条件变量来控制入队和出队流程。
DelayQueue 为了减少不必要的竞争,使用了 Thread leader
来进行优化,作为快速进入条件队列等待的标志。
ScheduledThreadPoolExecutor 同样支持延时任务,它内部使用的阻塞队列是自己定义的内部类 DelayedWorkQueue ,这个内部类的功能和 DelayQueue 基本一样,唯一区别点是 DelayedWorkQueue 没有直接使用优先级队列 PriorityQueue,而是使用数组又实现了一遍优先级队列,本质上是没什么区别的。