Java中的线程池详解

多线程、高并发是现在后端开发人员绕不去的一个坎。这篇博客打算就线程池进行深入地探讨。

前言

碰到一个新东西首先问自己三个问题:这东西是什么,它诞生的背景是啥以及它能解决什么问题(带来的好处)。

线程池,顾名思义,是基于池化思想诞生出来的用来管理线程的一个东西。

诞生的背景是因为现在几乎所有的程序都是多线程的,而线程虽然比较轻量级,但是线程在比较多的情况下,对服务器的压力还是比较大的,那么如果很多线程被频繁创建销毁,也是比较浪费资源的一种做法。于是就有了线程池这么一个概念,线程池本身用来维护多个线程,并且抽象出了对应的概念,让程序员可以专注在业务上,而任务的执行、线程的管理则由线程池进行负责。

个人认为的线程池的优点有如下几点:

  • 能够提升任务执行的速度:因为(大部分情况下)线程池是利用现成的线程去执行,比启动一个线程去执行任务省去了创建线程的过程,速度会有提升。
  • 能够避免线程无限创建导致系统崩溃
  • 能够有效降低资源的消耗,利用现有的线程去执行任务,可以有效降低线程的创建和销毁带来的资源消耗。

JDK线程池

JDK中当然也有对应的线程池的实现,即ThreadPoolExecutor类。

首先看到这个类,我第一个想到的问题是,为什么要带个Executor,而不能直接叫ThreadPool?其实看一眼继承关系应该就能理解了:

image-20210114101401067

线程池的体系结构

Executor

最原始的接口就是Executor,官方的文档中是这么说的:

This interface provides a way of decoupling task submission from the mechanics of how each task will be run, including details of thread use, scheduling, etc.

该接口提供了一种将任务提交与每个任务将如何运行的机制分离的方法,包括线程使用,调度等的详细信息。

也就是这个接口提供了任务的提交和执行两者的解耦。这个接口方法也比较简单,只有一个void execute(Runnable command);,也很好理解,这里就不多展开了。

ExecutorService

这个接口继承了上面的Executor,并且着手添加了不少的方法。总得来说,提供了方法用来终止线程池、提供方法用来生成Future对象来追踪一个或者多个异步的任务、提供了方法来执行一组任务。

ExecutorService可以被关闭,关闭了就会拒绝掉所有的新任务。ExecutorService提供了两种不同的关闭方法:

  • shutdown()能够让之前的任务继续执行完。
  • shutdownNow()不会让等待的线程执行,并且会尝试去关闭正在执行的线程。

以上的两个方法其实可以一起使用,首先使用shutdown()不接受新的任务,然后经过一段时间之后使用shutdownNow()

终止了之后,Executor就不会有正在执行的任务了,也不会有等待的任务,更不会有新的任务被提交了。如果一个ExecutorService没有被使用,那么应该关闭它用来回收资源。

submit()“继承”了父接口的Executor.execute(Runnable),它会返回一个Future对象,利用Future对象可以取消任务的执行,或者是等它执行完毕。方法invokeAnyinvokeAll执行一组任务,然后等待至少一个或全部完成。

AbstractExecutorService

这个抽象类简单来说就是提供了ExecutorService执行线程的默认实现。这个抽象类使用RunnableFuture实现了submit, invokeAnyinvokeAll这三个方法,而RunnableFuture则是通过newTaskFor这个方法获得的,而newTaskFor又是通过利用的FutureTask进行实现的(一环套一环),由于本文的关键并不是它,所以暂时跳过。

ThreadPoolExecutor

这个类本质上是一个ExecutorService,能够使用池子中一个或者多个线程来执行提交的任务,通常我们使用Executors的工厂方法来创建ThreadPoolExecutor。

线程池解决了两个不同的问题:

  • 由于减少了每个任务的开销,它们在执行大量的异步任务的时候具有良好的性能。
  • 提供了一种能够对在执行任务时消耗的资源(包括线程)进行绑定和管理的方法。

当然线程池也维护了一些基本的信息,像当前已经执行完成的任务数量等等。

为了能够广泛使用,该类在设计上提供了很多可调整的参数和可扩展的Hook。我们推荐程序员使用Executors类的工厂方法来创建线程池,如:

  • Executors.newCachedThreadPool(),创建一个无边界的线程池,线程可以自动回收。
  • Executors.newFixedThreadPool(int),具有固定数量线程的线程池
  • Executors.newSingleThreadExecutor(),单个线程的线程池。

