并发 - ArrayBlockingQueue

前言

ArrayBlockingQueue 是 Java 并发包下一个以数组实现的阻塞队列,它是线程安全的。

数据模型

ArrayBlockingQueue 是使用一个固定长度的数组来存储元素的,并使用一系列属性来描述这个数组。虽然这个数组是固定大小,但是使用了复用数组的设计思想,它的空间可以循环写入。循环写入不是无条件地覆盖式写入,而是采用等待可用空间地方式进行写入。从工作原理来说,它有以下特点:

  • 循环数组有一个写指针 putIndex,表示在数组 items 中的要写入的位置。如果写指针 putIndex 已经指向了数组末尾,那么此时再写入元素,写指针 putIndex 就会重新指向数组的头部,下次添加元素就可以复用数组了。
  • 循环数组同时有一个读指针 takeIndex,表示在数组 items 中要读取的位置。如果读指针 takeIndex 指向了数组末尾,那么也会把读指针 takeIndex 重新指向数组头部,下次从数组头部开始继续读取元素。

下图展示了循环数组的写指针工作机制,读指针也是类似的。

循环写入非覆盖式写入,具体体现在下图,当写指针 putIndex 追上读指针 takeIndex(也就是队列满了) ,那么就必须阻塞等待读指针向前推进,只有读指针 takeIndex 向前推进了才能继续写。

同理,当读指针 takeIndex 追上写指针 putIndex(也就是队列空了),那么也必须阻塞等待写指针向前推进,保证队列不为空才能继续读取。

源码分析

ArrayBlockingQueue 相关源码结构如下:

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

/**
* 存放元素的数组
*/
final Object[] items;

/**
* 取元素时的下标;
* 每从队列中取出一个元素,takeIndex 递增 1,到数组尾部会重置为 0
*/
int takeIndex;

/**
* 添加元素的下标
* 每向队列添加一个元素,putIndex 递增 1,满了会重置为 0
*/
int putIndex;

/**
* 队列的元素个数
*/
int count;

/* 并发控制使用经典双条件算法。 */

/**
* 可重入独占锁
*/
final ReentrantLock lock;

/**
* 当队列为空时,会调用 notEmpty 的 wait 方法,让当前线程等待
*/
private final Condition notEmpty;

/**
* 当队列满了,会调用 notFull 的 wait 方法,让当前线程等待
*/
private final Condition notFull;

/* 并发控制使用经典双条件算法。 */
}

循环数组本身被设计为一个 Object 类型的素组 items ,然后 ArrayBlockingQueue 设计了以下的变量来描述循环数组的状态:

  • count: 记录队列中元素的个数。
  • putIndex: 记录循环数组的写指针,即循环数组接下来写数据时应该写入的位置。
  • takeIndex: 记录循环数组的读指针,即循环数组接下来读数据时应该读取的位置。
  • lock: 可重入独占锁,用来保证队列安全的。
  • notEmpty: 数组非空的等待条件,当队列为空时,会调用 notEmpty 的 wait 方法,让当前线程等待。
  • notFull: 数组非满的等待条件,当队列满了,会调用 notFull 的 wait 方法,让当前线程等待。

构造函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
*
* 根据指定的大小创建 fair 类型的 ArrayBlockingQueue;这个公平指的是锁的公平性
*
* @param capacity the capacity of this queue 队列大小
* @throws IllegalArgumentException if {@code capacity < 1} 大小 < 1 抛出异常
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
// 阻塞队列的大小必须 > 0
if (capacity <= 0)
throw new IllegalArgumentException();

// 初始化指定大小的 Object 数组
this.items = new Object[capacity];

// 创建可重入锁,是否公平取决于 fair
lock = new ReentrantLock(fair);

// 创建锁的条件对象,用来等待队列非空
notEmpty = lock.newCondition();

// 创建锁的条件对象,用来等待队列非满
notFull = lock.newCondition();
}

添加元素

  • add(): 在不超出队列容量的情况下,在队列尾部插入指定的元素,成功返回 true,如果队列已满则抛出异常。其底层使用的是 offer 方法。
  • offer(): 在不超出队列容量的情况下,在队列尾部插入指定的元素,成功时返回 true,如果队列已满则返回 false。
  • put(): 在队列尾部添加指定元素,如果队列满了,则阻塞等待,直到空间可用。

