并发 - 线程池

前言

最初的时候并没有线程池的概念,而是先有线程。每个任务都需要对应一个线程,任务少的情况没有太大问题,任务过多就出现了各种性能和资源开销问题,更重要的是可创建线程的数量是有上限的,不可能无限的创建。在并发环境下,系统不能够确定在某一时刻有多少任务需要执行,有多少资源需要投入。

针对上述问题,于是诞生了线程池,用来平衡线程与系统资源之间的关系。线程池解决问题思路如下:

  1. 对于反复创建线程开销问题,线程池采用固定数量的线程一直保持工作状态并能反复执行任务。
  2. 对于创建过多线程占用太多资源问题,线程池会根据需要创建线程,灵活地控制线程数量,避免使用过多资源。

概述

线程池是一种管理线程和任务的工具,是应用场景最多的并发框架之一,几乎所有需要异步或并发执行任务的应用程序都可以使用线程池,合理地使用线程池可以带来可观得性能提升和响应速度。具体好处如下:

  • 解耦:线程的创建与任务执行完全分开。
  • 降低资源消耗:线程的复用降低了线程创建和销毁带来的资源消耗。
  • 提高响应速度:大多情况下(线程池预热后),到达的任务可以不需要等待线程创建就能立即执行,消除了线程创建所带来的延迟,提升了响应速度。
  • 便于线程管理:线程是稀缺资源,不能无限制地创建,使用线程池可以对线程进行统一分配、调优和监控。

关于线程池的概述就介绍这么多,本篇文章介绍的线程池核心是 JDK 中提供的 ThreadPoolExecutor 类,具体涉及的接口和实现类如下图所示:

需要说明的是,关于Scheduled类型的线程池继承体系本篇文章没有介绍到,它是基于本篇文章着重介绍的 ThreadPoolExecutor 的扩展实现,支持时间纬度执行任务。

总体设计

线程池的设计没有办法直接采用一般意义上池化资源方案,而是采用生产者 - 消费者模式,将任务和线程解耦并不直接关联,从而良好的缓冲任务、复用线程,缓冲任务通过阻塞队列实现,工作线程从阻塞队列中获取任务以实现线程复用。线程池的使用方是生产者,线程池本身是消费者。至于为什么线程池没有采用一般意义上池化资源的设计方法,这个取决于线程对象的特殊性,线程有着特殊的生命周期,一旦一个线程启动执行任务就不能再次启动了。

任务执行

任务的执行不一定非要通过开启新线程,任务在线程执行之前它也是一个实现类,也有对应的方法。因此我们可以定义出方法级别调用和线程级别调用。

线程级别调用

1
2
3
new Thread(() -> {
//...
}).start();

方法级别调用

1
2
3
((Runnable) () -> {
//..
}).run();

线程池对任务的处理最终是通过方法级别调用的来完成,在后面分析源码的时候我们可以看到。

Executor 框架

Executor 是一个异步任务的框架,根据一组执行策略进行调用、调度、执行和控制,目的是提供一种将任务提交任务执行分离的机制。

两级调度模型

最早 Java 的线程既是任务体也是执行机制,从 JDK5 开始,把任务体和执行机制进行了分离。任务体包括 Runnable 和 Callable,而执行机制由 Executor 框架提供,即 Executor是 Runnable 和 Callable 的调度容器

Java 线程会被一对一映射为操作系统线程,在 Java 线程启动时创建对应的操作系统线程,同样地,当该 Java 线程终止时对应的操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的 CPU 。对于计算复杂的应用,我们通常会将其拆解为若干个任务并交给 Java 多个线程,这个动作是由用户级别的调度器 Executor 框架完成的,它会将这若干个任务映射为对应数量的线程。在底层,操作系统内核将这些线程映射到硬件处理器上。由此可见,创建一个线程远比创建一个对象要复杂得多,不仅要在 JVM 堆中分配内存,还需要调用操作系统内核 API 来为线程分配资源,因此应该避免频繁创建和销毁。

这个过程属于两级调度模型,对应的示意图如下:

从示意图可看出,应用程序通过 Executor 框架控制上层的调度。而下层的调度由操作系统内核控制,应用程序是无法控制的。

Executor 框架结构

Executor 框架主要由三大部分组成,具体如下:

  • 任务体
    包括 Runnable 接口和 Callable 接口及其实现。
  • 任务的执行
    包括任务执行机制的核心接口 Executor,继承 Executor 的 ExecutorService 接口和它的实现们。
  • 异步计算结果
    包括核心接口 Future 以及对应的实现们,特别是 FutureTask 实现类。是对具体 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作。

Executor 框架成员

Executor 框架是线程池实现的基础,它的主要成员有 ThreadPoolExecutorScheduledThreadPoolExecutorExecutorsRunnableCallable 以及 Future

下面正式进入到代码层面的介绍,定时任务实现类 ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor ,用于实现定时执行,本文暂不介绍它的实现。

Executor 接口

1
2
3
4
5
6
7
public interface Executor {

/**
* @param Runable 接口
*/
void execute(Runnable command);
}

Executor 接口仅定义了一个方法,参数是 Runnable 类型,该方法的目的是将任务提交和任务执行细节解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需将任务提交到执行器 Executor 中,由执行器完成线程的调配和任务的执行。需要注意的是,该接口是没有返回值的,也就意味着无法判断提交的任务是否被线程池执行成功。

ExecutorService 接口

ExecutorService 接口继承自 Executor 接口,一般我们自定义线程池时使用的就是这个接口,该接口中定义的方法加上继承过来的 execute 方法在很多场景中已经可以满足需求了。

该接口中的方法如下图所示:

上图中的方法大致分类如下:

  1. 向线程池提交任务方法

    submit 方法和前文中的 Executor 接口中的 execute 方法有所不同,虽然也是向线程池提交任务,但是有返回值 Future ,并且参数类型不仅支持 Runnable 类型还支持 Callable 类型。

  2. 执行任务方法

    invokeAll 方法用于执行多个任务,同时支持设置超时时间。invokeAny 方法用于执行多个方法中的一个即可,任务执行完成就可以返回,同样支持设置超时时间。这两类方法的底层需要依赖 execute 方法。

  3. 关闭线程池方法

    shutdownshutdownNow 方法用于关闭线程池。

  4. 判断线程池是否关闭

    isShutdown 判断线程池是否已经开始了关闭工作,即是否执行了 shutdown 或者 shutdownNow 方法。注意,该方法返回 true 并不代表线程池此时已经彻底关闭了,仅说明线程池开始了关闭的流程,此时线程池中可能依然有线程在执行任务,队列中仍有等待被执行的任务。

  5. 判断线程池是否终止方法

    isTerminatedawaitTermination 方法用于判断线程池是否终止。只有在调用关闭方法后才有调用的意义。

Future

Future 的继承体系如下图所示:

由上图的UML可知,FutureTask 通过 RunnableFuture 间接实现了 Runnable 接口,因此 Executor.execute 方法支持将 FutureTask 提交到线程池。接下来介绍 AbstractExecutorService 抽象实现类就能很清晰看出 FutureTask 的作用。

AbstractExecutorService 实现

AbstractExecutorService 抽象类实现自 ExecutorService 接口,在其基础上实现了几个常用的方法,这些方法供子类进行调用。将执行任务的流程串联起来,保证下层的实现(如 ThreadPoolExecutor)只需关注执行任务的方法即可。具体方法如下:

由于 invokeAll 方法和 invokeAny 方法更多的是执行将任务提交给线程池前的工作,它们并没有将任务提交给线程池,需要通过 Executor 中的 execute 方法实现,而 execute 方法最终要交给具体子类实现。因此,不再对这两类方法展开说明。下面重点介绍下 newTaskFor 方法和 submit 方法。

