并发 - ReentrantReadWriteLock

前言

独占锁(排它锁)在同一时刻只允许一个线程进行访问,如 并发 - ReentrantLock 一文中介绍的 ReentrantLock 就是一个独占锁。而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其它写线程都会被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,不仅保证了写操作对读操作的可见性,还使得并发性相比一般的排它锁有更大提升。在读多写少的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。

场景

读写锁除了保证写操作对读操作的可见性以及并发性的提升之外,读写锁还能够简化读写交互场景的编码实现。对于一个共享的缓存数据,一般都是读多写少,但是写操作完成之后的更新需要对后续的读操作可见,这样做的目的是使读操作能读取到正确的数据。使用读写锁实现这样的功能,只需要在读操作时获取读锁,写操作时获取写锁即可。当写锁被获取时,后续其它线程的读写操作都会被阻塞,写锁释放之后,所有操作才会继续执行。相比于使用等待-通知机制,更加简单化。

概述

Java 并发包提供读写锁的实现是 ReentrantReadWriteLock ,它支持以下特性:

  • 公平性选择:支持非公平(默认)和公平的锁获取方式,吞吐量还是非公平优于公平
  • 重入性:支持锁重入,包括读锁和写锁
  • 锁降级:同一个线程获取的写锁能够降级为读锁,反之不行。遵循获取写锁、获取读锁、释放写锁、释放读锁

读写状态

读写锁 ReentrantReadWriteLock 同样是基于 AQS 实现的锁功能,而读写状态就是 AQS 的同步状态。在 ReentrantLock 中同步状态表示锁被一个线程持有的次数,而读写锁需要在同步状态上维护多个读线程和一个写线程的信息,这就使得同步状态的设计成为读写锁实现的关键。由于同步状态 state 是一个整型变量,4 个字节 32 位,因此读写锁将该变量切分成了两部分,高 16 位表示读,低 16 位表示写,划分方式如下图所示:

上图中的同步状态表示一个线程已经获取了写锁且重入了 2 次,同时也连续获取了两次读锁。读写锁是通过位运算来确定读和写各自的状态的。下面对状态的变化过程进行说明。

假设当前同步状态 state 的值为 S

  • 获取写状态

    S & (1 << 16 -1) -> 将高16位全部抹去

  • 获取读状态

    S>>>16 -> 无符号补0,右移16位

  • 更新操作

    写状态增加 1 时 -> S + 1
    读状态增加 1 时 -> S + (1<<16),也就是 S + 0x00010000

注意:读写锁 ReentrantReadWriteLock 虽然使用同步状态 state 的高低位来表示读写状态,但是同步队列依然是共用一个。

源码分析

ReentrantReadWriteLock 的类继承关系类图如下:

由继承关系图可知,读写锁 ReentrantReadWriteLock 是通过内部类 Sync 继承 AQS 来行使同步器的职能。由于该读写锁支持公平和非公平模式,因此通过继承内部类 Sync 的方式定义了非公平模式的 NonfairSync 和公平模式的 FairSync 。读写锁的实现依赖组合的 Sync ,也就是说 ReadLock 和 WriteLock 获取和释放锁的功能是交给 Sync 去实现的,公平模式下使用 FairSync ,非公平模式下使用 NonfairSync 。

读写锁 ReentrantReadWriteLock 组合关系如下图所示:

ReentrantReadWriteLock 分为读锁 ReadLock 和写锁 WriteLock 。读锁是共享锁,可被多个线程同时占有;写锁是独占锁,同时只能有一个线程占有,且写锁被线程占有后其它线程既不能获取读锁也不能获取写锁,但占有写锁的线程可以在不释放写锁的情况下继续获取读锁,这是锁降级的特点。

代码结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;

/**
* 读锁
*/
private final ReentrantReadWriteLock.ReadLock readerLock;

/**
* 写锁
*/
private final ReentrantReadWriteLock.WriteLock writerLock;

/**
* 获取写锁
*
* @return
*/
public ReentrantReadWriteLock.WriteLock writeLock() {
return writerLock;
}

/**
* 获取读锁
*
* @return
*/
public ReentrantReadWriteLock.ReadLock readLock() {
return readerLock;
}

