前言 ArrayBlockingQueue 是 Java 并发包下一个以数组 实现的阻塞队列,它是线程安全 的。
数据模型 ArrayBlockingQueue 是使用一个固定长度的数组来存储元素的,并使用一系列属性来描述这个数组。虽然这个数组是固定大小,但是使用了复用数组 的设计思想,它的空间可以循环写入。循环写入不是无条件地覆盖式写入,而是采用等待可用空间地方式进行写入。从工作原理来说,它有以下特点:
循环数组有一个写指针 putIndex,表示在数组 items 中的要写入的位置。如果写指针 putIndex 已经指向了数组末尾,那么此时再写入元素,写指针 putIndex 就会重新指向数组的头部,下次添加元素就可以复用数组了。
循环数组同时有一个读指针 takeIndex,表示在数组 items 中要读取的位置。如果读指针 takeIndex 指向了数组末尾,那么也会把读指针 takeIndex 重新指向数组头部,下次从数组头部开始继续读取元素。
下图展示了循环数组的写指针工作机制,读指针也是类似的。
循环写入非覆盖式写入,具体体现在下图,当写指针 putIndex 追上读指针 takeIndex(也就是队列满了) ,那么就必须阻塞等待读指针向前推进,只有读指针 takeIndex 向前推进了才能继续写。
同理,当读指针 takeIndex 追上写指针 putIndex(也就是队列空了),那么也必须阻塞等待写指针向前推进,保证队列不为空才能继续读取。
源码分析 ArrayBlockingQueue 相关源码结构如下:
数据结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public class ArrayBlockingQueue <E > extends AbstractQueue <E > implements BlockingQueue <E >, java .io .Serializable { final Object[] items; int takeIndex; int putIndex; int count; final ReentrantLock lock; private final Condition notEmpty; private final Condition notFull; }
循环数组本身被设计为一个 Object 类型的素组 items ,然后 ArrayBlockingQueue 设计了以下的变量来描述循环数组的状态:
count: 记录队列中元素的个数。
putIndex: 记录循环数组的写指针,即循环数组接下来写数据时应该写入的位置。
takeIndex: 记录循环数组的读指针,即循环数组接下来读数据时应该读取的位置。
lock: 可重入独占锁,用来保证队列安全的。
notEmpty: 数组非空的等待条件,当队列为空时,会调用 notEmpty 的 wait 方法,让当前线程等待。
notFull: 数组非满的等待条件,当队列满了,会调用 notFull 的 wait 方法,让当前线程等待。
构造函数 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public ArrayBlockingQueue (int capacity) { this (capacity, false ); } public ArrayBlockingQueue (int capacity, boolean fair) { if (capacity <= 0 ) throw new IllegalArgumentException(); this .items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
添加元素
add(): 在不超出队列容量的情况下,在队列尾部插入指定的元素,成功返回 true,如果队列已满则抛出异常。其底层使用的是 offer 方法。
offer(): 在不超出队列容量的情况下,在队列尾部插入指定的元素,成功时返回 true,如果队列已满则返回 false。
put(): 在队列尾部添加指定元素,如果队列满了,则阻塞等待,直到空间可用。
add() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public boolean add (E e) { return super .add(e); }
offer() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public boolean offer (E e) { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lock(); try { if (count == items.length) return false ; else { enqueue(e); return true ; } } finally { lock.unlock(); } }
put() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public void put (E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
enqueue() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private void enqueue (E x) { final Object[] items = this .items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0 ; count++; notEmpty.signal(); }
该方法是添加元素的最底层方法,涉及到写指针 putIndex 的更新,以及尝试唤醒阻塞在 notEmpty 上等待的线程(如果有的话)。需要注意的是,判断队列是否满了的条件是:putIndex==items.length
。
获取元素
remove(): 从队列中删除指定元素(如果存在)。
poll(): 尝试删除并返回队列头部的元素,队列为空返回 null 即可。
take(): 阻塞式删除并返回队列头部的元素,如果队列为空会一直等待,直到有元素才返回。
peek(): 返回队列头部元素,但不从队列中删除该元素,队列为空则返回 null。
remove() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 public boolean remove (Object o) { if (o == null ) return false ; final Object[] items = this .items; final ReentrantLock lock = this .lock; lock.lock(); try { if (count > 0 ) { final int putIndex = this .putIndex; int i = takeIndex; do { if (o.equals(items[i])) { removeAt(i); return true ; } if (++i == items.length) i = 0 ; } while (i != putIndex); } return false ; } finally { lock.unlock(); } } --- void removeAt (final int removeIndex) { final Object[] items = this .items; if (removeIndex == takeIndex) { items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); } else { final int putIndex = this .putIndex; for (int i = removeIndex; ; ) { int next = i + 1 ; if (next == items.length) next = 0 ; if (next != putIndex) { items[i] = items[next]; i = next; } else { items[i] = null ; this .putIndex = i; break ; } } count--; if (itrs != null ) itrs.removedAt(removeIndex); } notFull.signal(); }
注意: 删除元素相对复杂一些,移动数组元素时要考虑数组是循环使用的。
poll() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public E poll () { final ReentrantLock lock = this .lock; lock.lock(); try { return (count == 0 ) ? null : dequeue(); } finally { lock.unlock(); } }
上述方法是快速获取的方式,如果没有就立即返回 null。ArrayBlockingQueue 还支持超时获取元素,尽最大努力获取到元素。该方法是在 poll() 方法的基础上增加了超时等待条件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public E poll (long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) { if (nanos <= 0 ) return null ; nanos = notEmpty.awaitNanos(nanos); } return dequeue(); } finally { lock.unlock(); } }
take() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public E take () throws InterruptedException { final ReentrantLock lock = this .lock; lock.lockInterruptibly(); try { while (count == 0 ) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } }
take() 方法相比超时等待的 poll() 方法,前者没有等待时间,会一直等待,直到队列有元素或线程被中断。
peek() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public E peek () { final ReentrantLock lock = this .lock; lock.lock(); try { return itemAt(takeIndex); } finally { lock.unlock(); } }
dequeue() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private E dequeue () { final Object[] items = this .items; @SuppressWarnings ("unchecked" ) E x = (E) items[takeIndex]; items[takeIndex] = null ; if (++takeIndex == items.length) takeIndex = 0 ; count--; if (itrs != null ) itrs.elementDequeued(); notFull.signal(); return x; }
该方法是获取元素的最底层方法,涉及到读指针 takeIndex 的更新,以及尝试唤醒阻塞在 notFull 上等待的线程(如果有的话)。
使用场景 典型的使用场景就是生产者-消费者问题。
小结 本篇文章简单介绍了 ArrayBlockingQueue 的模型,并分析了核心的源码。核心点有以下两点:
以固定大小数组作为元素的容器,并定义系列描述数组的属性。这个固定大小的数组是可以循环使用的。存取数据遵循:队头取、队尾插,从读写指针也可以看出。
基本的增、删、查都要先加锁才能操作,使用的锁是 Reentrant 保证了安全性,并使用等待通知机制来实现阻塞的功能。也就是,同一时刻只有一个线程可操作队列。