并发 - ThreadLocal

概述

ThreadLocal 提供了线程局部变量,它是将线程需要访问的数据存储在线程对象自身中,从而避免多线程的数据竞争问题。ThreadLocal 实现原理如下图所示:

理解 ThreadLocal 原理,其实就是理解 ThreadThreadLocal 以及 ThreadLocalMap 三者之间的关系。线程类 Thread 内部持有 ThreadLocalMap 的成员变量,而 ThreadLocalMap 是 ThreadLocal 的内部类,ThreadLocal 对外暴露操作数据的 API 都是操作 ThreadLocalMap 中的数据。总得来说,线程 Thread 在向 ThreadLocal 中设置值时,其实都是向自己所持有的 ThreadLocalMap 中设置数据;读的时候同理,先是从线程自身中取出持有的 ThreadLocalMap ,然后再根据 ThreadLocal 引用作为 key 取出对应的元素 Entry 进而取出 value。因此 ThreadLocal 可以轻松实现变量的线程隔离,毕竟变量都是维护在各个线程中的,自然没有竞争。

源码分析

Thread

根据前文中的 ThreadLocal 实现原理图可以发现 ThreadLocal 是 Thread 使用的一个工具,下面我们剥离出 Thread 中相关 ThreadLocal 信息。

1
2
3
4
5
6
7
8
public class Thread implements Runnable {

/** 与此线程相关的 ThreadLocal 值,这个映射由 ThreadLocal 类维护 */
ThreadLocal.ThreadLocalMap threadLocals = null;

/** 与此线程相关的 InheritableThreadLocal 值。这个映射由 InheritableThreadLocal「ThreadLocal 的子类」 类维护。*/
ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
}

Thread 中维护了两个 ThreadLocalMap 类型的变量,threadLocals 用于存储当前线程使用的 ThreadLocal 相关信息;inheritableThreadLocals 用于存储当前线程使用的 InheritableThreadLocal 相关信息。使用 InheritableThreadLocal 可以实现多个线程访问 ThreadLocal 的值,即父线程中创建一个 InheritableThreadLocal 的实例,然后在子线程中就可以得到这个 InheritableThreadLocal 中设置的值。关于共享线程的 ThreadLocal 数据将在下面的内容中分析。

ThreadLocal

ThreadLocal 相关的 UML 类图如下所示:

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 1 作为 ThreadLocal 实例的变量,私有、不可变。每当创建 ThreadLocal 时这个值都会累加 HASH_INCREMENT
* 2 主要为了多个 ThreadLocal 实例的情况下,让哈希码能均匀的分布在2的N次方的数组里, 即 Entry[] table
*/
private final int threadLocalHashCode = nextHashCode();

/**
* 二进制:-10011110001101110111100110111001
* 十进制:-2654435769
* 说明:
* 1 Entry[] table 的大小必须是 2^N,那 2^N-1 的二进制表示就是低位连续的N个1,
* 2 key.threadLocalHashCode & (len-1) 的值就是 threadLocalHashCode 的低 N 位,这里使用位运算实现取模,和 HashMap 计算下标类似
* 3 要求 threadLocalHashCode 的值要均匀,这里给出 0x61c88647 就能达到。
* 原因:
* 取该值与fibonacci hashing(斐波那契散列法)以及黄金分割有关,目的就是为了让哈希码能均匀的分布在2的n次方的数组里, 也就是Entry[] table中。
*/
private static final int HASH_INCREMENT = 0x61c88647;

/**
* 作为 ThreadLocal 类变量
* <p>
* 基于当前 ThreadLocal ,下一个要给出的哈希码,自动更新,从 0 开始计数。
* 每次获取当前值并加上固定的 HASH_INCREMENT
*/
private static AtomicInteger nextHashCode = new AtomicInteger();

/**
* 返回下一个 ThreadLocal 的 hash
*/
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}

ThreadLocal 关键属性分为两类。一类是 ThreadLocal 实例变量,另一类是 ThreadLocal 类变量。其中类变量在 ThreadLocal 类加载过程就进行初始化,也就是 nextHashCode 的初始化,后续实例变量 threadLocalHashCode 随着对象的创建而调用 nextHashCode 方法进行赋值,可以看出每次递增为 HASH_INCREMENT ,至于为什么选择这个值,是和斐波那契散列法以及黄金分割有关,目的就是为了让哈希码能均匀的分布在 2^n 次方的数组里,这个和 HashMap 中 hash 实现目的是一致的。

