并发 - LinkedBlockingQueue

概述

LinkedBlockingQueue 是一个基于链表(单链表)实现的先进先出的阻塞队列,采用双锁技术,针对入队和出队操作分别使用了一把可重入的独占锁,在并发度上有一定的提升。

源码分析

LinkedBlockingQueue 相关的源码结构如下:

数据结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/**
* 链表节点
*/
static class Node<E> {
/**
* 存放的元素
*/
E item;

/**
* 后继指针
*/
Node<E> next;

/**
* 节点封装元素
*/
Node(E x) {
item = x;
}
}

/**
* 队列的容量,默认容量是 Integer.MAX_VALUE
*/
private final int capacity;

/**
* 队列中元素的个数
*/
private final AtomicInteger count = new AtomicInteger();

/**
* 队列的头节点,其封装的元素一直都是 null
*/
transient Node<E> head;

/**
* 队列的尾节点,last.next 一直是 null
*/
private transient Node<E> last;

/**
* 出队 take、poll 使用的锁
*/
private final ReentrantLock takeLock = new ReentrantLock();

/**
* 等待出队的条件,即队列非空条件
*/
private final Condition notEmpty = takeLock.newCondition();

/**
* 入队 put、offer 使用的锁
*/
private final ReentrantLock putLock = new ReentrantLock();

/**
* 等待入队的条件,即队列非满条件
*/
private final Condition notFull = putLock.newCondition();
}

LinkedBlockingQueue 用于存放元素的结构是一个单向链表,链表中的每个节点是一个 Node 结构。此外,LinkedBlockingQueue 定义了以下的变量来描述和维护这个单向链表:

  • capacity: 队列的容量,即约束单向链表节点数量;
  • count: 队列中元素的个数,即单向链表有效节点的数量;
  • head: 单向链表的头节点,不存储元素,起到获取链表节点的作用;
  • last: 单向链表的尾节点,用于添加元素时构建链表;
  • takeLock、notEmpty: 队列元素出队的锁和出队的等待条件;
  • putLock、notFull: 元素入队的锁和入队的等待条件;

相比 ArrayBlockingQueue 内部定义一个循环数组,出队和入队用的是同一把锁,LinkedBlockingQueue 内部则定义了一个链表结构,入队用的是 putLock 锁,出队用的是 takeLock 锁。可以看出,后者在并发度上更好一些。

这里我们可能有个疑问,那就是两把锁是怎么保证线程安全的呢?这个问题的答案留在后文。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 默认无界的队列
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

/**
* 创建给定容量的队列
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
// 容量大小不能 <= 0
if (capacity <= 0) throw new IllegalArgumentException();

// 设置队列容量大小
this.capacity = capacity;

// 初始化链表
last = head = new Node<E>(null);
}

从 LinkedBlockingQueue 的构造方法可知:

  • LinkedBlockingQueue 队列默认是无界的,也就是队列大小为 Integer.MAX_VALUE 。在使用的时候,我们最好传入容量,不然会有内存溢出的风险。
  • LinkedBlockingQueue 队列内部的链表是在构造方法中初始化的

添加元素

添加元素的方法,主要的流程是判断队列满的情况的处理,以及添加元素成功后尝试唤醒阻塞等待元素的线程。添加元素是在队列尾部添加。

put()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

/**
*
* 在此队列的尾部插入指定元素,如有必要,等待空间可用。
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
// 要添加的元素不能为空
if (e == null) throw new NullPointerException();

// 初始值为 -1
int c = -1;

// 创建一个节点存元素 e
Node<E> node = new Node<E>(e);

// 获取入队锁
final ReentrantLock putLock = this.putLock;

// 获取队列中元素个数
final AtomicInteger count = this.count;
putLock.lockInterruptibly();

try {
// 队列满了,则进入非满条件队列进行等待
while (count.get() == capacity) {
notFull.await();
}

// 入队,链表操作,尾插法
enqueue(node);

// 获取并更新队列中元素个数
c = count.getAndIncrement();

// 如果 c+1 < 队列容量,说明队列还可以继续添加元素,则唤醒在非满条件队列中等待的线程
// 这里 c+1 是因为前面已经加入了一个元素
if (c + 1 < capacity)
notFull.signal();

// 释放入队锁
} finally {
putLock.unlock();
}

// c == 0 说明原来queue是空的, 现在添加了元素,所以这里 signalNotEmpty 一下, 唤醒正在 poll/take 等待中的线程
if (c == 0)
signalNotEmpty();
}

put 方法主要包括以下流程:

  • 获取可中断的入队锁;
  • 判断队列是否满了,满了则进入非满条件队列进行等待;
  • 元素入队,链表的尾插操作;
  • 元素入队后,如果队列还没满,那么可以继续添加元素,则唤醒在非满条件队列中等待的线程;
  • 释放可中断的入队锁;
  • 如果在添加元素之前,队列是空的,那么尝试唤醒正在 poll/take 等待中的线程;
enqueue()
1
2
3
4
5
6
7
8
9
10
11
12

/**
* 元素入队底层操作
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
// 链表尾插法
last = last.next = node;
}
signalNotEmpty()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* 唤醒等待获取元素的线程。仅从 put/offer 调用(通常不会锁定 takeLock。)
*/
private void signalNotEmpty() {
// 唤醒前先拿到出队锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();

try {
// 唤醒阻塞在非空条件队列中的某个线程,以使其继续获取元素
notEmpty.signal();

// 释放出队锁
} finally {
takeLock.unlock();
}
}