newTaskFor 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
--- AbstractExecutorService
/**
* 将 Runnable 包装成 FutureTask
*
* @param runnable 任务
* @param value 任务执行成功的返回值
* @return Future
*/
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}

/**
* 将 Callable 包装成 FutureTask
*
* @param callable 任务
* @return Future
*/
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}

从上面代码可以看出,newTaskFor 方法用于将 Runnable 和 Callable 类型的任务统一包装成 FutureTask ,FutureTask 又间接继承了 Runnable 接口。我们知道,Runnable 的 void run() 方法是没有返回值的, 而 Callable 的 V call() 方法是有返回值的,但 Executor 中的 void execute(Runnable command)方法是不关心返回结果的,它只负责处理 Runnable 类型的任务。综上,不难看出 newTaskFor 方法就是为了屏蔽不同类型任务的差异,以达到统一交给 Executor.execute 执行的目的。下面我们继续看提交任务的另外一种方式。

submit 提交任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
--- AbstractExecutorService
/**
* 提交 Runnable 任务,不需要返回结果。
*/
public Future<?> submit(Runnable task) {
// 非空检查
if (task == null) throw new NullPointerException();
// 使用 FutureTask 封装任务
RunnableFuture<Void> ftask = newTaskFor(task, null);
// 使用 execute 方法执行任务
execute(ftask);
// 返回 FutureTask
return ftask;
}

/**
* 提交 Runnable 任务,任务执行成功的返回结果为 result
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}

/**
* 提交 Callable 任务,任务执行成功返回结果是Callable#call 方法返回值
*
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
// 非空检查
if (task == null) throw new NullPointerException();
// 使用 FutureTask 封装任务
RunnableFuture<T> ftask = newTaskFor(task);
// 使用 execute 方法执行任务
execute(ftask);
// 返回 FutureTask
return ftask;
}

Runnable 的 void run() 方法没有返回值,但是有的时候我们需要返回值,这个时候 submit 方法就可以实现,只需在该方法的第二个参数传入预期结果,当任务执行完成后会自动返回。

1
<T> Future<T> submit(Runnable task, T result)

此外,我们可以看出 submit 方法提交任务的能力是通过 execute 方法实现的。定义于最上层接口 Executor 中的 void execute(Runnable command) 方法不会返回执行结果,想要执行结果就需要通过 FutureTask 包装任务,然后将包装后的任务 FutureTask 交给 Executor.execute 方法执行,执行后的结果也会保存到 FutureTask 中。关于 Future 的继承体系不展开分析,下面概述下 submit 提交任务的原理。

  1. 调用 newTaskFor 方法将 Runnable 和 Callable 类型的任务统一包装成 FutureTask 对象。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    // Callable
    public FutureTask(Callable<V> callable) {
    if (callable == null)
    throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;
    }
    // Runnable
    public FutureTask(Runnable runnable, V result) {
    // 将 Runnable 适配成 Callable
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;
    }
    public static <T> Callable<T> callable(Runnable task, T result) {
    if (task == null)
    throw new NullPointerException();
    return new RunnableAdapter<T>(task, result);
    }
    static final class RunnableAdapter<T> implements Callable<T> {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
    this.task = task;
    this.result = result;
    }
    // 任务执行还是调用 run 方法,返回结果是传入的预期值
    public T call() {
    task.run();
    return result;
    }
    }
    包装的本质是将任务统一适配为Callable类型,因为Callable类型任务可以通过call方法返回执行结果。
  2. 当任务执行的时候,FutureTask 中的 run 方法会执行,这个过程是最关键的一步。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public void run() {
    if (state != NEW ||
    !UNSAFE.compareAndSwapObject(this, runnerOffset,
    null, Thread.currentThread()))
    return;
    try {
    Callable<V> c = callable;
    if (c != null && state == NEW) {
    V result;
    boolean ran;
    try {
    // 被适配的Runnable 和 Callable 方法级别调用
    result = c.call();
    ran = true;
    } catch (Throwable ex) {
    result = null;
    ran = false;
    setException(ex);
    }
    if (ran)
    // 将执行结果保存到 FutureTask 中
    set(result);
    }
    } finally {
    // runner must be non-null until state is settled to
    // prevent concurrent calls to run()
    runner = null;
    // state must be re-read after nulling runner to prevent
    // leaked interrupts
    int s = state;
    if (s >= INTERRUPTING)
    handlePossibleCancellationInterrupt(s);
    }
    }
  3. 通过FutureTask#get方法从 FutureTask 中取出任务执行结果
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    /**
    * @throws CancellationException {@inheritDoc}
    */
    public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
    s = awaitDone(false, 0L);
    // 从 FutureTask 中取出任务执行结果
    return report(s);
    }

    /**
    * @throws CancellationException {@inheritDoc}
    */
    public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
    throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
    (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
    throw new TimeoutException();
    // 从 FutureTask 中取出任务执行结果
    return report(s);
    }

提交任务方式

最上层接口 Executor 中的 void execute(Runnable) 不需要获取结果,不会使用 FutureTask 包装任务。抽象实现类 AbstractExecutorService 中的 Future<?> submit() 需要获取结果,因此使用了 FutureTask 包装任务。

需要获取任务结果用 submit 方法,不需要获取结果用 execute 方法。

运行机制

上图展示了线程池的运行机制,线程池运行机制主要分成两个部分,线程管理和任务管理。下面对线程池的主要处理流程进行说明:

  1. 主线程提交任务到线程池。
  2. 如果当前线程池中的线程少于核心线程数,则创建新的线程来执行任务。
  3. 如果线程池中的线程达到或超过核心线程数,则将任务加入到阻塞队列中。
  4. 如果在第 2 步中无法将任务加入阻塞队列,则依据最大线程数创建新的线程来处理任务。
  5. 如果在第 3 步创建新线程会使线程池中线程数超出最大线程数,任务将被拒绝并使用饱和策略处理(拒绝策略)。
  6. 处理完任务的线程会自旋获取新的任务去执行,当线程获取不到任务时,线程会被回收(一般针对非核心线程)。

其中第 1 步和第 3 步涉及到创建线程,该过程需要获取全局锁,因为关闭线程池也需要获取这个全局锁。当线程池完成了预热即核心线程数创建完毕,在一定程度上就不需要频繁创建线程,也就降低了获取全局锁的频次,对于线程池来说全局锁是一个严重的可伸缩瓶颈。关于流程中的概念下文会陆续说明。

ThreadPoolExecutor

线程池核心实现就在 ThreadPoolExecutor 实现类中,该类实现了线程池所需的各个方法,包括最核心的 execute 方法。开发者可以基于该实现类来进行功能上的扩展,定时任务实现类 ScheduledThreadPoolExecutor 就是基于 ThreadPoolExecutor 扩展的功能。

在详细介绍 ThreadPoolExecutor 运行机制之前,我们先对其核心概念,属性、方法等进行简单介绍。

核心参数

