当前位置: 首页 > ds >正文

线程池详细解析(一)

java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理的使用线程池能够带来三个好处。

1. 降低资源消耗:频繁的创建线程和销毁线程会产生不少的资源上的消耗,通过重复利用已创建的线程可以降低资源消耗。

2.提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。

3.提高线程的可管理性:通过线程池来对所有创建的线程进行资源的管理,对其可以进行统一的资源分配,调优和监控等功能,从而提高了稳定性

实现原理:

当任务向线程池提交一个任务之后,线程池是如何处理这个任务的呢?

下面我们看一个示意图:

上图我们可以分析出

1.当提交任务的时候,线程池会先判断当前核心线程是否已满(占用完),如果未占满则直接使用核心线程

2.当核心线程占满的时候就会将其放入队列中进行等待,放入之前也会判断队列是否已满,未满则放入队列

3.如果已经满了,线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的线程(这里的新线程是一个临时线程如果线程使用完毕之后则会回收这个线程)。如果都满了则会交给饱和策略来处理这个任务

下面我们看看线程池的几个关键参数:

核心线程数 (corePoolSize)

  • 本质:线程池的常驻工作兵力

  • 行为特点

    • 即使线程空闲也不会销毁(除非设置 allowCoreThreadTimeOut=true)

    • 新任务优先使用核心线程执行

最大线程数 (maximumPoolSize)

  • 本质:线程池的最大兵力上限

  • 行为特点

    • 当核心线程满且队列满时,临时创建新线程(直到达到此值)

    • 空闲超时后销毁(由 keepAliveTime 控制)

线程存活时间 (keepAliveTime)

  • 本质非核心线程的空闲存活时间

  • 行为特点

    • 线程空闲超过此时间后自动销毁

    • 仅作用于超过 corePoolSize 的线程

  • 单位TimeUnit(秒/毫秒/纳秒)

任务队列 (workQueue)

  • 本质任务缓冲区的容器

  • 常见实现及适用场景

队列类型特性适用场景
SynchronousQueue零容量队列,直接传递任务高响应要求,拒绝任务转移
ArrayBlockingQueue有界固定容量队列流量可控的系统
LinkedBlockingQueue可选有界/无界队列(默认无界)吞吐量优先,允许任务堆积
PriorityBlockingQueue带优先级的队列任务有优先级区分时
  • 危险点
    ⚠️ 无界队列可能导致 OOM(堆积过多任务)
    ⚠️ 有界队列需配合合理的拒绝策略

拒绝策略 (RejectedExecutionHandler)

  • 触发条件:线程数达最大且队列满时

  • 四大内置策略

策略名行为适用场景
AbortPolicy(默认)抛出 RejectedExecutionException需要快速失败感知的系统
CallerRunsPolicy由提交任务的线程直接执行保证任务不丢失的慢速降级
DiscardPolicy静默丢弃想要加入的任务可容忍丢失的场景(如日志)
DiscardOldestPolicy丢弃队列头部的任务并重试新任务优先处理新任务的场景

源码分析:

接下来我们选几个线程池比较核心的方法进行一些源码分析

内部属性:

下面我们来看看没有介绍过的线程池内部的属性

    private final AtomicInteger ctl;private static final int COUNT_BITS = 29;private static final int CAPACITY = 536870911;private static final int RUNNING = -536870912;private static final int SHUTDOWN = 0;private static final int STOP = 536870912;private static final int TIDYING = 1073741824;private static final int TERMINATED = 1610612736;private final BlockingQueue<Runnable> workQueue;private final ReentrantLock mainLock;private final HashSet<Worker> workers;private final Condition termination;

ctl表示两个状态值--线程池状态以及工作线程最大数量(线程池总线程数)

COUNT_BITS 表示在Integer的几位表示线程数

而后续的几个int变量值都是表示的是线程池状态值

workQueue则是线程池内部的任务队列用来存放任务

mainLock线程池内部的锁用来保证同步

termination目的是当线程池进行关闭的时候会将调用关闭方法的线程放入到Condition队列中进行等待,当线程池完全关闭之后唤醒该线程

核心方法execute