/**
* Sync 继承自 AQS ,执行所有同步机制
* 根据 ReentrantReadWriteLock 构造函数传入的布尔值决定要构造哪一种 Sync 实例
*/
final Sync sync;

/**
* 默认创建非公平的 ReentrantReadWriteLock
*/
public ReentrantReadWriteLock() {
this(false);
}

/**
* 根据传入的公平策略创建 ReentrantReadWriteLock
* 说明:创建 ReentrantReadWriteLock 对象同时,会依次创建对应模式的 AQS 对象、读锁对象、写锁对象
*
* @param fair 公平策略
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}

/**
* 静态内部类 Sync ,继承 AQS
* 1 具体子类包括:非公平模式 NonfairSync 和 公平模式 FairSync
* 2 实现 AQS 中的独占和共享模式的两对方法
*/
abstract static class Sync extends AbstractQueuedSynchronizer {...}

/**
* 非公平版本的 Sync
*/
static final class NonfairSync extends Sync {...}

/**
* 公平版本的 Sync
*/
static final class FairSync extends Sync {...}

/**
* 读锁
*/
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;

/**
* 使用外层的 ReentrantReadWriteLock 的 AQS 管理同步状态
*/
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

// 省略其它方法
}

/**
* 写锁
*/
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;

/**
* 使用外层的 ReentrantReadWriteLock 的 AQS 管理同步状态
*/
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

// 省略其它方法
}

通过前文分析的 ReentrantReadWriteLock UML 类图和相关的组合关系图,不难发现与上述代码结构是一一对应的。下面我们依次对读锁和写锁依赖的 AQS 相关实现进行介绍,理解了相关的实现后也就基本理解了读写锁的实现。

Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/**
* 静态内部类 Sync ,继承 AQS
* 具体子类包括:非公平模式 NonfairSync 和 公平模式 FairSync
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;

/**
* 将同步状态 state 分为两段:高 16 位用于共享模式;低 16 位用于独占模式
*/
static final int SHARED_SHIFT = 16;

// 读锁的值操作单位
// 由于高 16 位用于读锁,因此每次操作基于 1 左移 16 位的值,也就是从高 16 位的末尾进行计算
// 即,同步状态 state 加减 1 << 16 => 1 00000000 00000000
// 写锁是低 16 位,直接对同步状态 state 加减
static final int SHARED_UNIT = (1 << SHARED_SHIFT);

// 锁持有次数溢出的阈值,即 2^16 -1
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;

// 独占模式掩码,即 1 << 16 -1 => 11111111 11111111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/**
* 取 c 的高 16 位的值,代表读锁的获取次数,包括重入
* 注意:该值是所有线程获取次数总和,包括每个线程重入情况
*/
static int sharedCount(int c) {
return c >>> SHARED_SHIFT;
}

/**
* 取 c 的低 16 位的值,代表写锁的重入次数(写锁是独占模式)
*/
static int exclusiveCount(int c) {
return c & EXCLUSIVE_MASK;
}


//----------------------- 🌟线程读锁计数器 --------------------/

// 用于记录每个线程持有的读锁次数(包括读锁重入)
static final class HoldCounter {
// 线程持有读锁次数
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

// ThreadLocal 的子类,保存线程变量副本 HoldCounter
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
/**
* 初始化 HoldCounter
*
* @return
*/
@Override
public HoldCounter initialValue() {
return new HoldCounter();
}
}

// 当前线程读锁计数器
// 说明:使用 ThreadLocal 来记录当前线程持有的读锁次数
private transient ThreadLocalHoldCounter readHolds;

// 最后获取读锁的线程读锁计数器
// 说明:缓存最后一个获取读锁的线程持有读锁的次数,这里不是全局的概念,所以不管哪个线程获取到读锁后,就把这个值占为已用
private transient HoldCounter cachedHoldCounter;


// 首个获取读锁的线程(并且其未释放读锁)读锁计数器
// 说明:
// 1 这里不是全局的概念,该值被设置的条件是,当获取读锁时此时读锁没有线程持有。等这个 firstReader 代表的线程释放掉读锁以后,会有新的线程占用这个属性,也就是这个"第一个"是动态的。
// 2 在读锁不产生竞争的情况下,记录读锁重入次数是非常方便的
// 3 如果一个线程使用了 firstReader,那么它就不需要占用 cachedHoldCounter 变量了
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;


