Java并发编程第十篇(ThreadPoolExecutor线程池组件分析)
Java并发系列: JUC.ThreadPoolExecutor
- 一,JUC.ThreadPoolExecutor概况
- 1.1 线程池的意义
- 1.2 继承关系
- 二,源码解析
- 2.1 静态变量
- 2.2 属性
- 2.3 内部类 Worker
- 三,方法调用
- 3.1 runWorker
- 3.2 getTask
- 3.3 execute
- 3.4 addWorker
一,JUC.ThreadPoolExecutor概况
ThreadPoolExecutor作为开发中最常用的线程池, 也作为面试中被问到的最高频的并发组件之一, 我们有必要来聊聊它的作用以及内部构造。
1.1 线程池的意义
在讲解线程池之前, 有些读者可能存在这样的疑惑: 为什么需要线程池, 线程池有什么优越性?
关于这个问题, 主要从两个角度来进行解答:
-
减少开销
在大部分JVM上, 用户线程与操作系统内核线程是1:1的关系, 也就是说每次创建回收线程都要进行内核调用, 开销较大。那么有了线程池, 就可以重复使用线程资源, 大幅降低创建和回收的频率。此外, 也能一定程度上避免有人在写BUG时, 大量创建线程导致资源耗尽。 -
便于管理
线程池可以帮你维护线程ID、线程状态等信息, 也可以帮你统计任务执行状态等信息。
理解了线程池的意义, 那么本文的主角便是JUC提供的线程池组件: ThreadPoolExecutor.
请注意, 有人会将JUC中的ThreadPoolExecutor与Spring Framework中的ThreadPoolTaskExecutor混淆。这是两个不同的组件, ThreadPoolTaskExecutor可以理解为对ThreadPoolExecutor做的一层封装, 主要就是为了支持线程池的Bean化, 将其交给Spring Context来管理, 防止滥用线程池。而内部的核心逻辑还是由ThreadPoolExecutor处理。关于这一点, 简单了解即可。
从宏观上看, 开发者将任务提交给ThreadPoolExecutor, ThreadPoolExecutor分配工作线程(Worker)来执行任务, 任务完成后, 工作线程回到ThreadPoolExecutor, 等待后续任务。
根据这段描述, 产生了三个比较值得探究的问题:
- ThreadPoolExecutor自身有哪些状态, 如何维护这些状态?
- ThreadPoolExecutor如何维护内部的工作线程?
- ThreadPoolExecutor处理任务的整体逻辑是什么样的?
源码之前, 了无秘密。在读源码的过程中, 脑海中带着这三个问题。
1.2 继承关系
ThreadPoolExecutor继承了AbstractExecutorService, AbstractExecutorService实现了ExecutorService接口。ExecutorService接口继承了Executor接口, 整体呈现出了这样的关系:
ThreadPoolExecutor → AbstractExecutorService → ExecutorService → Executor
从上往下来看:
public interface Executor {void execute(Runnable command);
}
Executor 接口中只声明了一个execute方法, 用于执行提交的任务。
ExecutorService扩展了Executor的语义, 增加了多种多样的操作。
public interface ExecutorService extends Executor {void shutdown();List<Runnable> shutdownNow();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;<T> Future<T> submit(Callable<T> task);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}
而AbstractExecutorService则是对ExecutorService中声明的方法进行默认实现, 方便子类进行调用。比如ThreadPoolExecutor就直接使用了AbstractExecutorService的submit方法。 AbstractExecutorService也是一个比较核心的类, 但它不是本文的重点, 所以不会详细讲解。
二,源码解析
2.1 静态变量
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
这几个变量很关键, 在注释中也已经有了比较详细的解释。我这里就以更直白的方式加以介绍, 顺便帮你温习一些计算机基础知识。
先看下半部分的五个变量, 从命名上可以判定这些值代表了ThreadPoolExecutor的状态。
这些状态值涉及到了二进制移位操作, 我们知道int类型在Java中的二进制表示是以补码存储的。-1的二进制表示是32个1的序列, COUNT_BITS值是常数, 为32-3=29。因此RUNNING的二进制表示是高三位为111, 低29位都为0的序列。
我们用同样的方式表示出其余四个状态:
- RUNNING:
11100000 00000000 00000000 00000000
- SHUTDOWN:
00000000 00000000 00000000 00000000
- STOP:
00100000 00000000 00000000 00000000
- TIDYING:
01000000 00000000 00000000 00000000
- TERMINATED:
01100000 00000000 00000000 00000000
不难发现, 这五个状态可以理解为目前只用到了高三位, 这是因为ThreadPoolExecutor只用一个int变量来同时保存线程池状态以及工作线程数这两个信息, 线程状态使用高三位, 工作线程数使用低29位。CAPACITY这个变量就表示为工作线程的最大数量 (2^29 - 1
)。
这种将两种状态存储在一个二进制序列中的做法, 在业务代码中相对比较少见, 在底层源码中很常见。比如ReentrantReadWriteLock中, 用一个int来组合表示读锁和写锁的个数, 比如在ZooKeeper中, 用一个long来组合表示epoch和事务个数。
这几种状态的含义是:
- RUNNING: 接受新任务, 也能处理阻塞队列里的任务。
- SHUTDOWN: 不接受新任务, 但是处理阻塞队列里的任务。
- STOP: 不接受新任务, 不处理阻塞队列里的任务, 中断处理过程中的任务。
- TIDYING: 当所有的任务都执行完了, 当前线程池已经没有工作线程, 这时线程池将会转换为TIDYING状态, 并且将要调用
terminated
方法。 - TERMINATED:
terminated
方法调用完成。
这几个状态之间的变化如图所示:
2.2 属性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 相关方法
private static int ctlOf(int rs, int wc) { return rs | wc; }
首先着重介绍的是AtomicInteger类型的属性ctl
。
ctl
就是上文所说的, 组合了线程池状态以及池中工作线程数两个信息的变量。它初始化时调用了ctlOf
方法, 可以看到ctlOf
只是一个或操作。这就说明, 线程池在初始化时, 状态被标记为RUNNING, 工作线程数为0。
读到这里, 有一些读者可能会存在疑惑: 为啥非要用一个int值来组合表示两种状态? 用两个值表示, 清清楚楚不行吗?
可以, 当然可以。但使用一个变量的好处是: 如果需要对两个状态值进行同步修改, 直接通过位操作就可以了, 省去了加锁操作。因为在操作系统级别, 对int的修改本身就是原子的。
再来看看其他属性, 属性往往会透露出这个类是如何组织的。
private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock();private final HashSet<Worker> workers = new HashSet<Worker>();private final Condition termination = mainLock.newCondition();private int largestPoolSize;private long completedTaskCount;private volatile ThreadFactory threadFactory;private volatile RejectedExecutionHandler handler;private volatile long keepAliveTime;private volatile boolean allowCoreThreadTimeOut;private volatile int corePoolSize;private volatile int maximumPoolSize;
一个个属性来看:
- workQueue:
BlockingQueue<Runnable>
类型, 用来存储积压任务的阻塞队列。Callable类型的任务会被父类AbstractExecutorService
转化为FutureTask
(Runnable
的子类)。 - mainLock:
ReentrantLock
类型, 对线程池的一些操作需要状态同步, 所以需要用到锁。 - workers:
HashSet<Worker>
类型,Worker
是对工作线程以及一些状态的封装,workers
是用来存储所有Worker
的集合。 - termination: 由
mainLock
创建的Condition
, 用于awaitTermination
调用时的线程同步。 - largestPoolSize: 线程池中最多有过多少个活跃线程。
- completedTaskCount: 线程池总共处理了多少任务。
- threadFactory:
ThreadFactory
接口, 用户可以自定义创建工作线程的工厂。 - handler: 拒绝策略, 当
workQueue
满载时将会触发。 - keepAliveTime: 工作线程空闲时则保持存活的时间。
- allowCoreThreadTimeOut: 布尔类型, 是否需要保持核心线程始终处于存活。
- corePoolSize: 核心线程数。当阻塞队列还未满载时, 线程池将保持核心线程数。
- maximumPoolSize: 最大线程数。当阻塞队列满载时, 线程池将会在核心线程数的基础上创建新线程来处理任务, 直到最大线程数。
2.3 内部类 Worker
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
{private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this);}protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock() { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock() { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}
}
继承关系
- 继承AQS: 说明
Worker
内部存在同步需求。当某个Worker
从workQueue
中获取一个任务后, 便持有锁, 直到将任务在当前线程内执行完后, 再释放锁。这里加锁的作用是表示Worker
是否处于工作中, 不接收中断信号。 - 实现Runnable:
Worker
本身就是一个异步的任务调度者。
在构造函数中, 将AQS的state
初始化为-1, 是为了在初始化期间不接受中断信号, 直到runWorker
方法开始运行, state
将会被修改为0, 此时相当于锁被释放的状态, 可以接受中断信号。这部分逻辑可以从interruptIfStarted
方法中理解。
三,方法调用
3.1 runWorker
Worker
中最主要的run
方法, 也就是调用runWorker
方法。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}
- 若当前线程从
workQueue
中获取任务, 首先加锁, 标记当前工作线程正在执行任务, 不接收中断信号。 - 接下来判断线程池状态, 若线程池状态
>= STOP
, 则对当前线程调用中断。 - 任务处理有两个细节:
- 调用了task的
run
方法而不是start
方法, 表示依然在当前线程中处理, 而非新启线程。 - 在
task.run()
方法的前后, 有beforeExecute
和afterExecute
这两个钩子方法。
- 调用了task的
- 若
getTask
方法返回了null
, 那么将会跳出循环, 调用processWorkerExit
方法来对Worker
进行回收。
3.2 getTask
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}
- 5-12行: 获取线程池状态, 若是
STOP
,TIDYING
,TERMINATED
, 或是SHUTDOWN
且工作队列为空, 那么返回null
, 代表当前worker可以回收。 - 14-24行: 如果当前工作线程数超过了最大线程数, 或达到了核心线程数的回收条件, 则开始尝试回收当前worker。
- 26-35行: 从阻塞队列里获取任务。
timed
为true
时使用poll
超时获取, 否则使用take
阻塞等待。poll
超时也会导致新一轮的回收判断。
getTask
方法是线程池动态维护工作线程的核心。
3.3 execute
最重要的execute
方法。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (!isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
}
这段逻辑可以用一张流程图辅助理解。
简单来说: 当向线程池提交一个任务, 如果当前线程数小于核心线程数, 那么就新增worker。如果新增失败, 则进入下面的流程。向阻塞队列offer一个任务, 如果阻塞队列已满, 那么继续尝试创建worker(奔着最大线程数去), 如果已经达到了最大线程数, 那么触发拒绝策略。而如果成功提交到了阻塞队列, 这时再判断线程池的状态, 如果处于非RUNNING状态, 那么尝试移除任务, 如果成功移除该任务, 就触发拒绝策略。
3.4 addWorker
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}
- 外层
retry
循环和内层循环通过CAS操作compareAndIncrementWorkerCount
保证了只有一个线程能成功增加worker计数并跳出循环去创建Worker, 这保证了worker数量不会超限。 - 既然CAS已经保证了线程安全, 为什么还要获取
mainLock
呢? 这是为了同步对workers
这个HashSet
的操作, 因为HashSet
本身不是线程安全的, 需要防止在add
时其他地方发生remove
操作。