下面我们看看核心方法execute,揭秘线程池是如何运行任务的:

    public void execute(Runnable var1) {if (var1 == null) {throw new NullPointerException();} else {int var2 = this.ctl.get();if (workerCountOf(var2) < this.corePoolSize) {if (this.addWorker(var1, true)) {return;}var2 = this.ctl.get();}if (isRunning(var2) && this.workQueue.offer(var1)) {int var3 = this.ctl.get();if (!isRunning(var3) && this.remove(var1)) {this.reject(var1);} else if (workerCountOf(var3) == 0) {this.addWorker((Runnable)null, false);}} else if (!this.addWorker(var1, false)) {this.reject(var1);}}}

首先对任务进行判空处理,随后获取线程池的状态变量进行判断当前线程数是否小于核心线程数,如果小于核心线程数则直接调用addWorker创建核心线程来执行当前任务

下面我们看看addWorker方法是如何创建核心线程来执行任务的

    private boolean addWorker(Runnable var1, boolean var2) {while(true) {int var3 = this.ctl.get();int var4 = runStateOf(var3);if (var4 >= 0 && (var4 != 0 || var1 != null || this.workQueue.isEmpty())) {return false;}while(true) {int var5 = workerCountOf(var3);if (var5 >= 536870911 || var5 >= (var2 ? this.corePoolSize : this.maximumPoolSize)) {return false;}if (this.compareAndIncrementWorkerCount(var3)) {boolean var18 = false;boolean var19 = false;Worker var20 = null;try {var20 = new Worker(var1);Thread var6 = var20.thread;if (var6 != null) {ReentrantLock var7 = this.mainLock;var7.lock();try {int var8 = runStateOf(this.ctl.get());if (var8 < 0 || var8 == 0 && var1 == null) {if (var6.isAlive()) {throw new IllegalThreadStateException();}this.workers.add(var20);int var9 = this.workers.size();if (var9 > this.largestPoolSize) {this.largestPoolSize = var9;}var19 = true;}} finally {var7.unlock();}if (var19) {var6.start();var18 = true;}}} finally {if (!var18) {this.addWorkerFailed(var20);}}return var18;}var3 = this.ctl.get();if (runStateOf(var3) != var4) {break;}}}}

首先说明一下参数:第一个参数表示任务,第二个参数Boolean类型则是表示启用何种线程进行执行true表示核心线程,false表示临时线程。

首先进入方法中就是两个while嵌套循环,第一个循环则是判断当前线程池的状态是否可以继续创建线程执行任务否则返回false,第二个则是首先判断当前线程数是否可以继续创建线程否则返回false,随后开始使用CAS来修改状态值增加1个线程数。随后则创建任务线程,在线程池中任务线程则是一个Worker对象,创建完成之后将其加入到线程池的线程集合中,然后更改线程集合的大小,这里的操作都是采用lock全局锁的机制进行创建的保证了线程同步,上述都完成之后则开始调用线程的start方法正式运行线程并且修改标志位,如果出现了异常则会调用addWorkerFailed进行回滚的操作,这里的回滚操作则是线程数减一,状态位更改等操作。

随后我们接着看execute之后的代码逻辑

 if (isRunning(var2) && this.workQueue.offer(var1)) {int var3 = this.ctl.get();if (!isRunning(var3) && this.remove(var1)) {this.reject(var1);} else if (workerCountOf(var3) == 0) {this.addWorker((Runnable)null, false);}} else if (!this.addWorker(var1, false)) {this.reject(var1);}

这里如果当前核心线程数已经占满那么就会将其丢入阻塞队列中也就是任务队列,如果成功之后则会再次检查当前线程池的状态标志位是否可以执行任务,如果不可以则将任务从中移除,并且执行拒绝策略,如果当前线程数为0则创建一个临时线程去执行(自定义线程池如果核心线程为0则会出现这些情况)。如果队列已经满了则会创建临时线程去执行,如果也不行就会执行拒绝策略。

Worker类:

