前言 在 JDK 的并发包中提供了几个非常有用的并发工具类。 CountDownLatch
、CyclicBarrier
和 Semaphore
工具类提供了并发流程控制的手段,它们都是对 AQS 共享模式 的应用。本篇文章将介绍其简单使用以及内部原理。
工具类
作用
说明
Semaphore
信号量,通过控制 ‘许可证’ 的数量来协调各个线程,以保证合理的使用公共资源。
线程只有拿到 ‘许可证’ 才能继续运行
CyclicBarrier
循环栅栏,让一组线程到达一个栅栏(同步点)时被阻塞,直到最后一个线程到达栅栏时,被栅栏拦截的线程才会继续运行。
强调一组线程都到达同步点才会继续往下执行
CountDownLatch
门栓,等待多线程完成
强调一个或多个线程等待其它线程完成操作
CountDownLatch 使用例子 场景
加工厂生产产品,产品需要三道工序进行检测,只有三道工序检测通过才能进入下一个环节。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Slf 4jpublic class CountDownLatchDemo { public static ExecutorService service = Executors.newFixedThreadPool(5 ); public static void main (String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(3 ); for (int i = 1 ; i <= 3 ; i++) { final int no = i; service.submit(() -> { try { Thread.sleep((long ) (Math.random() * 10000 )); log.info("No." + no + " 完成检测。" ); } catch (InterruptedException e) { e.printStackTrace(); } finally { latch.countDown(); } } ); } log.info("产品质量检测中....." ); latch.await(); log.info("产品质量检测完毕,进入下一个环节。" ); } }
打印结果
1 2 3 4 5 [main] INFO com.code.juc.tool.CountDownLatchDemo - 产品质量检测中..... [pool-1-thread-2] INFO com.code.juc.tool.CountDownLatchDemo - No.2 完成检测。 [pool-1-thread-3] INFO com.code.juc.tool.CountDownLatchDemo - No.3 完成检测。 [pool-1-thread-1] INFO com.code.juc.tool.CountDownLatchDemo - No.1 完成检测。 [main] INFO com.code.juc.tool.CountDownLatchDemo - 产品质量检测完毕,进入下一个环节。
说明
以上例子中,main 线程调用了 latch.await()
进行阻塞等待,即它阻塞在门栓上(叫啥无所谓,中文是门栓、栅栏),只有当条件满足时(其它线程调用 latch.countDown()
递减 state 为0)它才能通过这个门栓。这个例子比较简单,只有一个线程调用 await
方法等待其它线程完成,这属于 一对多
关系。CountDownLatch 还可以实现复杂的 多对多
关系的场景,有 m 个线程在门栓上等待 n 个线程完成任务,直到 n 个线程都完成任务,这 m 个线程才能同时通过门栓。
源码分析 根据 CountDownLatch 的使用例子分析源码,按照执行流程逐一分析。
属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L ; Sync(int count) { setState(count); } int getCount () { return getState(); } @Override protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } @Override protected boolean tryReleaseShared (int releases) { for (; ; ) { int c = getState(); if (c == 0 ) { return false ; } int nextc = c - 1 ; if (compareAndSetState(c, nextc)) { return nextc == 0 ; } } } } private final Sync sync; public CountDownLatch (int count) { if (count < 0 ) { throw new IllegalArgumentException("count < 0" ); } this .sync = new Sync(count); } }
CountDownLatch 类是对 AQS 共享模式的使用。既然是使用 AQS 框架,那么就是一个固定的模式,AQS 已经处理好了同步状态的获取与释放以及阻塞与唤醒,自定义组件只需继承 AQS 以及根据同步状态获取方式(独占/共享)实现模版方法即可。前面也说了,AQS 准备好了一切,只需要条件触发就可以执行对应的任务,而实现的模版方法正是触发条件。
CountDownLatch 主要有两个核心方法,await
和 countDown
。countDown
方法每次调用都会将 state
减 1 ,直到 state
的值为 0。await
方法可以被多个线程调用,调用 await
方法的线程进入 AQS 的阻塞队列中并挂起,当且仅当 state
为 0 时,线程会从阻塞队列中依次被唤醒过来。
await 等待 await 方法是一个阻塞方法,当且仅当同步状态 state 减至 0,该方法才会返回,否则调用该方法的线程将阻塞。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 --- CountDownLatch public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } --- AbstractQueuedSynchronizer public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); }
CountDownLatch 的 await 方法简单,直接传入数量值为 1 尝试获取同步状态(其实传入值是没有意义的,用不到)。CountDownLatch 覆写了模版方法即条件,条件成立则 AQS 完成阻塞任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 --- AbstractQueuedSynchronizer private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
CountDownLatch 的 await 方法到此就结束了,下面总结下该方法的核心步骤。
main 线程没有获取到同步状态会进入阻塞队列 main 线程对应的节点入队完成,如上图。需要注意的是,因为 main 线程对应节点入队时阻塞队列为空,因此需要构建阻塞队列,使用一个虚节点作为 head 。如果节点在入队时已经存在阻塞队列,那么直接挂到阻塞队列尾部即可。
尝试获取同步状态 入队后进入for 循环,此时main线程对应的节点的前驱节点是 head,但 tryAcquireShared 返回 -1,此时进入 找大哥
的流程中。找大哥
就是将当前节点的有效 前驱节点等待状态 waitStatus 设置为 -1。这里是将 main 线程对应节点的前驱节点 head 的 waitStatus 设置为 -1。
挂起,等待前置节点唤醒 找到大哥后挂起自己,等待大哥(有效前置节点)唤醒自己。
以上是 main 线程获取同步状态失败后,进入阻塞队列等待唤醒的过程。需要说明的是,CountDownLatch 可以有多个线程等待其它线程完成,例子中只是使用一个线程等待而已。
countDown 唤醒 countDown 方法每次调用都会将同步状态 state 减 1,直到减少至 0 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 --- CountDownLatch public void countDown () { sync.releaseShared(1 ); } --- AbstractQueuedSynchronizer public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); }else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
上面代码只是正常情况下一个完整流程,即 main 线程加入阻塞队列并挂起后,t2、t3、t1 分别执行 countDown
方法递减 state
的值,到了 t1 调用该方法时,刚好 state
的值被减至 0 ,然后线程 t1 执行唤醒阻塞队列中的线程逻辑。下面对该过程进行总结。
至此,唤醒条件已经具备,即 state = 0
,下面我们回到之前线程挂起的代码处,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 --- AbstractQueuedSynchronizer private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
main 线程被唤醒后从 parkAndCheckInterrupt 方法返回,如果没有被中断,则继续尝试获取同步状态,此时可以获取到同步状态(r >= 0 成立)。接下来 main 线程会进入到 setHeadAndPropagate 方法中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 --- AbstractQueuedSynchronizer private void setHeadAndPropagate (Node node, int propagate) { Node h = head; setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0 ) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } private void setHead (Node node) { head = node; node.thread = null ; node.prev = null ; }
setHeadAndPropagate 方法主要做了两件事,node 占领 head 并唤醒 node 后置的有效 节点。由于例子中只有 main 线程进入了阻塞队列,它后面没有等待唤醒的线程节点,但为了研究源码我们假设 main 线程对应节点后面还有一个 线程 t 节点等待唤醒,那么 main 线程会执行 doReleaseShared 方法来唤醒线程 t ,此时 head 是 main 线程对应的节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); }else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
setHeadAndPropagate 方法和 doReleaseShared 方法配合,依次唤醒阻塞的线程,即 执行 doReleaseShared 方法的线程唤醒它的后置阻塞线程,醒来的线程会再次尝试获取同步状态然后进入到 setHeadAndPropagate 方法中先占领 head,然后调用 doReleaseShared 方法继续唤醒它的后置阻塞节点。需要说明的是,AQS 的 doReleaseShared
方法极端场景还是挺多的,这里结合 CountDownLatch 来说明。
我们抛开给出的例子,根据以下场景分析几个特殊的情况
要进行体能测试,每组三个同学进行短跑,在体育老师发出起跑指令前,这三个同学都要在起跑线待着,当体育老师准备完毕后会发出开始跑的指令,那这三个同学就会一起跑向终点的测试仪。这里 CountDownLatch 的数量 为 1,即同步状态为 1 。
h != head 的情况 当 t1 被唤醒后,唤醒 t1 的线程 t 执行到上图中的代码处,还没有退出循环,t1 已经占领了 head(此时图中的 head 要指向 t1 线程对应的节点,且 t1 线程节点 thread 置空,prev 置空。图中没有体现出来),此时 head != h ,线程 t 将会进行下一轮循环。
compareAndSetWaitStatus(h, Node.SIGNAL, 0) 失败 线程 t 进行第二轮循环时,刚好被唤醒的线程 t1 也进入该循环,此时两个线程并发执行,假设线程 t CAS 操作成功,然后退出循环,线程 t1 失败,将会进行下一轮循环。注意,此时虚节点的 next 指针还存在,因为我们假设的是 t1 线程失败了,t 线程成功退出了,t 线程不属于阻塞队列中的线程,它不会维护阻塞队列节点关系,如果是 t1 线程成功并退出循环就会清除它上一个节点的 next ,这里就是虚节点。
执行 else if (ws == 0 &&..) 分支 t1 线程第二次循环时,唤醒的 t2 线程还没有占领 head,此时的 head 还是 t1 线程对应的节点,但是 waitStatus 被之前的 t 线程通过 CAS 设置为 0 了,因此进入到 else if 分支,然后再次把节点 watiStatus 设置为 -3 。执行到 h == head 判断处时,假设 t2 还是没有占领 head ,此时 t1 退出循环,然后清除其前置节点的 next 指针,即虚节点。
compareAndSetWaitStatus(h, 0, Node.PROPAGATE) 失败 进入这个方法的前提是 ws == 0,即 head 的 waitStatus 出现了 0,此时如果 CAS 失败,一般有两种可能,一种是线程并发执行 CAS 只有一个会成功,另一种是其它的线程把该节点的 waitStatus 值修改了,此时能改 head 的状态值的很可能是节点入队引起的修改,因为新节点要把有效的前驱节点状态值设置为 -1 。在 CountDownLatch 中一般不会发生第二种可能,因为一旦唤醒条件成立,就不会再有节点需要入队阻塞了。
剩下的 t2 线程、t3 线程依次会被唤醒,需要注意的是 t3 线程被唤醒占领头节点后也会进入到 doReleaseShared 方法的循环中,此时它对应的节点既是 head 又是 tail,就直接退出循环,结束整个流程了。
小结 CountDownLatch 的构造函数需要一个 int 类型的参数作为数量(用来计数),如果想等待 N 个任务完成(N 个线程执行完任务),就需要传入 N 。CountDownLatch 的 countDown 方法用于将 N 减 1 ,await 方法会阻塞当前调用线程(阻塞在门栓上,门栓是一个同步点的概念),直到 N 减至 0 被阻塞的线程才会继续往下执行。此外,CountDownLatch 还提供了一个带有指定时间的 await 方法,用于等待超时的场景,超过等待时间就不会再等,被阻塞线程继续往下执行。这个方法很简单,就是在 await 方法的基础上增加了超时判断,下面粘贴下代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 --- CountDownLatch public boolean await (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1 , unit.toNanos(timeout)); } --- AbstractQueuedSynchronizer public final boolean tryAcquireSharedNanos (int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); } private boolean doAcquireSharedNanos (int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L ) return false ; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return true ; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L ) return false ; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this , nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
需要注意,N 值必须 大于等于 0,如果 N 等于 0 ,调用 await 方法时当前线程不会被阻塞,此外 CountDownLatch 不支持重新初始化,也不支持修改数量的值。
CyclicBarrier 使用例子 场景
某个公司部门举办团建活动,需要员工自行拼车前往目的地,司机会在指定的地点等待拼车的 4 个人到齐后才发车。我们假设该部门某个团队有 8 个人,那么就需要拼 2 辆车前往目的地。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 @Slf 4jpublic class CyclicBarrierDemo { public static ExecutorService service = Executors.newFixedThreadPool(8 ); public static void main (String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(4 , () -> log.info("4人已到齐,请系好安全带,现在出发赶往目的地 !" )); for (int i = 0 ; i < 8 ; i++) { service.submit(() -> { try { Thread.sleep((long ) (Math.random() * 10000 )); log.info("到达指定拼车地点 !" ); cyclicBarrier.await(); log.info("出发了 !" ); } catch (InterruptedException | BrokenBarrierException exception) { exception.printStackTrace(); } }); } } }
打印结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 [pool-1-thread-6] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 ! [pool-1-thread-5] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 ! [pool-1-thread-4] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 ! [pool-1-thread-1] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 ! [pool-1-thread-1] INFO com.code.juc.tool.CyclicBarrierDemo - 4人已到齐,请系好安全带,现在出发赶往目的地 ! [pool-1-thread-1] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 ! [pool-1-thread-6] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 ! [pool-1-thread-5] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 ! [pool-1-thread-4] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 ! [pool-1-thread-8] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 ! [pool-1-thread-2] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 ! [pool-1-thread-3] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 ! [pool-1-thread-7] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 ! [pool-1-thread-7] INFO com.code.juc.tool.CyclicBarrierDemo - 4人已到齐,请系好安全带,现在出发赶往目的地 ! [pool-1-thread-7] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 ! [pool-1-thread-8] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 ! [pool-1-thread-2] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 ! [pool-1-thread-3] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 !
说明
以上例子中,使用循环体和线程池模拟 8 个线程执行任务,其中每 4 个线程为一组,只有这 4 个线程都到达栅栏,例子中是到达指定拼车点,才能继续往下执行,否则都会阻塞在栅栏上等待其它线程到达栅栏。到达栅栏的定义是 线程调用 await 方法。一组线程都到达栅栏后,由最后到达的线程执行及时任务,没有任务则不执行。CyclicBarrier 是可循环使用的栅栏,当一组线程都到齐后,CyclicBarrier 进行下一个循环,下一组线程进行同样的操作。
源码分析 CyclicBarrier 的字面意思是可循环使用的栅栏,因为它的栅栏可以重复使用(通过重置关键属性)。它要做的事情是,让一组线程到达一个栅栏(是一个同步点)时被阻塞,直到最后一个线程到达栅栏时,栅栏才会打开,所有被栅栏拦截的线程才能继续运行。它的功能是通过组合 ReentrantLock
和 Condition
来达到的。我们还是基于使用例子来分析源码。
属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class CyclicBarrier { private static class Generation { boolean broken = false ; } private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); private final int parties; private final Runnable barrierCommand; private Generation generation = new Generation(); private int count; public CyclicBarrier (int parties, Runnable barrierAction) { if (parties <= 0 ) throw new IllegalArgumentException(); this .parties = parties; this .count = parties; this .barrierCommand = barrierAction; } public CyclicBarrier (int parties) { this (parties, null ); } }
CyclicBarrier 默认的构造方法的参数表示栅栏拦截的线程数,每个线程调用 await 方法都会告诉 CyclicBarrier 我已经到达栅栏,此时栅栏要把拦截的线程数减 1 ,然后阻塞当前线程,直到要拦截的线程都到达栅栏时,栅栏才会打开,即最后到达的线程唤醒阻塞在栅栏上的线程,然后这组线程都从 await 方法处继续往下执行。 CyclicBarrier 还提供一个高级构造函数,用于在最后一个线程到达栅栏时,优先执行的任务,便于处理复杂的业务场景。注意,执行优先任务先于唤醒阻塞线程 ,代码中所有体现。
下一代栅栏 1 2 3 4 5 6 7 8 9 10 11 12 13 ---CyclicBarrier private void nextGeneration () { trip.signalAll(); count = parties; generation = new Generation(); }
开启下一代栅栏很好理解,因为要开启下一代栅栏了,当前代栅栏上阻塞的线程需要被唤醒,同时初始化好下一代栅栏。
打破栅栏 1 2 3 4 5 6 7 8 9 10 11 12 ---CyclicBarrier private void breakBarrier () { generation.broken = true ; count = parties; trip.signalAll(); }
打破栅栏需要标记当前代的栅栏不可用,并且要唤醒阻塞在这个不可用的栅栏上的线程,因为这里不进行唤醒的话,阻塞的线程将一直挂起。这里重制 count 不明白是干嘛的。
await 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 --- CyclicBarrier public int await () throws InterruptedException, BrokenBarrierException { try { return dowait(false , 0L ); } catch (TimeoutException toe) { throw new Error(toe); } } public int await (long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true , unit.toNanos(timeout)); }
CyclicBarrier 提供了 await 两个重载方法,一个是不带超时机制的方法,另一个是带有超时机制的方法。下面我们分析 CyclicBarrier 核心代码 dowait
方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 --- CyclicBarrier private int dowait (boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this .lock; lock.lock(); try { final Generation g = generation; if (g.broken) throw new BrokenBarrierException(); if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } int index = --count; if (index == 0 ) { boolean ranAction = false ; try { final Runnable command = barrierCommand; if (command != null ) command.run(); ranAction = true ; nextGeneration(); return 0 ; } finally { if (!ranAction) breakBarrier(); } } for (;;) { try { if (!timed) trip.await(); else if (nanos > 0L ) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } if (g.broken) throw new BrokenBarrierException(); if (g != generation) return index; if (timed && nanos <= 0L ) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
涉及到的源码已经分析过了,下面结合使用例子简单分析下过程
CyclicBarrier 整个过程还是挺清晰的,没有使用 CAS 重试等机制,因为栅栏等待线程的 await 方法直接使用了ReentrantLock 锁,线程要到达栅栏必须拿到锁才行,整个过程是串行化的。分析完核心方法后,我们再看下其它几个方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 --- CyclicBarrier public void reset () { final ReentrantLock lock = this .lock; lock.lock(); try { breakBarrier(); nextGeneration(); } finally { lock.unlock(); } } public int getNumberWaiting () { final ReentrantLock lock = this .lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } } public boolean isBroken () { final ReentrantLock lock = this .lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } }
小结 CyclicBarrier 和 CountDownLatch 还是有点像的,前者强调的是一组线程到达同步点即栅栏,后者强调的是阻塞在同步点的线程等待其它线程完成任务。具体区别如下:
作用不同 CyclicBarrier 要等固定数量线程到达同步点,CountDownLatch 等待的不是线程而是同步状态state递减为 0。前者针对线程,后者针对事件/任务(根据需要调用 countDown 方法)。
重用性不同 CyclicBarrier 可以重复使用,上一代使用完后自动初始化下一代,也可以调用 reset 方法重置。 CountDownLatch 只能使用一次,在同步状态减为 0 后门栓打开后,就不能再次使用,想要使用需要新建实例。
唤起任务数不同 CyclicBarrier 只能唤醒一个任务,CountDownLatch 可以唤醒多个任务
Semaphore 使用例子 场景
有一些加工厂是对环境有很大污染的,如果要生产产品必须要有关机构申请生产许可证,拿到许可证后才可以生产,完成一定规模后需要归还许可证,便于其它工厂可以申请。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Slf 4jpublic class SemaphoreDemo { public static ExecutorService service = Executors.newFixedThreadPool(10 ); public static void main (String[] args) { Semaphore semaphore = new Semaphore(3 , true ); for (int i = 0 ; i < 6 ; i++) { service.submit(() -> { try { semaphore.acquire(); log.info("拿到了许可证" ); log.warn("凭借许可证处理任务..." ); Thread.sleep((long ) (Math.random() * 10000 )); } catch (InterruptedException e) { e.printStackTrace(); } finally { log.info("归还许可证" ); semaphore.release(); } }); } } }
打印结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 [pool-1-thread-1] INFO com.code.juc.tool.SemaphoreDemo - 拿到了许可证 [pool-1-thread-1] WARN com.code.juc.tool.SemaphoreDemo - 凭借许可证处理任务... [pool-1-thread-2] INFO com.code.juc.tool.SemaphoreDemo - 拿到了许可证 [pool-1-thread-2] WARN com.code.juc.tool.SemaphoreDemo - 凭借许可证处理任务... [pool-1-thread-3] INFO com.code.juc.tool.SemaphoreDemo - 拿到了许可证 [pool-1-thread-3] WARN com.code.juc.tool.SemaphoreDemo - 凭借许可证处理任务... [pool-1-thread-3] INFO com.code.juc.tool.SemaphoreDemo - 归还许可证 [pool-1-thread-4] INFO com.code.juc.tool.SemaphoreDemo - 拿到了许可证 [pool-1-thread-4] WARN com.code.juc.tool.SemaphoreDemo - 凭借许可证处理任务... [pool-1-thread-1] INFO com.code.juc.tool.SemaphoreDemo - 归还许可证 [pool-1-thread-5] INFO com.code.juc.tool.SemaphoreDemo - 拿到了许可证 [pool-1-thread-5] WARN com.code.juc.tool.SemaphoreDemo - 凭借许可证处理任务... [pool-1-thread-4] INFO com.code.juc.tool.SemaphoreDemo - 归还许可证 [pool-1-thread-6] INFO com.code.juc.tool.SemaphoreDemo - 拿到了许可证 [pool-1-thread-6] WARN com.code.juc.tool.SemaphoreDemo - 凭借许可证处理任务... [pool-1-thread-2] INFO com.code.juc.tool.SemaphoreDemo - 归还许可证 [pool-1-thread-6] INFO com.code.juc.tool.SemaphoreDemo - 归还许可证 [pool-1-thread-5] INFO com.code.juc.tool.SemaphoreDemo - 归还许可证
说明
以上例子中,使用循环体和线程池模拟 6 个线程,即 6 个加工厂获取生产许可证。Semaphore 的许可证数量为 3,即监管部门目前只有 3 个生产许可证,此时 6 个工厂只能有其中三个可以获取到,另外 3 个工厂只能等待生产许可证的归还,如果不归还将一直等着。
源码分析 Semaphore 是用来控制同时访问特定资源的线程数量,它通过协调各个线程来保证合理的使用有限的公共资源。Semaphore 也是对 AQS 共享模式的使用,因此套路也是一样的。它接收一个整形的数字 permits,也是 AQS 的 state,表示可用的许可证数量,即允许 permits 个线程获取许可证,也就是最大并发数是 permits。因为是共享模式的使用,因此需要重写对应的模版方法 tryAcquireShared
和 tryReleaseShared
,前者用来判断能否获取到许可证,后者用来判断能否归还许可整(总是返回true)。此外,Semaphore 在此基础上增加了公平和非公平获取同步状态的功能。Semaphore 的用法很简单,它的 acquire
方法获取许可证,release
方法归还许可证,获取不到许可证的线程就加入阻塞队列中,等待其它线程释放许可证。
类结构
前面也提到了,Semaphore 是对 AQS 共享模式的使用,并且支持公平和非公平的状态管理方式,即对同步状态 state 的操作。通过上图的 UML 类图更加清晰,Semaphore 既可以公平实现方式创建对象,又能以非公平方式创建对象。
Sync 内部类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 public class Semaphore implements java .io .Serializable { private static final long serialVersionUID = -3222578661600680210L ; private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L ; Sync(int permits) { setState(permits); } final int getPermits () { return getState(); } final int nonfairTryAcquireShared (int acquires) { for (; ; ) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared (int releases) { for (; ; ) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } }
Sync 内部类首先对同步状态 state 进行了初始化,先确定同步状态 state 的值,即表示的意义,这里指许可证。第二个是获取同步状态 - tryAcquireShared,这里指获取许可证,Sync 中没有进行实现而是交给了两个子类。第三个是释放同步状态 - tryReleaseShared,这里指归还许可证,Sync 中统一实现了这个逻辑。下面我们分别看下其子类实现。
NonfairSync 内部类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 public class Semaphore implements java .io .Serializable { private static final long serialVersionUID = -3222578661600680210L ; private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L ; Sync(int permits) { setState(permits); } final int getPermits () { return getState(); } final int nonfairTryAcquireShared (int acquires) { for (; ; ) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared (int releases) { for (; ; ) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L ; NonfairSync(int permits) { super (permits); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } } }
NonfairSync 内部类只做了一件事情,重写 AQS 的 tryAcquireShared 方法,需要注意它的非公平性,也就是不关心阻塞队列中有没有还在等待的线程,直接尝试获取许可证。
FairSync 内部类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 public class Semaphore implements java .io .Serializable { private static final long serialVersionUID = -3222578661600680210L ; private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L ; Sync(int permits) { setState(permits); } final int getPermits () { return getState(); } final int nonfairTryAcquireShared (int acquires) { for (; ; ) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared (int releases) { for (; ; ) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded" ); if (compareAndSetState(current, next)) return true ; } } static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L ; NonfairSync(int permits) { super (permits); } protected int tryAcquireShared (int acquires) { return nonfairTryAcquireShared(acquires); } } static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L ; FairSync(int permits) { super (permits); } protected int tryAcquireShared (int acquires) { for (; ; ) { if (hasQueuedPredecessors()) return -1 ; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } }
FairSync 内部类同样只做了一件事情,重写 AQS 的 tryAcquireShared 方法,以公平的方式实现,也就是线程在获取许可证之前,先判断阻塞队列中是否还有等待的线程,有的话就直接返回 -1 进入阻塞队列中等待。
构造方法 1 2 3 4 5 6 7 8 9 10 11 12 13 public Semaphore (int permits) { sync = new NonfairSync(permits); } public Semaphore (int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
和 ReentrantLock 有点类似,实现了公平和非公平方式,默认使用非公平实现。
acquire 系列方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 --- Semaphore public void acquire () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } --- AbstractQueuedSynchronizer public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } --- Semaphore public void acquireUninterruptibly () { sync.acquireShared(1 ); } --- AbstractQueuedSynchronizer public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } private void doAcquireShared (int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; if (interrupted) selfInterrupt(); failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } --- Semaphore public void acquire (int permits) throws InterruptedException { if (permits < 0 ) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public void acquireUninterruptibly (int permits) { if (permits < 0 ) throw new IllegalArgumentException(); sync.acquireShared(permits); }
通过 acquire 方法也可以看出,AQS 框架在实现共享式获取同步状态时,当且仅当同步状态处理结果小于 0 时,线程才会走入队流程。因为都是共享式实现,AQS 底层处理是一样的,因此后续的入队、找有效前驱节点以及挂起操作和 CountDownLatch 是一样的,就不再分析了。继续看它的释放同步状态的方法。
release 系列方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 --- Semaphore public void release () { sync.releaseShared(1 ); } public void release (int permits) { if (permits < 0 ) throw new IllegalArgumentException(); sync.releaseShared(permits); } --- AbstractQueuedSynchronizer public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; } private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
通过 release 系列方法也可以看出,AQS 框架在实现共享式释放同步状态时,当且仅当同步状态处理结果为 true 时,才会唤醒阻塞队列的线程。因为都是共享式实现,AQS 底层处理是一样的,因此唤醒的流程和 CountDownLatch 是一样的。
例子流程
Semaphore 初始化 3 个 许可证 同一个JVM进程中,某一时刻对 resource 访问的最大并发请求数为3
某一时刻t1、t2、t3获取到许可证,t4进入阻塞队列等待 线程t1、t2、t3拿到许可证去访问资源,此时 Semaphore 中已经没有可用的许可证了,t4只能加入阻塞队列等待许可证的释放。这里 t4 要入队。
t3 访问资源后归还许可证,t4 获取到获取到许可证 这个过程可能会有多种情况,如,t4 在没有挂起之前,t3 已经归还了许可证,此时 t4 直接就可以拿到。如果 t4 不太幸运的话,会挂起然后等待t3来唤醒。Semaphore 的一些特殊情况可以参考 CountDownLatch。
其它线程获取许可证依次类推
小结 Semaphore 使用的注意事项:
获取和释放的许可证数量必须一致,否则随着许可证的获取和归还流程推进,最后会导致许可证数量不够,将出现程序卡死。
在初始化 Semaphore 的时候可以设置释放公平,这个可以根据情景选择,一般设置为 true 更合理,因为 Semaphore 本身就是限制同时请求量的,不针对某个请求的。
获取和释放许可证不一定非要同一个线程来完成,可以是 线程 A 获取,线程 B 释放,逻辑合理即可。
总结 无论是 ReentrantLock,还是 CountDownLatch、CyclicBarrier、Semaphore 等 ,它们都是对 AQS 应用,至于是实现锁的功能,还是实现同步组件根据具体场景进行设计。本质上都离不开同步状态 state
、独占方式 tryAcquire-tryRelease
获取与释放方法,共享方式 tryAcquireShared-tryReleaseShared
获取与释放方法,此外 AQS 也支持自定义同步组件同时实现独占和共享两种方式,以及公平和非公平实现,不同组件表示的意义是不同的。AQS 还提供了 等待队列
机制,ReentrantLock 就基于该机制实现了等待与唤醒机制。