队列 - DelayQueue

概述

DelayQueue 是一个线程安全的、无界的、延时阻塞的队列。线程安全体现在使用 ReentrantLock 保证并发安全;无界队列体现在内部使用 PriorityQueue 优先级队列作为容器维护元素;延时阻塞体现在队列元素必须实现 Delayed 接口,使元素对象成为延时对象。此外,Delayed 接口继承了 Comparable 接口,这样元素的优先级也就能保证了。

DelayQueue 保证只有在延迟期满时才能从中取出元素,否则就要阻塞等待剩余的到期时间。DelayQueue 相关的 UML 类图如下:

源码分析

DelayQueue 相关源码结构如下:

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {

// 保证线程安全的锁
private final transient ReentrantLock lock = new ReentrantLock();

// 优先队列
private final PriorityQueue<E> q = new PriorityQueue<E>();

// 标记取元素时是否有线程在排队,减少不必要的竞争
// leader 不为空,说明有线程在等待元素,后来的线程乖乖地等着就行了,直到被唤醒
private Thread leader = null;

// 是否可获取元素的条件变量
private final Condition available = lock.newCondition();
}
  • lock:用于并发控制的可重入独占锁;
  • available:获取元素时,如果队列为空,则线程进入该条件队列等待队列非空;
  • q:优先级队列,DelayQueue 把元素的存储和维护交给了优先级队列,在上层进行并发控制和延迟处理逻辑;
  • leader:标记在取元素的过程中等待的线程,有了这个标记可以减少不必要的竞争;
  • E extends Delayed:说明队列元素必须实现 Delay 接口,这个是延时的关键;

从属性可以知道,DelayQueue 主要是用优先级队列 PriorityQueue 来实现,并使用重入锁和条件来控制并发安全。由于优先级队列可以看作是无界的,所以这里只需要一个非空条件就可以了。

Delayed

1
2
3
4
5
6
7
8
9
10
public interface Delayed extends Comparable<Delayed> {
/**
* 在给定的时间单位内返回与此对象关联的剩余延迟
*
* @param unit 时间单位
* @return 剩余的延迟;零或负值表示延迟已经过去
*
*/
long getDelay(TimeUnit unit);
}

Delayed 是一个继承自 Comparable 的接口,并且定义了一个 getDelay 方法,用于表示还有多少时间到期,到期了应返回小于等于 0 的数值。DelayQueue 要求其中的元素必须实现 Delayed 接口,这有两个意义:

  • DelayQueue 内部是使用 PriorityQueue 这个优先级队列来存放和维护元素的,也就是说元素必须要有一定的顺序,而 DelayQueue 继承了 Comparable 接口,因此有序性就可以交给使用方了;
  • DelayQueue 是一个具有延时特性的队列,而 Delayed 接口的 getDelay 方法就可以描述元素的到期时间,元素通过实现该接口成为延迟对象;

注意:根据 DelayQueue 队列的意义,一般我们是将元素的剩余延迟时间作为元素优先级的,这样的话在队列头的元素一定具有最小的剩余延时时间(最先过期)。这个逻辑是在 Comparable 接口的比较方法中完成。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 创建一个空的 DelayQueue
*/
public DelayQueue() {}