了解了核心属性后,下面以几个方法做为入口分析 ThreadLocal 源码。

Set 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
+--- ThreadLocal
/**
* 设置当前线程变量的值 value
*
* @param value
*/
public void set(T value) {
// 1 获取当前线程
Thread t = Thread.currentThread();

// 2 根据当前线程获取其成员变量 threadLocals 所指向的 ThreadLocalMap 对象
ThreadLocalMap map = getMap(t);

// 3 判断当前线程的 ThreadLocalMap 是否为空
if (map != null)
// 3.1 如果不为空,说明当前线程内部已经有ThreadLocalMap对象了
// 那么直接将当前对应的 ThreadLocal 对象的引用作为键,存入的 value 作为值存储到 ThreadLocalMap 中
map.set(this, value);
else
// 3.2 创建一个 ThreadLocalMap 对象并将值存入到该对象中,并赋值给当前线程的threadLocals成员变量
createMap(t, value);
}

ThreadLocal 中的增、删、查方法基本都会有三个步骤。下面对 Set 方法进行小结:

  1. 获取当前线程
  2. 根据当前线程获取其持有的 ThreadLocalMap 映射表
  3. 操作映射表 ThreadLocalMap ,如果映射表存在就调用映射表的 set 方法放入数据,映射表不存在则先创建映射表再放入数据

Get 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
+--- ThreadLocal
/**
* 获取当前线程的线程副本变量
*
* @return
*/
public T get() {
// 1 获取当前线程
Thread t = Thread.currentThread();

// 2 根据当前线程获取其 ThreadLocalMap 实例
ThreadLocalMap map = getMap(t);

// 3 如果当前线程的 ThreadLocalMap 不为空
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T) e.value;
return result;
}
}

// 4 如果当前线程的 ThreadLocalMap 为空或 ThreadLocalMap 中没有当前 ThreadLocal 对应的元素,则调用 initialValue() 方法初始化值
return setInitialValue();
}

下面对 Get 方法进行小结:

  1. 获取当前线程
  2. 根据当前线程获取其持有的 ThreadLocalMap 映射表
  3. 操作映射表 ThreadLocalMap ,如果映射表存在就调用映射表的 getEntry 方法查询数据;映射表不存在或从映射表中没有找到当前 ThreadLocal 对应的元素,则调用 setInitialValue 方法完成映射表的创建或初始值的获取

Get 方法相比 Set 方法会涉及查询不到元素的情况,如果当前线程持有的映射表还没有或者找不到元素,那么 ThreadLocal 会尝试以初始化值的方式进行兜底处理,初始值的情况通过方法 initialValue() 交给具体实现。

setInitialValue 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
+--- ThreadLocal
/**
* 设置初始化值
*
* @return the initial value
*/
private T setInitialValue() {

// 1 调用 initialValue() 方法获取指定的初始化值,默认为 null
T value = initialValue();

// 2 获取当前线程
Thread t = Thread.currentThread();

// 3 获取当前线程的 ThreadLocalMap
ThreadLocalMap map = getMap(t);

// 4 如果 ThreadLocalMap 不为空,则直接将初始值设置到 ThreadLocalMap 中
if (map != null)
map.set(this, value);

// 5 如果 ThreadLocalMap 为空,则创建 ThreadLocalMap 对象,并设置初始值
else
createMap(t, value);
return value;
}

/**
* 初始化值,子类覆盖
*
* @return
*/
protected T initialValue() {
return null;
}

Remove 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
+--- ThreadLocal
/**
* 清理
*/
public void remove() {
// 1 获取当前线程持有的 ThreadLocalMap
ThreadLocalMap m = getMap(Thread.currentThread());

// 2 删除 ThreadLocalMap 中当前 ThreadLocal 相关的信息
// 包括清理对应的引用和值
if (m != null)
m.remove(this);
}

下面对 Remove 方法进行小结:

  1. 获取当前线程
  2. 根据当前线程获取其持有的 ThreadLocalMap 映射表
  3. 操作映射表 ThreadLocalMap ,如果映射表存在就调用映射表的 remove 方法根据当前 ThreadLocal 清理对应数据,可以防止内存泄漏

createMap 方法

1
2
3
4
5
6
7
8
9
10
+--- ThreadLocal
/**
* 为线程 t 初始化 ThreadLocalMap 对象
*
* @param t 当前线程
* @param firstValue 值
*/
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}

