概述 LinkedBlockingQueue 是一个基于链表(单链表) 实现的先进先出的阻塞队列,采用双锁技术 ,针对入队和出队操作分别使用了一把可重入的独占锁,在并发度上有一定的提升。
源码分析 LinkedBlockingQueue 相关的源码结构如下:
数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public class LinkedBlockingQueue <E > extends AbstractQueue <E > implements BlockingQueue <E >, java .io .Serializable { static class Node <E > { E item; Node<E> next; Node(E x) { item = x; } } private final int capacity; private final AtomicInteger count = new AtomicInteger(); transient Node<E> head; private transient Node<E> last; private final ReentrantLock takeLock = new ReentrantLock(); private final Condition notEmpty = takeLock.newCondition(); private final ReentrantLock putLock = new ReentrantLock(); private final Condition notFull = putLock.newCondition(); }
LinkedBlockingQueue 用于存放元素的结构是一个单向链表 ,链表中的每个节点是一个 Node 结构。此外,LinkedBlockingQueue 定义了以下的变量来描述和维护 这个单向链表:
capacity: 队列的容量,即约束单向链表节点数量;
count: 队列中元素的个数,即单向链表有效节点的数量;
head: 单向链表的头节点,不存储元素,起到获取链表节点的作用;
last: 单向链表的尾节点,用于添加元素时构建链表;
takeLock、notEmpty: 队列元素出队的锁和出队的等待条件;
putLock、notFull: 元素入队的锁和入队的等待条件;
相比 ArrayBlockingQueue 内部定义一个循环数组,出队和入队用的是同一把锁,LinkedBlockingQueue 内部则定义了一个链表结构,入队用的是 putLock 锁,出队用的是 takeLock 锁。可以看出,后者在并发度上更好一些。
这里我们可能有个疑问,那就是两把锁是怎么保证线程安全的呢?这个问题的答案留在后文。
构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public LinkedBlockingQueue () { this (Integer.MAX_VALUE); } public LinkedBlockingQueue (int capacity) { if (capacity <= 0 ) throw new IllegalArgumentException(); this .capacity = capacity; last = head = new Node<E>(null ); }
从 LinkedBlockingQueue 的构造方法可知:
LinkedBlockingQueue 队列默认是无界的,也就是队列大小为 Integer.MAX_VALUE
。在使用的时候,我们最好传入容量,不然会有内存溢出的风险。
LinkedBlockingQueue 队列内部的链表是在构造方法中初始化的 。
添加元素 添加元素的方法,主要的流程是判断队列满的情况的处理,以及添加元素成功后尝试唤醒阻塞等待元素的线程。添加元素是在队列尾部添加。
put() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public void put (E e) throws InterruptedException { if (e == null ) throw new NullPointerException(); int c = -1 ; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); }
put 方法主要包括以下流程:
获取可中断的入队锁;
判断队列是否满了,满了则进入非满条件队列进行等待;
元素入队,链表的尾插操作;
元素入队后,如果队列还没满,那么可以继续添加元素,则唤醒在非满条件队列中等待的线程;
释放可中断的入队锁;
如果在添加元素之前,队列是空的,那么尝试唤醒正在 poll/take 等待中的线程;
enqueue() 1 2 3 4 5 6 7 8 9 10 11 12 private void enqueue (Node<E> node) { last = last.next = node; }
signalNotEmpty() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void signalNotEmpty () { final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { notEmpty.signal(); } finally { takeLock.unlock(); } }
offer() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public boolean offer (E e) { if (e == null ) throw new NullPointerException(); final AtomicInteger count = this .count; if (count.get() == capacity) return false ; int c = -1 ; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this .putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); return c >= 0 ; }
可以看出上述方法和 put 方法基本一致,唯一区别点在,如果队列满了直接返回失败,不阻塞等待。
LinkedBlockingQueue 基于 offer 方法进行了优化,支持超时添加元素的功能,是在 put 方法和 offer 方法之间取了个折中。具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 public boolean offer (E e, long timeout, TimeUnit unit) throws InterruptedException { if (e == null ) throw new NullPointerException(); long nanos = unit.toNanos(timeout); int c = -1 ; final ReentrantLock putLock = this .putLock; final AtomicInteger count = this .count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { if (nanos <= 0 ) return false ; nanos = notFull.awaitNanos(nanos); } enqueue(new Node<E>(e)); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0 ) signalNotEmpty(); return true ; }
获取元素 获取元素的方法,主要的流程是判断队列空的情况的处理,以及获取元素成功后尝试唤醒阻塞添加元素的线程。获取元素是从队列头部获取,也就是链表的第一个有效节点(非头节点)。
take() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public E take () throws InterruptedException { E x; int c = -1 ; final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
take 方法主要包括以下流程:
获取可中断的出队锁;
判断队列是否空的,空了则进入非空条件队列进行等待;
元素出队,链表操作;
元素出队后,如果队列还是非空的,那么可以继续出队,则唤醒在非空条件队列中等待的线程;
释放可中断的出队锁;
如果在获取元素之前,队列是满的,那么尝试唤醒正在 put/offer 等待中的线程;
dequeue() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 private E dequeue () { Node<E> h = head; Node<E> first = h.next; h.next = h; head = first; E x = first.item; first.item = null ; return x; }
signalNotFull() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void signalNotFull () { final ReentrantLock putLock = this .putLock; putLock.lock(); try { notFull.signal(); } finally { putLock.unlock(); } }
poll() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public E poll () { final AtomicInteger count = this .count; if (count.get() == 0 ) return null ; E x = null ; int c = -1 ; final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { if (count.get() > 0 ) { x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
上述方法和 take 方法基本一致,唯一区别点是,队列为空时直接返回,不会阻塞等待元素。
LinkedBlockingQueue 基于 poll 方法进行了优化,支持超时获取元素的功能,是在 take 和 poll 方法之间取个折中。具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 public E poll (long timeout, TimeUnit unit) throws InterruptedException { E x = null ; int c = -1 ; long nanos = unit.toNanos(timeout); final AtomicInteger count = this .count; final ReentrantLock takeLock = this .takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0 ) { if (nanos <= 0 ) return null ; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1 ) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
peek() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public E peek () { if (count.get() == 0 ) return null ; final ReentrantLock takeLock = this .takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null ) return null ; else return first.item; } finally { takeLock.unlock(); } }
遍历链表操作 前面介绍了 LinkedBlockingQueue 的入队和出队操作,它们也是最核心的方法。其中入队时都会先获取入队锁,获取成功才可以执行后续的入队流程;出队时也是一样,先获取出队锁,获取成功才可以执行后续的出队流程。这两类操作各自用到了一把锁,这两把锁各自独立。
在涉及到遍历链表的操作时,需要同时使用两把锁,也就是入队锁和出队锁 ,因为遍历链表的过程不能有出队和入队的操作,否则就不安全了。具体方法包括:判断是否包含 contains 、删除元素 remove、清除 clear、toString、toArray 等。
相比其他操作,入队和出队使用的更频繁。入队和出队虽然各自使用一把锁,但是结合等待通知机制 ,这两个类型操作是安全的。这两类操作外的遍历链表操作,会同时使用两把锁,因此总体还是线程安全的。
至此,我们就知道了 LinkedBlockingQueue 为什么可以保证线程安全,并且并发度相对更大。
ArrayBlockingQueue VS LinkedBlockingQueue
ArrayBlockingQueue 是有界的,而 LinkedBlockingQueue 默认是无界的。在使用 LinkedBlockingQueue 时要考虑内存实际使用问题,防止内存溢出问题的发生。
ArrayBlockingQueue 内部使用一个锁来控制元素的添加和取出操作,而 LinkedBlockingQueue 则是使用两个锁来控制。可以看出,前者不管是添加还是获取元素,都可能被阻塞,而后者在添加和获取时分别使用了两个锁,只会相同操作有阻塞可能,因此性能方面后者更强。
ArrayBlockingQueue 内部使用的是固定内存,而 LinkedBlockingQueue 内部使用的是动态内存,无论是分配内存还是释放内存(甚至GC),动态内存的性能自然都会比固定内存要差。
ArrayBlockingQueue 双锁问题 LinkedBlockingQueue 使用两个锁来保证并发安全,性能更好。那为什么 ArrayBlockingQueue 不使用双锁实现呢?
在不考虑设计的情况下,ArrayBlockingQueue 完全可以按照 LinkedBlockingQueue 的双锁设计思路无脑式实现。但是不要忘了,忽略了最初的设计,那么就失去了初衷,存在即合理嘛。
此外,ArrayBlockingQueue 还支持公平的特性,这是 LinkedBlockingQueue 双锁没有实现的。
小结 LinkedBlockingQueue 是一个基于链表(单链表)实现的先进先出的阻塞队列。在入队和出队时(不含删除、清除),分别使用两把独占锁保证安全,也提高了并发度,其他操作基本都会同时使用两把锁,因此线程安全问题没问题。
LinkedBlockingQueue 虽然性能比 ArrayBlockingQueue 好,但是在内存处理上却不如后者好。此外,ConcurrentLinkedQueue 不再使用锁机制,而是采用 CAS 的方式,在性能上更强于 LinkedBlockingQueue 。