1
2
3
4
5
6
7
8
 public ThreadPoolExecutor(int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数
long keepAliveTime, // 空闲线程存活时间
TimeUnit unit, // 空闲线程存活时间的单位
BlockingQueue<Runnable> workQueue, // 阻塞队列
ThreadFactory threadFactory, // 线程工厂
RejectedExecutionHandler handler // 饱和策略
) {//...}
  • corePoolSize

    核心线程数,线程池的基本大小。当提交一个任务到线程池时,线程池会创建一个基本线程来执行任务,即使其它空闲的基本线程能够执行新任务也会创建线程,只有线程池预热完毕(线程池中线程数达到核心线程数)才不再创建核心线程。

特别说明:

  1. 核心线程并不是特指某一个或某几个线程,而是针对设置的核心线程数而言,任何一个线程都可以是核心线程。
  2. corePoolSize 表示的是线程池的常驻线程数,如果设置为 0 则表示在没有任何任务时需要销毁线程池。如果大于 0 ,即使没有任务时也会保证线程池的线程数等于此值。
  3. 关于此值设置的合理性,如果设置的较小,则会频繁的创建和销毁线程(非核心线程);设置过大,则会浪费资源。
  • maximumPoolSize

    最大线程数,线程池允许创建的最大线程数,最大线程数 = 核心线程数 + 非核心线程数。此值只有在任务比较多且阻塞队列放不下时才会用到。

  • keepAliveTime

    空闲线程存活时间,线程池中的线程空闲时间超过该值也没任务可做那么就需要回收销毁。如果设置为 0,表示一旦空闲立即回收。该参数一般只会对非核心线程起作用,核心线程不会因为空闲太长时间而被关闭,当最大线程数等于核心线程数时,那么线程池在空闲的时候也不会销毁任何线程。但是可通过调用 allowCoreThreadTimeOut(true) 方法使核心线程数内的线程也可以被回收。

  • unit

    和 keepAliveTime 参数一起使用,是时间单位。如:天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)等。

  • workQueue

    用于存放等待执行的任务的阻塞队列,是 BlockingQueue 接口的实现。当线程池中的线程数大于等于核心线程数时才会用到该队列,注意和有没有空闲核心线程无关。

  • threadFactory

    线程工厂,线程池中的线程就由它创建。如果没有设置就使用默认的线程工厂。

  • handler

    饱和策略(拒绝策略),当阻塞队列和线程池都满了,说明线程池处于饱和状态,需要采取一种策略处理提交的新任务,默认是直接抛出异常。

通过配置不同的参数,就可以创建出行为不同的线程池,这也是线程池高度灵活性的基础。

核心属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
--- ThreadPoolExecutor

//======= 约定使用32位表示线程池状态和数量,高3位表示状态 ,低29位表示数量 =============/

/**
* 线程池初始化状态码,状态为 RUNNING,线程数为 0
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* COUNT_BITS 为 29 (0001 1101)
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 线程池允许最大线程池临界值,1 * 2^29 = 536870912
* 过程:(1)001 (2)左移29位得到001后跟29个0 -> 0010 0000 0000 0000 0000 0000 0000 0000 (3)减去1得0001 1111 1111 1111 1111 1111 1111 1111
*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 运行状态:111 00000000000000000000000000000
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 关闭状态:000 00000000000000000000000000000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 停止状态:001 00000000000000000000000000000
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* 整理状态:010 00000000000000000000000000000
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* 终止状态:011 00000000000000000000000000000
*/
private static final int TERMINATED = 3 << COUNT_BITS;

// 获取线程池的状态。将整数 c 的低 29 位置为 0 就得到了线程池的状态
private static int runStateOf(int c) {
return c & ~CAPACITY;
}

// 用于计算线程池中线程数量。将整数 c 的高 3 位置为 0,就得到了线程池中的线程数
private static int workerCountOf(int c) {
return c & CAPACITY;
}

// 获取线程池状态码
private static int ctlOf(int rs, int wc) {
return rs | wc;
}

/**
* 比较状态
*
* @param c
* @param s
* @return
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}

/**
* 当前线程池是否处于运行状态
*
* @param c
* @return
*/
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}

/**
* 增加线程池中的线程数量
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}

/**
* 减少线程池中的线程数量
* Attempts to CAS-decrement the workerCount field of ctl.
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}

private void decrementWorkerCount() {
do {
} while (!compareAndDecrementWorkerCount(ctl.get()));
}


/**
* 线程池阻塞队列
*/
private final BlockingQueue<Runnable> workQueue;

/**
* 线程池全局锁
*/
private final ReentrantLock mainLock = new ReentrantLock();

/**
* 1.用于保存和移除线程池创建的Worker,用来控制线程的生命周期。
* 2.对于垃圾回收来说,即使Worker中封装的thread完成了任务的执行(包括异常情况),但是如果Worker不被回收那么thread仍然被强引用着。
* 3.该Hash表是线程不安全的,操作时需要加全局锁
*/
private final HashSet<Worker> workers = new HashSet<>();

/**
* 全局锁条件 - 条件队列
*/
private final Condition termination = mainLock.newCondition();

/**
* 追踪线程池最大值,仅在获取到全局锁条件下执行
*/
private int largestPoolSize;

/**
* 线程池完成任务数量
*/
private long completedTaskCount;

/**
* 线程工厂
*/
private volatile ThreadFactory threadFactory;

/**
* 饱和策略
*/
private volatile RejectedExecutionHandler handler;

/**
* 保活时间,即最大允许空闲时间
*/
private volatile long keepAliveTime;

/**
* 是否允许核心线程被回收
*/
private volatile boolean allowCoreThreadTimeOut;

/**
* 核心线程池数,不会被回收,即 workers的最小值。除非设置 allowCoreThreadTimeOut 。
*/
private volatile int corePoolSize;

/**
* 最大线程数
*/
private volatile int maximumPoolSize;

/**
* 默认的饱和策略,直接抛出异常
*/
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

上面的属性都很重要,其中还包含了部分属性的操作方法,这些都会在下面的源码分析中不断出现。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// Public constructors and methods 构造方法们
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
// 当没有指定线程工厂时,使用默认的线程创建工厂
Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

创建线程池时如果不指定线程工厂则会使用默认的线程工厂,默认线程工厂创建的线程都属于同一个线程组,拥有相同的优先级,并且都是非守护线程,具体代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
--- Executors
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;

DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
// 线程组
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}

public Thread newThread(Runnable r) {
// 创建线程
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
// 设置为非守护线程
if (t.isDaemon())
t.setDaemon(false);
// 设置优先级
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}

生命周期

设计思想

线程池采用的是 Integer.SIZE 32 位的整数来存放线程池的状态和池中的线程数,其中高 3 位表示线程池状态即可以表示 7 种状态,低 29 位表示线程数即可以存放 5 亿多个线程。这种设计思想对整数赋予了双重角色,通过高低位的不同,既表示线程池状态,又表示工作线程数目,这是一个典型的高效优化。要知道用一个变量存储两个值,可以避免在做相关决策时出现不一致的情况,省去了占用锁资源去维护两个变量的一致性。这种方式在其它框架中也多有使用,如 Dubbo 协议就使用 16 个字节共 128 位,每一位用来表示不同意义的数值。

线程池状态

线程池的状态表示如下图所示:

注意,线程池的状态并非用户显示配置(用户调用关闭方法除外),而是随着线程池的运行由内部自行维护,和线程的执行密切相关,下面分别说明线程池的状态及其状态流转。

RUNNING

  • 状态说明
    线程池处于 RUNNING 状态允许接受新的任务,处理任务队列中的任务。
  • 状态转换
    线程池一旦被创建就处于 RUNNING 状态,并且线程池中的线程数为 0 。

SHUTDOWN

  • 状态说明
    线程池处于 SHUTDOWN 状态时,不再接收新任务,但能处理任务队列中的任务。
  • 状态转换
    调用线程池的shutdown()方法时,线程池由RUNNING -> SHUTDOWN 。

STOP

  • 状态说明
    线程池处在 STOP 状态时,不能接收新任务,也不处理任务队列中的任务,并且会中断正在处理任务的线程。
  • 状态转换
    调用线程池的shutdownNow()方法时,线程池由(RUNNING or SHUTDOWN ) -> STOP 。

TIDYING

  • 状态说明
    所有的任务已终止,线程池中线程数为 0 ,线程池会变为TIDYING状态(线程池内部自动更新状态)。当线程池变为TIDYING状态后,会紧接着执行钩子方法terminated()。若用户需要在线程池变为TIDYING时,进行相应的处理,可以通过重写terminated()方法来实现。
  • 状态转换
    当线程池在 SHUTDOWN 状态下时,阻塞队列为空并且线程池中线程数为 0 时,就会由 SHUTDOWN -> TIDYING。 当线程池在 STOP 状态下,线程池中线程数为 0 时,就会由STOP -> TIDYING。