//-------------------- 线程读锁计数器🌟 --------------------/

// 构造方法中初始化
Sync() {
// 初始化 readHolds 这个 ThreadLocal 属性
readHolds = new ThreadLocalHoldCounter();
// 为了保证 readHolds 的内存可见性
setState(getState()); // ensures visibility of readHolds
}


/**
* 获取读锁是否需要阻塞,交给子类实现
*/
abstract boolean readerShouldBlock();

/**
* 获取写锁是否需要阻塞,交给子类实现
*/
abstract boolean writerShouldBlock();

// 省略获取和释放同步状态方法对,即获取和释放读锁/写锁方法。
// 在分析读锁和写锁时结合分析,这里先不展示源码

}

继承 AQS 的静态内部类 Sync 负责读锁 ReadLock 和写锁 WriteLock 的获取与释放工作,对读写锁 ReentrantReadWriteLock 的公平和非公平支持交给了两个子类实现。下面对 Sync 中的属性和抽象方法进行介绍,这些属作为最基础的数据支持读写锁的运行与统计。

同步状态

  • 读写锁将 int 类型的同步状态 state 同时赋予两种语义,高 16 位表示读锁的持有次数,包括线程重入锁的情况。获取到读锁一次:state + (1<<16),释放掉读锁一次:state - (1<<16)
  • 低 16 位表示写锁的获取次数,因为写锁是独占锁,同时只能被一个线程获取,因此它代表的重入次数。获取写锁一次:state + 1,释放写锁一次:state -1

线程读锁计数器

  1. 每个线程都需要记录获取的读锁次数,这样才能知道到底是不是读锁重入。注意,判断读锁重入和写锁重入完全不一样。写锁属于独占锁,同一时刻写锁只能一个线程持有,因此同步状态的低 16 位的值就是该线程持有写锁的次数(包括重入);读锁属于共享锁,同一时刻允许多个线程持有,而同步状态的高 16 位的值是所有线程持有的总次数(包括各个线程重入),因此不能借助同步状态得出各个读线程持有读锁的次数,也就不能判断是否读锁重入,因此需要线程读锁计数器来辅助完成该诉求。
  2. 读写锁使用 ThreadLocal 维护每个线程读锁计数器,这样就能识别出哪个线程持有多少次读锁,进而可以判断线程是否是读锁重入以及线程持有读锁的次数。此外,读写锁基于性能考虑,又引入 “首个线程读锁计数器”“最后线程读锁计数器”。其实 ThreadLocal<HoldCounter> readHolds 完全可以完成计数,只是 ThreadLocal 内部基于 Map 来查询的,相比直接使用变量记录线程读锁计数信息性能要差了那么一丢丢,不过这两个计数器只能记录一个线程持读锁信息,并且是动态变化的,提升性能的依据是尽可能先用这两个计数器,然后才使用通用的 ThreadLocal<HoldCounter> readHolds 记录线程读锁信息。
  3. “首个线程读锁计数器” 是使用 firstReaderfirstReaderHoldCount 两个属性组合而成的。“最后线程读锁计数器” 是使用 HoldCounter 类型的 cachedHoldCounter 属性表示。

读写公平策略

读写锁 ReentrantReadWriteLock 具体分为读锁 ReadLock 和写锁 WriteLock ,在公平和非公平模式下读锁和写锁的表现不同,因此将具体的实现交给公平和非公平子类实现。

非公平 Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
+--- ReentrantReadWriteLock
/**
* 非公平版本的 Sync
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;

/**
* 获取写锁是否需要阻塞
* @return
*/
final boolean writerShouldBlock() {
// 如果是非公平模式,那么 lock 的时候就可以直接用去抢锁,抢不到再排队
return false; // writers can always barge
}

