当前位置: 首页 > news >正文

JAVA线程池ThreadPoolExecutor说明

1. 核心参数与线程池状态

  • 核心参数‌:

    • corePoolSize‌:核心线程数,即使空闲也不会回收(除非设置allowCoreThreadTimeOut)。
    • maximumPoolSize‌:最大线程数,队列满时允许创建的最大临时线程数,包含核心线程数。
    • workQueue‌:任务队列(如LinkedBlockingQueue),用于缓存未执行的任务。
    • keepAliveTime‌:非核心线程空闲存活时间。
  • 线程池状态‌(通过ctl变量高3位表示):

    • RUNNING‌(111):接收新任务并处理队列任务。
    • SHUTDOWN‌(000):不接收新任务,但处理队列中剩余任务。
    • STOP‌(001):中断所有线程,丢弃队列任务。
    • TIDYING‌(010):所有任务终止,即将执行terminated()钩子方法。
    • TERMINATED‌(011):线程池完全终止。

2. 任务提交:execute() 方法

源码核心逻辑如下:

public void execute(Runnable command) {if (command == null) throw new NullPointerException();int c = ctl.get();// 步骤1:当前线程数 < corePoolSize,创建核心线程if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) // true表示创建核心线程return;c = ctl.get(); // 创建失败后重新获取ctl}// 步骤2:尝试将任务加入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 二次检查线程池状态(防止在此期间被关闭)if (!isRunning(recheck) && remove(command))reject(command); // 如果线程池已关闭,回滚任务并拒绝else if (workerCountOf(recheck) == 0)addWorker(null, false); // 确保至少有一个线程处理队列任务}// 步骤3:队列已满,尝试创建非核心线程else if (!addWorker(command, false)) // false表示非核心线程reject(command); // 创建失败(线程数已达maximumPoolSize),触发拒绝策略
}

3. 线程执行逻辑:Worker 与 runWorker()

Worker 类(内部类)
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {final Thread thread; // 实际执行任务的线程Runnable firstTask;  // 初始任务(可能为null)Worker(Runnable firstTask) {this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); // 通过线程工厂创建线程}public void run() {runWorker(this); // 线程启动后调用runWorker}
}
核心方法:runWorker(Worker w)
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 允许中断boolean completedAbruptly = true;try {// 循环获取任务并执行while (task != null || (task = getTask()) != null) {w.lock(); // 加锁,确保任务执行期间不被中断(除非线程池关闭)// 检查线程池状态(如果已STOP,中断线程)if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task); // 钩子方法(可自定义)try {task.run(); // 实际执行任务} catch (Throwable ex) {afterExecute(task, ex); // 钩子方法(异常处理)throw ex;}afterExecute(task, null); // 钩子方法(正常完成)} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly); // 线程退出处理}
}

4. 获取任务:getTask()

private Runnable getTask() {boolean timedOut = false;for (;;) {int c = ctl.get();// 检查线程池状态if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {decrementWorkerCount();return null; // 线程需要终止}int wc = workerCountOf(c);// 判断是否允许超时(非核心线程或允许核心线程超时)boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 检查线程数是否超过maximumPoolSize或超时if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null; // 减少线程数并返回nullcontinue;}try {// 从队列中取任务(阻塞或超时)Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true; // 超时标记} catch (InterruptedException retry) {timedOut = false;}}
}

5. 线程回收:processWorkerExit()

当 getTask() 返回 null 时,线程会调用此方法清理资源:

private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly) // 若任务执行异常,减少线程数decrementWorkerCount();// 统计已完成的任务数final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;workers.remove(w); // 移除Worker} finally {mainLock.unlock();}// 尝试终止线程池(如果状态为SHUTDOWN且队列为空)tryTerminate();// 补充新线程(如果队列中仍有任务)int c = ctl.get();if (runStateLessThan(c, STOP)) {if (!completedAbruptly) {int min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && !workQueue.isEmpty())min = 1;if (workerCountOf(c) >= min)return;}addWorker(null, false); // 补充新线程处理队列任务}
}

6. 拒绝策略实现

默认实现为 AbortPolicy

public static class AbortPolicy implements RejectedExecutionHandler {public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());}
}

其他策略如 CallerRunsPolicy(由调用者线程执行任务)、DiscardPolicy(静默丢弃)等均可自定义。


总结:源码核心流程

  1. 任务提交:通过 execute() 方法触发线程创建、入队或拒绝。

  2. 线程管理:通过 Worker 封装线程和任务,runWorker() 循环执行任务。

  3. 任务获取getTask() 从队列中获取任务,支持阻塞或超时等待。

  4. 线程回收:超时或异常时,调用 processWorkerExit() 清理资源并补充线程。

  5. 状态控制:通过 ctl 变量的原子操作管理线程池状态和线程数。

http://www.xdnf.cn/news/89425.html

相关文章:

  • 树莓派超全系列教程文档--(40)树莓派config.txt旧版GPIO控制、超频及条件过滤器
  • 【Spring】依赖注入的方式:构造方法、setter注入、字段注入
  • ProxySQL如何支持高并发读写请求
  • ubuntu 安装 redis server
  • 技术能力和关系比较实在没有可比性
  • 【同轴线共焦传感器原理】
  • Tree Shaking 原理
  • [原创](现代Delphi 12指南):[macOS 64bit App开发]:在Mac App Store外创建、部署与公证
  • 【AI面试】分类模型 之 随机森林
  • UWB定位技术在钢铁厂行业中的创新应用与价值实践
  • Linux:简单自定义shell
  • Unity使用反射进行Protobuf(CS/SC)协议,json格式_002
  • Python 常用Web框架对比
  • 乐视系列玩机---乐视2 x620 x628等系列线刷救砖以及刷写第三方twrp 卡刷第三方固件步骤解析
  • Spring 中 @Component, @Repository, @Service的区别
  • 电商场景下Elasticsearch集群与分片(Sharding)的ELK安装配置指南
  • qemu如何支持vpxor %xmm0,%xmm0,%xmm0(百度AI)
  • ACI multipod 一、组网概要
  • 【自然语言处理与大模型】如何知道自己部署的模型的最大并行访问数呢?
  • 「数据可视化 D3系列」入门第十二章:树状图详解与实践
  • Docker 快速入门教程
  • XPath 介绍
  • Ubuntu与Linux的关系
  • Linux虚拟机中 编译Linux源码 记录
  • 给 20GB 文件“排排坐”——详解外部排序
  • 鸿蒙NEXT开发定位工具类 (WGS-84坐标系)(ArkTs)
  • ios开发中xxx.debug.dylib not found
  • MySQL终章(8)JDBC
  • OpenCV --- 图像预处理(六)
  • 小白工具视频转MPG, 功能丰富齐全,无需下载软件,在线使用,超实用