TERMINATED

  • 状态说明
    线程池彻底终止,就变成 TERMINATED 状态。
  • 状态转换
    线程池处在TIDYING状态时,执行完 terminated() 方法之后,就会由 TIDYING -> TERMINATED。

下面进行小结,线程池状态及流转(线程池的生命周期)如下图所示:

任务执行机制

任务调度

任务调度是线程池的主要入口,所有任务的调度都是由execute方法完成的,当用户提交了一个任务后,任务调度阶段将决定如何执行该任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public void execute(Runnable command) {
// 任务体不允许为 null
if (command == null)
throw new NullPointerException();

// 获取线程池的状态码,该值包含了线程池的状态和线程数
int c = ctl.get();

// 1 如果当前线程数少于核心线程数,则创建一个 Worker 来执行任务,即创建一个线程并将 command 作为该线程的第一个任务
if (workerCountOf(c) < corePoolSize) {
// 返回 false 说明线程池不允许创建线程,可能原因:(1)线程池关闭(2)当前线程数已经达到临界值
if (addWorker(command, true))
return;
// 创建失败,重读线程池状态码
c = ctl.get();
}

// 2 如果线程池处于运行状态,则尝试将任务添加到阻塞队列 workQueue 中
if (isRunning(c) && workQueue.offer(command)) {
// 再次获取线程池状态码
int recheck = ctl.get();
// 双重检查,再次判断线程池状态。如果线程状态变了(非运行状态)就需要从阻塞队列移除任务,同时执行拒绝策略。防止线程池关闭。
if (!isRunning(recheck) && remove(command))
reject(command);

// 如果线程池状态仍然是运行状态,并且线程池为空则创建一个非核心线程来执行任务,防止线程提交到阻塞队列后线程都关闭了。
// 一般这种情况是设置核心线程数为 0
else if (workerCountOf(recheck) == 0)
addWorker(null, false);

// 3 如果任务队列满了,则根据 maximumPoolSize 创建非核心线程。如果创建失败,说明当前线程数已经达到 maximumPoolSize 或线程池关闭,需要执行拒绝策略
} else if (!addWorker(command, false))
reject(command);
}

execute 方法逻辑体现了提交任务到线程池的流程,上面代码已经详细注释。需要强调的是,符合将任务加入阻塞队列中的情况,会进行双重检查线程池的状态,因为是直接将任务入队,和前后两种情况不一样,即使任务成功排队,也有可能出现线程池关闭或线程池为空的情况。

下面通过一张图进行阐述正常流(不考虑线程池关闭等情况)的流程:

任务缓冲

任务缓冲是线程池管理任务的核心部分,通过一个阻塞队列来实现。线程池的本质是对任务和线程的管理,而做到这一点关键的思想是将任务和线程解耦,阻塞队列缓冲任务,工作线程自旋从阻塞队列中获取任务。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。

线程池中的阻塞队列参数非常重要,不同的阻塞队列对线程池有不同影响,下面对线程池常用的阻塞队列进行说明。

  • ArrayBlockingQueue
    基于数组结构的有界阻塞队列,该队列按照先进先出原则对元素进行排序。

  • LinkedBlockingQueue
    基于链表结构的无界阻塞队列,该队列按照先进先出规则对元素进行排序。此队列的默认长度为 Integer.MAX_VALUE,使用该队列作为任务队列有容量危险。

  • SynchronousQueue
    一个比较特殊的阻塞队列,其本身不存储元素。每个插入操作必须等待另一个线程执行移除操作,反之亦然。如果使用该阻塞队列,只有当两个线程执行相反模式的操作才能配对成功,否则先执行的一方只能等待。下图是对线程池使用该队列处理任务过程的描述:

  • PriorityBlockingQueue
    支持优先级排序的无界阻塞队列,默认自然排序规则,不能保证同优先级元素的顺序。

  • DelayQueue
    一个实现延迟获取元素的无界队列,在创建元素时可以指定多久才能从队列中移除,只有延时期满后才能从队列中获取元素。

  • LinkedBlockingDeque
    一个由链表结构构成的双向阻塞队列。队列头部和尾部都可以添加和移除元素。

任务申请

任务执行有两种情况,一种是任务直接交给新创建的线程执行。另一种是线程执行 getTask 方法从任务队列中获取任务并执行,执行完任务的线程会继续尝试从任务队列中申请任务再去执行。第一种情况仅出现在用户提交任务到线程池,线程池为该任务创建线程的时候。第二种情况是线程执行任务最多的情况,包括线程池存在的线程执行任务,创建的非核心线程执行任务。

任务申请的核心方法 getTask 是配合 Worker线程 工作的,用于 Worker线程 拉取任务队列,下面对该方法进行分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
 /**
* @return 返回null 表示可以对当前线程进行回收
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?

for (; ; ) {

// 获取线程池状态码
int c = ctl.get();
// 获取线程池状态
int rs = runStateOf(c);

// 线程池状态为SHUTDOWN且队列为空 或 线程池状态为 STOP,就回收线程。这个条件不仅可以回收非核心线程,也可以回收核心线程。todo 核心线程唯一回收条件
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 减少线程池中的线程数
decrementWorkerCount();
return null;
}

// 线程池中的线程数
int wc = workerCountOf(c);

// 是否需要进行超时控制。即允许核心线程数内的线程回收,或线程池中的线程数超过了核心线程数,那么有可能发生超时关闭
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

// 控制线程池中线程数的关键
//1. wc > maximumPoolSize ,可能是在此方法执行阶段同时执行 setMaximumPoolSize 方法修改了最大值。
//2. timed && timedOut 如果为true,表示当前操作需要进行超时控制,且线程上一轮获取任务超时
//3. 结果:如果线程池中的线程数大于最大线程数或获取任务超时(不设置 allowCoreThreadTimeOut,核心线程没有超时概念),并且(线程数 > 1 或 任务队列为空),则应该回收当前线程。
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
// 减少工作线程数
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}

try {
// 根据timed来判断:
// 1. 如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务则返回null
// 2. 否则通过take方法获取任务,如果队列为空则take方法会阻塞直到队列不为空
// 3. 注意,真正响应中断是在 poll() 方法或者 take() 方法中
Runnable r = timed ?
// 超时获取任务,因为线程超时要被回收。如果线程在等待的过程发生了中断,会抛出中断异常
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
// 不需要超时,如果线程在等待的过程发生了中断,会抛出中断异常
workQueue.take();
if (r != null)
return r;

// 如果 r == null ,说明获取任务超时
timedOut = true;
} catch (InterruptedException retry) {
// 获取任务时当前线程发生中断,重置超时标记并重试
timedOut = false;
}
}
}

上述方法用于从任务队列中不断拉取待执行的任务,具体执行流程如下图所示:

下面对主要逻辑进行说明:

  1. 该方法返回 null 时,表示当前线程可以被回收了,包括核心线程。这也是该方法多次判断的原因,控制线程池中线程数量,进而控制线程池的状态。
  2. 在没有设置 allowCoreThreadTimeOut 时,核心线程数的线程会阻塞等待任务,不会被回收。
  3. 超时回收,在 keepAliveTime 对应的具体时间内都没有任务,应该回收非核心线程。
  4. 以下情况需要返回 null,回收当前线程。
    • 线程池处于 STOP 状态。
    • 线程池处于 SHUTDOWN 状态,且阻塞队列为空。
    • 线程池中的线程数大于最大线程数。
    • 线程获取任务超时再次重试时,仍为可回收线程。

getTask 方法还是比较复杂的,整个逻辑中进行了多次判断,目的是控制线程的数量,进而维护线程池的状态。需要特殊说明的是,当线程获取任务超时时并没有立刻回收该线程,而是让线程重试,这么做是为了防止该线程可能会成为核心线程,避免误回收,如果误回收在后续流程中还需要重新创建线程,因此重试一次代价会小一些。

任务执行

任务执行是 Worker线程 的工作,我们会在下面详细介绍。

任务拒绝

拒绝策略

线程池的拒绝策略属于一种限流保护机制,防止线程池崩溃。线程池拒绝任务的时机如下:

  1. 执行关闭方法后线程池处于关闭状态及以上状态
  2. 线程池处于运行状态,但是没有能力(阻塞队列已满,线程数达到最大值)处理新提交的任务了。

JDK 内置了 4 种拒绝策略,默认使用 AbortPolicy 策略。拒绝策略如下图所示:

AbortPolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 丢弃任务并抛出异常(默认策略)
*/
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() {
}

