当前位置: 首页 > ai >正文

Java并发编程-线程池(二)

文章目录

  • 线程池的实现原理
    • execute(Runnable command)
      • **1. 阶段一:尝试创建核心线程**
      • **2. 阶段二:尝试将任务加入队列**
      • **3. 阶段三:尝试创建非核心线程或拒绝任务**
      • **关键机制与设计思想**

线程池的实现原理

当向线程池提交一个任务之后,线程池是如何处理这个任务的呢?根据刚刚讲的线程池参数的含义,我们来看一下线程池 的主要处理流程。

从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程是这样的, 这个很关键,面试必问。

在这里插入图片描述

  1. 判断核心线程池是否已满,即线程数是否达到corePoolSize

如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程已经满了,则进入下个流程。

  • 判断工作队列是否已经满。 BlockingQueue

如果工作队列没有满,则将新提交的任务存储在这 个工作队列里。如果工作队列满了,则进入下个流程。

  • 判断线程池是否已满,即线程数是否达到maxPoolSize

如果没有,则创建一个新的工作线程 来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

再来看看 ThreadPoolExecutor执行execute()方法的图:

在这里插入图片描述

按照我们上面说的, ThreadPoolExecutor执行execute方法也会分为这几种情况。

  1. 如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。

  2. 如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

  3. 如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。

  4. 如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。

ThreadPoolExecutor采取上述步骤的总体设计思路,是为了在执行execute()方法时,尽可能地避免获取全局锁, 因为很明显这是一个严重的瓶颈。

在ThreadPoolExecutor完成预热之后 , 也就是当前运行的线程数大于等于corePoolSize,几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

通过流程分析,我们很直观地了解了线程池的工作原理,接下来, 我们再通过源代码来看看具体是如何实现的

execute(Runnable command)

//高3位表示状态,低29位表示线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {//如果当前工作线程数小于核心线程数(corePoolSize),//则尝试创建新线程作为核心线程并立即执行任务if (addWorker(command, true)) // 以核心模式创建Workerreturn;c = ctl.get(); // 若创建失败(如线程池已关闭),重新获取ctl值}if (isRunning(c) && workQueue.offer(command)) { // 检查状态并尝试入队int recheck = ctl.get();if (! isRunning(recheck) && remove(command))// 线程池已关闭且任务成功移除reject(command); // 拒绝任务else if (workerCountOf(recheck) == 0) //无存活线程addWorker(null, false); //创建非核心线程防止队列任务积压}else if (!addWorker(command, false))// 以非核心模式创建Workerreject(command); // 抛出RejectedExecutionException异常
}

1. 阶段一:尝试创建核心线程

int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true)) // 以核心模式创建Workerreturn;c = ctl.get(); // 若创建失败(如线程池已关闭),重新获取ctl值
}
  • 目标:如果当前工作线程数小于核心线程数(corePoolSize),则尝试创建新线程作为核心线程并立即执行任务。

  • 关键操作

    • workerCountOf(c):解析 ctl 变量(高3位表示状态,低29位表示线程数)获取当前工作线程数。

    • addWorker(command, true):尝试以核心线程限制(corePoolSize)创建工作者(Worker),command 作为首任务。

    • 失败处理:若线程池已关闭(状态非 RUNNING)或并发冲突导致创建失败,则进入阶段二。

2. 阶段二:尝试将任务加入队列