线程中的 ThreadLocalMap 使用的是延迟初始化,即第一次调用 get() 或者 set() 方法的时候才会进行初始化。

getMap 方法

1
2
3
4
5
+--- ThreadLocal
// 返回线程 t 的 threadLocals
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}

上述方法用于获取每个线程持有自身的 ThreadLocalMap 实例,因此它们是不存在并发竞争的,这就是我们说的每个线程都有自己的变量副本。

至此,ThreadLocal 相关属性以及暴露的方法已经分析完毕。可以看出,ThreadLocal 中并没有持有 ThreadLocalMap 的引用,该引用是在 Thread 类中。此外,ThreadLocal 暴露的 API 操作都是基于 ThreadLocalMap 的。因此,我们理解了 ThreadLocalMap 才算是掌握了 ThreadLocal 。下面我们重点对 ThreadLocalMap 进行分析。

ThreadLocalMap

ThreadLocalMap 是 ThreadLocal 的静态内部类,本质上是一个 Map ,和 HashMap 类似,依然是 key-value 的形式,具体由 Entry 结构封装。

属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 数组初始化容量
*/
private static final int INITIAL_CAPACITY = 16;

/**
* 存储数据的 Entry 数组,长度是 2 的幂。
*/
private Entry[] table;

/**
* 数组中元素的个数,初始值为 0
*/
private int size = 0;

/**
* 数组扩容阈值,默认为0,创建了 ThreadLocalMap 对象后会被重新设置
*/
private int threshold; // Default to 0

/**
* 设置调整大小阈值,以保持在最坏 2/3 的负载因子。
*/
private void setThreshold(int len) {
threshold = len * 2 / 3;
}

/* ThreadLocalMap使用 开放地址法-线性探测法 来解决哈希冲突 */
// 向后一个位置找,注意从头开始的情况
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
// 向前一个位置找,注意跳到尾部的情况
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}

整体属性类似 HashMap ,各个值的含义已经详细注释就不再说明。需要注意的是,由于 ThreadLocalMap 使用的是线性探测法解决 hash 冲突,因此定义向前和向后探测的方法以便于寻找合适的位置及定位元素。

存储结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
+--- ThreadLocalMap
static class Entry extends WeakReference<ThreadLocal<?>> {
// 与传入 ThreadLocal 关联的值,也就是线程局部变量值
Object value;

/**
* Entry的 key 是对 ThreadLocal 的弱引用,当抛弃掉 ThreadLocal 对象时,垃圾收集器会忽略这个 key 的引用而清理掉 ThreadLocal 对象。
*
* @param k
* @param v
*/
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

一个线程可以使用多个 ThreadLocal 将不同的值存储到当前线程的 ThreadLocalMap 中。熟悉 HashMap 存储结构的可以发现,ThreadLocalMap 中的存储结构有两大不同点:

  • 没有链表相关的指针,更没有树节点的左右链接
  • 元素节点结构 Entry 继承了 WeakReference 类,以实现弱引用功能

作为一个映射表,必然会有元素冲突的可能,虽然和 HashMap 结构类似,但是没有 HashMap 中链表和树形结构,那么 ThreadLocalMap 是怎么解决 hash 冲突的呢?答案是线性探测法。关于 Entry 继承 WeakReference 的原因我们在后面的文章中进行分析。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
+--- ThreadLocalMap
/**
* 构造方法
*
* @param firstKey ThreadLocal 引用
* @param firstValue 设置的值
*/
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 1 初始化 Entry 数组,大小为 16
table = new Entry[INITIAL_CAPACITY];

// 2 获取 ThreadLocal 的 hash 值(该值是累加的)
// 计算当前 key 对应的数组下标位置, 和 HashMap 的位运算代替取模原理一样
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);

// 3 将 Entry 对象存入到数组指定的位置
table[i] = new Entry(firstKey, firstValue);

// 4 记录数组元素个数
size = 1;

// 5 初始化扩容阈值
setThreshold(INITIAL_CAPACITY);
}

当线程 Thread 持有的 ThreadLocalMap 还没有初始化时,在使用 ThreadLocal 存储或获取数据时都会先创建一个 ThreadLocalMap 对象然后挂载到当前线程 Thread 上,后续不管该线程使用了多少个 ThreadLocal ,都会使用创建的 ThreadLocalMap 进行储存相应值,有冲突就使用线性探测法解决。