/**
* 直接抛出异常
*
* @param r 任务
* @param e te
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

AbortPolicy 策略是线程池默认的拒绝策略,在任务不能再提交到线程池时抛出异常,能够及时反馈程序的运行状态。对于比较核心的业务推荐使用此拒绝策略,因为当系统不能承载更大的并发流量时,业务方能够及时地通过异常发现。

CallerRunsPolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* 由提交任务的线程自己来执行任务
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() {
}

/**
* 只要线程池没有被关闭,就由提交任务的线程自己来执行这个任务。
*
* @param r
* @param e
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
// 线程池没有关闭
if (!e.isShutdown()) {
// 方法级别调用
r.run();
}
}
}

CallerRunsPolicy 策略是由提交任务的线程处理任务,此策略适合让所有任务都执行完毕。

DiscardPolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 直接忽略任务
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() {
}

/**
* 直接忽略
*
* @param r
* @param e
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}

DiscardPolicy 策略会直接丢弃任务,并且不会抛出异常。此策略会导致业务方无法发现异常,不建议核心业务采用此策略。

DiscardOldestPolicy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 将阻塞队列头的任务扔掉,然后将当前任务提交到线程池尝试执行。
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() {
}

/**
* 将队列中旧任务移除,并将当前任务提交到线程池
*
* @param r
* @param e
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}

DiscardOldestPolicy 策略会丢弃队列最前面的任务,然后重新提交被拒绝的任务。这种策略存在丢失任务的风险。

自定义拒绝策略只需要实现 RejectedExecutionHandler 接口,重写 rejectedExecution 方法即可。如果不自定义拒绝策略,线程池将使用默认的拒绝策略。

Worker线程管理

前文在介绍任务执行机制的时候涉及到 Worker线程,线程池维护的线程模块其实就是一组 Worker对象 ,下面我们就来看看 ThreadPoolExecutor 的内部类 Worker 。

Worker线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;

/**
* Worker持有的线程,即任务执行的真正线程
*/
final Thread thread;
/**
* 主线程提交任务到线程池,任务就会存放到这里。
*/
Runnable firstTask;
/**
* 用于存放当前线程完成的任务数。注意和 completedTaskCount 的区别
*/
volatile long completedTasks;

/**
* Worker 唯一的构造方法
*
* @param firstTask 任务,可能为 null
*/
Worker(Runnable firstTask) {
// 设置同步状态值为 -1,防止在启动线程之前,线程就被中断。因为AQS中默认的 state 为 0,Worker中实现的 tryAcquire 方法内存值就是 0,修改值为 1
setState(-1);
this.firstTask = firstTask;
// 使用工厂创建线程,注意创建出来的线程的任务体就是 Worker 本身。这意味着当线程启动时,Worker#run方法就会执行
this.thread = getThreadFactory().newThread(this);
}

/**
* Worker 实现了 Runnable 接口,重写了run() 方法。
*/
public void run() {
// 这里调用了外部类的 runWorker 方法
runWorker(this);
}

// ------- Worker继承了AQS类,下面的核心方法是重写了AQS的方法,使用独占锁获得执行权,不支持锁的重入 -----------------/

protected boolean isHeldExclusively() {
return getState() != 0;
}
/**
* 独占式获取资源。AQS 中默认的 state 为 0。
*
* @param unused
* @return
*/
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

/**
* 释放资源
*
* @param unused
* @return
*/
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}

/**
* lock
*/
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}

/**
* unlock
*/
public void unlock() {
release(1);
}
public boolean isLocked() {
return isHeldExclusively();
}

/**
* 中断线程
*/
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}

线程池在创建线程时,会将线程封装成工作线程Worker,目的是管理线程的状态并维护线程的生命周期

工作线程Worker 比较特别,下面对其关键点进行说明:

  • 继承了 AQS ,实现了一套独占锁机制。

    1.Worker 并没有直接使用可重入锁 ReentrantLock ,而是通过继承 AQS 实现了不可重入的独占锁,目的就是通过不可重入的特性判断 Worker 中封装线程的执行状态。
    2.在线程执行任务期间会加 Worker非重入锁,表示当前线程正在执行任务中,并不是处于空闲状态,不应该中断该线程。
    3.如果线程不是独占锁的状态则表明该线程处于空闲状态,可以对该线程进行中断

  1. 实现了 Runnable 接口,它是一个任务体并重写的 run 方法,该方法是线程池执行任务的关键。

    在创建 Worker 成功后,紧接着就会启动 Worker 封装的真实 Thread ,启动成功后 Worker 中的 run 方法就会执行。

  2. 内部封装了实际执行任务的线程。

    内部封装的线程是线程池的工厂创建出来的,它的使命就是执行 Worker 中的 run 方法中的任务。那业务任务谁来执行? 同样地,也是该线程执行,只不过它使用的是方法级别的调用。

  3. 内部封装了初始化任务体

    Worker 使用 firstTask 保存传入的第一个任务,该任务允许为null。如果该任务非空,那么线程就会在启动后优先执行这个任务,一般对应于核心线程的创建和未达到最大线程数情况下的非核心线程的创建;如果该任务为空,对应于非核心线程的创建,用于去执行任务队列中的任务。

  4. 线程复用

    一个 Worker 对应线程池中的一个线程,线程复用的逻辑实现是在 Worker 类中的 run 方法中执行 runWorker 方法。由上面的第 2、3 两个说明,很容易得出,当 Worker 中的线程启动后会执行 Worker 这个任务体的 run 方法,进而该线程就会执行 runWorker 方法,然后进入到 while 自旋,实现线程的复用。

  5. 线程回收

    线程池管理着线程的生命周期,需要对长时间空闲的线程、启动失败的线程以及执行任务出现异常的线程进行回收。线程池使用了HashSet这个Hash表去持有Worker的引用,这样可以通过添加引用和移除引用的操作来控制线程的生命周期。

前文对线程池的任务执行机制进行了介绍,下图是 Worker 执行任务的模型:

新增线程