add()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* 如果可以在不超出队列容量的情况下立即插入指定元素,则在此队列的尾部插入指定元素,
* 成功时返回 {@code true},如果此队列已满则抛出 {@code IllegalStateException}。
*
* @param e the element to add 要添加的元素
* @return {@code true} (as specified by {@link Collection#add}) 添加成功返回 true
* @throws IllegalStateException if this queue is full 队列满了
* @throws NullPointerException if the specified element is null 要添加的元素 e 不能为空
*/
public boolean add(E e) {
// 调用父类的 add 方法
return super.add(e);
/**
* public boolean add(E e) {
* // 调用 offer 方法
* if (offer(e))
* return true;
* else
* throw new IllegalStateException("Queue full");
* }
*/
}

offer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 如果可以在不超出队列容量的情况下立即插入指定元素,则在此队列的尾部插入指定元素,成功时返回 {@code true},
* 如果此队列已满则返回 {@code false}。此方法通常比方法 {@link add} 更可取,后者仅通过抛出异常可能无法插入元素。
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
// 检查要插入的元素 e 不能为空
checkNotNull(e);

// 加锁
final ReentrantLock lock = this.lock;
lock.lock();
try {

// 如果队列满了,直接返回 false ,不插入元素
if (count == items.length)
return false;

// 如果队列没有满,则向队列的尾部插入元素 e ,插入成功后返回 true
else {
enqueue(e);
return true;
}

// 释放锁
} finally {
lock.unlock();
}
}

put()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
// 检查要添加的元素 e 不能为空
checkNotNull(e);

// 加获取可中断锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();

try {
// 判断队列是否满了,满了则阻塞在 notFull 等待被唤醒
while (count == items.length)
notFull.await();

// 队列不满,则在队列尾部插入元素
enqueue(e);

// 释放锁
} finally {
lock.unlock();
}
}

enqueue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 在 putIndex 下标位置添加元素;该方法仅在获取到锁时调用
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;

// 获取全局数组
final Object[] items = this.items;

// 在 putIndex 下标位置添加元素
items[putIndex] = x;

// 向后移动 putIndex ,如果队列满了,则重置 putIndex 为 0
if (++putIndex == items.length)
putIndex = 0;

// 数组元素个数累加
count++;

// 尝试唤醒阻塞在 notEmpty 上等待的线程(如果有的话)
notEmpty.signal();
}

该方法是添加元素的最底层方法,涉及到写指针 putIndex 的更新,以及尝试唤醒阻塞在 notEmpty 上等待的线程(如果有的话)。需要注意的是,判断队列是否满了的条件是:putIndex==items.length

获取元素

  • remove(): 从队列中删除指定元素(如果存在)。
  • poll(): 尝试删除并返回队列头部的元素,队列为空返回 null 即可。
  • take(): 阻塞式删除并返回队列头部的元素,如果队列为空会一直等待,直到有元素才返回。
  • peek(): 返回队列头部元素,但不从队列中删除该元素,队列为空则返回 null。

remove()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
 public boolean remove(Object o) {
// 要删除的元素为空,则返回 false,不进行处理
if (o == null) return false;

// 获取全局数组
final Object[] items = this.items;

// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 如果队列非空
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;

// 遍历数组,找到要删除的元素
do {
if (o.equals(items[i])) {
// 删除该元素并移动元素
removeAt(i);
return true;
}

// 数组是循环使用的
if (++i == items.length)
i = 0;

// 追上 putIndex ,说明已经遍历完了
} while (i != putIndex);
}

// 队列为空,或者没有找到要删除的元素
return false;

// 释放锁
} finally {
lock.unlock();
}
}
---
/**
* 删除数组下标 removeIndex 处的元素并移动元素使元素连续。仅在获取到锁时调用;
*/
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;

// 获取全局数组
final Object[] items = this.items;