if (isRunning(c) && workQueue.offer(command)) { // 检查状态并尝试入队int recheck = ctl.get(); // 重新获取状态以应对并发变化if (!isRunning(recheck) && remove(command)) // 线程池已关闭且任务成功移除reject(command); // 拒绝任务else if (workerCountOf(recheck) == 0) // 无存活线程(例如核心线程超时被回收)addWorker(null, false); // 创建非核心线程(后续从队列取任务)
}
  • 目标:当核心线程已满,尝试将任务加入阻塞队列(如 LinkedBlockingQueue)。

  • 关键操作

    • 双重状态检查

      1. 初始入队前校验线程池是否处于运行状态(isRunning(c))。

      2. 入队后再次校验(recheck),避免在入队期间线程池被关闭。

    • 处理极端情况

      • 若线程池已关闭且成功从队列移除任务,则触发拒绝策略。

      • 若所有工作线程意外终止(例如异常退出),则新建非核心线程(参数 firstTask = null),强制处理队列中的积压任务。此处的 null 表示新线程不立即执行提交的command,而是直接从队列中获取任务(通过 Worker.run() 中的循环逻辑)。此时创建的非核心线程虽然无初始任务,但会主动消费队列中积累的任务,确保队列不积压。

      while (running) {Job job = null;synchronized (jobs) {if (jobs.isEmpty()) jobs.wait(); // 从队列取任务job = jobs.removeFirst();}if (job != null) job.run();
      }
      

3. 阶段三:尝试创建非核心线程或拒绝任务

else if (!addWorker(command, false)) // 以非核心模式创建Workerreject(command); // 队列已满且线程数达到maximumPoolSize,拒绝任务
  • 目标:当队列已满时,尝试创建非核心线程(线程数上限为 maximumPoolSize)。

  • 关键操作

    • addWorker(command, false):以非核心模式创建工作者,直接执行当前任务。

    • 失败条件

      • 线程数已达 maximumPoolSize

      • 线程池已关闭(非 RUNNING 状态)。

    • 拒绝策略:调用 RejectedExecutionHandler.rejectedExecution(),默认抛出 RejectedExecutionException

关键机制与设计思想

  1. 全局锁(Global Lock)的应用

    • addWorker() 方法内部通过锁(如 ReentrantLock)同步线程池状态修改操作(例如增减线程、更新 ctl),确保原子性。
  2. 减少锁竞争优化

    • 在核心线程已预热的情况下,多数任务直接通过队列处理(阶段二),避免了频繁获取锁的性能损耗。
  1. Worker执行逻辑(补充)

    • 每个 Worker 启动后执行 runWorker() 方法,循环从队列中获取任务。
http://www.xdnf.cn/news/5910.html

相关文章:

  • 今日行情明日机会——20250513
  • 期货反向跟单软件—持仓上限控制功能
  • gcc和g++
  • 闭包原理与常见陷阱
  • 装饰器在Python中的作用及在PyTorchMMDetection中的实战应用
  • Python -将MP4文件转为GIF图片
  • MyBatis 批量新增与删除功能完整教程
  • SpringBoot的外部化配置
  • 软件测试(1) 软件测试概述
  • 【Qt开发】信号与槽
  • 【技术追踪】InverseSR:使用潜在扩散模型进行三维脑部 MRI 超分辨率重建(MICCAI-2023)
  • Ansible安装与核心模块实战指南
  • 如何正确地写出单例模式
  • 嵌入式软件--stm32 DAY7 I2C通讯上
  • 码蹄集——分解、数组最大公约数、孪生质数、卡罗尔数、阶乘数
  • PY32系列单片机离线烧录器,可配置选项字节和上机台批量烧录
  • The Deep Learning Compiler: A Comprehensive Survey (深度学习编译器:全面调查)
  • milvus+flask山寨《从零构建向量数据库》第7章case2
  • FPGA图像处理(六)------ 图像腐蚀and图像膨胀
  • 【图像处理基石】遥感图像分析入门
  • stm32f103rct6中使用串口1 DMA通信程序含异常处理
  • 数据验证库pydantic的用法
  • 力扣热题——统计平衡排列的数目
  • 进程间通信分类
  • 数组练习题
  • 采购流程规范化如何实现?日事清流程自动化助力需求、采购、财务高效协作
  • 动态查找滚动容器(通用方案)
  • 故障诊断模型评估——混淆矩阵,如何使样本量一致(上)
  • 深入浅出之STL源码分析8_三个指针
  • PostgreSQL 恢复信息函数