/**
* 获取读锁是否需要阻塞
* @return
*/
final boolean readerShouldBlock() {
// 判断同步队列中 head 的第一个后继节点是否是来获取写锁的,如果是,就算是非公平模式,也先让该节点获取写锁,避免线程饥饿
return apparentlyFirstQueuedIsExclusive();
// final boolean apparentlyFirstQueuedIsExclusive() {
// Node h, s;
// return (h = head) != null &&
// (s = h.next) != null &&
// !s.isShared() &&
// s.thread != null;
// }
}
}

在非公平模式下,写锁优先尝试抢占锁,抢占失败才会去排队;一般来说,非公平模式下读锁也应该直接尝试抢占锁,但是写锁被定义了更高的优先级,读锁会先判断队列中等待的第一个线程节点是否是获取写锁的,如果是就算是非公平模式也先让该节点获取写锁,避免线程饥饿。

公平 Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
+--- ReentrantReadWriteLock
/**
* 公平版本的 Sync
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;

/**
* 获取写锁是否需要阻塞
* @return
*/
final boolean writerShouldBlock() {
// 那么如果阻塞队列有线程等待的话,就乖乖去排队
return hasQueuedPredecessors();
}

/**
* 判断读是否要阻塞
* @return
*/
final boolean readerShouldBlock() {
// 同步队列中有线程节点在等待
return hasQueuedPredecessors();
}
}

在公平模式下,无论是写锁还是读锁,都遵循先来后到原则。需要说明的是,对于读锁的获取,无论是公平还是非公平模式,它都没有抢占的概念,即使是在非公平模式下,还是需要判断同步队列中的第一个线程节点是否是写线程

至此,读锁和写锁的前置准备已经完成,下面我们进入到读锁和写锁的源码。

读锁

读锁内部持有 ReentrantReadWriteLock 中的 Sync 类型的对象,可能是 FairSync 对象,也可能是 NonfairSync 对象,具体由 ReentrantReadWriteLock 构造函数决定。ReadLock 锁获取与释放功能全部委托给 sync 对象完成。

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
+--- ReentrantReadWriteLock
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;

/**
* 使用 AQS 管理同步状态
*/
private final Sync sync;

/**
* 构造方法
*
* @param lock
*/
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}

获取读锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
+--- ReadLock
/**
* 获取读锁
*/
public void lock() {
// AQS 模版方法,获取共享同步状态
sync.acquireShared(1);
}

+--- AQS
public final void acquireShared(int arg) {
// 尝试获取读锁
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

+--- Sync

/**
* AQS 模版方法,获取共享同步状态 - 获取读锁
* 说明:
* 1 读锁是一个支持重入的共享锁,它能被多个线程同时获取,在没有其它写线程访问时(注意非公平模式下同步队列中首个获取写锁的线程节点的情况),读锁总会被成功地获取,而所做的也只是增加读状态。
* 2 如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其它线程获取,则进入等待状态。
* 3 读锁的实现有两部分逻辑,一个是获取读锁,另一个是设置线程的读锁计数器。
*
* @param unused
* @return
*/
protected final int tryAcquireShared(int unused) {

// 1 获取当前线程
Thread current = Thread.currentThread();

// 2 获取同步状态
int c = getState();

// 3 exclusiveCount(c) != 0 ,说明有线程持有写锁。如果不是当前线程持有的写锁,那么当前线程获取读锁失败。
// 由于读写锁的降级,如果当前线程持有写锁,是可以继续获取读锁的
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;

// 4 读锁的获取次数
int r = sharedCount(c);

// 5 获取读锁是否需要被阻塞(需要考虑公平与非公平的情况)
if (!readerShouldBlock() &&
// 判断持有读锁次数是否会溢出 (2^16-1)
r < MAX_COUNT &&
// 使用 CAS 是将 state 属性的高 16 位加 1,低 16 位不变,如果成功就代表获取到了读锁
// c + 1 00000000 00000000
compareAndSetState(c, c + SHARED_UNIT)) {

/* 进入当前代码区域,表示获取到了读锁。下面的逻辑是记录线程读锁计数器,用于标记当前线程持读锁次数,为判断是否读锁重入以及线程获取读锁次数做基础数据准备 */

// 5.1 r == 0 说明当前线程是第一个获取读锁的线程,或者是在它之前的读锁都已经释放了
// 记录 firstReader 为当前线程,及其持有的读锁数量:1
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;

// 5.2 当前线程重入锁,加 1 即可
} else if (firstReader == current) {
firstReaderHoldCount++;

// 5.3 当前线程不是第一个获取读锁,并且已经有其它线程获取了读锁
// - 使用 readHolds 保存当前线程持有的读锁次数
// - 将当前线程持有读锁信息更新为 cachedHoldCounter 的值,该变量用于记录最后一个获取读锁的线程持锁信息
} else {

// 获取最后一个获取读锁的线程信息。
Sync.HoldCounter rh = cachedHoldCounter;

// 如果 cachedHoldCounter 缓存的不是当前线程,则将当前线程持有读锁信息缓存到 HoldCounter
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();

// cachedHoldCounter 缓存的是当前线程,但 count 为 0
else if (rh.count == 0)
readHolds.set(rh);

// 将当前线程持有读锁次数 count 加 1
rh.count++;
}

// return 大于 0 代表获取到了共享锁
return 1;
}

// 进入下面方法,可能是以下三种情况:
// - compareAndSetState(c, c + SHARED_UNIT) 存在竞争,CAS 失败
// - 公平模式 FairSync 下同步队列中有其它线程节点在等待锁
// - 非公平模式 NonFairSync 下,同步队列中第一个线程节点(head.next)是获取写锁的,为了避免写锁饥饿,获取读锁的线程不应该和它竞争
return fullTryAcquireShared(current);
}

