前言
最初的时候并没有线程池的概念,而是先有线程。每个任务都需要对应一个线程,任务少的情况没有太大问题,任务过多就出现了各种性能和资源开销问题,更重要的是可创建线程的数量是有上限的,不可能无限的创建。在并发环境下,系统不能够确定在某一时刻有多少任务需要执行,有多少资源需要投入。
针对上述问题,于是诞生了线程池,用来平衡线程与系统资源之间的关系。线程池解决问题思路如下:
- 对于反复创建线程开销问题,线程池采用固定数量的线程一直保持工作状态并能反复执行任务。
- 对于创建过多线程占用太多资源问题,线程池会根据需要创建线程,灵活地控制线程数量,避免使用过多资源。
概述
线程池是一种管理线程和任务的工具,是应用场景最多的并发框架之一,几乎所有需要异步或并发执行任务的应用程序都可以使用线程池,合理地使用线程池可以带来可观得性能提升和响应速度。具体好处如下:
- 解耦:线程的创建与任务执行完全分开。
- 降低资源消耗:线程的复用降低了线程创建和销毁带来的资源消耗。
- 提高响应速度:大多情况下(线程池预热后),到达的任务可以不需要等待线程创建就能立即执行,消除了线程创建所带来的延迟,提升了响应速度。
- 便于线程管理:线程是稀缺资源,不能无限制地创建,使用线程池可以对线程进行统一分配、调优和监控。
关于线程池的概述就介绍这么多,本篇文章介绍的线程池核心是 JDK 中提供的 ThreadPoolExecutor
类,具体涉及的接口和实现类如下图所示:
需要说明的是,关于Scheduled类型的线程池继承体系本篇文章没有介绍到,它是基于本篇文章着重介绍的 ThreadPoolExecutor 的扩展实现,支持时间纬度执行任务。
总体设计
线程池的设计没有办法直接采用一般意义上池化资源方案,而是采用生产者 - 消费者模式,将任务和线程解耦并不直接关联,从而良好的缓冲任务、复用线程,缓冲任务通过阻塞队列实现,工作线程从阻塞队列中获取任务以实现线程复用。线程池的使用方是生产者,线程池本身是消费者。至于为什么线程池没有采用一般意义上池化资源的设计方法,这个取决于线程对象的特殊性,线程有着特殊的生命周期,一旦一个线程启动执行任务就不能再次启动了。
任务执行
任务的执行不一定非要通过开启新线程,任务在线程执行之前它也是一个实现类,也有对应的方法。因此我们可以定义出方法级别调用和线程级别调用。
线程级别调用
1 | new Thread(() -> { |
方法级别调用
1 | ((Runnable) () -> { |
线程池对任务的处理最终是通过方法级别调用的来完成,在后面分析源码的时候我们可以看到。
Executor 框架
Executor 是一个异步任务的框架,根据一组执行策略进行调用、调度、执行和控制,目的是提供一种将任务提交和任务执行分离的机制。
两级调度模型
最早 Java 的线程既是任务体也是执行机制,从 JDK5 开始,把任务体和执行机制进行了分离。任务体包括 Runnable 和 Callable,而执行机制由 Executor 框架提供,即 Executor是 Runnable 和 Callable 的调度容器。
Java 线程会被一对一映射为操作系统线程,在 Java 线程启动时创建对应的操作系统线程,同样地,当该 Java 线程终止时对应的操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的 CPU 。对于计算复杂的应用,我们通常会将其拆解为若干个任务并交给 Java 多个线程,这个动作是由用户级别的调度器 Executor 框架完成的,它会将这若干个任务映射为对应数量的线程。在底层,操作系统内核将这些线程映射到硬件处理器上。由此可见,创建一个线程远比创建一个对象要复杂得多,不仅要在 JVM 堆中分配内存,还需要调用操作系统内核 API 来为线程分配资源,因此应该避免频繁创建和销毁。
这个过程属于两级调度模型,对应的示意图如下:
从示意图可看出,应用程序通过 Executor 框架控制上层的调度。而下层的调度由操作系统内核控制,应用程序是无法控制的。
Executor 框架结构
Executor 框架主要由三大部分组成,具体如下:
- 任务体
包括 Runnable 接口和 Callable 接口及其实现。 - 任务的执行
包括任务执行机制的核心接口 Executor,继承 Executor 的 ExecutorService 接口和它的实现们。 - 异步计算结果
包括核心接口 Future 以及对应的实现们,特别是 FutureTask 实现类。是对具体 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作。
Executor 框架成员
Executor 框架是线程池实现的基础,它的主要成员有 ThreadPoolExecutor
、ScheduledThreadPoolExecutor
、Executors
、Runnable
、Callable
以及 Future
。
下面正式进入到代码层面的介绍,定时任务实现类 ScheduledThreadPoolExecutor
继承自 ThreadPoolExecutor
,用于实现定时执行,本文暂不介绍它的实现。
Executor 接口
1 | public interface Executor { |
Executor 接口仅定义了一个方法,参数是 Runnable 类型,该方法的目的是将任务提交和任务执行细节解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需将任务提交到执行器 Executor 中,由执行器完成线程的调配和任务的执行。需要注意的是,该接口是没有返回值的,也就意味着无法判断提交的任务是否被线程池执行成功。
ExecutorService 接口
ExecutorService 接口继承自 Executor 接口,一般我们自定义线程池时使用的就是这个接口,该接口中定义的方法加上继承过来的 execute 方法在很多场景中已经可以满足需求了。
该接口中的方法如下图所示:
上图中的方法大致分类如下:
- 向线程池提交任务方法
submit 方法和前文中的 Executor 接口中的 execute 方法有所不同,虽然也是向线程池提交任务,但是有返回值 Future ,并且参数类型不仅支持 Runnable 类型还支持 Callable 类型。
- 执行任务方法
invokeAll 方法用于执行多个任务,同时支持设置超时时间。invokeAny 方法用于执行多个方法中的一个即可,任务执行完成就可以返回,同样支持设置超时时间。这两类方法的底层需要依赖 execute 方法。
- 关闭线程池方法
shutdown 和 shutdownNow 方法用于关闭线程池。
- 判断线程池是否关闭
isShutdown 判断线程池是否已经开始了关闭工作,即是否执行了 shutdown 或者 shutdownNow 方法。注意,该方法返回 true 并不代表线程池此时已经彻底关闭了,仅说明线程池开始了关闭的流程,此时线程池中可能依然有线程在执行任务,队列中仍有等待被执行的任务。
- 判断线程池是否终止方法
isTerminated 和 awaitTermination 方法用于判断线程池是否终止。只有在调用关闭方法后才有调用的意义。
Future
Future 的继承体系如下图所示:
由上图的UML可知,FutureTask 通过 RunnableFuture 间接实现了 Runnable 接口,因此 Executor.execute
方法支持将 FutureTask 提交到线程池。接下来介绍 AbstractExecutorService 抽象实现类就能很清晰看出 FutureTask 的作用。
AbstractExecutorService 实现
AbstractExecutorService 抽象类实现自 ExecutorService 接口,在其基础上实现了几个常用的方法,这些方法供子类进行调用。将执行任务的流程串联起来,保证下层的实现(如 ThreadPoolExecutor)只需关注执行任务的方法即可。具体方法如下:
由于 invokeAll 方法和 invokeAny 方法更多的是执行将任务提交给线程池前的工作,它们并没有将任务提交给线程池,需要通过 Executor 中的 execute 方法实现,而 execute 方法最终要交给具体子类实现。因此,不再对这两类方法展开说明。下面重点介绍下 newTaskFor 方法和 submit 方法。
newTaskFor 方法
1 | --- AbstractExecutorService |
从上面代码可以看出,newTaskFor 方法用于将 Runnable 和 Callable 类型的任务统一包装成 FutureTask ,FutureTask 又间接继承了 Runnable 接口。我们知道,Runnable 的 void run() 方法是没有返回值的, 而 Callable 的 V call() 方法是有返回值的,但 Executor 中的 void execute(Runnable command)方法是不关心返回结果的,它只负责处理 Runnable 类型的任务。综上,不难看出 newTaskFor 方法就是为了屏蔽不同类型任务的差异,以达到统一交给 Executor.execute 执行的目的。下面我们继续看提交任务的另外一种方式。
submit 提交任务
1 | --- AbstractExecutorService |
Runnable 的 void run() 方法没有返回值,但是有的时候我们需要返回值,这个时候 submit 方法就可以实现,只需在该方法的第二个参数传入预期结果,当任务执行完成后会自动返回。
1 | <T> Future<T> submit(Runnable task, T result) |
此外,我们可以看出 submit 方法提交任务的能力是通过 execute 方法实现的。定义于最上层接口 Executor 中的 void execute(Runnable command) 方法不会返回执行结果,想要执行结果就需要通过 FutureTask 包装任务,然后将包装后的任务 FutureTask 交给 Executor.execute 方法执行,执行后的结果也会保存到 FutureTask 中。关于 Future 的继承体系不展开分析,下面概述下 submit 提交任务的原理。
- 调用 newTaskFor 方法将 Runnable 和 Callable 类型的任务统一包装成 FutureTask 对象。包装的本质是将任务统一适配为Callable类型,因为Callable类型任务可以通过call方法返回执行结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31// Callable
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW;
}
// Runnable
public FutureTask(Runnable runnable, V result) {
// 将 Runnable 适配成 Callable
this.callable = Executors.callable(runnable, result);
this.state = NEW;
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
// 任务执行还是调用 run 方法,返回结果是传入的预期值
public T call() {
task.run();
return result;
}
} - 当任务执行的时候,FutureTask 中的 run 方法会执行,这个过程是最关键的一步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 被适配的Runnable 和 Callable 方法级别调用
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
// 将执行结果保存到 FutureTask 中
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
} - 通过FutureTask#get方法从 FutureTask 中取出任务执行结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
// 从 FutureTask 中取出任务执行结果
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
// 从 FutureTask 中取出任务执行结果
return report(s);
}
提交任务方式
最上层接口 Executor 中的 void execute(Runnable) 不需要获取结果,不会使用 FutureTask 包装任务。抽象实现类 AbstractExecutorService 中的 Future<?> submit() 需要获取结果,因此使用了 FutureTask 包装任务。
需要获取任务结果用 submit 方法,不需要获取结果用 execute 方法。
运行机制
上图展示了线程池的运行机制,线程池运行机制主要分成两个部分,线程管理和任务管理。下面对线程池的主要处理流程进行说明:
- 主线程提交任务到线程池。
- 如果当前线程池中的线程少于核心线程数,则创建新的线程来执行任务。
- 如果线程池中的线程达到或超过核心线程数,则将任务加入到阻塞队列中。
- 如果在第 2 步中无法将任务加入阻塞队列,则依据最大线程数创建新的线程来处理任务。
- 如果在第 3 步创建新线程会使线程池中线程数超出最大线程数,任务将被拒绝并使用饱和策略处理(拒绝策略)。
- 处理完任务的线程会自旋获取新的任务去执行,当线程获取不到任务时,线程会被回收(一般针对非核心线程)。
其中第 1 步和第 3 步涉及到创建线程,该过程需要获取全局锁,因为关闭线程池也需要获取这个全局锁。当线程池完成了预热即核心线程数创建完毕,在一定程度上就不需要频繁创建线程,也就降低了获取全局锁的频次,对于线程池来说全局锁是一个严重的可伸缩瓶颈。关于流程中的概念下文会陆续说明。
ThreadPoolExecutor
线程池核心实现就在 ThreadPoolExecutor 实现类中,该类实现了线程池所需的各个方法,包括最核心的 execute 方法。开发者可以基于该实现类来进行功能上的扩展,定时任务实现类 ScheduledThreadPoolExecutor
就是基于 ThreadPoolExecutor
扩展的功能。
在详细介绍 ThreadPoolExecutor
运行机制之前,我们先对其核心概念,属性、方法等进行简单介绍。
核心参数
1 | public ThreadPoolExecutor(int corePoolSize, // 核心线程数 |
- corePoolSize
核心线程数,线程池的基本大小。当提交一个任务到线程池时,线程池会创建一个基本线程来执行任务,即使其它空闲的基本线程能够执行新任务也会创建线程,只有线程池预热完毕(线程池中线程数达到核心线程数)才不再创建核心线程。
特别说明:
- 核心线程并不是特指某一个或某几个线程,而是针对设置的核心线程数而言,任何一个线程都可以是核心线程。
- corePoolSize 表示的是线程池的常驻线程数,如果设置为 0 则表示在没有任何任务时需要销毁线程池。如果大于 0 ,即使没有任务时也会保证线程池的线程数等于此值。
- 关于此值设置的合理性,如果设置的较小,则会频繁的创建和销毁线程(非核心线程);设置过大,则会浪费资源。
maximumPoolSize
最大线程数,线程池允许创建的最大线程数,最大线程数 = 核心线程数 + 非核心线程数。此值只有在任务比较多且阻塞队列放不下时才会用到。
keepAliveTime
空闲线程存活时间,线程池中的线程空闲时间超过该值也没任务可做那么就需要回收销毁。如果设置为 0,表示一旦空闲立即回收。该参数一般只会对非核心线程起作用,核心线程不会因为空闲太长时间而被关闭,当最大线程数等于核心线程数时,那么线程池在空闲的时候也不会销毁任何线程。但是可通过调用 allowCoreThreadTimeOut(true) 方法使核心线程数内的线程也可以被回收。
unit
和 keepAliveTime 参数一起使用,是时间单位。如:天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)等。
workQueue
用于存放等待执行的任务的阻塞队列,是 BlockingQueue 接口的实现。当线程池中的线程数大于等于核心线程数时才会用到该队列,注意和有没有空闲核心线程无关。
threadFactory
线程工厂,线程池中的线程就由它创建。如果没有设置就使用默认的线程工厂。
handler
饱和策略(拒绝策略),当阻塞队列和线程池都满了,说明线程池处于饱和状态,需要采取一种策略处理提交的新任务,默认是直接抛出异常。
通过配置不同的参数,就可以创建出行为不同的线程池,这也是线程池高度灵活性的基础。
核心属性
1 | --- ThreadPoolExecutor |
上面的属性都很重要,其中还包含了部分属性的操作方法,这些都会在下面的源码分析中不断出现。
构造方法
1 | // Public constructors and methods 构造方法们 |
创建线程池时如果不指定线程工厂则会使用默认的线程工厂,默认线程工厂创建的线程都属于同一个线程组,拥有相同的优先级,并且都是非守护线程,具体代码实现如下:
1 | --- Executors |
生命周期
设计思想
线程池采用的是 Integer.SIZE 32 位的整数来存放线程池的状态和池中的线程数,其中高 3 位表示线程池状态即可以表示 7 种状态,低 29 位表示线程数即可以存放 5 亿多个线程。这种设计思想对整数赋予了双重角色,通过高低位的不同,既表示线程池状态,又表示工作线程数目,这是一个典型的高效优化。要知道用一个变量存储两个值,可以避免在做相关决策时出现不一致的情况,省去了占用锁资源去维护两个变量的一致性。这种方式在其它框架中也多有使用,如 Dubbo 协议就使用 16 个字节共 128 位,每一位用来表示不同意义的数值。
线程池状态
线程池的状态表示如下图所示:
注意,线程池的状态并非用户显示配置(用户调用关闭方法除外),而是随着线程池的运行由内部自行维护,和线程的执行密切相关,下面分别说明线程池的状态及其状态流转。
RUNNING
- 状态说明
线程池处于 RUNNING 状态允许接受新的任务,处理任务队列中的任务。 - 状态转换
线程池一旦被创建就处于 RUNNING 状态,并且线程池中的线程数为 0 。
SHUTDOWN
- 状态说明
线程池处于 SHUTDOWN 状态时,不再接收新任务,但能处理任务队列中的任务。 - 状态转换
调用线程池的shutdown()方法时,线程池由RUNNING -> SHUTDOWN 。
STOP
- 状态说明
线程池处在 STOP 状态时,不能接收新任务,也不处理任务队列中的任务,并且会中断正在处理任务的线程。 - 状态转换
调用线程池的shutdownNow()方法时,线程池由(RUNNING or SHUTDOWN ) -> STOP 。
TIDYING
- 状态说明
所有的任务已终止,线程池中线程数为 0 ,线程池会变为TIDYING状态(线程池内部自动更新状态)。当线程池变为TIDYING状态后,会紧接着执行钩子方法terminated()。若用户需要在线程池变为TIDYING时,进行相应的处理,可以通过重写terminated()方法来实现。 - 状态转换
当线程池在 SHUTDOWN 状态下时,阻塞队列为空并且线程池中线程数为 0 时,就会由 SHUTDOWN -> TIDYING。 当线程池在 STOP 状态下,线程池中线程数为 0 时,就会由STOP -> TIDYING。
TERMINATED
- 状态说明
线程池彻底终止,就变成 TERMINATED 状态。 - 状态转换
线程池处在TIDYING状态时,执行完 terminated() 方法之后,就会由 TIDYING -> TERMINATED。
下面进行小结,线程池状态及流转(线程池的生命周期)如下图所示:
任务执行机制
任务调度
任务调度是线程池的主要入口,所有任务的调度都是由execute方法完成的,当用户提交了一个任务后,任务调度阶段将决定如何执行该任务。
1 | public void execute(Runnable command) { |
execute 方法逻辑体现了提交任务到线程池的流程,上面代码已经详细注释。需要强调的是,符合将任务加入阻塞队列中的情况,会进行双重检查线程池的状态,因为是直接将任务入队,和前后两种情况不一样,即使任务成功排队,也有可能出现线程池关闭或线程池为空的情况。
下面通过一张图进行阐述正常流(不考虑线程池关闭等情况)的流程:
任务缓冲
任务缓冲是线程池管理任务的核心部分,通过一个阻塞队列来实现。线程池的本质是对任务和线程的管理,而做到这一点关键的思想是将任务和线程解耦,阻塞队列缓冲任务,工作线程自旋从阻塞队列中获取任务。
阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。
线程池中的阻塞队列参数非常重要,不同的阻塞队列对线程池有不同影响,下面对线程池常用的阻塞队列进行说明。
ArrayBlockingQueue
基于数组结构的有界阻塞队列,该队列按照先进先出原则对元素进行排序。LinkedBlockingQueue
基于链表结构的无界阻塞队列,该队列按照先进先出规则对元素进行排序。此队列的默认长度为 Integer.MAX_VALUE,使用该队列作为任务队列有容量危险。SynchronousQueue
一个比较特殊的阻塞队列,其本身不存储元素。每个插入操作必须等待另一个线程执行移除操作,反之亦然。如果使用该阻塞队列,只有当两个线程执行相反模式的操作才能配对成功,否则先执行的一方只能等待。下图是对线程池使用该队列处理任务过程的描述:PriorityBlockingQueue
支持优先级排序的无界阻塞队列,默认自然排序规则,不能保证同优先级元素的顺序。DelayQueue
一个实现延迟获取元素的无界队列,在创建元素时可以指定多久才能从队列中移除,只有延时期满后才能从队列中获取元素。LinkedBlockingDeque
一个由链表结构构成的双向阻塞队列。队列头部和尾部都可以添加和移除元素。
任务申请
任务执行有两种情况,一种是任务直接交给新创建的线程执行。另一种是线程执行 getTask 方法从任务队列中获取任务并执行,执行完任务的线程会继续尝试从任务队列中申请任务再去执行。第一种情况仅出现在用户提交任务到线程池,线程池为该任务创建线程的时候。第二种情况是线程执行任务最多的情况,包括线程池存在的线程执行任务,创建的非核心线程执行任务。
任务申请的核心方法 getTask 是配合 Worker线程 工作的,用于 Worker线程 拉取任务队列,下面对该方法进行分析。
1 | /** |
上述方法用于从任务队列中不断拉取待执行的任务,具体执行流程如下图所示:
下面对主要逻辑进行说明:
- 该方法返回 null 时,表示当前线程可以被回收了,包括核心线程。这也是该方法多次判断的原因,控制线程池中线程数量,进而控制线程池的状态。
- 在没有设置 allowCoreThreadTimeOut 时,核心线程数的线程会阻塞等待任务,不会被回收。
- 超时回收,在 keepAliveTime 对应的具体时间内都没有任务,应该回收非核心线程。
- 以下情况需要返回 null,回收当前线程。
- 线程池处于 STOP 状态。
- 线程池处于 SHUTDOWN 状态,且阻塞队列为空。
- 线程池中的线程数大于最大线程数。
- 线程获取任务超时再次重试时,仍为可回收线程。
getTask 方法还是比较复杂的,整个逻辑中进行了多次判断,目的是控制线程的数量,进而维护线程池的状态。需要特殊说明的是,当线程获取任务超时时并没有立刻回收该线程,而是让线程重试,这么做是为了防止该线程可能会成为核心线程,避免误回收,如果误回收在后续流程中还需要重新创建线程,因此重试一次代价会小一些。
任务执行
任务执行是 Worker线程 的工作,我们会在下面详细介绍。
任务拒绝
拒绝策略
线程池的拒绝策略属于一种限流保护机制,防止线程池崩溃。线程池拒绝任务的时机如下:
- 执行关闭方法后线程池处于关闭状态及以上状态
- 线程池处于运行状态,但是没有能力(阻塞队列已满,线程数达到最大值)处理新提交的任务了。
JDK 内置了 4 种拒绝策略,默认使用 AbortPolicy 策略。拒绝策略如下图所示:
AbortPolicy
1 | /** |
AbortPolicy 策略是线程池默认的拒绝策略,在任务不能再提交到线程池时抛出异常,能够及时反馈程序的运行状态。对于比较核心的业务推荐使用此拒绝策略,因为当系统不能承载更大的并发流量时,业务方能够及时地通过异常发现。
CallerRunsPolicy
1 | /** |
CallerRunsPolicy 策略是由提交任务的线程处理任务,此策略适合让所有任务都执行完毕。
DiscardPolicy
1 | /** |
DiscardPolicy 策略会直接丢弃任务,并且不会抛出异常。此策略会导致业务方无法发现异常,不建议核心业务采用此策略。
DiscardOldestPolicy
1 | /** |
DiscardOldestPolicy 策略会丢弃队列最前面的任务,然后重新提交被拒绝的任务。这种策略存在丢失任务的风险。
自定义拒绝策略只需要实现 RejectedExecutionHandler 接口,重写 rejectedExecution 方法即可。如果不自定义拒绝策略,线程池将使用默认的拒绝策略。
Worker线程管理
前文在介绍任务执行机制的时候涉及到 Worker线程,线程池维护的线程模块其实就是一组 Worker对象 ,下面我们就来看看 ThreadPoolExecutor 的内部类 Worker 。
Worker线程
1 | private final class Worker extends AbstractQueuedSynchronizer implements Runnable { |
线程池在创建线程时,会将线程封装成工作线程Worker,目的是管理线程的状态并维护线程的生命周期。
工作线程Worker 比较特别,下面对其关键点进行说明:
- 继承了 AQS ,实现了一套独占锁机制。
1.Worker 并没有直接使用可重入锁 ReentrantLock ,而是通过继承 AQS 实现了不可重入的独占锁,目的就是通过不可重入的特性判断 Worker 中封装线程的执行状态。
2.在线程执行任务期间会加 Worker非重入锁,表示当前线程正在执行任务中,并不是处于空闲状态,不应该中断该线程。
3.如果线程不是独占锁的状态则表明该线程处于空闲状态,可以对该线程进行中断
实现了 Runnable 接口,它是一个任务体并重写的 run 方法,该方法是线程池执行任务的关键。
在创建 Worker 成功后,紧接着就会启动 Worker 封装的真实 Thread ,启动成功后 Worker 中的 run 方法就会执行。
内部封装了实际执行任务的线程。
内部封装的线程是线程池的工厂创建出来的,它的使命就是执行 Worker 中的 run 方法中的任务。那业务任务谁来执行? 同样地,也是该线程执行,只不过它使用的是方法级别的调用。
内部封装了初始化任务体
Worker 使用 firstTask 保存传入的第一个任务,该任务允许为null。如果该任务非空,那么线程就会在启动后优先执行这个任务,一般对应于核心线程的创建和未达到最大线程数情况下的非核心线程的创建;如果该任务为空,对应于非核心线程的创建,用于去执行任务队列中的任务。
线程复用
一个 Worker 对应线程池中的一个线程,线程复用的逻辑实现是在 Worker 类中的 run 方法中执行 runWorker 方法。由上面的第 2、3 两个说明,很容易得出,当 Worker 中的线程启动后会执行 Worker 这个任务体的 run 方法,进而该线程就会执行 runWorker 方法,然后进入到 while 自旋,实现线程的复用。
线程回收
线程池管理着线程的生命周期,需要对长时间空闲的线程、启动失败的线程以及执行任务出现异常的线程进行回收。线程池使用了HashSet这个Hash表去持有Worker的引用,这样可以通过添加引用和移除引用的操作来控制线程的生命周期。
前文对线程池的任务执行机制进行了介绍,下图是 Worker 执行任务的模型:
新增线程
如果说 execute 方法逻辑体现了提交任务到线程池的流程,那么 addWorker 方法则体现了线程池执行任务的开端,即接收任务、创建线程、启动线程。
1 | private boolean addWorker(Runnable firstTask, boolean core) { |
线程池通过上述方法增加线程,该方法仅完成创建线程并使它运行,最后返回是否成功。至于是哪种情况下增加线程,该方法并不关心。下图是新增Worker线程的流程图:
还需要强调一点,该方法只是创建并启动线程,线程还没有执行任务。再分析执行任务逻辑之前,先来看看创建 Worker 的异常流程,addWorkerFailed
方法。
1 | private void addWorkerFailed(Worker w) { |
方法名非常直观,就是执行 addWorker 失败的处理方法。该方法主要做了以下工作:
- 从 Worker 缓存集合中移除启动失败的 Worker 便于 GC 。
- 递减线程池中线程数,在校验是否允许创建 Worker 流程中递增了线程数,这里需要递减。
- 尝试终止线程池,新增线程失败的原因可能是线程池状态处于[SHUTDOWN,TERMINATED],这种情况下要尝试更新线程池的状态为终止状态。
执行任务
Worker 中的线程启动成功后,其 run 方法会调用 runWorker 方法:
1 | /** |
runWorker 方法是执行提交任务和阻塞队列中等待任务的核心实现,接下来我们分析它的具体实现。
1 | final void runWorker(Worker w) { |
线程执行任务的流程如下图所示:
执行任务逻辑已经详细注释,下面对该方法简要分析:
- 线程执行任务有两个途径,通过取 Worker 的 firstTask 或者调用 getTask 方法从任务队列中取出待执行的任务。
- 线程复用得益于对线程的封装,封装后的线程不再局限于执行当前任务,而是while循环不断地通过getTask()方法获取任务,然后执行任务,从而实现了线程的复用。
- 线程在执行任务前会先申请对应 Worker 独占锁,标志自己处于工作状态,不应该中断该线程,这是对线程封装的好处。
- 当线程池状态大于等于 STOP 状态,要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
- 线程通过调用任务的 run 方法来执行对应的任务,而不是启动线程,这个正是前文特别说明的方法级别调用。
- 当 Worker 封装的线程退出循环后,执行 processWorkerExit() 方法对该线程进行回收。
- 可以通过重写 beforeExecute() 和 afterExecute() 方法来实现 ThreadPoolExecutor 的扩展功能。
再谈线程复用
线程池会使用一定数量的线程去执行任务,通常线程数量远小于任务数量,针对这种情况线程池通过线程复用的方式让同一个线程去执行不同的任务。我们知道线程池是将线程和任务解耦,摆脱了一个任务必须一个线程的限制,这也是线程复用的必要条件。线程池使用Worker对线程的封装,也就是Worker线程,线程启动后会去执行一个循环任务,该任务可以执行线程的首个任务和轮询任务队列中的任务,线程通过调用任务的 run
方法实现任务的执行。
线程复用的逻辑主要在 runWorker 方法中,该方法是 Worker 类的 run 方法中的逻辑,Worker 中封装的线程启动后会执行 Worker 的 run 方法进而执行 runWorker 方法。整个逻辑简化后的代码如下:
1 | runWorker(Worker w) { |
线程回收
线程池中线程的销毁依赖JVM自动回收,Worker 线程结束任务或异常退出后,Worker 会主动清除自身在线程池中的引用,这意味着线程池可以回收该线程了。
1 | private void processWorkerExit(Worker w, boolean completedAbruptly) { |
线程回收流程如下图所示:
线程数是否够用:针对核心线程数的,如果线程池设置核心线程数为 0 或允许回收核心线程,那么就要确保在任务队列非空的情况下至少有一个线程;如果不允许回收核心线程,且设置的核心线程数 > 0 ,那么要尽量填充核心线程数至上限。
需要注意的是,线程销毁工作不是只有 processWorkerExit
方法才能完成,前文介绍的新增Worker线程逻辑中对异常流处理的 addWorkerFailed
方法也可以做到。这两者销毁线程的时机不同,前者是线程执行任务的逻辑中销毁,后者是创建线程后启动失败的处理。线程是在执行任务的逻辑中由于执行异常被销毁,那么就需要补一个线程来替代该销毁的线程。
上述 processWorkerExit
方法在将Worker线程移除线程池后也就完成了线程的回收工作,但由于执行该方法的原因很多,线程正常退出getTask
方法或者执行任务异常都会执行该方法,因此在该方法中需要额外完成两个工作。一是使线程池自适应当前状态,另一个是根据需要创建线程。
至此,processWorkerExit
执行完之后Worker线程被销毁,该线程的整个生命周期结束。下面对整个过程使用流程图的形式进行总结,流程图如下:
关闭线程池
调用线程池的 shutdown
或 shutdownNow
方法来关闭线程池,两者的原理有点差异,下面我们分别说明这两个方法。
shutdown
1 | public void shutdown() { |
shutdown() 方法可以安全地关闭一个线程池,体现在下面几个方面:
- 只是将线程池的状态置为 SHUTDOWN ,这意味着线程池不能接收新的任务,再有新的任务被提交则根据拒绝策略进行处理。
- 会执行完正在执行的任务和队列中等待的任务,任务全部结束后才会彻底关闭线程池。
- 尝试中断线程池中所有闲置的线程,便于尝试回收这些线程。
- 调用tryTerminate尝试终止线程池,用于将线程池的状态更新为 TERMINATED 。
shutdownNow
1 | public List<Runnable> shutdownNow() { |
shutdownNow() 方法表示立即关闭线程池,工作如下:
- 将线程池状态置为 STOP 状态,这意味着线程池接下来就会终止了,不仅不接收新的任务,而且也不会执行队列中等待的任务了,正在执行的任务可以执行完毕。
- 中断所有Worker线程,包括空闲和非空闲。
- 清空阻塞队列并返回等待执行的任务备份。
- 调用tryTerminate尝试终止线程池,用于将线程池的状态更新为 TERMINATED 。
tryTerminate()
对于 tryTerminate() 方法的调用,前文中的新增线程失败逻辑、线程退出while逻辑以及两种关闭线程池的方法都会调用了该方法,也就是说几乎每个线程最后消亡的时候都会调用tryTerminate() 方法,但最后只会有一个线程真正执行到终止线程池的地方。其中线程退出调用的意义重大:线程池的状态是内部自行维护的。下面我们来看看这个方法的具体逻辑。
1 | final void tryTerminate() { |
tryTerminate() 方法主要根据线程池状态判断是否终止线程池,下面进行简单总结:
- 判断线程池是否可以终止,原则是线程池处于关闭状态、队列中没有任务的情况下可以终止。
- interruptIdleWorkers()方法的执行表示线程池具备终止条件,向任意空闲线程发送中断信号防止
getTask
方法中存在核心线程执行workQueue.take()
时一直阻塞,导致线程无法回收。 - 符合终止线程池的条件时,先获取全局锁,然后先将线程池状态置为 TIDYING 状态,设置成功后会执行 terminated() 钩子方法,最后将线程池状态设置为 TERMINATED 状态,完成线程池状态更新后释放全局锁。
下面我们来简单分析一下interruptIdleWorkers
方法。
1 | +--- ThreadPoolExecutor |
前文也进行了说明,Worker 继承了AQS,在Worker线程处理任务时会申请Worker独占锁,interruptIdleWorkers
在进行中断时会使用 tryLock() 来判断该Worker线程是否正在处理任务,如果 tryLock() 返回true,说明该Worker线程处于空闲状态,可以被中断。
注意事项:
- 线程池中多处执行
tryTerminate
方法的目的是将符合条件的线程池终止,前文也提到线程池的状态是内部自行维护的,并非人为设置。如用户执行shutdown
和shutdownNow
方法只是将线程池的状态设置为 SHUTDOWN 和 STOP ,后续的 TIDYING 和 TERMINATED 状态的设置就在于此。 tryTerminate
方法中的interruptIdleWorkers(ONLY_ONE)
的作用是防止线程池在终止的过程中getTask
方法中存在执行workQueue.take()
阻塞的线程,因为此时线程池不允许再有新的任务添加到阻塞队列中,这样一来线程将一直阻塞下去,线程池永远都终止不了。
线程池中虽然多处使用中断来期望中断任务的执行,但由于 Java 中不推荐强行停止线程的机制的限制,因为强制的让一个线程被动的退出是很不安全的,内部的数据不一致会对程序造成不可预知的后果。即使调用了 shutdownNow 方法,如果被中断的线程对于中断信号不敏感,那么依然有可能导致任务不会停止。
线程池配置
线程池太大或太小都会导致麻烦,选择一个合适的线程池是非常有必要的。调整线程池中的数量是为了充分并合理地使用 CPU 和内存资源,从而最大限度地提高程序性能。通常我们需要根据任务执行的性质来选择对应的策略。
CPU 密集型任务
如果任务主要进行大量复杂的计算,例如加密、解密、压缩等,那么意味着 CPU 的处理能力是稀缺的资源,应当分配较少的线程,通常按照 CPU 核数 或者 CPU 核数 + 1 进行设置。 计算任务会占用大量的 CPU 资源,CPU 的每个核工作基本都是高负荷的,如果设置过多的线程,每个线程都会尝试抢占 CPU 资源,这就造成了不必要的上下文切换(CPU并没有太多空闲),性能反而由于线程数量过多导致性能下降。
IO 密集型任务
I/O 操作比较多的任务,如数据库操作、文件读写、网络通信等,一般不会消耗太多 CPU 资源,但是普遍需要较长时间的等待,对于这类任务可以配置适当多的线程,如 CPU 核数 * 2 。由于 IO 读写速度相比于 CPU 的速度是比较慢的,设置过少的线程数是不能充分利用 CPU 资源。
合适线程数
Brain Goetz 推荐的计算方法如下:
线程数 = CPU核数 × 目标CPU利用率 ×(1 + 平均等待时间/平均工作时间)
通过上面的公式可以大致计算出一个合理的线程数(核心线程数和最大线程数统称)。如果任务平均等待时间长则线程数就应该多,对应于 IO 密集型任务。如果平均工作时间长则线程数就应该少,对应于 CPU 密集型任务。
线程数太少可能会使得程序整体性能降低,线程数太多可能会消耗内存资源以及造成不必要的上下文切换。想用准确定制线程池需要做的工作很多,除了考虑线程数还可以合理使用线程池的阻塞队列实现任务的调度,还可以根据业务等纬度实现线程池隔离。
线程池监控
线程池提供了一些用于获取属性的方法,这些属性可以用来对线程池进行监控。
线程池还提供了一些用于设置核心属性的方法,使用方可以通过这些方法动态设置线程池的核心策略,线程池内部会处理好当前状态并做到平滑修改。
动态设置核心线程数
1 | +--- ThreadPoolExecutor |
动态设置最大线程数
1 | +--- ThreadPoolExecutor |
动态设置空闲时间
1 | +--- ThreadPoolExecutor |
允许核心线程超时回收
1 | +--- ThreadPoolExecutor |
小结
本篇文章对线程池核心点进行了详细分析,先是简单介绍了线程池产生的背景,接着说明了线程池的优势,最后对线程池源码进行了分析。从任务提交到线程池,到线程池创建线程并处理任务,到最后线程被回收,最后简单介绍了线程池的配置以及线程池的监控。