当前位置: 首页 > news >正文

Java并发编程-线程池(四)

文章目录

  • 线程池实现原理
    • Worker
      • Worker 核心设计总结
    • runWorker(Worker w)
    • 总结

线程池实现原理

上一篇我们看了 addWork 方法,那接下来就让我们详细看看内部类Worker。

Worker

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
{final Thread thread; //worker自己的线程Runnable firstTask;Worker(Runnable firstTask) {setState(-1); // 默认禁止中断this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);}public void run() {runWorker(this); // 调用ThreadPoolExecutor的runWorker方法(后面会讲)}protected boolean isHeldExclusively() {return getState() != 0;}protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) { //仅当状态为0时获取锁,不可重入setExclusiveOwnerThread(Thread.currentThread());//课后习题return true;}return false;}protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;}public void lock()        { acquire(1); }public boolean tryLock()  { return tryAcquire(1); }public void unlock()      { release(1); }public boolean isLocked() { return isHeldExclusively(); }void interruptIfStarted() {//调用链:shutdownNow()->interruptWorkers()->interruptIfStarted()Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {// 中断条件:运行中/运行结束 && 线程初始化过 && 没有被中断过try {t.interrupt();} catch (SecurityException ignore) {}}}
}

Worker类也继承了AQS,主要目的是利用state实现线程执行任务时的中断控制,确保线程池的优雅关闭机制。利用AQS的state字段来区分线程的初始状态、运行状态及中断权限。

  • 初始化锁状态:在Worker的构造函数中调用setState(-1),将状态设置为非0值(-1),表示**默认禁止中断** 。这样可以避免新创建的Worker线程在未执行任务前(即未调用runWorker方法时)被中断干扰。

    • 实际任务执行时:在runWorker方法中,通过调用unlock()方法将状态从-1重置为0,表示允许中断。
  • shutdownNow()的中断条件:运行中/运行结束 && 线程初始化过 && 没有被中断过

  • shutdown()的中断条件:

final void runWorker(Worker w) {...w.unlock(); // 允许中断...w.lock();...w.unlock();...
}
//调用链:shutdown()->interruptIdleWorkers()
private void interruptIdleWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;if (!t.isInterrupted() && w.tryLock()) {// 中断条件:没有被中断 && 没有任务在执行try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}}} finally {mainLock.unlock();}
}

Worker 核心设计总结

  • 同步机制:通过AQS的自定义锁管理,实现任务的原子执行和中断控制。

  • 状态标记:利用AQS的state字段区分线程的初始状态、运行状态及中断权限。

  • 线程池管理:支持线程池在关闭时区分处理空闲线程和运行线程,确保任务完整性。

Worker会循环获取工作队列里的任务来执行。我们可以从Worker类的run()方法里看到这点:

runWorker(Worker w)

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 允许中断//标识线程是否因异常(如任务抛出未捕获错误)而非正常退出boolean completedAbruptly = true; try {// 循环获取工作队列里的任务来执行while (task != null || (task = getTask()) != null) {// 当前有任务 或者 能从队列中拿到任务w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||//若线程池正在关闭(STOP 状态)(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&// Thread.interrupted() 会清除当前线程的中断状态(返回历史值),// 若为 true 且线程池已停止,需重新标记中断防止代码忽略此请求。!wt.isInterrupted())//在强制关闭(如 shutdownNow())时,快速中断正在处理的任务wt.interrupt();try {//空方法(钩子),可被子类覆写以添加日志、监控或自定义逻辑(如任务执行时间统计)beforeExecute(wt, task);Throwable thrown = null;try {task.run(); // 执行任务} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;//避免任务对象被错误复用或内存泄漏w.completedTasks++;//统计Worker完成的任务数(用于线程池监控或扩容策略)w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }private void processWorkerExit(Worker w, boolean completedAbruptly) {if (completedAbruptly)//补偿操作:若线程异常终止(未正常调整计数),//需主动减少工作线程计数(workerCount)以保持准确性decrementWorkerCount();final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {completedTaskCount += w.completedTasks;//累计完成数workers.remove(w);//移除失效线程} finally {mainLock.unlock();}//当前线程退出后可能满足线程池终止条件(如所有线程退出且任务队列为空)//尝试将线程池状态过渡到TERMINATED(需符合SHUTDOWN/STOP状态且无活动线程和待处理任务)tryTerminate();int c = ctl.get();if (runStateLessThan(c, STOP)) {//线程池仍允许处理任务(如RUNNING/SHUTDOWN)if (!completedAbruptly) {//若允许核心线程超时(allowCoreThreadTimeOut),最小值为 0;//否则为 corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize;if (min == 0 && ! workQueue.isEmpty())min = 1;// 队列非空时至少保留一个线程处理任务if (workerCountOf(c) >= min)return; //当前线程数足够,无需补充}addWorker(null, false);//补充新Worker}
}

总结

ThreadPoolExecutor中线程执行任务的示意图
在这里插入图片描述

线程池中的线程执行任务分两种情况。

  1. 在execute()方法中创建一个线程时,会让这个线程执行当前任务。

  2. 这个线程执行完上图中1的任务后,会反复从BlockingQueue获取任务来执行。

http://www.xdnf.cn/news/489565.html

相关文章:

  • Reth(冗余以太网接口) 和Bridge-Aggregation(链路聚合接口)区别
  • 一个进程中可以有多个 WebView2 控件,它们各自有独立的用户数据目录,COOKIE共享
  • 内存泄漏系列专题分析之十六:高通相机CamX内存泄漏内存占用分析--chi-cdk部分ION内存拆解方法
  • 跳转传参的使用
  • Java生产环境设限参数教学
  • 第六章 进阶10 实习生的焦虑
  • 一文讲透面向对象编程OOP特点及应用场景
  • 深入探索Java微服务架构:Spring Cloud与Kubernetes的整合实践
  • 敏感数据加密和模糊匹配
  • 使用CherryStudio +SiliconFlow 部署独立的deepseek+知识库
  • 文本数据词汇级增强
  • Python 之类型注解
  • MCU开发学习记录16* - 看门狗学习与实践(HAL库) - IWDG与WWDG -STM32CubeMX
  • java加强 -IO流
  • 基于React的高德地图api教程005:圆形标记的绘制、删除、修改
  • 【AI学习】AI大模型技术发展研究月报的生成提示词
  • 【Linux】序列化与反序列化、会话与进程组、守护进程
  • 投影仪基础知识及选购方向小记③
  • 曝光融合(Exposure Fusion)
  • 【大模型系列篇】驱动编码助手Cursor与Windsurf工作的隐藏算法解读
  • 小结:jvm 类加载过程
  • 车道线检测----Lane-ATT
  • Linux自有服务
  • LLM学习笔记(四)信息论
  • 公路水运安全员B证主要考核内容有哪些
  • 中级统计师-统计学基础知识-第一章
  • C++ lambda表达式
  • 构建稳定的金字塔模式生态:从自然法则到系统工程
  • LVGL常见面试题
  • 腾讯云MCP数据智能处理:简化数据探索与分析的全流程指南