并发 - 线程池工具类

前言

上一篇文章 线程池 对线程池的原理进行了说明,并对线程池的源码进行了深入分析,本篇文章对线程池工具类 Executors 进行分析,需要说明的是 Executors 中除了并行计算的 WorkStealingPool 线程池,其它的都是直接基于 ThreadPoolExecutor 来实现的。本篇文章主要说明 Executors 基于 ThreadPoolExecutor 创建的线程池。

FixedThreadPool

FixedThreadPool 属于固定线程数的线程池,使用 Executors.newFixedThreadPool() 方法创建。

构造方法

  • 指定线程数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    /**
    * 创建固定数量的线程池
    *
    * @param nThreads 核心线程数 = 最大线程数
    * @return
    */
    public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>());
    }
  • 指定线程数和线程工厂
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * 创建固定数量的线程池
    *
    * @param nThreads 核心线程数 = 最大线程数
    * @param threadFactory 线程工厂
    * @return
    */
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(nThreads, nThreads,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(),
    threadFactory);
    }

特点

核心线程数和最大线程数一致,并且使用的任务队列为无界队列。线程池中的线程数随着任务的提交会从 0 增加到核心线程数 nThreads,完成预热之后,线程池中的线程数将会保持 nThreads,之后的任务提交一律放入任务队列中,由空闲的核心线程从队列取出并执行。如果有工作线程退出「一般是执行任务异常退出」,线程池将会创建新的工作线程以补足执行的数目 nThreads 。此外,由于使用的是无界队列,隐藏的默认拒绝策略是无效的,并且默认情况下线程池是不会回收核心线程数内的线程,keepAliveTime 同样是个无效参数。

运行示意图

适用场景

适用于为了满足资源管理的需求,而需要限制线程数量的应用场景。

存在问题

由于使用了无界的任务队列,当大量的任务提交到线程池,可能会造成任务堆积,而线程池的拒绝策略又处于失效状态,从而导致 OOM 。

SingleThreadExecutor

SingleThreadExecutor 属于固定线程数的线程池,使用 Executors.newSingleThreadExecutor() 方法创建。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 创建单个线程的线程池
*
* @return
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

/**
* 创建单个线程的线程池
*
* @param threadFactory 线程工厂
* @return
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

特点

SingleThreadExecutor 和 FixedThreadPool 基本一致,区别在于前者的核心线程数和最大线程数固定为 1 ,并且是一个包装 ThreadPoolExecutor 的线程池,支持调用 finalize() 方法通知垃圾收集器时关闭线程池。

运行示意图

适用场景

保证了所有任务都是被顺序执行,任意时间点最多会有一个任务处于活动状态。

存在问题

和 FixedThreadPool 是一样的问题,使用了无界的任务队列,当大量的任务提交到线程池,可能会造成任务堆积,而线程池的拒绝策略又处于失效状态,从而导致 OOM 。

CachedThreadPool

CachedThreadPool 属于缓冲线程池,会根据需要创建新线程。使用 Executors.newCachedThreadPool() 方法创建。

构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}


public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

特点

核心线程数为 0 ,最大线程数为 Integer.MAX_VALUE,可以认为是无界的。使用的任务队列是没有容量的 SynchronousQueue ,即线程池使用这个队列意味着每次都要创建新的线程来处理任务。 keepAliveTime 被设置为 60L,单位为 TimeUnit.SECONDS ,意味着 CachedThreadPool 中的空闲线程等待任务的最大时长为 60s 。

CachedThreadPool 总体上有以下几个特点:

  1. 无核心线程数,且最大的线程数是 Integer.MAX_VALUE
  2. 任务队列并不会存储任务,如果有空闲线程则队列会把任务交给空闲线程执行,如果没有空闲线程则迫使线程池尝试创建一个新的线程执行任务。这个特点是任务队列 SynchronousQueue 提供的。
  3. 由于 keepAliveTime 被设置为 60L,因此会在该时间内缓存线程,被缓存的线程会等待 SynchronousQueue 队列中的任务。
  4. 线程池长时间闲置得话也不会消耗什么资源,因为线程池中的线程都是可回收的,和固定线程数的线程池不同,核心线程会不断轮询任务(不开启回收核心线程)。

SynchronousQueue

一个比较特殊的阻塞队列,其本身不存储元素。每个插入操作必须等待另一个线程执行移除操作,反之亦然。如果使用该阻塞队列,只有当两个线程执行相反模式的操作才能配对成功,否则先执行的一方只能等待。下图是对线程池使用该队列处理任务过程的描述:

运行示意图

适用场景

处理大量短时间任务,或者负载较轻的服务器。

存在问题

由于允许创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程最终超过了操作系统的上限而无法创建新线程,容易导致 CPU 飙升和 OOM。

Executors 创建线程问题

Executors 工具类创建的线程池都会存在一定的风险,相比较而言手动创建线程池更加合理,因为可以根据不同的场景对线程池进行定制,来提升程序的性能和减少资源消耗。

小结

无论是使用 Executors 工具类还是定制线程池,都应该避免任务大量堆积,否则可能出现 OOM ;还应该避免过度创建新线程,否则可能由于创建大量线程导致系统崩溃。