并发 - Java并发工具类

前言

在 JDK 的并发包中提供了几个非常有用的并发工具类。 CountDownLatchCyclicBarrierSemaphore 工具类提供了并发流程控制的手段,它们都是对 AQS 共享模式的应用。本篇文章将介绍其简单使用以及内部原理。

工具类 作用 说明
Semaphore 信号量,通过控制 ‘许可证’ 的数量来协调各个线程,以保证合理的使用公共资源。 线程只有拿到 ‘许可证’ 才能继续运行
CyclicBarrier 循环栅栏,让一组线程到达一个栅栏(同步点)时被阻塞,直到最后一个线程到达栅栏时,被栅栏拦截的线程才会继续运行。 强调一组线程都到达同步点才会继续往下执行
CountDownLatch 门栓,等待多线程完成 强调一个或多个线程等待其它线程完成操作

CountDownLatch

使用例子

场景

加工厂生产产品,产品需要三道工序进行检测,只有三道工序检测通过才能进入下一个环节。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Slf4j
public class CountDownLatchDemo {

/**
* 固定线程数线程池
*/
public static ExecutorService service = Executors.newFixedThreadPool(5);

/**
* 产品质量检测
*
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {

// 需要3个工人进行检测,就用3来初始化一个 CountDownLatch
CountDownLatch latch = new CountDownLatch(3);

for (int i = 1; i <= 3; i++) {
final int no = i;
service.submit(() -> {
try {
// 检测
Thread.sleep((long) (Math.random() * 10000));
log.info("No." + no + " 完成检测。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 调用 countDown() 代表完成。这里指某个员工完成检测任务
latch.countDown();
}
}
);
}

log.info("产品质量检测中.....");
// 调用await() 代表线程阻塞等待其它线程完成,即同步状态 state 减为 0。这里指产品等待检测完成
latch.await();

log.info("产品质量检测完毕,进入下一个环节。");
}
}

打印结果

1
2
3
4
5
[main] INFO com.code.juc.tool.CountDownLatchDemo - 产品质量检测中.....
[pool-1-thread-2] INFO com.code.juc.tool.CountDownLatchDemo - No.2 完成检测。
[pool-1-thread-3] INFO com.code.juc.tool.CountDownLatchDemo - No.3 完成检测。
[pool-1-thread-1] INFO com.code.juc.tool.CountDownLatchDemo - No.1 完成检测。
[main] INFO com.code.juc.tool.CountDownLatchDemo - 产品质量检测完毕,进入下一个环节。

说明

以上例子中,main 线程调用了 latch.await() 进行阻塞等待,即它阻塞在门栓上(叫啥无所谓,中文是门栓、栅栏),只有当条件满足时(其它线程调用 latch.countDown() 递减 state 为0)它才能通过这个门栓。这个例子比较简单,只有一个线程调用 await 方法等待其它线程完成,这属于 一对多 关系。CountDownLatch 还可以实现复杂的 多对多 关系的场景,有 m 个线程在门栓上等待 n 个线程完成任务,直到 n 个线程都完成任务,这 m 个线程才能同时通过门栓。

源码分析

根据 CountDownLatch 的使用例子分析源码,按照执行流程逐一分析。

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch. // 继承AQS的内部类
* Uses AQS state to represent count. // 使用 AQS 的状态表示 数量
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

/**
* 有参构造方法
*
* @param count 数量
*/
Sync(int count) {
// 调用父类方法,设置状态值
setState(count);
}

/**
* 获取数量
*
* @return
*/
int getCount() {
// 调用父类方法,获取状态值
return getState();
}

/**
* 覆写父类方法 (获取同步状态(这里表示数量) - 共享方式)
*
* @param acquires
* @return
*/
@Override
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

/**
* 覆写父类方法(释放同步状态(这里表示数量) - 共享方式)
*
* @param releases 没有意义的参数,用不到
* @return
*/
@Override
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (; ; ) {
// 执行递减数量时,如果数量已经是 0 ,则直接返回 false,说明状态已经被其它线程递减为 0 了,当前线程无需唤醒 await() 阻塞的线程(们)
int c = getState();
if (c == 0) {
return false;
}
int nextc = c - 1;
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
}
}
}

/**
* AQS 对象
*/
private final Sync sync;

/**
* 构造方法,需要一个 >= 0 的整数
*
* @param count the number of times {@link #countDown} must be invoked
* before threads can pass through {@link #await}
* @throws IllegalArgumentException if {@code count} is negative
*/
public CountDownLatch(int count) {
if (count < 0) {
throw new IllegalArgumentException("count < 0");
}
this.sync = new Sync(count);
}

// ${省略其它代码}

}

CountDownLatch 类是对 AQS 共享模式的使用。既然是使用 AQS 框架,那么就是一个固定的模式,AQS 已经处理好了同步状态的获取与释放以及阻塞与唤醒,自定义组件只需继承 AQS 以及根据同步状态获取方式(独占/共享)实现模版方法即可。前面也说了,AQS 准备好了一切,只需要条件触发就可以执行对应的任务,而实现的模版方法正是触发条件。

CountDownLatch 主要有两个核心方法,awaitcountDowncountDown 方法每次调用都会将 state 减 1 ,直到 state 的值为 0。await 方法可以被多个线程调用,调用 await 方法的线程进入 AQS 的阻塞队列中并挂起,当且仅当 state 为 0 时,线程会从阻塞队列中依次被唤醒过来。

await 等待