如果说 execute 方法逻辑体现了提交任务到线程池的流程,那么 addWorker 方法则体现了线程池执行任务的开端,即接收任务、创建线程、启动线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
private boolean addWorker(Runnable firstTask, boolean core) {

//------------------------------- 1 创建线程前的检测工作 -------------------------------------/
// for 跳出标志
retry:
for (; ; ) {

//------------------------- 1.1 创建线程前,对线程池状态和队列进行检查,判断是否还可以创建线程 ----------------------/

// 获取线程池状态码
int c = ctl.get();

// 获取线程池状态
int rs = runStateOf(c);

/**
*
* 如果线程池状态范围是:[SHUTDOWN,TERMINATED],出现下列任一种情况都不允许创建Worker:
* 1 firstTask != null
* 2 workQueue 为空
*
*小结:
* 1 线程池处于 SHUTDOWN 状态时,不允许提交任务,但是已经存在的任务需要继续执行。
* 1.1 当 firstTask == null 时且阻塞队列不为空,说明非提交任务创建线程,执行阻塞队列中的任务,允许创建 Worker
* 1.2 当 firstTask == null 但阻塞队列为空,不能创建 Worker
* 1.3 当 firstTask != null 时,不能创建
* 2 线程池状态大于 SHUTDOWN 状态时,不允许提交任务,且中断正在执行的任务。
*/
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;


//---------------------------- 2 创建线程前,对线程池中线程数检查,判断是否还可以创建线程 ---------------------/

for (; ; ) {
// 获取线程池线程数
int wc = workerCountOf(c);

// 判断线程池线程数是否达到边界值:1 临界值 2 核心线程数或最大线程数
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;

// 增加线程池中线程数如果成功,则表示创建 Worker 前的校验工作完成,可以进行创建 Worker 流程了。
if (compareAndIncrementWorkerCount(c))
break retry;

// 增加线程数失败,说明可能其它线程也在尝试创建Worker,就需要回到起点,重新校验。

//并发影响,需要重新获取线程池状态码
c = ctl.get();

//线程池状态是否改变,改变了则需要重头校验,否则只需要再次校验线程数即可
if (runStateOf(c) != rs)
continue retry;
}

}


//---------------------------------- 创建 Worker 流程 ------------------------------------/

// Worker 中的线程是否启动的标志
boolean workerStarted = false;

// Worker 是否添加到 workers 集合中的标志
boolean workerAdded = false;

Worker w = null;
try {

// 创建 Worker,将任务传入。注意,如果是非提交任务创建Worker的话,firstTask 为null
w = new Worker(firstTask);

// 将创建的Worker中的线程临时保存到 t,这个是真正的线程,Worker 只是对线程进行了包装。
final Thread t = w.thread;

// Worker 中的线程创建成功
if (t != null) {

// 加锁,注意这个锁的粒度是全局的。也就是说,当这里获取到锁,线程池不能关闭,因为线程池关闭也需要锁。
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();

try {

// 再次获取线程池状态
int rs = runStateOf(ctl.get());

// 如果线程池是运行状态,或者是关闭状态且传入的任务为null(不接收新任务,但是会继续执行任务队列中的任务),符合条件。
// 此外都不符合条件,线程池不会维护当前创建的Worker线程,该Worker线程由于没有被引用最后会被JVM回收
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {

// 提前检查新创建的Worker中的线程是否是启动状态
if (t.isAlive())
throw new IllegalThreadStateException();

// 将新创建的 Worker 加入到 workers 集合,意味着线程池持有当前 Worker 的引用,当前 Worker 不会被 GC。
workers.add(w);

// 更新 largestPoolSize 的值,该值用于追踪线程池中出现过的最大线程数量
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;

// 更新标记值
workerAdded = true;
}
} finally {
// 全局锁释放,注意全局锁释放的时机
mainLock.unlock();
}

// Worker线程只有添加到Worker集合后才能启动线程
if (workerAdded) {
// 启动Worker中的线程,这一步的意义重大
t.start();
// 标记线程启动成功
workerStarted = true;
}
}
} finally {
// 线程加入线程池失败或启动失败,需要清理工作
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}

线程池通过上述方法增加线程,该方法仅完成创建线程并使它运行,最后返回是否成功。至于是哪种情况下增加线程,该方法并不关心。下图是新增Worker线程的流程图:

还需要强调一点,该方法只是创建并启动线程,线程还没有执行任务。再分析执行任务逻辑之前,先来看看创建 Worker 的异常流程,addWorkerFailed 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void addWorkerFailed(Worker w) {
// 获得全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {

if (w != null)
// 从 workers 缓存中移除启动失败的 Worker
workers.remove(w);

// 减少线程池中线程数,因为在此之前递增了
decrementWorkerCount();

// 尝试终止线程池
tryTerminate();
} finally {
mainLock.unlock();
}
}

方法名非常直观,就是执行 addWorker 失败的处理方法。该方法主要做了以下工作:

  1. 从 Worker 缓存集合中移除启动失败的 Worker 便于 GC 。
  2. 递减线程池中线程数,在校验是否允许创建 Worker 流程中递增了线程数,这里需要递减。
  3. 尝试终止线程池,新增线程失败的原因可能是线程池状态处于[SHUTDOWN,TERMINATED],这种情况下要尝试更新线程池的状态为终止状态。

执行任务

Worker 中的线程启动成功后,其 run 方法会调用 runWorker 方法:

1
2
3
4
5
6
7
/**
* Worker 实现了 Runnable 接口,重写了run() 方法。
*/
public void run() {
// 这里调用了外部类的 runWorker 方法
runWorker(this);
}

runWorker 方法是执行提交任务和阻塞队列中等待任务的核心实现,接下来我们分析它的具体实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
final void runWorker(Worker w) {
// 当前线程,即 w 中的线程
Thread wt = Thread.currentThread();
// 获取该线程的第一个任务,可能没有。如果有的话,优先执行该任务。
Runnable task = w.firstTask;
w.firstTask = null;

// 将 state 值由由 -1 设置为 0,这样就可以允许中断了 。
w.unlock(); // allow interrupts

boolean completedAbruptly = true;
try {
// 循环调用getTask() 方法从任务队列中获取任务并执行
while (task != null || (task = getTask()) != null) {

// 申请Worker非重入锁,标志着自己处于工作状态。
w.lock();

/**
* 该if判断保证了:如果线程池正在停止,需要确保当前线程是中断状态,否则要保证当前线程不是中断状态。
*
* 出现以下任何一种情况都需要中断线程:
* 1 如果线程池状态大于等于 STOP,并且当前线程没有被中断
* 2 如果当前线程被中断了并且线程池状态大于等于 STOP 状态(恢复中断标识)
* 使用interrupted()方法判断线程是否被中断,该方法会清除中断标志位,既确保了在线程RUNNING或者SHUTDOWN状态时线程是非中断状态的,又支持了线程池是STOP状态下的判断
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
// 中断当前线程,进行中断标志复位
wt.interrupt();

try {
// ThreadPoolExecutor 的扩展方法
beforeExecute(wt, task);

Throwable thrown = null;
try {

// 执行目标任务,方法级别调用。
task.run();

} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// ThreadPoolExecutor 的扩展方法
afterExecute(task, thrown);
}
} finally {
// 置空 task,为下一个任务做准备
task = null;
// 更新Worker线程完成任务数量
w.completedTasks++;
// 释放 Worker非重入锁
w.unlock();
}
}

// while 循环没有出现异常,completedAbruptly 才会被设置为 false
completedAbruptly = false;
} finally {
/**
* 线程退出 while 循环后需要进行回收,可能情况如下:
* 1 任务队列中已经没有要执行的任务了
* 2 任务执行过程出现异常
*/
processWorkerExit(w, completedAbruptly);
}
}

线程执行任务的流程如下图所示:

执行任务逻辑已经详细注释,下面对该方法简要分析:

  1. 线程执行任务有两个途径,通过取 Worker 的 firstTask 或者调用 getTask 方法从任务队列中取出待执行的任务。
  2. 线程复用得益于对线程的封装,封装后的线程不再局限于执行当前任务,而是while循环不断地通过getTask()方法获取任务,然后执行任务,从而实现了线程的复用。
  3. 线程在执行任务前会先申请对应 Worker 独占锁,标志自己处于工作状态,不应该中断该线程,这是对线程封装的好处。
  4. 当线程池状态大于等于 STOP 状态,要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
  5. 线程通过调用任务的 run 方法来执行对应的任务,而不是启动线程,这个正是前文特别说明的方法级别调用
  6. 当 Worker 封装的线程退出循环后,执行 processWorkerExit() 方法对该线程进行回收。
  7. 可以通过重写 beforeExecute() 和 afterExecute() 方法来实现 ThreadPoolExecutor 的扩展功能。

再谈线程复用

线程池会使用一定数量的线程去执行任务,通常线程数量远小于任务数量,针对这种情况线程池通过线程复用的方式让同一个线程去执行不同的任务。我们知道线程池是将线程和任务解耦,摆脱了一个任务必须一个线程的限制,这也是线程复用的必要条件。线程池使用Worker对线程的封装,也就是Worker线程,线程启动后会去执行一个循环任务,该任务可以执行线程的首个任务轮询任务队列中的任务,线程通过调用任务的 run 方法实现任务的执行。

线程复用的逻辑主要在 runWorker 方法中,该方法是 Worker 类的 run 方法中的逻辑,Worker 中封装的线程启动后会执行 Worker 的 run 方法进而执行 runWorker 方法。整个逻辑简化后的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
runWorker(Worker w) {
// 线程首个任务
    Runnable task = w.firstTask;
// 轮询任务队列中的任务
    while (task != null || (task = getTask()) != null) {
        try {
// 线程执行任务的 run 方法,即方法级别的调用
            task.run();
        } finally {
            task = null;
        }
    }
}

线程回收

线程池中线程的销毁依赖JVM自动回收,Worker 线程结束任务或异常退出后,Worker 会主动清除自身在线程池中的引用,这意味着线程池可以回收该线程了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 线程执行任务抛出了异常
// 注意,如果非异常退出,那么在 getTask 方法中会递减线程池中线程数量
if (completedAbruptly)
// 减少线程池中线程数量
decrementWorkerCount();

// 获取全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 累计线程池完成的任务数量
completedTaskCount += w.completedTasks;
// 将线程引用移出线程池
workers.remove(w);
} finally {
// 释放全局锁
mainLock.unlock();
}