读锁获取使用 AQS 的共享模式获取同步状态,整个流程如下:

  1. 判断写锁是否被其它线程占有(支持锁降级获取读锁),如果被其它线程占有直接获取读锁失败。
  2. 根据具体的公平或非公平模式判断获取读锁是否需要阻塞,阻塞的话会进入后续二次确认方法,即判断是否是重入获取读锁,重入获取读锁不需要阻塞。
  3. 获取读锁成功后,记录线程读锁计数器。

获取读锁的注意事项如下:

  • 获取读锁前提条件是写锁没有被其它线程持有,当前线程持有写锁是可以继续获取读锁的,这是读写锁的锁降级特性。
  • 在公平模式下,获取读锁时同步队列中有等待的线程节点,如果此时不是重入获取读锁,那么获取锁失败。
  • 在非公平模式下,获取读锁时同步队列中第一个线程节点是获取写锁的情况,此时如果不是重入获取读锁,那么获取锁失败。写锁被定义更高的优先级。
  • 获取锁成功后,需要记录当前线程读锁计数器。线程读锁计数器有两个作用,一个是用于判断线程是否是重入读锁,另一个是提供当前线程获取读锁的次数

fullTryAcquireShared

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
+--- Sync
/**
* 这段代码与 tryAcquireShared 中的代码在一定程度上是冗余的,但由于没有使用重试和惰性读取保持计数之间的交互使 tryAcquireShared 复杂化,所以总体上更简单。
*
* @param current 当前线程
* @return
*/
final int fullTryAcquireShared(Thread current) {

// 记录线程获取读锁的次数
Sync.HoldCounter rh = null;

// for 循环
for (; ; ) {
// 1 获取同步状态
int c = getState();

// 2 如果其它线程获取了写锁,那么当前线程是不能获取到读锁的,只能去同步队列中排队
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;

// else we hold the exclusive lock; blocking here
// would cause deadlock.

// 3 获取读锁应该阻塞,说明同步队列中有其它线程在等待。
// 注意: 既然是获取读锁应该阻塞,那么进入有什么用呢? 是用来处理读锁重入的
} else if (readerShouldBlock()) {

// firstReader 线程重入锁,暂不做操作,直接执行后面的 CAS
if (firstReader == current) {
// assert firstReaderHoldCount > 0;

// 非 firstReader 线程重入锁,则继续判断其它情况重入锁
} else {
if (rh == null) {

// 判断是否是 cachedHoldCounter 重入锁,如果也不是,那就是既不是 firstReader 可重入也不是 lastReader 可重入,
// 这是只需从 ThreadLocal 取出当前线程持有读锁信息,如果没有占有,则进行兜底操作,让线程去排队
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {

// 那么到 ThreadLocal 中获取当前线程的 HoldCounter
// 注意,如果当前线程从来没有初始化过 ThreadLocal 中的值,get() 会执行初始化
rh = readHolds.get();

// 如果发现 count == 0,也就是说是上一行代码初始化的,之前该线程并没有持有读锁,那么执行 remove 操作清空信息,因为接下来该线程要入队等待了
// 然后往下两三行,乖乖排队去
if (rh.count == 0)
readHolds.remove();
}
}

// 非重入,去同步队列中排队
if (rh.count == 0)
return -1;
}
}


if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");

// 这里 CAS 成功,那么就意味着成功获取读锁了
// 下面需要做的是设置 firstReader 或 cachedHoldCounter,以及 readHolds,记录线程读锁信息
if (compareAndSetState(c, c + SHARED_UNIT)) {

// 注意这里 c 是上面的快照,上面修改的不是 c 而是 state
// 如果发现 sharedCount(c) 等于 0,也就是当前没有线程持有读锁,就将当前线程设置为 firstReader
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;

// 如果是重 firstReader 重入,直接累加持有读锁的次数即可
} else if (firstReader == current) {
firstReaderHoldCount++;

// 将 cachedHoldCounter 设置为当前线程持有读锁信息,并且使用 ThreadLocal 记录当前线程持有读锁信息
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);

// 累加当前线程持读锁次数
rh.count++;

// 更新 cachedHoldCounter 为当前线程持有读锁信息
cachedHoldCounter = rh; // cache for release
}