了解了 ThreadLocalMap 的底层数据结构后,下面我们依然从它的核心操作方法出发分析底层实现。

getEntry 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
+--- ThreadLocalMap
/**
* 获取与 key 关联的 Entry
*
* @param key the thread local object
* @return the entry associated with key, or null if no such
*/
private Entry getEntry(ThreadLocal<?> key) {
// 1 根据 key 计算下标
int i = key.threadLocalHashCode & (table.length - 1);

// 2 根据下标获取 Entry
Entry e = table[i];
if (e != null && e.get() == key)
return e;

// 3 通过下标直接得到的 Entry 不是要找的,那么就线性探测找
else
return getEntryAfterMiss(key, i, e);
}

获取线程局部变量值的 getEntry 方法是 ThreadLocal 暴露的 get() 方法的底层实现,它的主要流程如下:

  • 根据 ThreadLocal 引用计算对应的数组下标,这个和 HashMap 类似
  • 根据计算得到的下标尝试直接获取对应的 Entry,如果当前 Entry 为空或对应的 key 不是当前传入的 key ,那么进行线性探测获取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
+--- ThreadLocalMap
/**
* getEntry方法的版本,用于在键未在其直接散列槽中找到时使用。
* 注意,该方法会清理无效 Entry
*
* @param key the thread local object
* @param i the table index for key's hash code
* @param e the entry at table[i]
* @return the entry associated with key, or null if no such
*/
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
// 1 获取 Table 信息
Entry[] tab = table;
int len = tab.length;

// 2 线性探测
while (e != null) {
// 获取 Entry 的 key
ThreadLocal<?> k = e.get();

// 2.1 命中,直接返回
if (k == key)
return e;
// 2.2 无效 Entry,执行连续段删除
if (k == null)
expungeStaleEntry(i);

// 2.3 获取下个位置
else
i = nextIndex(i, len);
e = tab[i];
}

// 3 遍历完也没查到,返回 null
return null;
}

之所以要进行线性探测获取其实是和链表查询一样,可能目标元素由于 hash 冲突放到了后续位置。注意:查询流程也就是 get() 方法可能也会清理无效的元素,以防止内存泄漏,但不能保证。关于连续段清理无效元素逻辑暂略过。

set 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
+--- ThreadLocalMap
/**
* 设置与 key 关联的值
* 说明:
* ThreadLocal 的 set 方法,就是为了将指定的值存入到指定线程的 ThreadLocalMap 对象中,具体还是通过 ThreadLocalMap 的 set 方法
*
* @param key the thread local object
* @param value the value to be set
*/
private void set(ThreadLocal<?> key, Object value) {

// 1 获取数组信息
Entry[] tab = table;
int len = tab.length;

// 2 计算当前 ThreadLocal 对象引用作为键在数组中的下标
int i = key.threadLocalHashCode & (len - 1);

// 3 根据获取到的下标进行线性探测,寻找空的位置。
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {

// 3.1 获取当前 Entry 中的 key ,即 ThreadLocal 的引用,注意是弱引用。
ThreadLocal<?> k = e.get();

// 3.2 判断和当前的 ThreadLocal 对象是否是同一个对象,如果是,那么直接进行值替换,并结束方法,
if (k == key) {
e.value = value;
return;
}

// 3.3 判断当前 Entry 是否失效,即是否被回收(弱引用),如果失效,说明当前位置可以重新使用,就使用新的 key-value 将其替换
// 该过程还会进行连续段删除其它无效的 entry
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}

// 4 找到某个空位置,直接将键值对设置到该位置上。
tab[i] = new Entry(key, value);
int sz = ++size;

// 5 尝试随机清理无效 entry ,如果没有可清理的 entry 且数组元素大小 >= 扩容阈值,则进行 rehash
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}

设置线程局部变量值的 set 方法稍微复杂点,下面对其主要流程进行说明:

  1. 根据传入的 key 计算数组下标位置
  2. 根据计算得到的下标进行线性探测,寻找空的位置以存放元素
    • 检测到存在相同 key 的元素,进行值覆盖,然后结束流程即可
    • 检测到无效的元素,则重用无效元素位置,并尝试清理无效的元素
    • 在某个空位置存放数据
  3. 如果不是值覆盖或重用无效元素位置的情况,那么需要判断是否需要 rehash

下面我们对上述关联的核心方法进行拆解分析。