// 如果要删除下标正好是下一个要取出的元素对应的下标,那么同取出元素操作逻辑
if (removeIndex == takeIndex) {
// removing front item; just advance
// 置空
items[takeIndex] = null;

// 更新 takeIndex 下标
if (++takeIndex == items.length)
takeIndex = 0;

// 更新队列元素个数
count--;

// 更新迭代器状态
if (itrs != null)
itrs.elementDequeued();

// 如果要删除的下标非 takeIndex ,那么说明就是"中间位置",需要移动元素以覆盖删除的位置
} else {
// an "interior" remove
// 以要删除的下标 removeIndex 为基准,"向前"移动元素,直到追上 putIndex
// todo 注意数组循环使用的情况
final int putIndex = this.putIndex;
for (int i = removeIndex; ; ) {
int next = i + 1;
// 从头开始
if (next == items.length)
next = 0;
// 没有追上 putIndex,就向前移动
if (next != putIndex) {
items[i] = items[next];
i = next;

// 后退 putIndex
} else {
items[i] = null;
this.putIndex = i;
break;
}
}

// 更新队列元素个数
count--;

// 更新迭代器
if (itrs != null)
itrs.removedAt(removeIndex);
}

// 尝试唤醒阻塞在 notFull 上等待的线程
notFull.signal();
}

注意:删除元素相对复杂一些,移动数组元素时要考虑数组是循环使用的。

poll()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 尝试获取队列头部的元素,队列为空返回 null 即可
*
* @return
*/
public E poll() {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();

try {
// 队列为空,返回 null ;队列非空返回 takeIndex 下标位置的元素
return (count == 0) ? null : dequeue();

// 释放锁
} finally {
lock.unlock();
}
}

上述方法是快速获取的方式,如果没有就立即返回 null。ArrayBlockingQueue 还支持超时获取元素,尽最大努力获取到元素。该方法是在 poll() 方法的基础上增加了超时等待条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 尝试超时获取元素
*
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 获取元素的超时时间
long nanos = unit.toNanos(timeout);

// 获取可中断锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();

try {
// 如果队列为空,等待 nanos 纳秒时长
while (count == 0) {
// 等待 nanos 纳秒后,队列还是为空
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}

// 执行到这里有两种可能: 1)队列非空,直接获取 2)队列为空,但在等待时长内队列又有元素了
return dequeue();

// 释放锁
} finally {
lock.unlock();
}
}

take()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 阻塞式获取元素
*
* @return
* @throws InterruptedException
*/
public E take() throws InterruptedException {
// 获取可中断锁
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();

try {
// 队列为空,则阻塞等待,直到队列有元素
while (count == 0)
notEmpty.await();

// 获取 takeIndex 下标位置的元素
return dequeue();

// 释放锁
} finally {
lock.unlock();
}
}

take() 方法相比超时等待的 poll() 方法,前者没有等待时间,会一直等待,直到队列有元素或线程被中断。

peek()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 读取队列头部元素,但不移除该元素;队列为空,则返回 null
*
* @return
*/
public E peek() {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();

// 直接从数组中获取下标为 takeIndex 的元素
try {
return itemAt(takeIndex); // null when queue is empty

// 释放锁
} finally {
lock.unlock();
}
}

dequeue()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 取出 takeIndex 下标位置的元素;该方法仅在获取到锁时调用;
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;

// 获取全局数组
final Object[] items = this.items;

// 获取 takeIndex 下标位置的元素
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];

// 设置 null,便于 GC
items[takeIndex] = null;

// 更新 takeIndex ,如果到达了数组的末尾,则重置 takeIndex 为 0 ,即下次从数组头开始取元素
if (++takeIndex == items.length)
takeIndex = 0;

// 数组元素个数递减
count--;

// 更新迭代器状态
if (itrs != null)
itrs.elementDequeued();

// 尝试唤醒阻塞在 notFull 上等待的线程(如果有的话)
notFull.signal();

// 返回元素
return x;
}

该方法是获取元素的最底层方法,涉及到读指针 takeIndex 的更新,以及尝试唤醒阻塞在 notFull 上等待的线程(如果有的话)。

使用场景

典型的使用场景就是生产者-消费者问题。

小结

本篇文章简单介绍了 ArrayBlockingQueue 的模型,并分析了核心的源码。核心点有以下两点:

  • 以固定大小数组作为元素的容器,并定义系列描述数组的属性。这个固定大小的数组是可以循环使用的。存取数据遵循:队头取、队尾插,从读写指针也可以看出。
  • 基本的增、删、查都要先加锁才能操作,使用的锁是 Reentrant 保证了安全性,并使用等待通知机制来实现阻塞的功能。也就是,同一时刻只有一个线程可操作队列。