// 尝试终止线程池
tryTerminate();

int c = ctl.get();

// 如果线程池状态小于 STOP 状态,说明还可以处理任务
if (runStateLessThan(c, STOP)) {

// 1. 当前线程处理任务没有出现异常
if (!completedAbruptly) {
// 获取核心线程数,如果设置了允许回收核心线程数,则返回 0,否则取核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

// 1.1 如果 allowCoreThreadTimeOut=true,并且任务队列中有任务,至少保留一个worker线程
if (min == 0 && !workQueue.isEmpty())
min = 1;

// 1.2 如果 allowCoreThreadTimeOut=false,线程池中线程数不能少于 corePoolSize

// 线程池中线程数大于等于 min ,说明无需创建线程。
if (workerCountOf(c) >= min)
return; // replacement not needed
}

// 执行到这里的可能情况:
// 1 线程池中没有线程执行任务队列中的任务,需要创建线程取执行。(核心线程数设置为 0 或 允许回收核心线程)
// 2 线程池中线程数小于核心线程数,需要创建线程补充核心线程数。(核心线程数 > 0)
// 3 当前线程执行任务过程出现异常,而且当前线程被回收了,为了确保有线程执行任务,这里需要创建线程。
addWorker(null, false);
}
}

线程回收流程如下图所示:

线程数是否够用:针对核心线程数的,如果线程池设置核心线程数为 0 或允许回收核心线程,那么就要确保在任务队列非空的情况下至少有一个线程;如果不允许回收核心线程,且设置的核心线程数 > 0 ,那么要尽量填充核心线程数至上限。

需要注意的是,线程销毁工作不是只有 processWorkerExit 方法才能完成,前文介绍的新增Worker线程逻辑中对异常流处理的 addWorkerFailed 方法也可以做到。这两者销毁线程的时机不同,前者是线程执行任务的逻辑中销毁,后者是创建线程后启动失败的处理。线程是在执行任务的逻辑中由于执行异常被销毁,那么就需要补一个线程来替代该销毁的线程

上述 processWorkerExit 方法在将Worker线程移除线程池后也就完成了线程的回收工作,但由于执行该方法的原因很多,线程正常退出getTask方法或者执行任务异常都会执行该方法,因此在该方法中需要额外完成两个工作。一是使线程池自适应当前状态,另一个是根据需要创建线程。

至此,processWorkerExit 执行完之后Worker线程被销毁,该线程的整个生命周期结束。下面对整个过程使用流程图的形式进行总结,流程图如下:

关闭线程池

调用线程池的 shutdownshutdownNow 方法来关闭线程池,两者的原理有点差异,下面我们分别说明这两个方法。

shutdown

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void shutdown() {
// 全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 设置线程池状态为 SHUTDOWN
advanceRunState(SHUTDOWN);
// 尝试中断线程池所有中闲置的线程
interruptIdleWorkers();
// hook
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}

// 尝试终止线程池
tryTerminate();
}

shutdown() 方法可以安全地关闭一个线程池,体现在下面几个方面:

  1. 只是将线程池的状态置为 SHUTDOWN ,这意味着线程池不能接收新的任务,再有新的任务被提交则根据拒绝策略进行处理。
  2. 会执行完正在执行的任务和队列中等待的任务,任务全部结束后才会彻底关闭线程池。
  3. 尝试中断线程池中所有闲置的线程,便于尝试回收这些线程。
  4. 调用tryTerminate尝试终止线程池,用于将线程池的状态更新为 TERMINATED 。

shutdownNow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
// 全局锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 设置线程池状态为 STOP
advanceRunState(STOP);
// 尝试中断线程池中所有启动状态的线程「通过 Worker 中的 同步状态 state >= 0 判断是已启动的线程」
interruptWorkers();
// 将阻塞队列中正在等待的所有任务进行备份,然后清空阻塞队列并返回备份。有了这个备份,可以根据需要做补救措施。
tasks = drainQueue();
} finally {
mainLock.unlock();
}

// 尝试终止线程池
tryTerminate();
return tasks;
}

shutdownNow() 方法表示立即关闭线程池,工作如下:

  1. 将线程池状态置为 STOP 状态,这意味着线程池接下来就会终止了,不仅不接收新的任务,而且也不会执行队列中等待的任务了,正在执行的任务可以执行完毕。
  2. 中断所有Worker线程,包括空闲和非空闲。
  3. 清空阻塞队列并返回等待执行的任务备份。
  4. 调用tryTerminate尝试终止线程池,用于将线程池的状态更新为 TERMINATED 。

tryTerminate()

