队列 - PriorityQueue

概述

队列是一种先进先出(FIFO)的数据结构。但有些情况下,操作的数据可能带有优先级,出队时需要优先级高的元素先出队列,这种数据结构就是优先级队列,也是我们今天的主角 PriorityQueue

一般来说,优先级队列使用堆来实现,关于堆的简单介绍可以参考这里 堆的简单介绍。PriorityQueue 正是通过堆来实现优先级队列的。

源码分析

PriorityQueue 相关源码结构如下图:

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class PriorityQueue<E> extends AbstractQueue<E>
implements java.io.Serializable {
private static final long serialVersionUID = -7720805057305804111L;

/**
* 默认容量
*/
private static final int DEFAULT_INITIAL_CAPACITY = 11;

/**
* 存储元素的数组
*/
transient Object[] queue; // non-private to simplify nested class access

/**
* 优先级队列中的元素数。
*/
private int size = 0;

/**
* 比较器,如果优先级队列使用元素的自然顺序,则为 null。
*/
private final Comparator<? super E> comparator;

/**
* 优先级队列的写次数
*/
transient int modCount = 0; // non-private to simplify nested class access

}

PriorityQueue 主要有以下五个属性,下面对每个属性的作用进行说明:

  • DEFAULT_INITIAL_CAPACITY PriorityQueue 的默认容量为 11;
  • queue: PriorityQueue 核心是使用堆来组织数据,该数组就是用来存放元素的;
  • size: 用来记录优先级队列中元素的个数,也就是数组中的元素个数;
  • comparator: 元素比较器,在 PriorityQueue 中有两种方式比较元素,一种是元素的自然顺序,这个要求元素实现 Comparable 接口;另一种是通过指定的比较器来比较,如果使用元素自然顺序,则该属性可以为 null;
  • modCount: 记录对 PriorityQueue 进行写操作的次数,用于实现 fast-fail

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
* 创建一个默认初始容量为 11 的 PriorityQueue,它根据元素的自然顺序对其元素进行排序。
*/
public PriorityQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}

/**
* 创建一个指定容量大小的 PriorityQueue,它根据元素的自然顺序对其元素进行排序。
*
* @param initialCapacity the initial capacity for this priority queue
* @throws IllegalArgumentException if {@code initialCapacity} is less
* than 1
*/
public PriorityQueue(int initialCapacity) {
this(initialCapacity, null);
}

/**
* 创建一个默认初始容量为 11 的 PriorityQueue,它根据指定的比较器对其元素进行排序。
*
* @param comparator the comparator that will be used to order this
* priority queue. If {@code null}, the {@linkplain Comparable
* natural ordering} of the elements will be used.
* @since 1.8
*/
public PriorityQueue(Comparator<? super E> comparator) {
this(DEFAULT_INITIAL_CAPACITY, comparator);
}

/**
*
* @param initialCapacity 优先级队列的初始容量
* @param comparator 将用于排序此优先级队列的比较器。如果为 null,将使用元素的 {@linkplain Comparable 自然排序}。
* @throws IllegalArgumentException if {@code initialCapacity} is less than 1
*/
public PriorityQueue(int initialCapacity,
Comparator<? super E> comparator) {
// Note: This restriction of at least one is not actually needed,
// but continues for 1.5 compatibility
if (initialCapacity < 1)
throw new IllegalArgumentException();

// 创建数组容器
this.queue = new Object[initialCapacity];

// 设置元素比较器,可以为空
this.comparator = comparator;
}

添加元素