如果你不想使用上面的预定义的线程池,而执意要自己创建的话,请遵守以下的规则:

  • 核心池大小(corePoolSize)和最大池大小(maximumPoolSize)。线程池是可以根据corePoolSize和maximumPoolSize调整自己的大小的(通过getPoolSize())。当一个任务通过execute方法提交的时候,如果当前正在运行的线程少于corePoolSize ,那么一个新的线程就会被创建,并执行对应的任务——就算其他的线程是空闲的,也会创建新线程。如果当前的线程已经多于corePoolSize ,但是少于maximumPoolSize,那么只有当前的队列已经满的情况下,线程才会被创建。如果你设置了corePoolSize==maximumPoolSize,那么就相当于设置了一个固定大小的线程池,如果把maximumPoolSize设置成一个非常大的数字,那么基本上就相当于允许池进行任意数量的并发任务。虽然它们俩是在构造的时候设置,但是也可以之后进行修改。
  • 按需构造。默认情况下,只有在新任务到达的时候才启动核心线程,但是可以通过方法进行改变。如果使用非空队列进行构造的话,你可能需要预启动线程。
  • 创造新的线程。线程池中线程的创建是通过线程工厂。如果没有自己指定一个,那么就使用Executors.defaultThreadFactory(),它会默认把所有的线程都属于一个线程组,所有线程都默认的优先级并且所有的线程都是非守护线程。如果你自己的线程工厂出了问题,比如返回了一个null,那么对应的线程池也会执行不了任务。线程应该具备“modifyThread”的运行时权限。如果工作线程或者其他线程没有这个权限,服务将会被降级:配置可能无法及时被更改、可能无法关闭线程池。注:权限这块没了解过,应该跟那个SecurityManager有关。
  • 保活时间。如果当前的线程池有超过corePoolSize个线程,那么这些超过的线程会在keepAliveTime之后终止。显然这是为了减少资源的消耗。如果之后又变得活跃了,那么新的线程就会被创建。当前这个时间也可以被动态调整。当然也可以设置一个超大的时间,让线程不会终止。甚至allowCoreThreadTimeOut(true)可以让核心线程也受到该参数的影响。
  • 队列。任何的BlockingQueue都可以被用来作为任务的暂存地,队列和池大小进行交互:
    • 如果当前执行任务的线程数少于corePoolSize ,那么线程池一定会新建线程,而不去理会队列。
    • 如果执行任务的线程数大于等于corePoolSize,那么线程池优先把任务放到queue里面。
    • 如果队列也放不下了,那么看能不能在不超过maximumPoolSize的前提下创建线程来做任务。如果已经到了maximumPoolSize,那么就拒绝。
  • 有以下三种排队策略:
    • 直接交接。queue一个很好的默认选择是SynchronousQueue,它直接把任务交给线程,而不会保留任务。如果没有线程可以运行任务,那么尝试暂存任务是会导致失败的。这个queue在处理那种可能存在内部依赖的任务组合的时候,可以避免死锁的发生。使用这个等待队列的话,通常建议设置maximumPoolSize为超大的一个数,但是这么做通常也会有一个隐患,就是当任务到达的速度太快的时候,会导致线程无限增长。
    • 无界队列。使用无界队列(如不指定参数的LinkedBlockingQueue)的话,在所有核心线程都在执行任务的时候,新的任务就需要等待了。并且由于队列是无限的,所以maximumPoolSize就完全失去了作用。这对于那些彼此之间相互独立的线程来说非常有用,比如web服务器。尽管这种排队方式对于那种短期的突发线程非常有用,但是如果一直来任务,那么队列也可能爆满。
    • 有界队列。使用有界队列(如ArrayBlockingQueue)能够有效防止资源被耗尽的风险,但是调整和控制会更加困难,在这里,如何抉择线程池的最大大小和队列的大小就成了一个trade off问题:使用大队列和小的线程池可以减少CPU使用率、OS的资源和上下文切换,但是吞吐量就下来了,比如任务频繁阻塞(读取IO等);反之的话,会让CPU繁忙,但是可能会遇到不可接受的调度开销(极端点就是把时间全用在上下文切换上了),这同样会降低吞吐量。
  • 拒绝任务。如果线程池达到最大的线程数,且队列也满了;或者是线程池当前已经是关闭状态,那么在这两个情况下对于提交的任务就会采取拒绝的策略。无论是哪种情况,调用execute方法都会有以下四个拒绝策略:
    • AbortPolicy,默认策略。handler抛出一个运行时的异常——RejectedExecutionException。
    • CallerRunsPolicy,谁调用了execute方法,那么就让那个线程自己执行这个任务。虽然这个机制很简单,但是它将降低新任务的提交速度。
    • DiscardPolicy,简单丢弃。
    • DiscardOldestPolicy,如果线程池此时还在运行状态中,那么队列中的头部的那个任务会被丢弃,并且重新尝试这个步骤,注意,重新尝试仍然可能会重复这一步骤。
    • 当然也可以自己定义。
  • hook方法。此类提供了beforeExecute和afterExecute方法,显然是在任务执行前和执行后被调用的。这些方法可以用来初始化环境,或者做一些统计的工作等等。如果这些hook方法出现异常,那么内部的工作线程可能会失败。
  • 队列维护。方法getQueue()可以访问工作队列,能够进行监视和调试。强烈建议不要将这个方法用于别的目的。remove(Runnable)和purge()方法能够在取消大量排队任务的时候进行回收。
  • 结束。如果线程池没有任何的引用,并且里面没有任何的线程,那么就会自动关闭。这个的前提就是corePoolSize必须设置为0,并且设置合适的过期时间。