对于 tryTerminate() 方法的调用,前文中的新增线程失败逻辑、线程退出while逻辑以及两种关闭线程池的方法都会调用了该方法,也就是说几乎每个线程最后消亡的时候都会调用tryTerminate() 方法,但最后只会有一个线程真正执行到终止线程池的地方。其中线程退出调用的意义重大:线程池的状态是内部自行维护的。下面我们来看看这个方法的具体逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
final void tryTerminate() {
for (; ; ) {

// 线程池状态码
int c = ctl.get();

// 以下几种情况不能终止线程池,直接返回(STOP 状态可不会直接返回)
//1. 线程池是运行状态 RUNNING
//2. 大于等于 TIDYING 状态,无需再次终止
//3. SHUTDOWN 状态且阻塞队列非空,这种情况需要执行完任务队列中的任务
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;

// 执行到这里,说明已经具备终止线程池的条件,只差线程回收了。
// 线程池中线程数量不为 0,向任意空闲线程发出中断信号,所有被阻塞的线程(执行poll/take)最终都会被一个个唤醒,回收。
if (workerCountOf(c) != 0) { // Eligible to terminate
// 这里既不是中断所有线程,也不是中断所有空闲线程,而是中断任意一个空闲线程,原因如下:
// 1. tryTerminate() 方法多处被调用,需要中断线程逻辑在上层已经进行了处理,如 shutdown 方法调用时会中断所有空闲线程
// 2. interruptIdleWorkers(ONLY_ONE) 方法用在 tryTerminate() 方法中主要为了唤醒 getTask()方法中存在执行workQueue.take()等待的线程,防止一直等待造成线程无法回收。
// 即使有多个线程阻塞等待,唤醒任意一个也足够了,被唤醒的线程在退出while循环后会再次调用tryTerminate()方法,继续中断阻塞等待线程。此外线程退出后进入到processWorkerExit()方法中
// 会要申请全局锁的,如果全部唤醒会出现竞争锁的情况。
interruptIdleWorkers(ONLY_ONE);
return;
}

/*------------ 执行到这里,说明线程池中线程都已经退出了,接下来就可以终止线程池了------------*/

// 全局锁
final ReentrantLock mainLock = this.mainLock;

// 终止线程池时加全局锁,保证CAS执行成功,即线程池状态依次更新为 TIDYING 和 TERMINATED 。
// 这里可能发生并发问题,如两个线程都通过了 workerCountOf(c) != 0 条件,执行到这里就需要加锁。
mainLock.lock();
try {
// 设置线程池状态码为 TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 状态设置成功后执行 terminated() 钩子方法
terminated();
} finally {

// 设置线程池状态码为 TERMINATED 终止状态
ctl.set(ctlOf(TERMINATED, 0));

// 主要为 awaitTermination 方法服务,唤醒等待线程池终止的线程
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}

tryTerminate() 方法主要根据线程池状态判断是否终止线程池,下面进行简单总结:

  1. 判断线程池是否可以终止,原则是线程池处于关闭状态、队列中没有任务的情况下可以终止。
  2. interruptIdleWorkers()方法的执行表示线程池具备终止条件,向任意空闲线程发送中断信号防止 getTask 方法中存在核心线程执行 workQueue.take()时一直阻塞,导致线程无法回收
  3. 符合终止线程池的条件时,先获取全局锁,然后先将线程池状态置为 TIDYING 状态,设置成功后会执行 terminated() 钩子方法,最后将线程池状态设置为 TERMINATED 状态,完成线程池状态更新后释放全局锁。

下面我们来简单分析一下interruptIdleWorkers方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
+--- ThreadPoolExecutor
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}

/**
* 中断所有闲置的Worker
*
* @param onlyOne 是否仅中断一个
*/
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
// 全局锁,涉及到 workers 操作线程池都会加该锁
mainLock.lock();
try {
// 遍历 workers ,对每个非中断线程进行中断操作。
for (Worker w : workers) {
Thread t = w.thread;
// 如果线程非中断状态,且能 tryLock() 成功,说明该线程闲置,需要进行中断
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}

前文也进行了说明,Worker 继承了AQS,在Worker线程处理任务时会申请Worker独占锁,interruptIdleWorkers 在进行中断时会使用 tryLock() 来判断该Worker线程是否正在处理任务,如果 tryLock() 返回true,说明该Worker线程处于空闲状态,可以被中断。

注意事项:

  1. 线程池中多处执行 tryTerminate 方法的目的是将符合条件的线程池终止,前文也提到线程池的状态是内部自行维护的,并非人为设置。如用户执行 shutdownshutdownNow 方法只是将线程池的状态设置为 SHUTDOWNSTOP ,后续的 TIDYINGTERMINATED 状态的设置就在于此。
  2. tryTerminate 方法中的 interruptIdleWorkers(ONLY_ONE) 的作用是防止线程池在终止的过程中 getTask 方法中存在执行 workQueue.take() 阻塞的线程,因为此时线程池不允许再有新的任务添加到阻塞队列中,这样一来线程将一直阻塞下去,线程池永远都终止不了。

线程池中虽然多处使用中断来期望中断任务的执行,但由于 Java 中不推荐强行停止线程的机制的限制,因为强制的让一个线程被动的退出是很不安全的,内部的数据不一致会对程序造成不可预知的后果。即使调用了 shutdownNow 方法,如果被中断的线程对于中断信号不敏感,那么依然有可能导致任务不会停止。

线程池配置

线程池太大或太小都会导致麻烦,选择一个合适的线程池是非常有必要的。调整线程池中的数量是为了充分并合理地使用 CPU 和内存资源,从而最大限度地提高程序性能。通常我们需要根据任务执行的性质来选择对应的策略。

CPU 密集型任务

如果任务主要进行大量复杂的计算,例如加密、解密、压缩等,那么意味着 CPU 的处理能力是稀缺的资源,应当分配较少的线程,通常按照 CPU 核数 或者 CPU 核数 + 1 进行设置。 计算任务会占用大量的 CPU 资源,CPU 的每个核工作基本都是高负荷的,如果设置过多的线程,每个线程都会尝试抢占 CPU 资源,这就造成了不必要的上下文切换(CPU并没有太多空闲),性能反而由于线程数量过多导致性能下降。

IO 密集型任务

I/O 操作比较多的任务,如数据库操作、文件读写、网络通信等,一般不会消耗太多 CPU 资源,但是普遍需要较长时间的等待,对于这类任务可以配置适当多的线程,如 CPU 核数 * 2 。由于 IO 读写速度相比于 CPU 的速度是比较慢的,设置过少的线程数是不能充分利用 CPU 资源。

合适线程数

Brain Goetz 推荐的计算方法如下:

线程数 = CPU核数 × 目标CPU利用率 ×(1 + 平均等待时间/平均工作时间)

通过上面的公式可以大致计算出一个合理的线程数(核心线程数和最大线程数统称)。如果任务平均等待时间长则线程数就应该多,对应于 IO 密集型任务。如果平均工作时间长则线程数就应该少,对应于 CPU 密集型任务。

线程数太少可能会使得程序整体性能降低,线程数太多可能会消耗内存资源以及造成不必要的上下文切换。想用准确定制线程池需要做的工作很多,除了考虑线程数还可以合理使用线程池的阻塞队列实现任务的调度,还可以根据业务等纬度实现线程池隔离。

线程池监控

线程池提供了一些用于获取属性的方法,这些属性可以用来对线程池进行监控。

线程池还提供了一些用于设置核心属性的方法,使用方可以通过这些方法动态设置线程池的核心策略,线程池内部会处理好当前状态并做到平滑修改。

动态设置核心线程数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
+--- ThreadPoolExecutor
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
// 计算核心线程数变化值
int delta = corePoolSize - this.corePoolSize;

// 覆盖原来的corePoolSize
this.corePoolSize = corePoolSize;

//线程池的线程数大于变更的核心线程数,说明有多余的worker线程,此时会向空闲的worker线程发起中断请求以实现回收
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();

// 核心线程数大于原来值,尝试增加核心线程
else if (delta > 0) {

// 取 任务数和 delta 两者的最小值
int k = Math.min(delta, workQueue.size());

// 预先创建足够多的新Worker以达到核心线程数,并处理队列中的任务。队列空了则停止
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}

动态设置最大线程数

1
2
3
4
5
6
7
8
9
10
11
12
+--- ThreadPoolExecutor
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();

// 覆盖原来的 maximumPoolSize
this.maximumPoolSize = maximumPoolSize;

// 如果是设置小了的话,此时会向空闲的worker线程发起中断请求以实现回收
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}

动态设置空闲时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
+--- ThreadPoolExecutor
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();

if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
// 计算超时时间
long keepAliveTime = unit.toNanos(time);
// 计算差值
long delta = keepAliveTime - this.keepAliveTime;
// 覆盖原来的 keepAliveTime
this.keepAliveTime = keepAliveTime;
// 如果时间设置比原来小,则向空闲的worker线程发起中断请求以实现回收
if (delta < 0)
interruptIdleWorkers();
}

允许核心线程超时回收

1
2
3
4
5
6
7
8
9
10
11
12
13
+--- ThreadPoolExecutor
public void allowCoreThreadTimeOut(boolean value) {
// 核心线程必须要有保活时间
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");

if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
// 允许回收则立即中断空闲线程
if (value)
interruptIdleWorkers();
}
}

小结

本篇文章对线程池核心点进行了详细分析,先是简单介绍了线程池产生的背景,接着说明了线程池的优势,最后对线程池源码进行了分析。从任务提交到线程池,到线程池创建线程并处理任务,到最后线程被回收,最后简单介绍了线程池的配置以及线程池的监控。