replaceStaleEntry 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
+--- ThreadLocalMap
/**
* 替换无效 Entry ,并尝试删除其它无效的 Entry。
*
* @param key key
* @param value value
* @param staleSlot 无效 entry 的位置
*/
private void replaceStaleEntry(ThreadLocal<?> key, Object value, int staleSlot) {
// 1 获取数组信息
Entry[] tab = table;
int len = tab.length;
Entry e;

// 标志 Table 中是否存在无效 Entry。slotToExpunge != staleSlot 说明 table 中存在无效 Entry 需要进行清理,否则说明没有。
int slotToExpunge = staleSlot;

// 2 根据传入的无效 Entry 的位置 staleSlot 向前扫描一段连续非空的 Entry ,并记录最后一个无效的 Entry 的位置。或者扫描完也没有找到。
for (int i = prevIndex(staleSlot, len); (e = tab[i]) != null; i = prevIndex(i, len))
// 2.1 如果是无效 Entry ,则更新删除标记 slotToExpunge
if (e.get() == null)
slotToExpunge = i;


// 3 根据传入的无效 Entry 向后扫描一段连续的 Entry(根据线性探测,具有相同的 key 的元素一定是从后找的) ,以寻找是否有相同 key 的 Entry,以及在需要时更新删除标记位 slotToExpunge
// 找到相同 key 的元素或末尾的空槽,以最先出现的为准
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {

// 3.1 获取当前 Entry 的 key
ThreadLocal<?> k = e.get();

// 3.2 如果找到了具有相同的 key ,则更新其值。也就是更新其值并将其与传入的无效 Entry 替换,即与 table[staleSlot] 进行替换
if (k == key) {
// 3.2.1 更新为传入的 value
e.value = value;

// 3.2.2 使用相同 key 的 Entry 重用失效位置以避免新创建一个 Entry,以维持散列表的顺序。
tab[i] = tab[staleSlot];
tab[staleSlot] = e;

// 3.2.3 如果向前查找没有找到无效的 Entry,则更新删除标记位 slotToExpunge 为当前位置 i,此时 i 位置对应的是无效 Entry
if (slotToExpunge == staleSlot)
slotToExpunge = i;

// 3.3.4 将新过时的槽或在其上面遇到的任何其他过时槽发送到expungeStaleEntry,以删除或重新散列运行中的所有其他条目。
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}

// 如果向前查找没有找到无效 entry,并且当前向后扫描的entry无效,则更新 slotToExpunge 为当前值 i
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}

// 4 执行到这里,说明 table 中不存在相同 key 的 Entry,此时只需直接重用无效位置即可
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);

// 5 slotToExpunge != staleSlot 说明 table 中存在无效 Entry 需要进行清理
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

重用无效 Entry 位置有两种可能:

  • 如果扫描到的 Table 中存在和要插入的 key 相同的 Entry,那么就使用更新 value 后的该 Entry 替换无效 Entry 以避免新创建 Entry,同时该 Entry 成为了无效 Entry,只需等待后续删除即可。
  • 如果扫描到的 Table 中不存在和要插入的 key 相同的 Entry ,那么直接创建 Entry 替换无效 Entry 即可。

此外,上述方法还会对无效 Entry 进行清理,触发条件就是检测到其它无效 Entry 的存在。

rehash 操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
+--- ThreadLocalMap
/**
* rehash 操作
* 1 扫描整个 Table ,删除无效的 Entry
* 2 执行清理后,如果还需要扩容,则将表扩大一倍
*/
private void rehash() {
// 1 清理所有无效 Entry
expungeStaleEntries();

// 2 threshold - threshold / 4 = 1en / 2,降低扩容阈值是因为上面做了一次全清理 size 可能会减小
if (size >= threshold - threshold / 4)
resize();
}

/**
* Table 容量扩大为原来 2 倍
*/
private void resize() {
// 1 获取 Table 信息
Entry[] oldTab = table;
int oldLen = oldTab.length;

// 2 容量扩大为原来 2 倍
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;

// 3 重新映射旧数组中的元素
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();

// 清理无效 Entry
if (k == null) {
e.value = null; // Help the GC

// 线性探测重新设置值
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}

// 4 设置新的阈值
setThreshold(newLen);
size = count;
table = newTab;
}

新增数据后,如果达到阈值就会触发 rehash 流程,进行数组扩容和数据重新映射,这个没什么可说的。