// 返回大于 0 的数,代表获取到了读锁
return 1;
}
}
}

上述方法在一定程度上是对 tryAcquireShared 方法的冗余,主要是对并发获取读锁失败以及重入获取锁的处理。具体作用如下:

  1. tryAcquireShared 方法中 CAS 获取同步状态失败后增加获取读锁成功的机会,尽可能不进入同步队列。
  2. 处理 tryAcquireShared 中因获取读锁需要阻塞的情况(上述方法只会处理重入读锁的情况,因为重入读锁不需要阻塞,非重入就需要阻塞,也就是获取读锁再次失败)
    • 在非公平模式 NonFairSync 情况下,如果同步队列中 head.next 是获取写锁的节点,那么如果该线程不是重入读锁则获取失败,如果是重入读锁则获取成功,因为重入优先级更高。
    • 在公平模式 FairSync 情况下,如果同步队列中有线程节点等待,那么如果不是重入读锁则获取失败,如果是重入读锁则获取成功,同样地,因为重入优先级更高。

释放读锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
+--- ReadLock
/**
* 释放读锁
*/
public void unlock() {
sync.releaseShared(1);
}

+--- AQS
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
+--- Sync
/**
* AQS模版方法,释放共享同步状态 - 释放读锁
* 说明:
* 读锁的每次释放均减少读状态,减少的值是 1<<16
*
* @param unused
* @return
*/
protected final boolean tryReleaseShared(int unused) {
// 1 获取当前线程
Thread current = Thread.currentThread();

// 2 如果当前线程是 firstReader ,说明当前线程是第一个读线程
if (firstReader == current) {

// 如果 firstReaderHoldCount 等于 1 ,那么本次解锁后就不再持有锁了,需要把 firstReader 置为 null
// 没有设置 firstReaderHoldCount = 0 ,是因为没必要,其他线程使用的时候自己会重新设置该值
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;

// 3 当前线程不是首个获取读锁的线程
} else {

// 判断当前线程是不是最后获取读锁的线程,不是的话要到 ThreadLocal 中取
Sync.HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();

// 获取计数
int count = rh.count;
if (count <= 1) {
// 将 ThreadLocal remove 掉,防止内存泄漏。因为已经不再持有读锁了
readHolds.remove();
// 防止释放锁和获取锁次数不匹配
if (count <= 0)
throw unmatchedUnlockException();
}

// count 减 1
--rh.count;
}


// 4 将同步状态 state 的高 16 位减 1,如果发现读锁和写锁都释放完了,那么唤醒后继的等待线程节点
for (; ; ) {

// 获取同步状态 state
int c = getState();

// nextc 是 state 高 16 位减 1 后的值
int nextc = c - SHARED_UNIT;

// 如果 nextc == 0,那就是 state 全部 32 位都为 0,也就是读锁和写锁都没有被占有
if (compareAndSetState(c, nextc))
// 释放读锁对读操作没有影响,但是如果现在读锁和写锁都是空闲的,那么释放读锁可能允许等待的写操作继续进行。
return nextc == 0;
}
}