这里首先给出一个整体的流程,方便有一个印象。

用户的任务到达之后,线程池有两种策略,一种是直接拒绝执行,另外一种是接受。接受了之后也有两种策略,一种是目前任务较多,那么就暂时放到一个缓冲的队列中;另外一种则是直接分配线程执行任务。

我们所说的线程池,其实除了真正的线程池以外,还包含了一个任务管理分配器,两者合在一起,才是ThreadPoolExecutor真正的功能。

线程池的生命周期

线程池里面主要有两个control state,而这两个控制状态是由一个原子整数(AtomicInteger)包裹在一起的,分别是有效的线程数量(workerCount)和当前的运行状态(runState)。

因为int是32位的,所以把其中的29位分给了线程数量,即线程池中最大的线程数量是229-1个,大约是5亿左右,在可见的未来不用担心会有问题。

workerCount的值并不是活着的线程的值,举个例子,如果ThreadFactory在创建线程的时候失败了就会导致两者不同。用户可见的线程池大小指的是,当前工作集的大小。

线程池的生命周期有以下状态:

  • RUNNING:接受新的任务,并且执行队列中的任务。
  • SHUTDOWN:不接受新的任务,但是执行队列中的任务。
  • STOP:不接受新的任务、不执行队列中的任务,并且终止正在执行的任务。
  • TIDYING(中文整理的意思):所有任务终止,workerCount为0,线程会执行terminated()的hook方法。
  • TERMINATED:完成terminated()方法。

上面这五种状态分别对应了五种数字,且这五个数字依次递增,它们之间的相互转化如图所示:

注意都是单向流动的。从SHUTDOWN状态到TIDYING状态并没有想象中那么简单。

线程池的执行过程