remove 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
+--- ThreadLocalMap
/**
* 删除
*/
private void remove(ThreadLocal<?> key) {
// 1 获取 Table 信息
Entry[] tab = table;
int len = tab.length;

// 2 计算 key 对应的下标
int i = key.threadLocalHashCode & (len - 1);

// 3 进行线性探测,查找正确的 Entry
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {

// 找到对应的 Entry
if (e.get() == key) {
// 调用 WeakReference.clear() 方法清除对应的引用
e.clear();

// 连续段删除 Entry
expungeStaleEntry(i);
return;
}
}
}

上述方法是 ThreadLocal 暴露的 remove() 方法的底层实现,用来清理当前 ThreadLocal 关联的信息。具体清理信息如下:

  • 清除对当前 ThreadLocal 的弱引用
  • 清除当前 ThreadLcoal 对应的 Entry 中的 value
  • 清除当前 ThreadLocal 对应的 Entry

expungeStaleEntry 方法

expungeStaleEntry 方法用于彻底删除指定位置的 Entry 所以信息,接着向后连续段扫描删除无效 Entry ,并对可能存在 hash 冲突的 Entry 进行 rehash 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
+--- ThreadLocalMap
/**
* 删除过期的元素,是连续段方式的删除
*
*
*/
private int expungeStaleEntry(int staleSlot) {
// 1 获取 Table 信息
Entry[] tab = table;
int len = tab.length;

// 2 清理 staleSlot 位置的无效 Entry ,并递减元素个数
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;

// Rehash until we encounter null
Entry e;
int i;

// 3 从 stateSlot 开始向后扫描一段连续的非空的 Entry
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {

// 3.1 获取当前 Entry 的 k
ThreadLocal<?> k = e.get();

// 3.2 如果遇到key为null,表示无效entry,进行清理.
if (k == null) {
// value 置空
e.value = null;
// 元素置空
tab[i] = null;
// 元素个数递减
size--;

// 3.3 如果 key 不为 null ,则对可能存在 hash 冲突的 Entry 进行 rehash
} else {

// 计算 key 对应的下标,如果与现在所在位置不一致,认为存在 hash 冲突,置空当前 table[i] ,
// 并从 h 开始向后线性探测到第一个空的 slot,把当前的 entry 移动过去。
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;

// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}

// 下一个为空的下标
return i;
}

上述方法用于删除过期的元素,具体删除策略如下:

  • 根据传入的 stateSlot 清理对应的无效 Entry,这个是绝对删除。
  • 根据传入的 stateSlot 向后扫描一段连续非空的 Entry ,对可能存在 hash 冲突的 Entry 进行 rehash ,并且清理遇到的无效 Entry 。

expungeStaleEntries 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* 扫描整个 Table ,删除无效的 Entry
*/
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
// 无效 Entry ,则调用 expungeStaleEntry 方法删除对应位置及连坐删除 Entry
if (e != null && e.get() == null)
expungeStaleEntry(j);
}
}

该方法是在 expungeStaleEntry 方法的基础上,对整个 Table 中无效元素进行清理,一定会清理截止到当前 Table 中所有的无效 Entry 。需要说明的是,ThreadLocal 的所有一些列操作都是单线程的,也就是当前线程。

cleanSomeSlots 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
+--- ThreadLocalMap
/**
* 启发式的扫描清除,扫描次数由传入的参数n决定
*
* @param i 从i向后开始扫描(不包括i,因为索引为i的Slot肯定为null)
* @param n 控制扫描次数,正常情况下为 log2(n) ,如果找到了无效 Entry ,会将 n 重置为 table 的长度 len 进行阶段删除
* @return true if any stale entries have been removed.
*/
private boolean cleanSomeSlots(int i, int n) {
// 1 删除标志
boolean removed = false;

// 2 数组信息
Entry[] tab = table;
int len = tab.length;

// 从 i 位置向后遍历,删除无效的 Entry
do {
i = nextIndex(i, len);
Entry e = tab[i];

// 如果当前 Entry 无效,则进行清理并进行连坐
if (e != null && e.get() == null) {
n = len;
removed = true;
i = expungeStaleEntry(i);
}

// 无符号的右移动,可以用于控制扫描次数在log2(n)
} while ((n >>>= 1) != 0);

return removed;
}

启发式扫描删除具有随机性,启发式思想也用在其他代码中,如 Redis 中的定期删除逻辑。