add()

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 将指定元素插入此优先级队列。
*
* @return {@code true} (as specified by {@link Collection#add})
* @throws ClassCastException if the specified element cannot be
* compared with elements currently in this priority queue
* according to the priority queue's ordering
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
// 直接调用 offer 方法
return offer(e);
}

可以看到,add 方法是直接调用 offer 方法的。

offer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 将指定元素插入此优先级队列。
*
* @return 成功返回 true,不成功会抛出异常
* @throws ClassCastException 要添加的元素不能和队列中的元素比较
* @throws NullPointerException 要添加的元素为空
*/
public boolean offer(E e) {
// 元素不能为空
if (e == null)
throw new NullPointerException();

// 递增写次数
modCount++;

// 获取优先级队列元素个数
int i = size;

// 元素个数达到最大容量,需要先扩容
if (i >= queue.length)
grow(i + 1);

// 元素个数加 1
size = i + 1;

// 如果队列还没有元素,直接插入到数组第一个位置,不需要进行比较、调整堆
if (i == 0)
queue[0] = e;

// 如果队列有元素,做自下而上的堆调整
else
siftUp(i, e);

return true;
}
  1. 要添加的元素不能为空(空了怎么参与比较呢);
  2. 在添加元素之前,判断队列中元素个数是否达到队列最大容量阈值;
  3. 将队列中元素个数加 1;
  4. 如果队列中没有元素,那么直接将元素放到数组的第一个位置,也就是下标为 0 的位置,这样就不需要执行比较、调整堆的流程了;
  5. 如果队列中有元素,就执行堆的自下而上的调整逻辑;
grow()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 扩容队列
*
* @param minCapacity the desired minimum capacity 所需的最小容量
*/
private void grow(int minCapacity) {
// 队列原始长度
int oldCapacity = queue.length;

// Double size if small; else grow by 50%
// 旧容量小于 64 时,容量翻倍 + 2
// 旧容量大于等于 64 ,容量只增加旧容量的一半
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) :
(oldCapacity >> 1));

// overflow-conscious code
// 保证新容量的大小不溢出
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);

// 创建出一个新容量大小的新数组并把旧数组元素拷贝过去
queue = Arrays.copyOf(queue, newCapacity);
}

grow 方法是 PriorityQueue 内部使用的方法,当队列中的元素个数达到队列容量时,就会使用该方法进行扩容。PriorityQueue 扩容可总结为 “一策略、一注意事项”,具体如下:

  • 扩容策略:当队列(数组)比较小(小于 64)时,容量扩大一倍 + 2;不小于 64 时,容量扩大一半;
  • 扩展注意事项:计算出新的容量后,需要限制最大容量,不能超过 Integer.MAX_VALUE,这个是由 hugeCapacity 方法控制的:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    /**
    * 最大容量限制
    *
    * @param minCapacity
    * @return
    */
    private static int hugeCapacity(int minCapacity) {
    // 容量不能 < 0
    if (minCapacity < 0) // overflow
    throw new OutOfMemoryError();

    // 最大使用 Integer.MAX_VALU
    return (minCapacity > MAX_ARRAY_SIZE) ?
    Integer.MAX_VALUE :
    MAX_ARRAY_SIZE;
    }
siftUp()
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* @param k the position to fill 计划要插入的位置
* @param x the item to insert 要插入的元素
*/
private void siftUp(int k, E x) {
// 根据是否有比较器,使用不同的方法
if (comparator != null)
// 使用比较器排序
siftUpUsingComparator(k, x);
else
// 按照元素自然顺序
siftUpComparable(k, x);
}

该方法会根据是否指定了元素比较器,进而执行不同的分支自下而上调整堆。由于无论是使用元素自然顺序比较,还是使用比较器比较,逻辑是一致的,下面我们以按照元素自然顺序排序(实现 Comparable 接口)为例进行分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 按照元素自然顺序排序(实现 Comparable 接口)调整堆
*
* @param k 要填补的位置
* @param x 要插入的元素
*/
@SuppressWarnings("unchecked")
private void siftUpComparable(int k, E x) {
// 转化类型,这要求元素必须实现 Comparable 接口,否则必然报错
Comparable<? super E> key = (Comparable<? super E>) x;
// 自下向上调整堆
while (k > 0) {
// 找到父节点的位置, 因为元素是从0开始的,所以减1之后再除以2
int parent = (k - 1) >>> 1;

// 父节点的值
Object e = queue[parent];

// 比较插入的元素的值与父节点的值,如果比父节点大,则跳出循环,否则交换位置
// todo 从这里也能看出是小顶堆
if (key.compareTo((E) e) >= 0)
break;

// 与父节点交换位置,父节点位置空出来了
queue[k] = e;

// 此时,元素要插入的位置移到了父节点的位置,继续与父节点再比较以调整堆
k = parent;
}

// 最后找到应该插入的位置,放入元素
queue[k] = key;
}