读锁释放过程比较简单,主要还是对应的两个操作,具体如下:

  • 更新当前释放读锁的线程对应的读锁计数器,如果是完全释放锁,则需要销毁对应的读锁计数器。
  • 更新同步状态的高 16 位的值,表示释放读锁。如果是完全释放锁,则当前线程去唤醒同步队列中的线程节点。注意,此时同步队列中既可能有写线程节点,也可能有读线程节点,可以想下锁降级的阻塞场景。此外,共享模式是传播性唤醒,需要好好体会下。

写锁

写锁是一个支持重入的排它锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取写锁时,读锁已经被获取或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。

写锁内部持有 ReentrantReadWriteLock 中的 Sync 类型的对象,可能是 FairSync 对象,也可能是 NonfairSync 对象,具体由 ReentrantReadWriteLock 构造函数决定。ReadLock 锁获取与释放功能全部委托给 sync 对象完成。

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
+--- ReentrantReadWriteLock
/**
* 写锁
* 1 写锁是独占锁
* 2 如果有读锁被占用,写锁获取要进入同步队列中等待
*/
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;

protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
}

获取写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
+--- WriteLock
/**
* 获取写锁
*/
public void lock() {
sync.acquire(1);
}

+--- AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

+--- Sync
/**
* AQS 模版方法,获取独占同步状态 - 获取写锁
* 说明:
* 1 该方法除了重入条件(当前线程是获取了写锁的线程)之外,增加了一个读锁是否存在的判断。
* 2 如果存在读锁,则写锁不能被获取,原因在于,读写锁要确保写锁的操作对读锁可见,如果允许读锁在已经被获取的情况下对写锁的获取,
* 那么正在运行的其它读线程就无法感知到当前写线程的操作。因此,只有等待其它读线程都释放了读锁,写锁才能被当前线程获取。
* 3 写锁一旦被获取,则其它读写线程的后续访问都被阻塞。
*
* @param acquires
* @return
*/
protected final boolean tryAcquire(int acquires) {

// 1 获取当前线程
Thread current = Thread.currentThread();

// 2 获取同步状态 state
int c = getState();

// 3 根据 state 获取写锁的持有次数
int w = exclusiveCount(c);

// 4 c != 0 表示要么有线程持有读锁,要么有线程持有写锁
// 由于该方法是获取写锁,因此下面只能是写锁重入分支(存在持有读锁的情况直接失败)
if (c != 0) {
// c != 0 && w == 0: 写锁可用,但是有线程持有读锁(也可能是自己持有,但由于不支持锁升级,因此不能获取写锁)
if (w == 0 ||
// c != 0 && w !=0 && current != getExclusiveOwnerThread(): 非重入,其他线程持有写锁
current != getExclusiveOwnerThread())
// 存在读锁或者当前获取线程不是已经获取写锁的线程
return false;

// 判断写锁持有次数是否超过阈值(65535)
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");

// 能到这里的,只可能是写锁重入,更新同步状态即可
setState(c + acquires);
return true;
}

// 执行到这里,此时 state == 0 ,读锁和写锁都没有被获取
// 5 获取写锁,这里判断是否需要阻塞(这里考虑到公平还是非公平)
if (writerShouldBlock() ||
// 不需要阻塞,则更新 state
!compareAndSetState(c, c + acquires))
return false;

// 6 当前线程独占锁
setExclusiveOwnerThread(current);

return true;
}

写锁获取使用 AQS 的独占模式获取同步状态的流程,整个流程如下:

  1. 判断读锁是否被线程持有(包括当前线程自身),如果被持有则获取写锁直接失败。
  2. 判断是否是重入获取写锁,如果不是直接获取写锁失败。
  3. 根据具体的公平或非公平模式判断获取写锁是否需要阻塞,如果不需要阻塞则尝试获取写锁,成功后当前线程独占锁。