自此,ThreadLocalMap 的源码实现分析完毕。

共享数据

一般来说 ThreadLocal 数据都是线程独享的,但是 ThreadLocal 继承体系支持子线程共享父线程的变量副本数据。共享线程的 ThreadLocal 数据可以使用 InheritableThreadLocal 来实现。通过在父线程中创建一个 InheritableThreadLocal 的实例,然后在子线程中就可以获取该实例中设置的值。

使用示例

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws InterruptedException {

// 1 创建 InheritableThreadLocal 对象
ThreadLocal<String> threadLocal = new InheritableThreadLocal<>();

// 2 创建的 ThreadLocalMap 挂载到当前 Thread 的 inheritableThreadLocals 属性
threadLocal.set("小芒果!");

// 3 创建并启动子线程
Runnable runnable = () -> {
// 打印子线程 ThreadLocal 的线程变量值
System.out.println(Thread.currentThread().getName() + "-> " + threadLocal.get());
};
Thread thread = new Thread(runnable);
thread.start();

// 打印父线程(这里是主线程) ThreadLocal 的线程变量值
System.out.println(Thread.currentThread().getName() + "-> " + threadLocal.get());
Thread.sleep(3000);
}

打印结果

1
2
3
4
main-> 小芒果!
Thread-0-> 小芒果!

Process finished with exit code 0

实现原理

线程变量传递逻辑很简单,它隐藏在 Thread 的 init 方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
+--- Thread
/**
* 线程初始化
*/
private void init(ThreadGroup g, Runnable target, String name,
long stackSize) {
init(g, target, name, stackSize, null, true);
}

// 省略无关代码
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {

// 获取初始化当前线程的线程
Thread parent = currentThread();

// 如果 parent 线程使用了 InheritableThreadLocal ,那么就把 parent 的 inheritableThreadLocals 给当前线程的 inheritableThreadLocals 。
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
// 设置到当前线程的 inheritableThreadLocals 属性中
this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;

/* Set thread ID */
tid = nextThreadID();
}

从线程初始化逻辑中可以看出,如果父线程 ThreadLocalMap 类型的变量 inheritableThreadLocals 不为空,那么就把父线程的该属性设置给当前线程的 inheritableThreadLocals 属性。具体设置逻辑在 ThreadLocal 的 createInheritedMap 方法中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
+--- ThreadLocal
// 继承式创建 ThreadLocal
static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
return new ThreadLocalMap(parentMap);
}

// 仅由 createInheritedMap 方法调用
private ThreadLocalMap(ThreadLocal.ThreadLocalMap parentMap) {
// 1 获取 parentMap 的 Table 信息
ThreadLocal.ThreadLocalMap.Entry[] parentTable = parentMap.table;
int len = parentTable.length;
setThreshold(len);

// 2 创建新的 Table
table = new ThreadLocal.ThreadLocalMap.Entry[len];

// 3 将 parentMap 中的数据依次映射到新创建的 Table 中
for (int j = 0; j < len; j++) {
// 3.1 获取当前 Entry
ThreadLocal.ThreadLocalMap.Entry e = parentTable[j];
if (e != null) {

// 3.2 获取 Entry 中的 key
@SuppressWarnings("unchecked")
ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();

// 3.3 Entry 有效的话就进行映射
if (key != null) {

// 用于获取父线程变量值的转换,默认就是父线程 Entry.value 值
Object value = key.childValue(e.value);

// 线性探测存储元素
ThreadLocal.ThreadLocalMap.Entry c = new ThreadLocal.ThreadLocalMap.Entry(key, value);
int h = key.threadLocalHashCode & (len - 1);
while (table[h] != null)
h = nextIndex(h, len);
table[h] = c;

size++;
}
}
}
}

通过以上逻辑可以发现,如果要使用线程变量共享需要使用到 InheritableThreadLocal 类,线程 Thread 的映射属性使用 inheritableThreadLocals 而非 threadLocals

InheritableThreadLocal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class InheritableThreadLocal<T> extends ThreadLocal<T> {
/**
* 默认是 Entry 的 value 值,使用方可继承 InheritableThreadLocal 覆盖该方法
*
* @param parentValue the parent thread's value
* @return the child thread's initial value
*/
protected T childValue(T parentValue) {
return parentValue;
}

/**
* 获取当前线程 t 的 ThreadLocalMap ,注意取的是 inheritableThreadLocals 属性
*
* @param t the current thread
*/
ThreadLocalMap getMap(Thread t) {
return t.inheritableThreadLocals;
}

/**
* 创建当前线程 t 的 ThreadLocalMap ,注意设置的是 inheritableThreadLocals 属性
*
* @param t the current thread
* @param firstValue value for the initial entry of the table.
*/
void createMap(Thread t, T firstValue) {
t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
}
}