上述方法的逻辑纯粹是堆添加元素时自下而上的调整堆的过程,使堆满足堆的特性。下面对该方法的细节之处进行说明:

  • 参数 k 表示目标元素 x 要插入的位置,这个 k 起始值其实就是数组最后一个元素的下个位置。注意,数组的下标是从 0 开始的,因此在从下往上找父节点时,先要减 1 再除以 2 才能获取正确的父节点的下标;
  • 从代码来看,如果比父节点小就与父节点交换位置,直到出现比父节点大或者比较到了根节点为止。这也可以看出,默认情况下 PriorityQueue 是一个小顶堆;

获取元素

peek()

1
2
3
4
5
6
7
8
9
/**
* 返回堆顶元素
*
* @return
*/
@SuppressWarnings("unchecked")
public E peek() {
return (size == 0) ? null : (E) queue[0];
}

获取堆顶元素但不删除;

poll()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 出队
*
* @return
*/
@SuppressWarnings("unchecked")
public E poll() {
// 队列为空
if (size == 0)
return null;

// 队列元素个数减 1
int s = --size;

// 队列写操作次数递增
modCount++;

// 获取队列首元素
E result = (E) queue[0];

// 获取队列末元素
E x = (E) queue[s];

// 将队列末元素删除
queue[s] = null;

// 出队后队列中还有元素(因为移除的是堆顶元素,此时堆的特性不能满足,必须需要调整)
if (s != 0)
// 将队列末元素移到队列首,再做自上而下的调整
siftDown(0, x);

// 返回弹出的元素
return result;
}
  1. 队列为空,直接返回 null 即可;
  2. 队列非空,队列元素个数减 1;
  3. 将队列首元素弹出(此时堆顶元素就没了,堆的特性可能被破坏);
  4. 取出队列末元素,将其作为参考的新的堆顶元素,为后续堆调整做准备;
  5. 如果队列在弹出元素后就为空了,那么就不需要调整堆了,否则需要做自上而下的堆调整;

注意:在出队的逻辑中,没有涉及到队列缩容的逻辑。

siftDown()
1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 在位置 k 处插入项 x,即自上而下的堆调整。
*
* @param k the position to fill 要填补的位置
* @param x the item to insert 要插入的元素
*/
private void siftDown(int k, E x) {
// 根据是否有比较器,选择不同的方法进行元素的比较,进而调整堆
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}

同样地,该方法会根据是否指定了元素比较器,进而执行不同的分支自上而下调整堆。由于无论是使用元素自然顺序比较,还是使用比较器比较,逻辑是一致的,这次我们以指定比较器的方式进行分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 根据比较器,自上而下调整堆
*
* 本质:将要插入的元素放入到它应该在的位置
*
* @param k 要填补的位置
* @param x 要插入的元素
*/
@SuppressWarnings("unchecked")
private void siftDownUsingComparator(int k, E x) {
// 调整的时候,是按照二分的思想往下寻找子节点的,这里只比较一半就可以了
int half = size >>> 1;

// 不断往下比较
while (k < half) {
// 寻找子节点位置,这里加 1 是因为元素从 0 号位置开始
int child = (k << 1) + 1;

// 左子节点的值
Object c = queue[child];

// 右子节点的位置
int right = child + 1;

// 如果右子节点存在,那么取左右节点中较小的值
if (right < size &&
comparator.compare((E) c, (E) queue[right]) > 0)
c = queue[child = right];

// 如果要插入元素比子节点都小,则结束,说明找到对应的位置了
if (comparator.compare(x, (E) c) <= 0)
break;

// 如果比较小的子节点还大,则交换位置
queue[k] = c;

// 指针移动到子节点位置,继续往下调整堆
k = child;
}

// 找到正确的位置,放入元素
queue[k] = x;
}