offer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* 在不超出队列容量的情况下立即在此队列的尾部插入指定元素,成功时返回 true,如果此队列已满则返回 false。
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
// 要添加的元素不能为空
if (e == null) throw new NullPointerException();

// 队列中元素的个数
final AtomicInteger count = this.count;

// 如果队列已满,直接返回 false
if (count.get() == capacity)
return false;

int c = -1;

// 创建一个节点存元素 e
Node<E> node = new Node<E>(e);

// 获取入队锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {

// 如果队列没有满,则入队元素,即链表尾插法
if (count.get() < capacity) {
enqueue(node);

// 获取并更新队列中元素个数
c = count.getAndIncrement();

// 如果 c+1 < 队列容量,说明队列还可以继续添加元素,则唤醒在非满条件队列中等待的线程
// 这里 c+1 是因为前面已经加入了一个元素
if (c + 1 < capacity)
notFull.signal();
}

// 释放入队锁
} finally {
putLock.unlock();
}

// c == 0 说明原来queue是空的, 现在添加了元素,所以这里 signalNotEmpty 一下, 唤醒正在 poll/take 等待中的线程
if (c == 0)
signalNotEmpty();

// 如果 count.get() < capacity 不成立,直接释放锁,此时 c==-1
return c >= 0;
}

可以看出上述方法和 put 方法基本一致,唯一区别点在,如果队列满了直接返回失败,不阻塞等待。

LinkedBlockingQueue 基于 offer 方法进行了优化,支持超时添加元素的功能,是在 put 方法和 offer 方法之间取了个折中。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* 在此队列的尾部插入指定元素,如有必要,等待指定的等待时间以使空间可用。
*
* 和 put 方法基本类似,唯一区别是,该方法不会一直阻塞等待,有个最大等待时长
*
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

// 要添加的元素不能为空
if (e == null) throw new NullPointerException();

// 超时等待时间
long nanos = unit.toNanos(timeout);

int c = -1;

// 获取入队锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();

try {
// 如果队列满了,则等待指定的时间,尽最大努力添加元素
while (count.get() == capacity) {
// 超时了还没有空的空间,那么就不添加了
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}

// 入队,可能条件如下:
// 1)队列没有满 2)在指定的超时时间内有空的空间了,线程提前被唤醒
enqueue(new Node<E>(e));

// 获取并更新队列中元素个数
c = count.getAndIncrement();

// 如果 c+1 < 队列容量,说明队列还可以继续添加元素,则唤醒在非满条件队列中等待的线程
// 这里 c+1 是因为前面已经加入了一个元素
if (c + 1 < capacity)
notFull.signal();

// 释放入队锁
} finally {
putLock.unlock();
}