/**
* 根据给定集合创建 DelayQueue
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}

DelayQueue 构造方法本身没有什么可说的,我们对隐藏的两个信息进行说明:

  • DelayQueue 中封装的 PriorityQueue 在实例化时就创建了,使用的是默认的构造方法;
  • DelayQueude 中封装的 PriorityQueue 对象是通过其默认构造方法创建的,也就意味着底层使用的是小顶堆,而且是根据元素自然顺序排序;

入队

由于 DelayQueue 内部使用的是优先级队列 PriorityQueue ,可以认为它是无界的,所以不会出现队列满了而阻塞入队线程的情况,因此入队的方法其实都是一样的。

add()

1
2
3
4
5
6
7
8
9
10
/**
* 将指定元素插入此延迟队列。
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return offer(e);
}

是对 offer 方法的简单封装调用。

put()

1
2
3
4
5
6
7
8
9
10
/**
*
* 将指定元素插入此延迟队列。由于队列是无限的,这个方法永远不会阻塞。
*
* @param e the element to add
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) {
offer(e);
}

是对 offer 方法的简单封装调用。

超时等待 offer()

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
*
* 将指定元素插入此延迟队列。由于队列是无限的,这个方法永远不会阻塞。
*
* @param e the element to add
* @param timeout This parameter is ignored as the method never blocks 此参数被忽略,因为该方法从不阻塞
* @param unit This parameter is ignored as the method never blocks 此参数被忽略,因为该方法从不阻塞
* @return {@code true}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}

offer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 将指定元素插入此延迟队列。
*
* @param e the element to add
* @return {@code true}
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();

try {
// 将元素加入到优先队列中,会自动保证优先级
q.offer(e);

// 如果添加的元素是堆顶元素
if (q.peek() == e) {
// leader 需要置空,因为现在堆顶出现了新的元素,可能添加的时候就要过期,因此需要让等待的线程尝试获取下,
// 因此 leader 这个等待获取堆顶元素的标记可以置空了
leader = null;

// 唤醒可取条件队列上等待的线程,让其尝试获取元素
available.signal();
}

return true;

// 释放锁
} finally {
lock.unlock();
}
}

上述方法是入队的核心方法,还是比较简单的,下面对关键点进行说明:

  1. 在添加元素时需要加锁成功才行,保证了线程安全;
  2. 添加元素到优先级队列,一般都会成功,因为优先级队列可以认为是无界的;
  3. 如果添加的元素成为了堆顶元素,就需要特别处理:
    • 因为新添加的元素成为了堆顶元素,这个元素可能就要过期,因此需要对等待进行处理;
    • 将正在等待堆顶的元素的标志置空和唤醒条件队列上等待的线程来尽可能及时获取堆顶元素;
  4. 释放锁;

出队

DelayQueue 是一个延时阻塞队列,入队不会出现阻塞的情况,因为内部使用的优先级队列 PriorityQueue 可以认为是无界的。但是出队时可能会遇到队列为空的情况,这时会出现三种情况:阻塞式出队、非阻塞式出队以及阻塞超时出队。下面我们依次说明。

take()

出队阻塞方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* 检索并移除队列的头部元素,如有必要,则阻塞等待直到队列上有一个具有到期的元素。
*
* @return the head of this queue 队列头部元素
* @throws InterruptedException {@inheritDoc} 中断异常
*/
public E take() throws InterruptedException {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();

try {
// 自旋
for (; ; ) {

// 获取堆顶元素
E first = q.peek();

// 为空则进入等待,阻塞式获取元素
if (first == null)
available.await();

// 堆顶不为空
else {

// 获取元素的超时时间
long delay = first.getDelay(NANOSECONDS);
// 判断是否到期,到期直接弹出即可
if (delay <= 0)
return q.poll();

/* 执行到这里,说明堆顶元素还没有到期,不能弹出*/

// 要进入等待了,需要设置为 null
first = null; // don't retain ref while waiting

// 检验前面是否已有等待获取元素的线程,有的话就直接进入等待(快速进入等待)
if (leader != null)
available.await();

// 还没有等待获取元素的线程,就把自己设置为等待线程,然后进入超时等待
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待的时间是,堆顶元素剩余的过期时间
available.awaitNanos(delay);

} finally {
// 唤醒后就把 leader 置空
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 没有等待元素的线程,并且队列还有数据,就唤醒下一个线程来取(如果有的话)
if (leader == null && q.peek() != null)
available.signal();

// 释放锁
lock.unlock();
}
}

出队本身逻辑还是比较简单的,先判断堆顶元素是否为空,为空就直接阻塞等待;不为空也不能直接弹出,还要先判断堆顶元素是否到期了,没到期就等待剩余过期时间后再尝试获取、判断,到期了就直接弹出即可。

出于线程安全性、线程通信等方面的考虑,整个逻辑相对更完整和复杂一些:

  1. 只有获取到锁才能进行出队操作;
  2. 判断堆顶元素是否为空,为空的话直接阻塞等待;
  3. 判断堆顶元素是否到期,到期了直接调用优先级队列的poll()弹出元素;
  4. 没到期,再根据等待标志 leader 快速判断前面是否有其它线程在等待,有则直接进入等待直到被唤醒,以减少不必要的竞争
  5. 如果在此之前没有其它线程等待获取元素,则把自己设置为等待标记,这样可以让后续获取元素的线程直接进入等待;然后当前线程等待剩余的过期时间后会再尝试获取元素;
  6. 获取到元素之后再尝试唤醒下一个等待的线程;
  7. 释放锁;

注意:该方法使用自旋的作用,一方面是阻塞式获取,必须要返回一个到期的元素;另一方面是防止假唤醒;

这里我们不难看出,如果所有其他线程因为 leader 标志而阻塞等待,此时刚好进来一个最先过期的元素,那么是不是只能等待线程自动醒来呢?其实不是的,在 offer 方法中会判断这种情况,如果加入的是最先过期的元素,那么会唤醒一个阻塞线程,以通知尽快获取元素。

poll()

出队非阻塞方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* 检索并删除队列的头部元素,如果队列没有具有到期的元素,则返回 null
*
* @return 返回到期的元素,否则返回 null
*/
public E poll() {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();

try {
// 从优先队列中获取元素
E first = q.peek();

// 如果为空,或者没有到期,则返回空
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;

// 到期则取出堆顶元素
else
return q.poll();

// 释放锁
} finally {
lock.unlock();
}
}

poll 方法相对 take 方法是快速失败,没有元素或元素没有到期不会等待,而是直接返回 null。

同样的套路,DelayQueue 在 take 方法和 poll 方法之间进行了折中,可以超时等待获取元素。具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
*
* 检索并删除队列的头部元素,如有必要,等待直到队列上有一个具有到期的元素,或指定的等待时间过期。
*
* @return the head of this queue, or {@code null} if the
* specified waiting time elapses before an element with
* an expired delay becomes available
* @throws InterruptedException {@inheritDoc}
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 计算超时等待的时长
long nanos = unit.toNanos(timeout);

// 获取锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();

try {
// 自旋
for (; ; ) {
// 获取堆顶元素
E first = q.peek();

// 队列为空,则进入超时等待
if (first == null) {
// 等待时间到了,还是为空,则返回 null
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);

// 队列非空
} else {
// 获取元素的超时时间
long delay = first.getDelay(NANOSECONDS);

// 判断是否到期,到期直接弹出即可
if (delay <= 0)
return q.poll();


/* 执行到这里,说明堆顶元素还没有到期,还不能弹出*/

// 判断超时时间到了没
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting

// 当前超时小于到期剩余时间,或者前面已经有等待的线程了
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);

else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);

// 计算剩余等待时长
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}

peek()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public E peek() {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();

try {
// 获取堆顶元素但不删除
return q.peek();

// 释放锁
} finally {
lock.unlock();
}
}

remove()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 从队列中删除指定元素(如果存在),无论它是否已过期。
*/
public boolean remove(Object o) {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();

// 从优先级队列中删除该元素
try {
return q.remove(o);

// 释放锁
} finally {
lock.unlock();
}
}

使用场景

DelayQueue 主要适用于定时任务。任务调度系统能够相对准确的把握任务的执行时间。为了具有调用行为,存放到 DelayDeque 的元素必须实现 Delayed 接口。

小结

DelayQueue 是一个线程安全的、无界的、延时阻塞队列,它使用的是优先级队列 PriorityQueue + 时间维度(到期时间)来实现。具体来说,是在队列的基础上增加了时间维度的优先级,然后通过锁和条件变量来控制入队和出队流程。

DelayQueue 为了减少不必要的竞争,使用了 Thread leader 来进行优化,作为快速进入条件队列等待的标志。

ScheduledThreadPoolExecutor 同样支持延时任务,它内部使用的阻塞队列是自己定义的内部类 DelayedWorkQueue ,这个内部类的功能和 DelayQueue 基本一样,唯一区别点是 DelayedWorkQueue 没有直接使用优先级队列 PriorityQueue,而是使用数组又实现了一遍优先级队列,本质上是没什么区别的。