当前位置: 首页 > news >正文

Java并发编程第十篇(ThreadPoolExecutor线程池组件分析)

Java并发系列: JUC.ThreadPoolExecutor

  • 一,JUC.ThreadPoolExecutor概况
    • 1.1 线程池的意义
    • 1.2 继承关系
  • 二,源码解析
    • 2.1 静态变量
    • 2.2 属性
    • 2.3 内部类 Worker
  • 三,方法调用
    • 3.1 runWorker
    • 3.2 getTask
    • 3.3 execute
    • 3.4 addWorker

一,JUC.ThreadPoolExecutor概况

ThreadPoolExecutor作为开发中最常用的线程池, 也作为面试中被问到的最高频的并发组件之一, 我们有必要来聊聊它的作用以及内部构造。

1.1 线程池的意义

在讲解线程池之前, 有些读者可能存在这样的疑惑: 为什么需要线程池, 线程池有什么优越性?

关于这个问题, 主要从两个角度来进行解答:

  • 减少开销
    在大部分JVM上, 用户线程与操作系统内核线程是1:1的关系, 也就是说每次创建回收线程都要进行内核调用, 开销较大。那么有了线程池, 就可以重复使用线程资源, 大幅降低创建和回收的频率。此外, 也能一定程度上避免有人在写BUG时, 大量创建线程导致资源耗尽。

  • 便于管理
    线程池可以帮你维护线程ID、线程状态等信息, 也可以帮你统计任务执行状态等信息。

理解了线程池的意义, 那么本文的主角便是JUC提供的线程池组件: ThreadPoolExecutor.

请注意, 有人会将JUC中的ThreadPoolExecutor与Spring Framework中的ThreadPoolTaskExecutor混淆。这是两个不同的组件, ThreadPoolTaskExecutor可以理解为对ThreadPoolExecutor做的一层封装, 主要就是为了支持线程池的Bean化, 将其交给Spring Context来管理, 防止滥用线程池。而内部的核心逻辑还是由ThreadPoolExecutor处理。关于这一点, 简单了解即可。

从宏观上看, 开发者将任务提交给ThreadPoolExecutor, ThreadPoolExecutor分配工作线程(Worker)来执行任务, 任务完成后, 工作线程回到ThreadPoolExecutor, 等待后续任务。

根据这段描述, 产生了三个比较值得探究的问题:

  1. ThreadPoolExecutor自身有哪些状态, 如何维护这些状态?
  2. ThreadPoolExecutor如何维护内部的工作线程?
  3. ThreadPoolExecutor处理任务的整体逻辑是什么样的?

源码之前, 了无秘密。在读源码的过程中, 脑海中带着这三个问题。

1.2 继承关系

ThreadPoolExecutor继承了AbstractExecutorService, AbstractExecutorService实现了ExecutorService接口。ExecutorService接口继承了Executor接口, 整体呈现出了这样的关系:

ThreadPoolExecutor → AbstractExecutorService → ExecutorService → Executor

从上往下来看:

public interface Executor {void execute(Runnable command);
}

Executor 接口中只声明了一个execute方法, 用于执行提交的任务。

ExecutorService扩展了Executor的语义, 增加了多种多样的操作。

public interface ExecutorService extends Executor {void shutdown();List<Runnable> shutdownNow();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;<T> Future<T> submit(Callable<T> task);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

而AbstractExecutorService则是对ExecutorService中声明的方法进行默认实现, 方便子类进行调用。比如ThreadPoolExecutor就直接使用了AbstractExecutorService的submit方法。 AbstractExecutorService也是一个比较核心的类, 但它不是本文的重点, 所以不会详细讲解。

二,源码解析

2.1 静态变量

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

这几个变量很关键, 在注释中也已经有了比较详细的解释。我这里就以更直白的方式加以介绍, 顺便帮你温习一些计算机基础知识。

先看下半部分的五个变量, 从命名上可以判定这些值代表了ThreadPoolExecutor的状态。

这些状态值涉及到了二进制移位操作, 我们知道int类型在Java中的二进制表示是以补码存储的。-1的二进制表示是32个1的序列, COUNT_BITS值是常数, 为32-3=29。因此RUNNING的二进制表示是高三位为111, 低29位都为0的序列。

我们用同样的方式表示出其余四个状态:

  • RUNNING: 11100000 00000000 00000000 00000000
  • SHUTDOWN: 00000000 00000000 00000000 00000000
  • STOP: 00100000 00000000 00000000 00000000
  • TIDYING: 01000000 00000000 00000000 00000000
  • TERMINATED: 01100000 00000000 00000000 00000000

不难发现, 这五个状态可以理解为目前只用到了高三位, 这是因为ThreadPoolExecutor只用一个int变量来同时保存线程池状态以及工作线程数这两个信息, 线程状态使用高三位, 工作线程数使用低29位。CAPACITY这个变量就表示为工作线程的最大数量 (2^29 - 1)。

这种将两种状态存储在一个二进制序列中的做法, 在业务代码中相对比较少见, 在底层源码中很常见。比如ReentrantReadWriteLock中, 用一个int来组合表示读锁和写锁的个数, 比如在ZooKeeper中, 用一个long来组合表示epoch和事务个数。

这几种状态的含义是:

  • RUNNING: 接受新任务, 也能处理阻塞队列里的任务。
  • SHUTDOWN: 不接受新任务, 但是处理阻塞队列里的任务。
  • STOP: 不接受新任务, 不处理阻塞队列里的任务, 中断处理过程中的任务。
  • TIDYING: 当所有的任务都执行完了, 当前线程池已经没有工作线程, 这时线程池将会转换为TIDYING状态, 并且将要调用terminated方法。
  • TERMINATED: terminated方法调用完成。

这几个状态之间的变化如图所示:

在这里插入图片描述

2.2 属性

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 相关方法
private static int ctlOf(int rs, int wc) { return rs | wc; }

首先着重介绍的是AtomicInteger类型的属性ctl

ctl就是上文所说的, 组合了线程池状态以及池中工作线程数两个信息的变量。它初始化时调用了ctlOf方法, 可以看到ctlOf只是一个或操作。这就说明, 线程池在初始化时, 状态被标记为RUNNING, 工作线程数为0。

读到这里, 有一些读者可能会存在疑惑: 为啥非要用一个int值来组合表示两种状态? 用两个值表示, 清清楚楚不行吗?

可以, 当然可以。但使用一个变量的好处是: 如果需要对两个状态值进行同步修改, 直接通过位操作就可以了, 省去了加锁操作。因为在操作系统级别, 对int的修改本身就是原子的。

再来看看其他属性, 属性往往会透露出这个类是如何组织的。

private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock = new ReentrantLock();private final HashSet<Worker> workers = new HashSet<Worker>();private final Condition termination = mainLock.newCondition();private int largestPoolSize;private long completedTaskCount;private volatile ThreadFactory threadFactory;private volatile RejectedExecutionHandler handler;private volatile long keepAliveTime;private volatile boolean allowCoreThreadTimeOut;private volatile int corePoolSize;private volatile int maximumPoolSize;

一个个属性来看:

  • workQueue: BlockingQueue<Runnable>类型, 用来存储积压任务的阻塞队列。Callable类型的任务会被父类AbstractExecutorService转化为FutureTaskRunnable的子类)。
  • mainLock: ReentrantLock类型, 对线程池的一些操作需要状态同步, 所以需要用到锁。
  • workers: HashSet<Worker>类型, Worker是对工作线程以及一些状态的封装, workers是用来存储所有Worker的集合。
  • termination: 由mainLock创建的Condition, 用于awaitTermination调用时的线程同步。
  • largestPoolSize: 线程池中最多有过多少个活跃线程。
  • completedTaskCount: 线程池总共处理了多少任务。
  • threadFactory: ThreadFactory接口, 用户可以自定义创建工作线程的工厂。
  • handler: 拒绝策略, 当workQueue满载时将会触发。
  • keepAliveTime: 工作线程空闲时则保持存活的时间。
  • allowCoreThreadTimeOut: 布尔类型, 是否需要保持核心线程始终处于存活。
  • corePoolSize: 核心线程数。当阻塞队列还未满载时, 线程池将保持核心线程数。
  • maximumPoolSize: 最大线程数。当阻塞队列满载时, 线程池将会在核心线程数的基础上创建新线程来处理任务, 直到最大线程数。