同样地,上述方法的逻辑存粹是堆删除元素时自上而下的调整堆的过程,使堆满足堆的特性。下面堆该方法的细节之处进行说明:

  • 参数 k 表示目标元素要插入的位置,这个 k 起始值是被删除的元素所在的下标。从上往下找子节点时先从左子节点开始,由于数组的下标是从 0 开始的,因此要先除以 2 再加 1 才能获取正确的左子节点下标;
  • 从代码来看,使用的是比较器,因此 PriorityQueue 是小顶堆还是大顶堆取决于比较器;

remove()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
*
* 从队列中移除指定的元素,如果元素不存在,则返回 false
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
// 获取指定元素在队列中的下标
int i = indexOf(o);

// 如果元素不存在,则返回 false
if (i == -1)
return false;

// 如果元素存在,则从队列中移除该元素
else {
removeAt(i);
return true;
}
}

从 PriorityQueue 中移除指定的元素,需要执行两步操作:

  • 调用 indexOf 方法找到要移除的元素所在的下标;
  • 调用 removeAt 方法将该下标所在元素置空,即删除该元素,然后从这个下标开始调整堆;
indexOf()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 返回指定元素在优先级队列(数组)中的下标
*
* @param o
* @return 返回 -1 表示队列中没有该元素
*/
private int indexOf(Object o) {
if (o != null) {
// 遍历数组
for (int i = 0; i < size; i++)
if (o.equals(queue[i]))
return i;
}
return -1;
}
removeAt()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 从队列中移除下标为 i 的元素。
*/
@SuppressWarnings("unchecked")
private E removeAt(int i) {
// assert i >= 0 && i < size;
// 优先级队列写操作次数累加
modCount++;

// 优先级队列数量减 1
int s = --size;

// 如果是最后一个元素,直接删除即可
if (s == i) // removed last element
queue[i] = null;

// 如果不是最后一个元素,则需要从删除的位置,即下标 i 位置开始调整
else {
E moved = (E) queue[s];
queue[s] = null;
siftDown(i, moved);

// 如果 moved 确实插入到了 i 位置,那么再尝试 自下而上调整堆
if (queue[i] == moved) {
siftUp(i, moved);
if (queue[i] != moved)
return moved;
}
}
return null;
}

删除元素过程涉及到的堆调整,和添加元素或弹出堆顶元素类似,唯一区别是前面两种是从数组末尾或数组的首开始调整,而上述方法是从中间位置开始调整的。逻辑都是一样的,唯一区别是在调整的位置。

小结

PriorityQueue 是一个优先级队列,内部使用数组作为存储数据的容器,元素的优先级依靠堆排序实现,排序的规则即可以根据元素自然顺序(元素实现 Comparable 接口),也可以根据自定义的元素比较器 Comparator。此外,该队列是一个无界队列(最大容量是 Integer.MAX_VALUE ,只会扩容不会缩容。

优先级队列 PriorityQueue 默认情况下是小顶堆,然而可以通过传入自定义的 Comparator 来实现大顶堆。注意,使用元素自然顺序只能实现小顶堆功能,从文中源码部分也能看出。具体如下:

构造小顶堆:PriorityQueue small=new PriorityQueue<>();
构造大顶堆:PriorityQueue small=new PriorityQueue<>(Collections.reverseOrder());

优先级队列 PriorityQueue 性能差,主要体现在添加元素、弹出元素时都有可能破坏堆的特性,为了保证堆的特性就需要调整堆。性能是和元素量成反比的。

优先级队列 PriorityQueue 是线程不安全的,从源码中也可以看出,没有使用锁机制,也没有使用 CAS 机制保证并发安全。与它对应的是 PriorityBlockingQueue 队列,这个队列是线程安全的,但是两者源码极其相似,后者使用了 ReentrantLock lock 和对应的 Condition notEmpty 以保证线程安全,其它的都类似。