我们直接从提交一个任务,也即是execute方法开始看看一个任务是怎么执行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void execute(Runnable command) {
// 1. null判断
if (command == null)
throw new NullPointerException();

int c = ctl.get();
// 2. 判断是否需要增加线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}

// 3. 如果现在不需要增加线程,即目前的worker已经有corePoolSize的数量,那么判断状态+插入队列中
if (isRunning(c) && workQueue.offer(command)) {
// 重新检查一遍,因为可能现在已经变了
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
// 当前没有线程,那么就启动一个
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 到这里就是我们没办法往queue里面放入任务,那就尝试一下直接执行任务
else if (!addWorker(command, false))
// 失败的话就只能调用失败的策略了
reject(command);
}

首先进来进行非空判断,很简单,跳过。

然后第一步是对当前线程数和corePoolSize进行比较,显然如果没到corePoolSize,就调用addWorker方法。

在详解这个方法以前,我们首先来看看Worker它究竟是个什么东西。

Worker

ThreadPoolExecutor一共有5个内部类,其中的四个对应了四种拒绝策略,还有一个就是Worker这个类了。

这个类主要维护线程运行任务的中断控制状态,以及其他的一些不太重要的信息。它继承了AbstractQueuedSynchronizer以获得简单的锁的功能。通过锁的机制,可以防止旨在唤醒工作线程等待任务的中断,而不是中断正在运行的任务。(上面那句是真的翻译不来…)为什么没有使用ReentrantLock这个可重入的锁,而是自己实现了一个非可重入的锁,是因为不想在setCorePoolSize()等方法获取锁。此外,为了防止在任务开始之前中断,这个类的初始状态是负数,并在启动的时候会清除这个负数。这个在之后的部分会详解。

worker本身一共有三个重要的属性:

1
2
3
4
5
6
/** Thread this worker is running in.  Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;

从上面可以看到,worker本身实际上是对线程和所要执行第一个任务的一个封装。 如果firstTask是空,那么就直接通过调用getTask()方法来从workQueue里面获取任务执行;如果不是空,那么就先把任务执行掉,然后再执行getTask()方法。

addWorker()

首先先来翻译下,在这个函数上面的官方文档。

根据当前的线程池状态和给定的core和max线程大小,判断当前的worker是否添加。如果可以添加,那么就调整worker count,并且创建一个worker,执行firstTask作为其第一个任务。如果线程池已经关闭了,那么该方法就会返回false。如果线程工厂创建线程失败,也返回false。如果线程创建失败,我们可以进行干净地回滚。

firstTask新线程应首先运行的任务,当阻塞队列满的时候,可以绕过队列

core如果是true,则使用corePoolSize作为边界,否则使用maximumPoolSize。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
retry:
for (;;) {
// 获取整数,通过这个整数来获取线程池的状态
int c = ctl.get();
int rs = runStateOf(c);

// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;

for (;;) {
// 获取当前的worker数量
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
// 如果成功增加了worker数量
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}

========首先经过上面的逻辑 能过确保各种限制条件通过且原子增加了worker的数量============

之后的逻辑就是获取了一个锁,并且进行一系列判断,通过的话就把work加到works(一个hashset)里面。加完之后就启动worker里面的thread,也就是调用其start方法,这样线程就启动了,addWorker基本就这样结束了。

阻塞队列BlockingQueue

阻塞队列也是理解线程池执行过程的关键。因为线程池本质上就管理两个部分,一个是用户提交的任务,另外一个就是线程,这两者之间是通过阻塞队列这个数据结构来发生关系的:用户把任务放到阻塞队列中,线程池分配线程来完成这个任务。一个非常典型的生产者消费者模型。

BlockingQueue最最基本的功能是,当队列已经没有元素了,但是仍然试图从队列中获取的时候会被阻塞(或者其他操作,见下);同理当队列中已经放满了元素,但是仍然试图放入元素的时候也会阻塞(或者其他操作,见下)。

BlockingQueue的方法有四种表现形式,具体来说就是它们以不同的方式进行处理的操作:

  1. 抛出异常
  2. 返回特殊值(null或false等,具体视情况而定)
  3. 无限期地阻塞当前线程,直到操作成功为止
  4. 仅在给定的最大时间限制内放弃。
Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Remove remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

BlockingQueue并不支持null作为元素,因为是用它来指示结果的。并且它可能会受容量限制,当然也可能不受容量控制。

BlockingQueue其实也实现了Collection接口,所以可以用remove来删除元素,但是不是很推荐使用。

BlockingQueue是线程安全的,所有的方法都在其内部使用了相对应的锁或者其他形式来保证并发安全,但是addAll,containsAll,retainAll和removeAll这些批量操作则不一定保证线程安全。

BlockingQueue本身只是一个队列,所以并不能被“关闭”,但是在实际过程中,可以往队列中放入一个特殊的对象,来指示已经结束了。当消费者取到这个对象的时候就会知道这是最后一个。

阻塞队列具体实现

实现的话多种多样,我这里挑选了最简单的一个基于数组实现的来讲一讲吧。最核心的部分就是内部有一把锁,并且该锁带了两个条件:

1
2
3
4
5
6
7
8
/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

然后是offer()的逻辑:

1
2
3
4
5
6
7
8
9
10
11
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try{
...
把元素放入到队列中
notEmpty.signal();
...
}finally{
lock.unlock();
}

然后是poll()的逻辑:

1
2
3
4
5
6
7
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : 取出元素并且 notFull.signal();
} finally {
lock.unlock();
}

如果你对ReentrantLock和其背后的Condition很了解,那么这个阻塞队列也是相当相当简单的。

线程池的回收过程

之前提到过,worker故意没有使用ReentrantLock,而是自己去继承了AQS,其主要目的就是为了实现一个不可重入的锁。为什么要这么设计?就是为了能够在线程进行回收的时候,好判断其状态。

如果目前的线程获取到了锁在执行任务,由于其不可重入性,继续调用其tryLock()将无法获得锁;反之如果线程是空闲的,那么就可以通过tryLock()来获取锁,获得之后就可以回收线程池里面的空闲线程了。