// c == 0 说明原来queue是空的, 现在添加了元素,所以这里 signalNotEmpty 一下, 唤醒正在 poll/take 等待中的线程
if (c == 0)
signalNotEmpty();

return true;
}

获取元素

获取元素的方法,主要的流程是判断队列空的情况的处理,以及获取元素成功后尝试唤醒阻塞添加元素的线程。获取元素是从队列头部获取,也就是链表的第一个有效节点(非头节点)。

take()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* 出队
*
* @return
* @throws InterruptedException
*/
public E take() throws InterruptedException {
E x;
int c = -1;

// 队列中元素的个数
final AtomicInteger count = this.count;

// 获取可中断出队锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();

try {
// 如果队列为空,则进入 notEmpty 等待队列进行等待队列非空
while (count.get() == 0) {
notEmpty.await();
}

// 出队,链表操作
x = dequeue();

// 获取并更新队列中元素的个数
c = count.getAndDecrement();

// 说明队列中还有元素,唤醒在 notEmpty 等待队列进行等待的线程
if (c > 1)
notEmpty.signal();

// 释放出队锁
} finally {
takeLock.unlock();
}

// c == capacity 说明队列在本次出队之前是满的,现在出队了一个元素,有空的空间了,可以唤醒 put/offer 操作阻塞的线程(如果有的话)
if (c == capacity)
signalNotFull();

return x;
}

take 方法主要包括以下流程:

  • 获取可中断的出队锁;
  • 判断队列是否空的,空了则进入非空条件队列进行等待;
  • 元素出队,链表操作;
  • 元素出队后,如果队列还是非空的,那么可以继续出队,则唤醒在非空条件队列中等待的线程;
  • 释放可中断的出队锁;
  • 如果在获取元素之前,队列是满的,那么尝试唤醒正在 put/offer 等待中的线程;
dequeue()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 出队底层操作
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head; // head 是一个虚节点
Node<E> first = h.next; // 获取虚节点后一个节点,也就是真正的节点
h.next = h; // help GC
head = first; // 重新设置 head
E x = first.item; // 获取出队的值
first.item = null; // 置空元素
return x;
}
signalNotFull()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 唤醒等待添加元素的线程。仅从 take/poll 调用
*/
private void signalNotFull() {
// 唤醒前先拿到入队锁
final ReentrantLock putLock = this.putLock;
putLock.lock();


try {
// 唤醒阻塞在非满条件队列中的某个线程
notFull.signal();

// 释放入队锁
} finally {
putLock.unlock();
}
}

poll()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 出队
*
* @return
*/
public E poll() {
// 获取队列中元素的个数
final AtomicInteger count = this.count;

// 如果队列为空,则直接返回 null
if (count.get() == 0)
return null;

E x = null;
int c = -1;

// 获取出队锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();

try {

// 如果队列不为空,那么出队
if (count.get() > 0) {
// 出队
x = dequeue();

// 获取并更新队列中元素的个数
c = count.getAndDecrement();

// 说明队列中还有元素,唤醒在 notEmpty 等待队列进行等待的线程
if (c > 1)
notEmpty.signal();
}

// 释放出队锁
} finally {
takeLock.unlock();
}

// c == capacity 说明队列在本次出队之前是满的,现在出队了一个元素,有空的空间了,可以唤醒 put/offer 操作阻塞的线程(如果有的话)
if (c == capacity)
signalNotFull();

return x;
}

上述方法和 take 方法基本一致,唯一区别点是,队列为空时直接返回,不会阻塞等待元素。