InheritableThreadLocal 继承了 ThreadLocal ,重写了以上三个方法,主要是将映射表挂在到 Thread 的 inheritableThreadLocals 属性上,用以实现线程变量的共享。注意,线程变量独享使用的是 Thread 中的 threadLocals 属性。InheritableThreadLocal 对比 ThreadLocal 唯一不同是子线程会继承父线程变量,并支持自定义赋值函数。

问题

内存泄漏

ThreadLocalMap 使用 ThreadLocal 的弱引用做为元素 Entry 的 key ,如果一个 ThreadLocal 没有外部强引用来引用它,那么系统 GC 时这个 ThreadLocal 会被回收。此时,ThreadLocalMap 中就会出现 key 为 null 的 Entry ,这样就没有办法访问这些 Entry,这些 Entry 理论上属于无效的,应该被 GC 回收。但是,如果存在持有这些 Entry 的线程迟迟不结束(如使用线程池),那么这些 key 为 null 的 Entry 的 value 就会一直存在一条强引用链 Thread Ref -> Thread -> ThreaLocalMap -> Entry -> value,导致无法回收,造成内存泄漏。

ThreadLocal 为了尽可能避免内存泄漏问题,在 get()set() 以及 remove() 方法中都有清除线程的 ThreadLocalMap 中 key 为 null 的 value 逻辑。注意,其中 get()、set() 两个方法都不能完全防止内存泄漏,存在无效 Entry 无法扫描到的情况,因为只有在线性探测流程中才会尝试连续段清理无效 Entry 。最好的方式是每次使用完 ThreadLocal 都手动 remove 一下

此外,以下措施会增加内存泄漏的风险:

  • 使用 static 的 ThreadLocal ,这种方式延长了 ThreadLocal 的生命周期
  • 分配使用了 ThreadLocal ,而不再调用其 get()set() 以及 remove() 方法

为什么使用弱引用

  • key 使用强引用:ThreadLocal 对象没有外部引用,但是 ThreadLocalMap 持有 ThreadLocal 的强引用,如果没有手动删除,ThreadLocal 不会被回收,这会导致 Entry 内存泄漏。
  • key 使用弱引用:ThreadLocal 对象没有外部引用,由于 ThreadLocalMap 持有 ThreadLocal 的弱引用,即使没有手动删除,ThreadLcoal 也可以被回收。value 在下一次执行方法时被清除。

Entry 继承 WeakReference,并且使用 ThreadLocal 的弱引用作为 key,这样可以将 ThreadLocal 对象的生命周期和线程生命周期解绑。持有弱引用可以使得 ThreadLocal 在没有其他强引用的时候被回收掉,这样可以避免因为线程得不到销毁导致 ThreadLocal 对象无法被回收。

最佳实践

每次使用完 ThreadLocal 都应该调用它的 remove() 方法,进行数据清理,防止内存泄漏。

应用场景

  • 多线程之间需要拥有同一个信息,那么通常可以采用 initialValue() 方法进行初始化,直接将需要拥有的变量副本存储到 ThreadLocal 中。
  • 多个线程中存储不同的信息,那么需要使用 set() 方法设置变量副本到 ThreadLocal 中。

上述描述存储到 ThreadLocal 中是不对的,需要注意,数据其实是存储到线程持有的 ThreadLocalMap 对象中,该对象是一个散列表。

小结

本篇文章对 ThreadLocal 进行了深入的分析,它能保证数据安全是因为每个线程都有自己的线程变量,不会发生多个线程共享变量的情况。首先对 ThreadLocal 的关系进行了介绍,从总体上认识 ThreadLocal 。接着对 ThreadLocal 进行了分析,从其属性到暴露的 API 。然后对 ThreadLocal 底层的支持类 ThreadLocalMap 进行了分析,同样从属性到支撑上层的方法的分析。最后对父子线程就 ThreadLocal 如何共享线程变量进行了分析。在文章的最后,对内存泄漏问题进行了介绍。

参考资料
https://www.cnblogs.com/lqlqlq/p/13302901.html