await 方法是一个阻塞方法,当且仅当同步状态 state 减至 0,该方法才会返回,否则调用该方法的线程将阻塞。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
--- CountDownLatch
public void await() throws InterruptedException {
// 可中断获取同步状态
sync.acquireSharedInterruptibly(1);
}

--- AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 中断则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();

// main 线程调用 await 时,state = 3,条件成立
if (tryAcquireShared(arg) < 0)
// 接下来就是 AQS 的工作了,共享方式可中断获取同步状态
doAcquireSharedInterruptibly(arg);
}

CountDownLatch 的 await 方法简单,直接传入数量值为 1 尝试获取同步状态(其实传入值是没有意义的,用不到)。CountDownLatch 覆写了模版方法即条件,条件成立则 AQS 完成阻塞任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
--- AbstractQueuedSynchronizer
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 1 入队 ,即当前线程加入阻塞队列,共享方式
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 尝试获取前驱节点
final Node p = node.predecessor();
if (p == head) {
// CountDownLatch 实现的条件,state != 0 时,返回 -1
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}

// 2 找大哥,找到大哥就挂起自己,然后等待大哥唤醒自己。没有找到则继续找,直到找到或其前驱节点是 head 节点,找到则挂起等待,是 head 则尝试获取同步状态。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 线程被中断则抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

CountDownLatch 的 await 方法到此就结束了,下面总结下该方法的核心步骤。

  • main 线程没有获取到同步状态会进入阻塞队列

    main 线程对应的节点入队完成,如上图。需要注意的是,因为 main 线程对应节点入队时阻塞队列为空,因此需要构建阻塞队列,使用一个虚节点作为 head 。如果节点在入队时已经存在阻塞队列,那么直接挂到阻塞队列尾部即可。

  • 尝试获取同步状态

    入队后进入for 循环,此时main线程对应的节点的前驱节点是 head,但 tryAcquireShared 返回 -1,此时进入 找大哥 的流程中。找大哥 就是将当前节点的有效前驱节点等待状态 waitStatus 设置为 -1。这里是将 main 线程对应节点的前驱节点 head 的 waitStatus 设置为 -1。

  • 挂起,等待前置节点唤醒
    找到大哥后挂起自己,等待大哥(有效前置节点)唤醒自己。


以上是 main 线程获取同步状态失败后,进入阻塞队列等待唤醒的过程。需要说明的是,CountDownLatch 可以有多个线程等待其它线程完成,例子中只是使用一个线程等待而已。

countDown 唤醒

countDown 方法每次调用都会将同步状态 state 减 1,直到减少至 0 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
--- CountDownLatch
public void countDown() {
// 释放同步状态
sync.releaseShared(1);
}

--- AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
//只有当 state 减到 0 时, tryReleaseShared 方法才返回 true,否则仅是将 state 减 1 并返回 false
if (tryReleaseShared(arg)) {
// state == 0 时,唤醒阻塞的线程。 注意,这里是 t1 线程唤醒阻塞的线程即 main 线程
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
// t1 线程执行到这里,唤醒阻塞队列中等待的 main 线程
for (;;) {
// 将当前 head 保存起来,因为其它线程可能会占领它,此时是虚节点
Node h = head;
if (h != null && h != tail) {
// main 线程入队时已经把 head 当作大哥了,即 将 head 的 waitStatus 设置为 -1 (Node.SIGNAL)
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// cas 将 head 的 waitStatus 设置 为 0。
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases

// 唤醒 head 下一个有效节点。这里是 main 线程对应的节点
unparkSuccessor(h);

}else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

上面代码只是正常情况下一个完整流程,即 main 线程加入阻塞队列并挂起后,t2、t3、t1 分别执行 countDown 方法递减 state 的值,到了 t1 调用该方法时,刚好 state 的值被减至 0 ,然后线程 t1 执行唤醒阻塞队列中的线程逻辑。下面对该过程进行总结。

至此,唤醒条件已经具备,即 state = 0 ,下面我们回到之前线程挂起的代码处,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
--- AbstractQueuedSynchronizer
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
// 尝试获取前驱节点
final Node p = node.predecessor();
if (p == head) {
// CountDownLatch 实现的条件,state != 0 时,返回 -1
int r = tryAcquireShared(arg);
if (r >= 0) {
// 2 main 占据 head 并继续唤醒后置阻塞的线程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}

if (shouldParkAfterFailedAcquire(p, node) &&
// 1 线程 t1 唤醒阻塞的 main 线程,该方法返回,即 main 线程继续执行尝试再次获取同步状态
parkAndCheckInterrupt())
// 如果线程被中断则抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

main 线程被唤醒后从 parkAndCheckInterrupt 方法返回,如果没有被中断,则继续尝试获取同步状态,此时可以获取到同步状态(r >= 0 成立)。接下来 main 线程会进入到 setHeadAndPropagate 方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
--- AbstractQueuedSynchronizer
private void setHeadAndPropagate(Node node, int propagate) {
// 将当前 head 保存起来,因为其它线程可能会占领它
Node h = head; // Record old head for check below
// node 节点占领 head,即 main 线程占领 head
setHead(node);
/*
* 这里条件判断对应的场景比较多,毕竟是 AQS 统一处理方法,因此考虑的情况比较全面。对于 CountDownLatch ,就是唤醒 node 之后的有效节点。
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 当前节点的后置节点
Node s = node.next;
// 如果为 null 或者 是共享方式的节点
if (s == null || s.isShared())
// 接着唤醒阻塞线程 (共享式)。注意,这里是醒来的阻塞线程继续唤醒后置还在阻塞的线程。
doReleaseShared();
}
}

/**
* 占领 head
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}

setHeadAndPropagate 方法主要做了两件事,node 占领 head 并唤醒 node 后置的有效节点。由于例子中只有 main 线程进入了阻塞队列,它后面没有等待唤醒的线程节点,但为了研究源码我们假设 main 线程对应节点后面还有一个 线程 t 节点等待唤醒,那么 main 线程会执行 doReleaseShared 方法来唤醒线程 t ,此时 head 是 main 线程对应的节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doReleaseShared() {
for (;;) {
// 将当前 head 保存起来,因为其它线程可能会占领它
Node h = head;
// h == null 说明阻塞队列为空,h == tail 说明头节已经是最后一个节点或者是刚刚初始化的节点,这对应 CountDownLatch 来说都应该结束。
// 按照例子走到这里,head 就 main线程对应的节点,同时 tail 也是 main 线程对应的节点。不过我们假设了 线程 t ,因此条件是成立的
if (h != null && h != tail) {
// h 的状态,即 main 线程对应节点状态,由入队方法可知,t 线程对应节点会把 main 线程对应节点作为 大哥节点,即 waitStatus 设置为 -1(Node.SIGNAL)
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 可能会失败
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases

// 唤醒 h 的后置节点,也就是阻塞队列中的第一个节点。这里是线程 t 对应的节点
unparkSuccessor(h);

}else if (ws == 0 &&
// todo 这里可能会失败
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}

// 线程执行到这里,如果唤醒的线程已经占领了 head,此时 h != head,当前线程继续循环。如果 h == head ,说明,唤醒的线程还没有占领 head,当前线程退出循环
// 这里可能 main 线程执行到这里时,main 线程唤醒的线程 t 已经占领了 head ,此时 h != head
if (h == head) // loop if head changed
break;
}
}

setHeadAndPropagate 方法和 doReleaseShared 方法配合,依次唤醒阻塞的线程,即 执行 doReleaseShared 方法的线程唤醒它的后置阻塞线程,醒来的线程会再次尝试获取同步状态然后进入到 setHeadAndPropagate 方法中先占领 head,然后调用 doReleaseShared 方法继续唤醒它的后置阻塞节点。需要说明的是,AQS 的 doReleaseShared 方法极端场景还是挺多的,这里结合 CountDownLatch 来说明。

我们抛开给出的例子,根据以下场景分析几个特殊的情况

要进行体能测试,每组三个同学进行短跑,在体育老师发出起跑指令前,这三个同学都要在起跑线待着,当体育老师准备完毕后会发出开始跑的指令,那这三个同学就会一起跑向终点的测试仪。这里 CountDownLatch 的数量 为 1,即同步状态为 1 。

  • h != head 的情况

    当 t1 被唤醒后,唤醒 t1 的线程 t 执行到上图中的代码处,还没有退出循环,t1 已经占领了 head(此时图中的 head 要指向 t1 线程对应的节点,且 t1 线程节点 thread 置空,prev 置空。图中没有体现出来),此时 head != h ,线程 t 将会进行下一轮循环。

  • compareAndSetWaitStatus(h, Node.SIGNAL, 0) 失败

    线程 t 进行第二轮循环时,刚好被唤醒的线程 t1 也进入该循环,此时两个线程并发执行,假设线程 t CAS 操作成功,然后退出循环,线程 t1 失败,将会进行下一轮循环。注意,此时虚节点的 next 指针还存在,因为我们假设的是 t1 线程失败了,t 线程成功退出了,t 线程不属于阻塞队列中的线程,它不会维护阻塞队列节点关系,如果是 t1 线程成功并退出循环就会清除它上一个节点的 next ,这里就是虚节点。

  • 执行 else if (ws == 0 &&..) 分支

    t1 线程第二次循环时,唤醒的 t2 线程还没有占领 head,此时的 head 还是 t1 线程对应的节点,但是 waitStatus 被之前的 t 线程通过 CAS 设置为 0 了,因此进入到 else if 分支,然后再次把节点 watiStatus 设置为 -3 。执行到 h == head 判断处时,假设 t2 还是没有占领 head ,此时 t1 退出循环,然后清除其前置节点的 next 指针,即虚节点。

  • compareAndSetWaitStatus(h, 0, Node.PROPAGATE) 失败
    进入这个方法的前提是 ws == 0,即 head 的 waitStatus 出现了 0,此时如果 CAS 失败,一般有两种可能,一种是线程并发执行 CAS 只有一个会成功,另一种是其它的线程把该节点的 waitStatus 值修改了,此时能改 head 的状态值的很可能是节点入队引起的修改,因为新节点要把有效的前驱节点状态值设置为 -1 。在 CountDownLatch 中一般不会发生第二种可能,因为一旦唤醒条件成立,就不会再有节点需要入队阻塞了。

剩下的 t2 线程、t3 线程依次会被唤醒,需要注意的是 t3 线程被唤醒占领头节点后也会进入到 doReleaseShared 方法的循环中,此时它对应的节点既是 head 又是 tail,就直接退出循环,结束整个流程了。

小结

CountDownLatch 的构造函数需要一个 int 类型的参数作为数量(用来计数),如果想等待 N 个任务完成(N 个线程执行完任务),就需要传入 N 。CountDownLatch 的 countDown 方法用于将 N 减 1 ,await 方法会阻塞当前调用线程(阻塞在门栓上,门栓是一个同步点的概念),直到 N 减至 0 被阻塞的线程才会继续往下执行。此外,CountDownLatch 还提供了一个带有指定时间的 await 方法,用于等待超时的场景,超过等待时间就不会再等,被阻塞线程继续往下执行。这个方法很简单,就是在 await 方法的基础上增加了超时判断,下面粘贴下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
--- CountDownLatch
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

--- AbstractQueuedSynchronizer
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
/**
* Acquires in shared timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 计算出等待的最迟时间
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
// 计算出等待剩余时间
nanosTimeout = deadline - System.nanoTime();
// 超过等待时间,则不再等待,直接返回
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
// 执行挂起的最小时间粒度
nanosTimeout > spinForTimeoutThreshold)
// 挂起 nanosTimeout 时间后自动醒来
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

需要注意,N 值必须 大于等于 0,如果 N 等于 0 ,调用 await 方法时当前线程不会被阻塞,此外 CountDownLatch 不支持重新初始化,也不支持修改数量的值。

CyclicBarrier

使用例子

场景

某个公司部门举办团建活动,需要员工自行拼车前往目的地,司机会在指定的地点等待拼车的 4 个人到齐后才发车。我们假设该部门某个团队有 8 个人,那么就需要拼 2 辆车前往目的地。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Slf4j
public class CyclicBarrierDemo {

/**
* 固定线程数线程池
*/
public static ExecutorService service = Executors.newFixedThreadPool(8);

public static void main(String[] args) {

// 要等待 4 个同学到齐,到齐后发车,因此这里初始化一个带有 Runnable 参数的 CyclicBarrier
CyclicBarrier cyclicBarrier = new CyclicBarrier(4, () -> log.info("4人已到齐,请系好安全带,现在出发赶往目的地 !"));

// 8个人,需要 2 辆车。这里会循环使用 CyclicBarrier
for (int i = 0; i < 8; i++) {

service.submit(() -> {
try {
// 赶往拼车地点
Thread.sleep((long) (Math.random() * 10000));

log.info("到达指定拼车地点 !");
cyclicBarrier.await();

// 一组人员全部到达后,才能出发。 即 一组线程全部到达栅栏后,被阻塞的线程才能继续执行
log.info("出发了 !");
} catch (InterruptedException | BrokenBarrierException exception) {
exception.printStackTrace();
}
});
}
}
}

打印结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[pool-1-thread-6] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 !
[pool-1-thread-5] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 !
[pool-1-thread-4] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 !
[pool-1-thread-1] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 !
[pool-1-thread-1] INFO com.code.juc.tool.CyclicBarrierDemo - 4人已到齐,请系好安全带,现在出发赶往目的地 !
[pool-1-thread-1] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 !
[pool-1-thread-6] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 !
[pool-1-thread-5] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 !
[pool-1-thread-4] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 !
[pool-1-thread-8] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 !
[pool-1-thread-2] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 !
[pool-1-thread-3] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 !
[pool-1-thread-7] INFO com.code.juc.tool.CyclicBarrierDemo - 到达指定拼车地点 !
[pool-1-thread-7] INFO com.code.juc.tool.CyclicBarrierDemo - 4人已到齐,请系好安全带,现在出发赶往目的地 !
[pool-1-thread-7] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 !
[pool-1-thread-8] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 !
[pool-1-thread-2] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 !
[pool-1-thread-3] INFO com.code.juc.tool.CyclicBarrierDemo - 出发了 !

说明

以上例子中,使用循环体和线程池模拟 8 个线程执行任务,其中每 4 个线程为一组,只有这 4 个线程都到达栅栏,例子中是到达指定拼车点,才能继续往下执行,否则都会阻塞在栅栏上等待其它线程到达栅栏。到达栅栏的定义是 线程调用 await 方法。一组线程都到达栅栏后,由最后到达的线程执行及时任务,没有任务则不执行。CyclicBarrier 是可循环使用的栅栏,当一组线程都到齐后,CyclicBarrier 进行下一个循环,下一组线程进行同样的操作。

源码分析

CyclicBarrier 的字面意思是可循环使用的栅栏,因为它的栅栏可以重复使用(通过重置关键属性)。它要做的事情是,让一组线程到达一个栅栏(是一个同步点)时被阻塞,直到最后一个线程到达栅栏时,栅栏才会打开,所有被栅栏拦截的线程才能继续运行。它的功能是通过组合 ReentrantLockCondition 来达到的。我们还是基于使用例子来分析源码。

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public class CyclicBarrier {
/**
* 栅栏所处的代。栅栏上阻塞的线程被唤醒或者栅栏被重置,就开启新的一代
*/
private static class Generation {
// 栅栏是否被打破,默认为 false
boolean broken = false;
}

/**
* 锁
*/
private final ReentrantLock lock = new ReentrantLock();
/**
* 锁对应的条件,阻塞线程在栅栏或者唤醒阻塞在栅栏上的线程
*/
private final Condition trip = lock.newCondition();
/**
* 栅栏要拦截的线程数
*/
private final int parties;
/**
* 一组线程都到达栅栏后优先执行的任务,即如果设置这个这个任务,那么被阻塞在栅栏上的线程要等这个任务结束后才能被唤醒。注意,这个任务是被最后到达的线程执行
*/
private final Runnable barrierCommand;
/**
* 当前栅栏所处的代,如果第一次就是 1 代,如果第2次使用就是 2 代
*/
private Generation generation = new Generation();

/**
* 还要等待的线程数,即还没有到栅栏的线程数。这个初始值 是 parties 值,每个线程到栅栏就减 1
*/
private int count;

// CyclicBarrier 高级构造函数,支持优先执行任务
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

// CyclicBarrier 默认的构造方法
public CyclicBarrier(int parties) {
this(parties, null);
}

// ${省略其它代码}

}

CyclicBarrier 默认的构造方法的参数表示栅栏拦截的线程数,每个线程调用 await 方法都会告诉 CyclicBarrier 我已经到达栅栏,此时栅栏要把拦截的线程数减 1 ,然后阻塞当前线程,直到要拦截的线程都到达栅栏时,栅栏才会打开,即最后到达的线程唤醒阻塞在栅栏上的线程,然后这组线程都从 await 方法处继续往下执行。 CyclicBarrier 还提供一个高级构造函数,用于在最后一个线程到达栅栏时,优先执行的任务,便于处理复杂的业务场景。注意,执行优先任务先于唤醒阻塞线程 ,代码中所有体现。

下一代栅栏

1
2
3
4
5
6
7
8
9
10
11
12
13
---CyclicBarrier
/**
* 开启下一代栅栏
* 1 唤醒阻塞在上一代栅栏上的线程
* 2 重置 count 和 generation
*/
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}

开启下一代栅栏很好理解,因为要开启下一代栅栏了,当前代栅栏上阻塞的线程需要被唤醒,同时初始化好下一代栅栏。

打破栅栏

1
2
3
4
5
6
7
8
9
10
11
12
---CyclicBarrier
/**
* 打破栅栏
*/
private void breakBarrier() {
// 设置栅栏已破标志
generation.broken = true;
// 重置 count
count = parties;
// 唤醒阻塞在栅栏上的线程
trip.signalAll();
}

打破栅栏需要标记当前代的栅栏不可用,并且要唤醒阻塞在这个不可用的栅栏上的线程,因为这里不进行唤醒的话,阻塞的线程将一直挂起。这里重制 count 不明白是干嘛的。

await

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
--- CyclicBarrier

// 不带超时机制的方法,例子中使用的就是这个
public int await() throws InterruptedException, BrokenBarrierException {
try {
// false , 0
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

// 带有超时机制的方法,如果超过等待时间,当前线程没有被唤醒则 抛出 TimeoutException
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

CyclicBarrier 提供了 await 两个重载方法,一个是不带超时机制的方法,另一个是带有超时机制的方法。下面我们分析 CyclicBarrier 核心代码 dowait 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
--- CyclicBarrier
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
// 使用可重入锁
final ReentrantLock lock = this.lock;

// 加锁
lock.lock();
try {

// 获取标志着当前栅栏的 代
final Generation g = generation;

// 检查当前代的栅栏是否被打破,如果当前代的栅栏被打破需要 抛出 BrokenBarrierException 异常
if (g.broken)
throw new BrokenBarrierException();

// 检查当前线程中断状态,如果被中断了,则要抛出 InterruptedException 异常,并且打破栅栏
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

// 递减 count 的值
int index = --count;

// 如果 count 递减后的值为 0 ,说明当前代的栅栏要拦截的最后一个线程也到达栅栏
if (index == 0) { // tripped

// 标志优先任务是否失败,默认是 false
boolean ranAction = false;

try {
// 如果指定了优先任务,就交给最后到达的线程执行
final Runnable command = barrierCommand;
if (command != null)
command.run();

// 设置标志
ranAction = true;

// 唤醒当前代的栅栏上阻塞的任务,并开启下一代 (栅栏可以重复使用)
nextGeneration();

return 0;

} finally {

// 如果执行优先任务失败,则打破栅栏
if (!ranAction)
breakBarrier();
}
}

// ---------------------- 执行到这里的线程不是最后一个线程,因此需要阻塞,等待最后一个线程到来并唤醒自己 ---------------/
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 不带超时机制
if (!timed)
// 释放锁,加入等待队列 (ConditionObject)
trip.await();
// 带超时机制,并且超时时间 > 0
else if (nanos > 0L)
// 释放锁,加入等待队列 (ConditionObject),如果到时间还没有被唤醒则不再阻塞
nanos = trip.awaitNanos(nanos);

// 执行到这里说明,线程进入等待队列后被中断了
} catch (InterruptedException ie) {
// 栅栏仍是进入等待队列的前的栅栏,此时应该打破栅栏,并且抛出中断异常
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// 1 g != generation,说明新一代的栅栏生成了,即最后一个线程也到达了栅栏,此时只需复位被中断线程的中断标志
// 2 栅栏被打破了(一定要抛出异常),被打破异常交由后续逻辑处理,此时只需复位被中断线程的中断标志
Thread.currentThread().interrupt();
}
}

// 线程被唤醒后,还没从 await 方法返回栅栏就被打破了,直接抛出异常
if (g.broken)
throw new BrokenBarrierException();

/**
* 这个方法很重要,被唤醒后的线程正常逻辑都会从该方法返回出去
* 1 最后一个线程到达后会做三件事:执行优先任务、唤醒当前代的栅栏上阻塞的线程、开启栅栏的下一代
* 2 当前所在的方法是加了 ReentrantLock 锁的,因此我们要知道以下信息:
* 1)最后一个到达线程在没有执行完三件事前,是不会释放锁的
* 2)唤醒的阻塞线程并不能马上从 await 方法返回,它需要先去竞争锁,获取锁后才能从 await 方法返回
* 3)即使最后一个线程开启了栅栏的下一代,在它没有释放锁前,其它组的线程也要阻塞,比如例子中的后四个线程
* 3 被最后一个线程唤醒的线程执行到这里时,新一代的栅栏一定已经存在了。注意,这里说的是被最后一个线程唤醒的线程,并不是由于超时机制醒来的线程
*/
if (g != generation)
return index;

// 超时机制醒来的线程,如果发现已经超时了,则打破栅栏,抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

涉及到的源码已经分析过了,下面结合使用例子简单分析下过程

CyclicBarrier 整个过程还是挺清晰的,没有使用 CAS 重试等机制,因为栅栏等待线程的 await 方法直接使用了ReentrantLock 锁,线程要到达栅栏必须拿到锁才行,整个过程是串行化的。分析完核心方法后,我们再看下其它几个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
--- CyclicBarrier
// 重置栅栏
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

// 在栅栏上等待线程数
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}

// 判断栅栏是否被打破
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}

小结

CyclicBarrier 和 CountDownLatch 还是有点像的,前者强调的是一组线程到达同步点即栅栏,后者强调的是阻塞在同步点的线程等待其它线程完成任务。具体区别如下:

  • 作用不同
    CyclicBarrier 要等固定数量线程到达同步点,CountDownLatch 等待的不是线程而是同步状态state递减为 0。前者针对线程,后者针对事件/任务(根据需要调用 countDown 方法)。
  • 重用性不同
    CyclicBarrier 可以重复使用,上一代使用完后自动初始化下一代,也可以调用 reset 方法重置。 CountDownLatch 只能使用一次,在同步状态减为 0 后门栓打开后,就不能再次使用,想要使用需要新建实例。
  • 唤起任务数不同
    CyclicBarrier 只能唤醒一个任务,CountDownLatch 可以唤醒多个任务

Semaphore

使用例子

场景

有一些加工厂是对环境有很大污染的,如果要生产产品必须要有关机构申请生产许可证,拿到许可证后才可以生产,完成一定规模后需要归还许可证,便于其它工厂可以申请。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Slf4j
public class SemaphoreDemo {

/**
* 固定线程数线程池
*/
public static ExecutorService service = Executors.newFixedThreadPool(10);

public static void main(String[] args) {

// 有 3 个许可证书,每个加工厂公平获取。
Semaphore semaphore = new Semaphore(3, true);

// 有 6 个加工厂想要获取
for (int i = 0; i < 6; i++) {
service.submit(() -> {
try {
// 获取许可证
semaphore.acquire();
log.info("拿到了许可证");

// 处理任务
log.warn("凭借许可证处理任务...");
Thread.sleep((long) (Math.random() * 10000));

} catch (InterruptedException e) {
e.printStackTrace();

} finally {
log.info("归还许可证");
semaphore.release();
}
});
}
}
}

打印结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[pool-1-thread-1] INFO com.code.juc.tool.SemaphoreDemo - 拿到了许可证
[pool-1-thread-1] WARN com.code.juc.tool.SemaphoreDemo - 凭借许可证处理任务...
[pool-1-thread-2] INFO com.code.juc.tool.SemaphoreDemo - 拿到了许可证
[pool-1-thread-2] WARN com.code.juc.tool.SemaphoreDemo - 凭借许可证处理任务...
[pool-1-thread-3] INFO com.code.juc.tool.SemaphoreDemo - 拿到了许可证
[pool-1-thread-3] WARN com.code.juc.tool.SemaphoreDemo - 凭借许可证处理任务...
[pool-1-thread-3] INFO com.code.juc.tool.SemaphoreDemo - 归还许可证
[pool-1-thread-4] INFO com.code.juc.tool.SemaphoreDemo - 拿到了许可证
[pool-1-thread-4] WARN com.code.juc.tool.SemaphoreDemo - 凭借许可证处理任务...
[pool-1-thread-1] INFO com.code.juc.tool.SemaphoreDemo - 归还许可证
[pool-1-thread-5] INFO com.code.juc.tool.SemaphoreDemo - 拿到了许可证
[pool-1-thread-5] WARN com.code.juc.tool.SemaphoreDemo - 凭借许可证处理任务...
[pool-1-thread-4] INFO com.code.juc.tool.SemaphoreDemo - 归还许可证
[pool-1-thread-6] INFO com.code.juc.tool.SemaphoreDemo - 拿到了许可证
[pool-1-thread-6] WARN com.code.juc.tool.SemaphoreDemo - 凭借许可证处理任务...
[pool-1-thread-2] INFO com.code.juc.tool.SemaphoreDemo - 归还许可证
[pool-1-thread-6] INFO com.code.juc.tool.SemaphoreDemo - 归还许可证
[pool-1-thread-5] INFO com.code.juc.tool.SemaphoreDemo - 归还许可证

说明

以上例子中,使用循环体和线程池模拟 6 个线程,即 6 个加工厂获取生产许可证。Semaphore 的许可证数量为 3,即监管部门目前只有 3 个生产许可证,此时 6 个工厂只能有其中三个可以获取到,另外 3 个工厂只能等待生产许可证的归还,如果不归还将一直等着。

源码分析

Semaphore 是用来控制同时访问特定资源的线程数量,它通过协调各个线程来保证合理的使用有限的公共资源。Semaphore 也是对 AQS 共享模式的使用,因此套路也是一样的。它接收一个整形的数字 permits,也是 AQS 的 state,表示可用的许可证数量,即允许 permits 个线程获取许可证,也就是最大并发数是 permits。因为是共享模式的使用,因此需要重写对应的模版方法 tryAcquireSharedtryReleaseShared ,前者用来判断能否获取到许可证,后者用来判断能否归还许可整(总是返回true)。此外,Semaphore 在此基础上增加了公平和非公平获取同步状态的功能。Semaphore 的用法很简单,它的 acquire 方法获取许可证,release 方法归还许可证,获取不到许可证的线程就加入阻塞队列中,等待其它线程释放许可证。

类结构

前面也提到了,Semaphore 是对 AQS 共享模式的使用,并且支持公平和非公平的状态管理方式,即对同步状态 state 的操作。通过上图的 UML 类图更加清晰,Semaphore 既可以公平实现方式创建对象,又能以非公平方式创建对象。

Sync 内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;

/**
* 继承 AQS 的内部类对象
*/
private final Sync sync;

/**
* 信号量的同步实现。使用 AQS 的同步状态 state 表示许可证。分为 公平和非公平两种实现
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;

// 构造方法,设置 AQS 的同步状态 state。对于 Semaphore 来说表示许可证
Sync(int permits) {
setState(permits);
}

// 获取同步状态,即许可证
final int getPermits() {
return getState();
}

// 非公平实现,共享式获取许可证。返回剩余许可证数量
final int nonfairTryAcquireShared(int acquires) {
for (; ; ) {
int available = getState();
// 减少 acquires 个许可证
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

// 重写 AQS 的共享式释放同步状态方法,即归还许可证。该方法总是返回 true
// 注意,也就是说调用释放同步状态的方法,只要释放的值不小于 0 ,就会尝试唤醒等待的线程。这也要求使用时要保证许可证的成对
protected final boolean tryReleaseShared(int releases) {
for (; ; ) {
int current = getState();
// 归还 releases 个许可证 (注意,如果不获取先释放的话,许可证会变多的)
int next = current + releases;
// 这里判断归还数量不能小于 0
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS 更新 state (AQS 中的方法 )
if (compareAndSetState(current, next))
return true;
}
}

// ${省略其它代码}

}

Sync 内部类首先对同步状态 state 进行了初始化,先确定同步状态 state 的值,即表示的意义,这里指许可证。第二个是获取同步状态 - tryAcquireShared,这里指获取许可证,Sync 中没有进行实现而是交给了两个子类。第三个是释放同步状态 - tryReleaseShared,这里指归还许可证,Sync 中统一实现了这个逻辑。下面我们分别看下其子类实现。

NonfairSync 内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;

/**
* 继承 AQS 的内部类对象
*/
private final Sync sync;

/**
* 信号量的同步实现。使用 AQS 的同步状态 state 表示许可证。分为 公平和非公平两种实现
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;

// 构造方法,设置 AQS 的同步状态 state。对于 Semaphore 来说表示许可证
Sync(int permits) {
setState(permits);
}

// 获取同步状态,即许可证
final int getPermits() {
return getState();
}

// 非公平实现要执行的方法,共享式获取许可证。返回剩余许可证数量
final int nonfairTryAcquireShared(int acquires) {
for (; ; ) {
int available = getState();
// 减少 acquires 个许可证
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

// 重写 AQS 的共享式释放同步状态方法,即归还许可证。该方法总是返回 true
protected final boolean tryReleaseShared(int releases) {
for (; ; ) {
int current = getState();
// 归还 releases 个许可证 (注意,如果不获取先释放的话,许可证会变多的)
int next = current + releases;
// 这里判断归还数量不能小于 0
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS 更新 state (AQS 中的方法 )
if (compareAndSetState(current, next))
return true;
}
}

/**
* 非公平实现
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

// 构造方法
NonfairSync(int permits) {
super(permits);
}

// 重写 AQS 的共享式获取同步状态的方法,这里是非公平方式获取许可证
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

// ${省略其它代码}

}

NonfairSync 内部类只做了一件事情,重写 AQS 的 tryAcquireShared 方法,需要注意它的非公平性,也就是不关心阻塞队列中有没有还在等待的线程,直接尝试获取许可证。

FairSync 内部类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;

/**
* 继承 AQS 的内部类对象
*/
private final Sync sync;

/**
* 信号量的同步实现。使用 AQS 的同步状态 state 表示许可证。分为 公平和非公平两种实现
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;

// 构造方法,设置 AQS 的同步状态 state。对于 Semaphore 来说表示许可证
Sync(int permits) {
setState(permits);
}

// 获取同步状态,即许可证
final int getPermits() {
return getState();
}

// 非公平实现,共享式获取许可证。返回剩余许可证数量
final int nonfairTryAcquireShared(int acquires) {
for (; ; ) {
int available = getState();
// 减少 acquires 个许可证
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

// 重写 AQS 的共享式释放同步状态方法,即归还许可证。该方法总是返回 true
protected final boolean tryReleaseShared(int releases) {
for (; ; ) {
int current = getState();
// 归还 releases 个许可证 (注意,如果不获取先释放的话,许可证会变多的)
int next = current + releases;
// 这里判断归还数量不能小于 0
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
// CAS 更新 state (AQS 中的方法 )
if (compareAndSetState(current, next))
return true;
}
}



/**
* 非公平实现
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

// 构造方法
NonfairSync(int permits) {
super(permits);
}

// 重写 AQS 的共享式获取同步状态的方法,这里是非公平方式获取许可证
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

}

/**
* 公平实现
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

// 构造方法
FairSync(int permits) {
super(permits);
}

// 重写 AQS 的共享式获取同步状态的方法, 这里是 公平方式获取许可证。返回剩余许可证数
protected int tryAcquireShared(int acquires) {
for (; ; ) {
// 是否有线程在排队等待许可证
if (hasQueuedPredecessors())
return -1;
int available = getState();
// 减少 acquires 个许可证
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

// ${省略其它代码}

}

FairSync 内部类同样只做了一件事情,重写 AQS 的 tryAcquireShared 方法,以公平的方式实现,也就是线程在获取许可证之前,先判断阻塞队列中是否还有等待的线程,有的话就直接返回 -1 进入阻塞队列中等待。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 以非公平方式创建 Semaphore
*/
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

/**
* 可选择公平/非公平方式创建 Semaphore
*/
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

和 ReentrantLock 有点类似,实现了公平和非公平方式,默认使用非公平实现。

acquire 系列方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
--- Semaphore
/**
* 可响应中断地获取许可证(获取一个许可证)
*/
public void acquire() throws InterruptedException {
// AQS 的方法
sync.acquireSharedInterruptibly(1);
}

--- AbstractQueuedSynchronizer
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// Semaphore 实现的获取同步状态,公平还是不公平看Semaphore的具体实现
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
/**
* Acquires in shared interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// Semaphore 实现的获取同步状态,公平还是不公平看Semaphore的具体实现
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}


--- Semaphore
/**
* 获取许可证(获取一个许可证),对中断不敏感
*/
public void acquireUninterruptibly() {
// AQS 的方法
sync.acquireShared(1);
}

--- AbstractQueuedSynchronizer
public final void acquireShared(int arg) {
// Semaphore 实现的获取同步状态,公平还是不公平看Semaphore的具体实现
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// Semaphore 实现的获取同步状态,公平还是不公平看Semaphore的具体实现
int r = tryAcquireShared(arg);
if (r >= 0) {
// 内部会调用 doReleaseShared 方法
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

--- Semaphore
/**
* 从 Semaphore 获取给定数量的许可证,不够就阻塞等待,对中断敏感
*/
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}

/**
* 从 Semaphore 获取给定数量的许可证,不够就阻塞等待,对中断不敏感
*/
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}

通过 acquire 方法也可以看出,AQS 框架在实现共享式获取同步状态时,当且仅当同步状态处理结果小于 0 时,线程才会走入队流程。因为都是共享式实现,AQS 底层处理是一样的,因此后续的入队、找有效前驱节点以及挂起操作和 CountDownLatch 是一样的,就不再分析了。继续看它的释放同步状态的方法。

release 系列方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
--- Semaphore
/**
* 归还许可证
*/
public void release() {
// AQS 方法
sync.releaseShared(1);
}

/**
* 归还给定数量的许可证到 Semaphore
*/
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
// AQS 方法
sync.releaseShared(permits);
}

--- AbstractQueuedSynchronizer
public final boolean releaseShared(int arg) {
// Semaphore 实现的释放同步状态方法
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

通过 release 系列方法也可以看出,AQS 框架在实现共享式释放同步状态时,当且仅当同步状态处理结果为 true 时,才会唤醒阻塞队列的线程。因为都是共享式实现,AQS 底层处理是一样的,因此唤醒的流程和 CountDownLatch 是一样的。

例子流程

  • Semaphore 初始化 3 个 许可证

    同一个JVM进程中,某一时刻对 resource 访问的最大并发请求数为3

  • 某一时刻t1、t2、t3获取到许可证,t4进入阻塞队列等待

    线程t1、t2、t3拿到许可证去访问资源,此时 Semaphore 中已经没有可用的许可证了,t4只能加入阻塞队列等待许可证的释放。这里 t4 要入队。

  • t3 访问资源后归还许可证,t4 获取到获取到许可证

    这个过程可能会有多种情况,如,t4 在没有挂起之前,t3 已经归还了许可证,此时 t4 直接就可以拿到。如果 t4 不太幸运的话,会挂起然后等待t3来唤醒。Semaphore 的一些特殊情况可以参考 CountDownLatch。

  • 其它线程获取许可证依次类推

小结

Semaphore 使用的注意事项:

  1. 获取和释放的许可证数量必须一致,否则随着许可证的获取和归还流程推进,最后会导致许可证数量不够,将出现程序卡死。
  2. 在初始化 Semaphore 的时候可以设置释放公平,这个可以根据情景选择,一般设置为 true 更合理,因为 Semaphore 本身就是限制同时请求量的,不针对某个请求的。
  3. 获取和释放许可证不一定非要同一个线程来完成,可以是 线程 A 获取,线程 B 释放,逻辑合理即可。

总结

无论是 ReentrantLock,还是 CountDownLatch、CyclicBarrier、Semaphore 等 ,它们都是对 AQS 应用,至于是实现锁的功能,还是实现同步组件根据具体场景进行设计。本质上都离不开同步状态 state、独占方式 tryAcquire-tryRelease 获取与释放方法,共享方式 tryAcquireShared-tryReleaseShared 获取与释放方法,此外 AQS 也支持自定义同步组件同时实现独占和共享两种方式,以及公平和非公平实现,不同组件表示的意义是不同的。AQS 还提供了 等待队列 机制,ReentrantLock 就基于该机制实现了等待与唤醒机制。