LinkedBlockingQueue 基于 poll 方法进行了优化,支持超时获取元素的功能,是在 take 和 poll 方法之间取个折中。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/**
* 超时出队
*
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;

// 超时等待时长
long nanos = unit.toNanos(timeout);

// 获取队列元素的个数
final AtomicInteger count = this.count;

// 获取可中断出队锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();

try {
// 队列为空,则进入超时等待
while (count.get() == 0) {
// 等待超时也没有元素
if (nanos <= 0)
return null;
// 超时等待
nanos = notEmpty.awaitNanos(nanos);
}

// 出队,可能情况如下:
// 1)队列没有空 2)在指定的超时时间内队列又有元素了,线程提前被唤醒
x = dequeue();

// 获取并更新队列中元素的个数
c = count.getAndDecrement();

// 说明队列中还有元素,唤醒在 notEmpty 等待队列进行等待的线程
if (c > 1)
notEmpty.signal();

// 释放出队锁
} finally {
takeLock.unlock();
}

// c == capacity 说明队列在本次出队之前是满的,现在出队了一个元素,有空的空间了,可以唤醒 put/offer 操作阻塞的线程(如果有的话)
if (c == capacity)
signalNotFull();

return x;
}

peek()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 获取但不移除队列头部元素
*
* @return
*/
public E peek() {
// 队列为空,直接返回 null
if (count.get() == 0)
return null;

// 获取出队锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();


try {
// 获取有效节点
Node<E> first = head.next;

// 返回元素
if (first == null)
return null;
else
return first.item;

// 释放出队列锁
} finally {
takeLock.unlock();
}
}

遍历链表操作

前面介绍了 LinkedBlockingQueue 的入队和出队操作,它们也是最核心的方法。其中入队时都会先获取入队锁,获取成功才可以执行后续的入队流程;出队时也是一样,先获取出队锁,获取成功才可以执行后续的出队流程。这两类操作各自用到了一把锁,这两把锁各自独立。

在涉及到遍历链表的操作时,需要同时使用两把锁,也就是入队锁和出队锁,因为遍历链表的过程不能有出队和入队的操作,否则就不安全了。具体方法包括:判断是否包含 contains 、删除元素 remove、清除 clear、toString、toArray 等。

相比其他操作,入队和出队使用的更频繁。入队和出队虽然各自使用一把锁,但是结合等待通知机制,这两个类型操作是安全的。这两类操作外的遍历链表操作,会同时使用两把锁,因此总体还是线程安全的。

至此,我们就知道了 LinkedBlockingQueue 为什么可以保证线程安全,并且并发度相对更大。

ArrayBlockingQueue VS LinkedBlockingQueue

  1. ArrayBlockingQueue 是有界的,而 LinkedBlockingQueue 默认是无界的。在使用 LinkedBlockingQueue 时要考虑内存实际使用问题,防止内存溢出问题的发生。
  2. ArrayBlockingQueue 内部使用一个锁来控制元素的添加和取出操作,而 LinkedBlockingQueue 则是使用两个锁来控制。可以看出,前者不管是添加还是获取元素,都可能被阻塞,而后者在添加和获取时分别使用了两个锁,只会相同操作有阻塞可能,因此性能方面后者更强。
  3. ArrayBlockingQueue 内部使用的是固定内存,而 LinkedBlockingQueue 内部使用的是动态内存,无论是分配内存还是释放内存(甚至GC),动态内存的性能自然都会比固定内存要差。

ArrayBlockingQueue 双锁问题

LinkedBlockingQueue 使用两个锁来保证并发安全,性能更好。那为什么 ArrayBlockingQueue 不使用双锁实现呢?

在不考虑设计的情况下,ArrayBlockingQueue 完全可以按照 LinkedBlockingQueue 的双锁设计思路无脑式实现。但是不要忘了,忽略了最初的设计,那么就失去了初衷,存在即合理嘛。

此外,ArrayBlockingQueue 还支持公平的特性,这是 LinkedBlockingQueue 双锁没有实现的。

小结

LinkedBlockingQueue 是一个基于链表(单链表)实现的先进先出的阻塞队列。在入队和出队时(不含删除、清除),分别使用两把独占锁保证安全,也提高了并发度,其他操作基本都会同时使用两把锁,因此线程安全问题没问题。

LinkedBlockingQueue 虽然性能比 ArrayBlockingQueue 好,但是在内存处理上却不如后者好。此外,ConcurrentLinkedQueue 不再使用锁机制,而是采用 CAS 的方式,在性能上更强于 LinkedBlockingQueue 。