2.3 内部类 Worker

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
{private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in. Null if factory fails. */final Thread thread;/** Initial task to run. Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this);}protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock()      { acquire(1); }public boolean tryLock() { return tryAcquire(1); }public void unlock()    { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}}
}

继承关系

  • 继承AQS: 说明Worker内部存在同步需求。当某个WorkerworkQueue中获取一个任务后, 便持有锁, 直到将任务在当前线程内执行完后, 再释放锁。这里加锁的作用是表示Worker是否处于工作中, 不接收中断信号。
  • 实现Runnable: Worker本身就是一个异步的任务调度者。

在构造函数中, 将AQS的state初始化为-1, 是为了在初始化期间不接受中断信号, 直到runWorker方法开始运行, state将会被修改为0, 此时相当于锁被释放的状态, 可以接受中断信号。这部分逻辑可以从interruptIfStarted方法中理解。

三,方法调用

3.1 runWorker

Worker中最主要的run方法, 也就是调用runWorker方法。

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}
  • 若当前线程从workQueue中获取任务, 首先加锁, 标记当前工作线程正在执行任务, 不接收中断信号。
  • 接下来判断线程池状态, 若线程池状态>= STOP, 则对当前线程调用中断。
  • 任务处理有两个细节:
    • 调用了task的run方法而不是start方法, 表示依然在当前线程中处理, 而非新启线程。
    • task.run()方法的前后, 有beforeExecuteafterExecute这两个钩子方法。
  • getTask方法返回了null, 那么将会跳出循环, 调用processWorkerExit方法来对Worker进行回收。

3.2 getTask

private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}
  • 5-12行: 获取线程池状态, 若是STOP, TIDYING, TERMINATED, 或是SHUTDOWN且工作队列为空, 那么返回null, 代表当前worker可以回收。
  • 14-24行: 如果当前工作线程数超过了最大线程数, 或达到了核心线程数的回收条件, 则开始尝试回收当前worker。
  • 26-35行: 从阻塞队列里获取任务。timedtrue时使用poll超时获取, 否则使用take阻塞等待。poll超时也会导致新一轮的回收判断。

getTask方法是线程池动态维护工作线程的核心。

3.3 execute

最重要的execute方法。

public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (!isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);
}

这段逻辑可以用一张流程图辅助理解。

在这里插入图片描述

简单来说: 当向线程池提交一个任务, 如果当前线程数小于核心线程数, 那么就新增worker。如果新增失败, 则进入下面的流程。向阻塞队列offer一个任务, 如果阻塞队列已满, 那么继续尝试创建worker(奔着最大线程数去), 如果已经达到了最大线程数, 那么触发拒绝策略。而如果成功提交到了阻塞队列, 这时再判断线程池的状态, 如果处于非RUNNING状态, 那么尝试移除任务, 如果成功移除该任务, 就触发拒绝策略。

3.4 addWorker

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// Recheck while holding lock.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}
  • 外层retry循环和内层循环通过CAS操作compareAndIncrementWorkerCount保证了只有一个线程能成功增加worker计数并跳出循环去创建Worker, 这保证了worker数量不会超限。
  • 既然CAS已经保证了线程安全, 为什么还要获取mainLock呢? 这是为了同步对workers这个HashSet的操作, 因为HashSet本身不是线程安全的, 需要防止在add时其他地方发生remove操作。
http://www.xdnf.cn/news/1187677.html

相关文章:

  • 锁相环技术简介(面向储能变流器应用)
  • 机器学习(一)KNN,K近邻算法(K-Nearest Neighbors)
  • [硬件电路-85]:一款高集成度热电制冷器(TEC)控制器芯片ADN8835ACPZ
  • 工程师实践出真知
  • 【Spring WebFlux】为什么 Spring 要拥抱响应式
  • java面试题(一)
  • 基于深度学习的图像分类:使用DenseNet实现高效分类
  • 解决 Delete ␍ prettier/prettier问题的方案
  • TwinCAT3编程入门1
  • 理解Spring中的IoC
  • 探索 MyBatis-Plus
  • [2025CVPR-图象分类方向]SPARC:用于视觉语言模型中零样本多标签识别的分数提示和自适应融合
  • TDengine 转化函数 TO_UNIXTIMESTAMP 用户手册
  • S7-1500 与 ET200MP 的组态控制通信(Configuration Control)功能实现详解(下)
  • 【vue3+vue-pdf-embed】实现PDF+图片预览
  • 文件被删除了怎么恢复?恢复方法总结与重点注意事项
  • Mysql 日志 binlog redolog
  • deepseek本地部署,轻松实现编程自由
  • 在线事务型的业务、实时分析类业务、离线处理类型的业务
  • 数据赋能(332)——安全与合规——保密管理
  • MJ11032G和MJ11033G是对管由onsemi/安森美公司研发的一款高性能、低功耗的达林顿晶体管
  • Node.js(三)之Express
  • Zero-Shot TrackingT0:对象分割+运动感知记——当“切万物”武士学会运动记忆,目标跟踪稳如老狗
  • ESP32学习笔记_Components(1)——使用LED Strip组件点亮LED灯带
  • 图论水题日记
  • MC_GearInPos电子齿轮
  • ISIS高级特性LSP的分片扩展
  • Cacti 前台命令注入漏洞(CVE-2022-46169)
  • 深入解析Linux匿名管道机制与应用
  • 浅析PCIe 6.0 ATS地址转换功能