释放写锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
+--- WriteLock
/**
* 写锁释放
*/
public void unlock() {
sync.release(1);
}

+-- AQS
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
+--- Sync
/**
* AQS 模版方法,释放独占同步状态 - 释放写锁
*
* @param releases
* @return
*/
protected final boolean tryRelease(int releases) {
// 当前线程是否占有锁,否则没有资格尝试释放写锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();

// 计算同步状态剩余值
int nextc = getState() - releases;

// 写锁重入次数是否为 0 ,为 0 表示可以释放
boolean free = exclusiveCount(nextc) == 0;

// 完全释放
if (free)
// 清空独占线程
setExclusiveOwnerThread(null);

// 更新 state
setState(nextc);
return free;
}

写锁的释放与 ReentrantLock 的释放过程基本类似,每次释放均减少写状态,当写状态为 0 时表示写锁可以被释放。

锁降级

ReentrantReadWriteLock 锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指持有写锁的线程在不释放写锁的同时,再获取到读锁,随后释放写锁,最后释放读锁

锁降级示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
 ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
public volatile Boolean updateFlag = Boolean.FALSE;
/**
* 读取缓存,一旦缓存被修改破坏,需要更新
*/
private void processData() {

// 获取读锁,该方法主要是读取缓存数据
readLock.lock();

// 共享数据发生改变,需要重新计算缓存数据
if (updateFlag) {
// 必须先释放掉读锁,后续加写锁更新缓存
readLock.unlock();

// 1 获取写锁,用于只有一个线程更新缓存
writeLock.lock();

try {
if (updateFlag) {
// 更新缓存值
cacheData = caculateCacheData();
updateFlag = Boolean.FALSE;
}

// 2 获取读锁
readLock.lock();
} finally {
// 3 释放写锁
writeLock.unlock();
}

// 以上 1、2、3 步完成锁降级,即写锁降级为读锁
}

try {
// 使用缓存
System.out.println("print cache: " + cacheData);
} finally {
readLock.unlock();
}
}

上述示例中,缓存数据可用时,每个线程只需获取读锁然后访问,数据访问完成后释放读锁。但当共享的缓存数据被破坏,此时所有访问 processData 方法的线程都能感知到,但只有一个线程能够获取写锁然后更新缓存,其它线程都会被阻塞。当线程更新完缓存数据后,会接着获取读锁,随后才会释放写锁,完成锁的降级。

锁降级中的读锁获取是否有必要?答案是必要的,主要是为了保证数据的可见性。如果线程计算完缓存后没有获取读锁而是直接释放掉了写锁,那么此时如果存在另一个线程 t 获取了写锁并修改了缓存,那么当前线程就无法感知线程 t 的数据更新。如果当前线程在释放掉写锁前获取读取,也就是遵循锁降级的步骤,则线程 t 就无法获取写锁,直到当前线程访问数据并释放掉读锁后,线程 t 才能有机会获取写锁更新缓存数据。

注意: ReentrantReadWriteLock 不支持锁升级,即持有读锁时再获取写锁,随后释放读锁。不支持的目的是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新数据,则其更新对其它获取到读锁的线程是不可见的。此外,是为了避免发生死锁,试想一个线程先获取读锁,然后再获取写锁,那么该线程会由于获取写锁失败进入同步队列中等待,可能之后就不会被唤醒了。

小结

ReentrantReadWriteLock 读写锁正如其名,具体分为读锁和写锁。无论是读锁还是写锁,整个获取与释放锁的流程都是交给实现 AQS 的 Sync 类型的对象完成,准确来说是公平 Sync 或者非公平 Sync 对象。对于读锁和写锁的语义,是将同步状态 state 划分为高低位,高 16 位表示读锁状态,低 16 位表示写锁状态。写锁的获取和释放锁类似重入锁 ReentrantLock 过程,唯一不同的是写锁需要考虑读锁的占有情况。读锁的获取和释放比较复杂,复杂的主要原因是读锁允许多个线程同时获取且支持可重入,此时同步状态的高 16 位的值没办法表示各个线程持有读锁的情况,因此读写锁新增了线程读锁计数器的概念,有了这个概念就可以很轻松判断读线程重入锁的情况以及实时获取当前线程持有读锁的次数。