我们来看一看Worker对象的内部构造

    private final class Worker extends AbstractQueuedSynchronizer implements Runnable {private static final long serialVersionUID = 6138294804551838833L;final Thread thread;Runnable firstTask;volatile long completedTasks;Worker(Runnable var2) {this.setState(-1);this.firstTask = var2;this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);}public void run() {ThreadPoolExecutor.this.runWorker(this);}protected boolean isHeldExclusively() {return this.getState() != 0;}protected boolean tryAcquire(int var1) {if (this.compareAndSetState(0, 1)) {this.setExclusiveOwnerThread(Thread.currentThread());return true;} else {return false;}}。。。。。}

可以看出它是存在线程池内部的内部类,封装了线程池中的工作线程,并通过继承 AbstractQueuedSynchronizer(AQS)实现了独特的锁机制,封装工作线程和任务我们可以理解但为什么内部还要实现AQS的锁机制呢?

首先我们可以看到当Worker进行初始化的时候将state设置为-1

  Worker(Runnable var2) {this.setState(-1);this.firstTask = var2;this.thread = ThreadPoolExecutor.this.getThreadFactory().newThread(this);}

这里的-1就是防止线程在初始化过程中被中断(state=-1)。通过AQS的 state 值(-101)区分线程的不同阶段(未启动、空闲、忙碌)。线程池的shutdown方法会遍历所有的线程根据AQS的state不同进行对应的中断操作。换句话来说worker内部采用AQS的更多目的是通过AQS的state来表示当前Worker的一个状态,随后根据这些状态来执行不同的业务逻辑处理,而不是真正的去使用锁。在后续文章中将会源码分析shutdown方法来看看是如何中断线程的。

因此Worker 集成锁机制的核心目的是:

  1. 启动保护:防止线程在初始化过程中被中断(state=-1)。
  2. 状态区分:通过 state 值(-101)区分线程的不同阶段(未启动、空闲、忙碌)。
  3. 精确中断:配合 shutdown() 和 shutdownNow() 实现不同的中断策略,既保证任务完整性,又能快速响应关闭请求。

总结execute方法:

到此execute方法已经解析完毕,可以看上述流程图对整个方法的业务逻辑有一个清晰的认知。接下来我们将会讲一讲线程池的其他方法。

线程池实际上使用了两种锁:一个是全局锁lock还有一个是每个Worker线程内部自带的AQS实现锁,全局锁的目的更主要是为了线程同步,而AQS锁的目的则更主要的是为了实现Worker 的状态控制。

http://www.xdnf.cn/news/10823.html

相关文章:

  • 空间智能重塑未来治理
  • ProxyPin抓APK数据包
  • 3.需求分析与测试用例设计方法
  • 为什么使用 ./ 表示当前目录:深入解析路径表示法的起源与原理
  • 太极APP:免Root,畅享Xposed模块的神奇魅力
  • 数值与字典解决方案二十七讲:两列数据相互去掉重复值后合并
  • 每天总结一个html标签——a标签
  • Docker安装Redis集群(3主3从+动态扩容、缩容)保姆级教程含踩坑及安装中遇到的问题解决
  • 判断用户输入昵称是否存在(Python)
  • Python中的 __name__ 属性全解析
  • 【机器人编程基础】python中的算术运算符
  • AI Agent工程实践:从提示词到自主智能
  • world quant教程学习
  • FreeRTOS实时操作系统学习笔记
  • (aaai2024) Omni-Kernel Network for Image Restoration
  • Linux多路TTS混音播放:让多个语音同时清晰可听
  • 系统思考:成长与投资不足
  • ISBN书号查询接口如何用PHP实现调用?
  • NVMe协议简介之AXI总线更新
  • Flask+LayUI开发手记(七):头像的上传及突破static目录限制
  • 鸿蒙进阶——Mindspore Lite AI框架源码解读之模型加载详解(二)
  • D. Gellyfish and Camellia Japonica【Codeforces Round 1028 (Div. 2)】
  • 【存储基础】【VFS】inodedentrysuper_block以及它们之间的关系
  • 【AUTOSAR SystemServices】深入解析StbM模块:功能定义、工作原理与代码实现
  • Eigen库介绍以及模块划分和相关示例代码
  • 论文略读:LIMO: Less is More for Reasoning
  • Spring Boot中保存前端上传的图片
  • TASK OA 案例hook
  • Node.js 项目调试指南
  • 【小沐杂货铺】基于Three.JS构建IFC模型浏览器(WebGL、CAD、Revit、IFC)