当前位置: 首页 > ai >正文

并发编程——13 线程池ThreadPoolExecutor实战及其原理分析

1 线程池简介

  • 线程池(Thread Pool)是基于池化思想管理线程的工具,常用于多线程服务器(如Tomcat)。它会预先维护多个线程,等待统一分配可并发执行的任务;

  • 为什么需要线程池?如果无限制地创建线程,会带来一系列问题:

    • 额外开销大:创建、销毁线程,以及调度线程都会消耗系统资源;

    • 性能降低:线程过多会让计算机整体性能下降,甚至因过度调度导致系统不稳定;

  • 线程池通过预先维护线程+统一管理的模式,解决了上述问题,具体优势有:

    • 降低资源消耗:利用池化技术重复使用已创建的线程,避免了频繁创建、销毁线程的损耗;
    • 提高响应速度:任务到达时,无需等待线程创建,可直接用已有的线程执行任务;
    • 提高线程的可管理性:线程是稀缺资源,无限制创建会导致资源调度失衡、系统稳定性下降。线程池可以统一分配、调优和监控线程,保障系统稳定;
    • 提供更多强大功能:线程池具备可扩展性,能添加多样功能。例如ScheduledThreadPoolExecutor(定时线程池),支持任务延期执行或定期执行。

2 线程池的使用

2.1 创建线程池

2.1.1 ThreadPoolExecutor——推荐使用

  • 利用 ThreadPoolExecutor 提供的构造方法创建线程池:

    在这里插入图片描述

  • 主要看参数最全的构造方法,其他构造方法最终还是会调用该构造方法:

    在这里插入图片描述

    • corePoolSize(核心线程数):线程池初始化时默认无线程,任务到来时才会创建核心线程执行任务;核心线程会长期存活(除非线程池设置了允许核心线程超时);

    • maximumPoolSize(最大线程数):线程池能创建的线程总数上限。当核心线程数已满、任务队列也已满时,若当前线程数小于maximumPoolSize,会创建非核心线程来执行任务;

    • keepAliveTime(空闲时间):非核心线程的存活超时时间。若非核心线程空闲时间超过此值,会被自动终止回收。若corePoolSize == maximumPoolSize(即无核心/非核心线程区分),此参数无效;

    • unit(时间单位):配合keepAliveTime使用,指定时间的单位(如TimeUnit.SECONDS表示秒);

    • workQueue(任务队列):用于保存待执行任务的阻塞队列,常见类型有:

      • ArrayBlockingQueue(有界队列):队列长度有限,队列满时需创建非核心线程;若最大线程数也满,则触发拒绝策略;
      • LinkedBlockingQueue(无界队列):队列长度无限,若任务生产速度远大于消费速度,可能导致内存溢出(OOM);
      • SynchronousQueue(同步队列):队列不存储任务,仅做“任务传递”,队列长度为0;
    • threadFactory(线程工厂):用于创建线程的工厂接口,默认使用Executors.defaultThreadFactory();也可自定义ThreadFactory接口,实现对线程创建的个性化控制(如命名线程、设置优先级等);

    • handler(拒绝策略):当线程池(任务队列已满且最大线程数已满)无法接收新任务时,执行的策略。常见内置策略:

      • AbortPolicy(默认):直接抛出RejectedExecutionException异常,中断程序并提示异常;
        • 该拒绝策略适合需要保证所有任务都能执行完毕的场景,尤其是大量计算型任务。业务逻辑要求任务必须被处理,不能丢弃(如账单生成、数据持久化任务)。多线程的目的是增大吞吐量,但最终每个任务的执行完整性是核心诉求;
        • 通过让调用线程亲自执行任务,一方面避免了任务被直接丢弃,保证了任务的执行性;另一方面,调用线程执行任务时会产生阻塞效应—— 调用者无法快速提交新任务,间接起到了流量削峰的作用,缓解线程池的压力;
      • CallerRunsPolicy:让提交任务的线程(即调用线程)自己执行该任务,缓解任务提交压力。这种情况是需要所有任务都执行完毕,那么就适合大量计算的任务类型去执行
      • DiscardOldestPolicy:丢弃队列中最久的任务,重新提交被拒绝的任务。是否要采用该拒绝策略,还得根据实际业务是否允许丢弃老任务来衡量;
      • DiscardPolicy:直接丢弃任务,不做任何通知。使用此策略,可能会使我们无法发现系统的异常状态,建议是一些无关紧要的业务采用此策略;
      • 也可自定义RejectedExecutionHandler接口实现特殊拒绝逻辑;
  • 以下是一个使用 ThreadPoolExecutor 创建线程池的示例:

    import java.util.concurrent.ArrayBlockingQueue;  
    import java.util.concurrent.ThreadPoolExecutor;  
    import java.util.concurrent.TimeUnit;  
    import java.util.concurrent.RejectedExecutionHandler;  
    import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;  public class ThreadPoolExample {public static void main(String[] args) {// 配置核心参数int corePoolSize = 5; // 核心线程数5int maximumPoolSize = 10; // 最大线程数10ArrayBlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100); // 有界队列,长度100long keepAliveTime = 60L; // 非核心线程空闲60秒后回收TimeUnit unit = TimeUnit.SECONDS; // 时间单位“秒”RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy(); // 拒绝策略:抛异常// 实例化线程池ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,handler);// 通过循环提交 15 个任务,每个任务打印 “任务 ID + 执行线程名称”:for (int i = 0; i < 15; i++) {final int taskId = i;  executor.execute(() -> {System.out.println("Task " + taskId + " is running by " + Thread.currentThread().getName());});}// 关闭线程池executor.shutdown();try {// 等待所有任务执行完成,超时时间 60 秒。若超时后任务仍未完成,调用shutdownNow()强制关闭if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {// 发起线程池关闭请求,不再接收新任务,等待已有任务执行完毕executor.shutdownNow();}  } catch (InterruptedException e) {// 若等待过程中被中断,也调用shutdownNow()强制关闭executor.shutdownNow();}  System.out.println("All tasks are done or interrupted.");  }  
    }
    

2.1.2 Executors——不推荐使用

  • Executors 是 Java 提供的线程池工具类,提供了多种创建线程池的静态方法,简化了线程池的创建流程。常见方法包括newFixedThreadPoolnewSingleThreadExecutornewCachedThreadPoolnewSingleThreadScheduledExecutor等;

    在这里插入图片描述

  • Executors创建的线程池类型(及特点)

    类型池内线程类型池内线程数量处理特点应用场景
    定长线程池(FixedThreadPool)核心线程固定核心线程空闲时不回收;任务队列无界,若任务过多可能堆积导致OOM;任务数>线程数时队列等待控制线程最大并发数
    定时线程池(ScheduledThreadPool)核心线程+非核心线程核心线程固定,非核心线程无界核心线程闲置时会回收;支持定时/周期性任务执行执行定时、周期性任务
    可缓存线程池(CachedThreadPool)非核心线程不固定(可无限大)优先复用闲置线程;无闲置线程时新建线程;线程闲置60s后回收;任务队列是同步队列(无缓冲)执行数量多、耗时少的线程任务
    单线程化线程池(SingleThreadExecutor)核心线程1个所有任务按顺序在一个线程中执行;无需处理线程同步问题需任务串行执行的场景(但不适合高并发,易引发性能问题)
  • 以下是一个使用 Java 中的 ExecutorService 和 Executors 创建线程池的简单示例:

    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;public class ThreadPoolExample {public static void main(String[] args) {// 创建固定大小(5个线程)的线程池ExecutorService executor = Executors.newFixedThreadPool(5);// 提交10个任务,每个任务打印“任务ID + 执行线程名称”for (int i = 0; i < 10; i++) {final int taskId = i;executor.submit(() -> {System.out.println("Task " + taskId + " is running by " + Thread.currentThread().getName());});}// 关闭线程池:先停止接收新任务,再等待已有任务执行完毕executor.shutdown();while (!executor.isTerminated()) {// 等待所有任务完成}System.out.println("All tasks are done.");}
    }
    
  • 虽然Executors使用简单,但存在资源耗尽(OOM)的风险,因此阿里等企业的开发手册明确禁止使用,推荐直接用ThreadPoolExecutor自定义线程池。具体弊端如下:

    • FixedThreadPool、SingleThreadExecutor:它们的任务队列是LinkedBlockingQueue(无界队列),允许的请求队列长度为Integer.MAX_VALUE。若任务生产速度远大于消费速度,队列会堆积大量任务,最终导致内存溢出(OOM)
    • CachedThreadPool、ScheduledThreadPool:它们的最大线程数是Integer.MAX_VALUE(可无限创建线程)。若任务并发量极高,会创建大量线程,最终导致内存溢出(OOM)

    在这里插入图片描述

2.2 提交任务

  • 无返回值任务使用public void execute(Runnable command)方法提交;

    • 参数:接收Runnable接口的实现类(Runnable接口只有void run()方法,无返回值);

    • 例:

      executor.execute(() -> { // executor是线程池对象System.out.println("执行无返回值的任务");
      });
      
  • 有返回值任务submit方法的三种重载形式。submit方法会返回Future对象,通过Future可以获取任务的执行结果或控制任务执行(如取消任务、判断任务是否完成);

    • Future<?> submit(Runnable task):提交Runnable任务,获取空结果的Future

      • 参数:Runnable任务(无返回值);

      • 例:

        Future<?> future = executor.submit(() -> {System.out.println("执行Runnable任务");
        });
        // 可通过future判断任务是否完成,或获取空结果(get()返回null)
        
    • Future<T> submit(Runnable task, T result):提交Runnable任务,并指定一个预定义的返回结果

      • 参数:Runnable任务 + 预定义的返回结果result

      • 例:

        String result = "预定义结果";
        Future<String> future = executor.submit(() -> {System.out.println("执行Runnable任务,返回预定义结果");
        }, result);
        // future.get()会返回"预定义结果"
        
    • Future<T> submit(Callable<T> task):提交Callable任务,获取自定义的返回结果(最常用的有返回值提交方式);

      • 参数:Callable<T>接口的实现类(CallableV call() throws Exception方法,支持返回泛型T的结果);

      • 例:

        Future<Integer> future = executor.submit(() -> {System.out.println("执行Callable任务,计算结果");return 1 + 2; // 返回Integer类型结果
        });
        // future.get()会返回3
        

2.3 批量执行任务

  • 线程池通过 invokeAllinvokeAny 系列方法支持批量任务执行,它们的核心区别如下:

    // 提交一批Callable任务,等待所有任务执行完毕后返回结果列表
    // 阻塞直到所有任务完成
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException// 提交一批Callable任务,在指定时间内等待任务执行;超时后未完成的任务会被取消
    // 带超时控制,避免无限阻塞
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)// 提交一批Callable任务,返回最先完成的任务结果,其余未完成的任务会被取消
    // 只关注 “第一个完成的结果”,适合 “只要有一个结果即可” 的场景
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException// 提交一批Callable任务,在指定时间内返回最先完成的结果;超时后若没有任务完成则抛异常
    // 带超时的“优先结果”逻辑
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
    
  • 例:

    public class InvokeAllDemo {public static void main(String[] args) {// 生成 10 个Task任务List<Task> tasks = new ArrayList<>();for (int i = 1; i <= 10; i++) {tasks.add(new Task(i));}// 创建固定大小(5 线程)的线程池ExecutorService threadpool = Executors.newFixedThreadPool(5);try {// 调用 invokeAll(tasks) 批量提交任务,该方法会阻塞直到所有任务完成,然后返回 Future 列表List<Future<Integer>> futures = threadpool.invokeAll(tasks);// 遍历Future获取每个任务的执行结果for (Future<Integer> future : futures){System.out.println(future.get());}} catch (InterruptedException | ExecutionException e) {throw new RuntimeException(e);}finally {// 任务执行完毕后,调用 shutdown() 关闭线程池,释放资源threadpool.shutdown();}}// 内部类 Task 实现 Callable<Integer> 接口,模拟耗时任务(睡眠 1 秒后返回任务索引)static class Task implements Callable<Integer> {private int index;Task(int index) {this.index = index;}@Overridepublic Integer call() throws Exception {Thread.sleep(1000);return index;}}
    }
    

2.4 何执行定时、延时任务

  • ScheduledThreadPoolExecutor是专门用于执行定时、延时、周期性任务的线程池,继承自ThreadPoolExecutor并实现ScheduledExecutorService接口,具备线程池的基础能力,同时扩展了定时任务的执行逻辑;

    // 具备执行定时、延时、周期性任务的线程池
    public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {// 延迟delay时间后,执行一次Runnable任务(无返回值)// 只需延迟执行一次、无需结果的场景(如 “5 秒后打印日志”)public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);// 延迟delay时间后,执行一次Callable任务(有返回值)// 只需延迟执行一次、且需要获取执行结果的场景(如 “10 秒后计算并返回一个数值”)public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
    }
    

2.5 执行周期、重复性任务

  • 通过 scheduleAtFixedRatescheduleWithFixedDelay 实现“周期性重复执行任务”的需求,两者的核心区别在于周期的计算方式

    // 周期为“固定时间”——以上一次任务的开始时间为基准,间隔period后开始下一次任务。若任务执行时间超过period,会“叠执行”(下一次任务在当前任务结束后立即开始)
    // 需严格按照固定时间间隔执行的场景(如“每分钟准点统计一次数据”)
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)// 周期为“间隔时间”——以上一次任务的结束时间为基准,间隔delay后开始下一次任务。任务执行时间不影响间隔,始终保证“结束后延迟delay再执行下一次”
    // 需保证两次任务之间有固定间隔的场景(如 “任务执行完后,隔 3 秒再执行下一次”)
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit)
    

2.6 关闭线程池

  • 线程池提供两种关闭方式和一种状态判断方法,用于灵活控制线程池的生命周期:

    • shutdown():关闭线程池时,正在执行的任务和队列中的任务会继续执行完毕,但不再接收新任务;后续新任务会触发拒绝策略;

    • shutdownNow():立即关闭线程池,正在执行的任务和队列中的任务会被中断;同时会返回被中断的队列任务列表,方便开发者对未执行的任务做后续处理(如重试);

    • isTerminated():当正在执行的任务和队列中的任务全部执行完毕时,返回true,用于判断线程池是否真正关闭;

  • shutdownshutdownNow的区别

    对比维度shutdownshutdownNow
    立即关闭线程池
    延时关闭线程池(等任务执行完)
    不再接收新任务
    继续执行完任务队列中的任务
    返回任务队列中的任务
    线程池状态SHUTDOWNSTOP

2.7 线程池的参数设计分析

  • 核心线程数(corePoolSize

    • 核心线程数的设计需结合任务处理时间每秒产生的任务量,通常遵循“二八原则”(按80%的常规场景设计,剩余20%由最大线程数兜底);

    • 例:若一个线程执行1个任务需0.1秒,1秒可执行10个任务;系统80%的时间每秒产生100个任务。为了在1秒内处理完这100个任务,需要 100任务 ÷ 10任务/线程 = 10 个核心线程;

      10任务/线程指的是:每秒每个线程可以执行10个任务;

  • 任务队列长度(workQueue

    • 任务队列用于缓存待执行的任务,长度设计公式为:核心线程数 / 单个任务执行时间 × 2

    • 例:延续上文场景(核心线程数10,单个任务执行时间0.1秒),则队列长度 = 10 / 0.1 × 2 = 200。这个长度能在核心线程忙碌时,暂时缓存足够多的任务,避免任务被直接拒绝;

  • 最大线程数(maximumPoolSize

    • 最大线程数需结合核心线程数、任务队列长度系统每秒最大任务数设计,公式为:(最大任务数 - 任务队列长度) × 单个任务执行时间

    • 例:若系统每秒最大产生1000个任务,结合任务队列长度200,则最大线程数 = (1000 - 200) × 0.1 = 80。这部分线程用于处理“超出核心线程+队列容量”的突发任务,防止任务堆积;

  • 最大空闲时间(keepAliveTime

    • 该参数用于控制非核心线程的空闲回收时间,无固定公式,需根据系统运行环境、硬件压力任务时间间隔经验性设定(例如系统任务间隔较长时,可适当延长空闲时间,避免频繁创建/销毁线程)。

3 线程池原理分析

3.1 线程池执行任务的具体流程

  • 线程池提供两种提交任务的方法,核心区别是是否需要返回结果:

    • execute(Runnable command):无返回值,仅用于提交不需要结果的任务;

    • submit(Runnable task):最终会调用execute方法,但会返回Future对象,用于获取任务执行结果(或控制任务执行状态);

  • 当调用execute提交任务时,线程池会按照以下步骤处理:

    在这里插入图片描述

    • 判断核心线程数是否未满。若当前线程数 < corePoolSize,则创建新的核心线程,并将任务作为该线程的第一个任务执行;

      “当前线程数”指的是线程池中当前正在管理的、处于活跃状态的线程总数(包括核心线程和非核心线程);

      具体来说:

      • 当线程池刚创建时,“当前线程数”为 0(默认不预先创建核心线程,除非调用了prestartAllCoreThreads()等预热方法);
      • 随着任务提交,线程池会创建新线程执行任务,“当前线程数”逐渐增加;
      • 这个数值会动态变化:当任务增多时可能增加(最多到maximumPoolSize),当非核心线程空闲超时后会减少(最低到corePoolSize,核心线程默认不回收);

      举个例子:假设corePoolSize=5maximumPoolSize=10

      • 若当前有 3 个线程在运行,“当前线程数 = 3”,此时< corePoolSize,新任务会触发创建新的核心线程(直到达到 5 个);
      • 若当前有 7 个线程在运行,“当前线程数 = 7”,此时> corePoolSize< maximumPoolSize,只有当队列满了之后,新任务才会触发创建非核心线程;
    • 核心线程数已满,判断任务队列是否未满。若核心线程数已满,将任务加入任务队列(workQueue,等待核心线程空闲后执行;

    • 任务队列已满,判断最大线程数是否未满。若任务队列已满且当前线程数 < maximumPoolSize,则创建新的非核心线程,执行当前任务;

    • 最大线程数也已满。触发拒绝策略(如抛异常、丢弃任务等),拒绝执行当前任务;

  • 注意:

    • 核心线程的创建逻辑:只要线程数小于corePoolSize,即使有空闲核心线程,也会创建新核心线程(保证核心线程的“保底”执行能力);

    • 线程池的“非公平性”:当队列满后提交的任务,可能会比队列中等待的任务“先执行”(因为会创建非核心线程直接执行新任务)。

3.2 线程池的五种状态是如何流转的?

  • 线程池有以下五种状态,每种状态对应不同的行为逻辑:

    状态行为特点
    RUNNING正常运行状态,会接收新任务,并处理队列中已有的任务
    SHUTDOWN不会接收新任务,但会处理队列中已有的任务(优雅关闭)
    STOP不会接收新任务,不会处理队列中已有的任务,且会中断正在处理的任务(强制关闭)
    TIDYING所有任务都已终止,线程池中也没有活跃线程;此时会自动调用terminated()方法
    TERMINATEDterminated()方法执行完毕后,线程池进入最终的终止状态
  • 五种状态不能任意转换,只有以下固定流转路径:

    • RUNNING → SHUTDOWN:从正常运行切换到优雅关闭,停止接收新任务但处理存量任务;

      • 触发条件:手动调用shutdown()方法,或线程池对象被GC时自动调用finalize()间接触发shutdown()
    • (RUNNING 或 SHUTDOWN) → STOP:从正常运行运行/优雅关闭切换到强制关闭,停止接收新任务、中断存量任务;

      • 触发条件:调用shutdownNow()方法;若先调用shutdown()再调用shutdownNow(),也会从SHUTDOWN切换到STOP;
    • SHUTDOWN → TIDYING:优雅关闭完成,进入任务/线程清理状态;

      • 触发条件:任务队列为空线程池中没有活跃线程(所有存量任务处理完毕,线程全部空闲);
    • STOP → TIDYING:强制关闭完成,进入任务/线程清理状态;

      • 触发条件:线程池中没有活跃线程(不管队列是否还有任务,强制关闭后线程全部中断/空闲)。
    • TIDYING → TERMINATED:线程池彻底终止,生命周期结束;

      • 触发条件:terminated()方法执行完毕;

    在这里插入图片描述

  • 线程池的状态流转是其生命周期管理的核心逻辑:

    • RUNNING 是默认的工作状态,保证任务的正常提交和执行;

    • SHUTDOWNSTOP 是关闭过程的中间状态,分别对应优雅关闭和强制关闭的策略,满足不同场景下的资源释放需求;

    • TIDYINGTERMINATED 是关闭完成的最终状态,标志线程池资源彻底回收,避免内存泄漏。

3.3 线程池中的线程是如何关闭的?

  • Java中线程本身没有直接的停止方法,线程池是通过中断(Interruption)机制来间接控制线程的生命周期:

    • 线程池通过调用线程的 interrupt() 方法,给线程发送中断信号;

    • 线程内部可以通过 isInterrupted() 判断是否收到中断信号,进而决定是否停止执行(需业务代码配合处理中断逻辑);

  • 下面这段代码是线程池关闭时(如shutdownNow()方法)用于中断线程的核心逻辑:

    void interruptIfStarted() {Thread t;// 条件判断:线程状态有效(>0)、线程对象非空(存在)、且线程未被中断过if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt(); // 给线程发送中断信号(线程可在业务逻辑中通过isInterrupted()感知这个信号,进而决定是否停止执行)} catch (SecurityException ignore) {// 捕获安全异常(如无权限中断线程),直接忽略}}
    }
    

3.4 线程池为什么一定得是阻塞队列?

  • 核心原因:保证核心线程不自然消亡

    • 线程池需要保留指定数量的核心线程,以应对后续任务的快速执行。如果队列不是阻塞队列,当队列中没有任务时,线程会因无任务可执行而自然结束,无法维持核心线程数;

    • 而阻塞队列的take()poll(timeout)方法具备阻塞等待特性:当队列中没有任务时,线程会阻塞在获取任务的操作上,不会自然消亡,从而保证核心线程的数量稳定;

  • 下面这段代码是线程池线程获取任务的核心逻辑,体现了阻塞队列的作用:

    try {Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();if (r != null)return r;timedOut = true;
    } catch (InterruptedException retry) {timedOut = false;
    }
    
    • workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS):非核心线程调用此方法时,若队列无任务,线程会阻塞keepAliveTime时长;超时后仍无任务,线程会自然消亡(以此实现非核心线程的空闲回收);
    • workQueue.take():核心线程调用此方法时,若队列无任务,线程会一直阻塞等待,直到队列中有新任务才会被唤醒执行,以此保证核心线程不消亡。

3.5 线程发生异常,会被移出线程池吗?

  • 线程在执行任务时发生异常,会被移出线程池;但线程池会通过自动新增线程的机制,维持核心线程数的稳定;

  • 下面是源码片段,线程池处理任务异常的流程如下:

    在这里插入图片描述

    • 任务执行与异常捕获:线程执行任务时(task.run()),若发生异常会被catch (Throwable ex)捕获;

    • 触发线程退出逻辑:异常发生后,最终会执行processWorkerExit(w, completedAbruptly)方法。该方法会让当前线程自然消亡(移出线程池)

    • 新增线程维持核心数processWorkerExit方法内部会判断线程消亡是否由异常导致,若属于核心线程异常消亡,会自动新增一个线程,以此保证核心线程数的固定;

  • 假设所有核心线程在执行任务时都发生异常:

    • 每个异常线程都会触发processWorkerExit,导致自身被移出线程池;

    • processWorkerExit会针对每个消亡的核心线程,自动新增一个线程来补充

    • 最终核心线程数会维持在配置的corePoolSize,不会出现核心线程全部消失的情况。

4 线程池源码分析

4.1 线程池源码的基础属性和方法

  • 线程池通过一个AtomicInteger类型的变量ctl同时存储线程池状态和当前工作线程数量

    • 一个Integer占4个字节,即32bit,即32位;

    • 2个bit能表示4种状态,那5种状态就至少需要三个bit位。所以其中高3位用于表示线程池的5种状态(见3.2 线程池的五种状态是如何流转的?),剩余的低29位用于表示工作线程的数量;

    • 比如在线程池的源码中就是这么来表示的:

      private static final int COUNT_BITS = Integer.SIZE - 3; // 等于29,表示低29位用于存线程数private static final int RUNNING    = -1 << COUNT_BITS;
      private static final int SHUTDOWN   =  0 << COUNT_BITS;
      private static final int STOP       =  1 << COUNT_BITS;
      private static final int TIDYING    =  2 << COUNT_BITS;
      private static final int TERMINATED =  3 << COUNT_BITS;
      
    • 最终各个状态对应的二级制为:

      • RUNNING:11100000 00000000 00000000 00000000
      • SHUTDOWN:00000000 00000000 00000000 00000000
      • STOP:00100000 00000000 00000000 00000000
      • TIDYING:01000000 00000000 00000000 00000000
      • TERMINATED:01100000 00000000 00000000 00000000
    • 比如:假如ctl为:11100000 00000000 00000000 00001010,表示线程池的状态为 RUNNING,线程池中目前在工作的线程有10个;

      这里说的“在工作”意思是线程活着,要么在执行任务,要么在阻塞等待任务;

  • 源码中提供了一系列方法,用来获取线程池状态和工作线程数,即用于操作和判断ctl的状态与线程数:

    // COUNT_BITS的值为29,二进制表示为00000000 00000000 00000000 00011101
    private static final int COUNT_BITS = Integer.SIZE - 3;// 00011111 11111111 11111111 11111111
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 表示低29位的最大值(即线程数的上限)
    
    • 1 << COUNT_BITS:这是左移运算,表示将数字1的二进制形式向左移动 29 位;
      • 二进制中,1原本是 000...0001(32 位,仅最后一位为 1)
      • 左移 29 位后,变成 100...0000(第 30 位为 1,后 29 位全为 0)
      • 这个结果的十进制值是 2^29
    • (1 << COUNT_BITS) - 1:对左移后的结果减 1,会让二进制中 1 后面的所有 0 都变成 1
      • 左移 29 位后是 100...0000(共 32 位,第 30 位为 1,后 29 位为 0)
      • 减 1 后变成 011...1111(第 30 位变为 0,后 29 位全为 1)
      • 这个结果的十进制值是 2^29 - 1
    • 所以:CAPACITY 的值是 2^29 - 1(十进制约为 5.36 亿),它代表:线程池能管理的最大工作线程数是 5.36 亿(因为int的低 29 位最多能表示这么大的数);这个值是线程池的线程数上限,当工作线程数达到CAPACITY时,线程池无法再创建新线程(即使maximumPoolSize配置得更大,也会被CAPACITY限制);
    // 1、拆分状态与线程数
    // 1.1、通过位运算提取ctl的高 3 位(状态)
    private static int runStateOf(int c)     { return c & ~CAPACITY; 
    }
    // 1.2、通过位运算提取ctl的低 29 位(工作线程数)
    private static int workerCountOf(int c)  { return c & CAPACITY; 
    }
    // 1.3、将“状态rs”和 “线程数wc”合并为ctl(通过位或运算)
    // 注意:rs的低29位都得为0,wc的高3位都得为0,
    private static int ctlOf(int rs, int wc) { return rs | wc; 
    }// 2、状态判断方法
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;
    // 2.1、判断状态c是否小于状态s(如RUNNING < SHUTDOWN)
    private static boolean runStateLessThan(int c, int s) {return c < s;
    }
    // 2.2、判断状态c是否大于等于状态s(如STOP >= SHUTDOWN)
    private static boolean runStateAtLeast(int c, int s) {return c >= s;
    }
    // 2.3、判断是否为RUNNING状态(只有RUNNING < SHUTDOWN)
    private static boolean isRunning(int c) {return c < SHUTDOWN;
    }// 3、线程数增减方法
    // 3.1、通过 CAS 原子操作增加工作线程数(对ctl加 1)
    private boolean compareAndIncrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect + 1);
    }
    // 3.2、通过 CAS 原子操作减少工作线程数(对ctl减 1)
    private boolean compareAndDecrementWorkerCount(int expect) {return ctl.compareAndSet(expect, expect - 1);
    }
    

4.2 execute方法

  • 当执行线程池的execute方法(用于提交无返回值的任务)时:

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();// 获取ctl// ctl初始值是ctlOf(RUNNING, 0),表示线程池处于运行中,工作线程数为0int c = ctl.get();// 1、尝试创建核心线程执行任务// 先判断当前工作线程数是否小于核心线程数(corePoolSize)if (workerCountOf(c) < corePoolSize) {// 若满足,调用addWorker(command, true)创建核心线程,并将任务command作为该线程的第一个任务执行if (addWorker(command, true))return; // 若创建成功,直接返回(任务开始执行)// 若创建失败(如线程池状态已改变),重新获取ctl,进入下一阶段c = ctl.get();}// 2、尝试将任务加入阻塞队列// 若核心线程数已满,判断线程池是否处于RUNNING状态。若处于RUNNING,将任务加入阻塞队列(workQueue)if (isRunning(c) && workQueue.offer(command)) {// 加入队列后二次检查线程池状态:int recheck = ctl.get();// 若状态已不是RUNNING,则从队列中移除任务if (! isRunning(recheck) && remove(command))reject(command); // 执行拒绝策略// 若状态仍是RUNNING,但工作线程数为 0(可能核心线程全部异常消亡)else if (workerCountOf(recheck) == 0)addWorker(null, false); // 创建一个非核心线程,用于从队列中获取任务执行}// 3、尝试创建非核心线程执行任务,否则执行拒绝策略else if (!addWorker(command, false)) // 尝试创建非核心线程。若创建成功,任务由新的非核心线程执行reject(command); // 若创建失败(如已达maximumPoolSize),执行拒绝策略
    }
    
  • addWorker(task, boolean core)coretrue时创建核心线程,受corePoolSize限制;为false时创建非核心线程,受maximumPoolSize限制;

  • 拒绝策略(reject(command):当线程池无法接收任务时(队列满且非核心线程数达上限),触发的兜底逻辑(如抛异常、丢弃任务等)。

4.3 addWorker方法

  • addWorker是线程池添加工作线程的核心方法,通过参数coretrue表示核心线程,false表示非核心线程)来区分线程类型,最终实现创建线程并执行任务的逻辑;

  • 在看这个方法之前,不妨先自己来分析一下,什么是添加线程?添加线程实际上就要开启一个线程,不管是核心线程还是非核心线程,其实都只是一个普通的线程,而核心和非核心的区别在于:

    • 如果是要添加核心工作线程,那么就得判断目前的工作线程数是否超过corePoolSize

      • 如果没有超过,则直接开启新的工作线程执行任务
      • 如果超过了,则不会开启新的工作线程,而是把任务进行入队
    • 如果要添加的是非核心工作线程,那就要判断目前的工作线程数是否超过maximumPoolSize

      • 如果没有超过,则直接开启新的工作线程执行任务
      • 如果超过了,则拒绝执行任务
    • 所以在addWorker方法中,首先就要判断工作线程有没有超过限制,如果没有超过限制就再去开启一个线程;

  • addWorker方法中,还得判断线程池的状态,如果线程池的状态不是 RUNNING 状态了,那就没必要要去添加线程了。当然有一种特例,就是线程池的状态是 SHUTDOWN,即队列中有任务,那此时还是需要添加线程来完成剩余的任务。那这种特例是如何产生的呢?

    • 我们前面提到的都是开启新的工作线程,那么工作线程怎么回收呢?不可能开启的工作线程一直都活着,如果任务由多变少,那也就不需要过多的线程资源,所以线程池中会有机制对开启的工作线程进行回收(如何回收的,后文会提到)。我们这里先分析,有没有可能线程池中所有的线程都被回收了?答案的是有的;
    • 首先非核心工作线程被回收是可以理解的,那核心工作线程要不要回收掉呢?其实线程池存在的意义,就是提前生成好线程资源,需要线程的时候直接使用就可以,而不需要临时去开启线程,所以正常情况下,开启的核心工作线程是不用回收掉的,就算暂时没有任务要处理,也不用回收,就让核心工作线程在那等着任务的到来就可以了;
    • 但是!在线程池中有这么一个参数:allowCoreThreadTimeOut,表示是否允许核心工作线程超时,意思就是是否允许核心工作线程被回收,这个参数默认为 false,但是可以调用allowCoreThreadTimeOut(boolean value)来把这个参数改成 true。只要改了,那么核心工作线程也就会被回收了,那这样线程池中的所有工作线程都可能被回收掉,那如果所有工作线程都被回收掉之后,阻塞队列中来了一个任务,这样就形成了特例情况;
  • addWorker源码分析

    private boolean addWorker(Runnable firstTask, boolean core) {// 1、状态与线程数校验retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);// 线程池状态校验:非RUNNING时,仅当“SHUTDOWN且队列为空”才不创建线程;否则根据场景判断if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))return false;// 工作线程数校验:判断是否超过核心/最大线程数限制for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY || // 超过线程数上限(低29位最大值)wc >= (core ? corePoolSize : maximumPoolSize)) // 超过核心/最大线程数return false; if (compareAndIncrementWorkerCount(c)) // 通过 CAS 原子操作增加工作线程数break retry;c = ctl.get(); // 若CAS失败,重新获取ctlif (runStateOf(c) != rs)continue retry;}}// 2、创建Worker对象并启动线程boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// Worker对象:是线程池对工作线程的封装,包含firstTask(线程要执行的第一个任务)和Thread(实际执行任务的线程)w = new Worker(firstTask);// 拿出线程对象,还没有startfinal Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // 线程已启动则抛异常throw new IllegalThreadStateException();workers.add(w); // 将Worker加入线程池管理集合int s = workers.size();if (s > largestPoolSize)largestPoolSize = s; // 记录线程数峰值workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start(); // 启动线程workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w); // 启动失败则回滚(移除Worker、减少线程数)}return workerStarted;
    }
    
  • 所以,对于addWorker方法,核心逻辑就是:

    • 先判断工作线程数是否超过了限制
    • 修改ctl,使得工作线程数+1
    • 构造 Work 对象,并把它添加到workers集合中
    • 启动 Work 对象对应的工作线程

4.4 runWorker方法

  • runWorker是工作线程的核心执行逻辑,定义了线程如何获取任务、执行任务、处理异常、管理生命周期的完整流程。当工作线程启动后,会通过Workerrun方法调用runWorker,进入任务执行循环;

  • Worker的构造方法:

    Worker(Runnable firstTask) {setState(-1);this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);
    }
    
  • 在利用 ThreadFactory 创建线程时,会把this,也就是当前 Work 对象作为 Runnable 传给线程,所以工作线程运行时,就会执行 Worker 的run方法:

    public void run() {// run方法再调用runWorker方法,这个方法就是工作线程运行时的执行逻辑runWorker(this);
    }
    
    final void runWorker(Worker w) {// 获取当前工作线程对象(Worker内部封装了Thread)Thread wt = Thread.currentThread();// 获取Worker创建时指定的第一个任务(可能为null)Runnable task = w.firstTask;// 清空firstTask,因为即将开始执行它w.firstTask = null;// 释放Worker的锁,允许中断(Worker构造时state=-1,不允许中断,这里解锁后允许中断)w.unlock();// 标记线程是否"异常退出"(true表示因异常退出循环,false表示正常退出)boolean completedAbruptly = true;try {// 核心循环:不断获取并执行任务// 1. 首先检查是否有初始任务(task != null)// 2. 如果没有,则通过getTask()从任务队列获取任务// 3. getTask()可能返回null(当线程需要退出时)while (task != null || (task = getTask()) != null) {// 获取到任务后,先加锁(防止shutdown时中断正在执行任务的线程)w.lock();// 检查线程池状态,确保在STOP状态时线程被正确中断// 如果线程池已进入STOP状态,但线程未被中断,则中断它// 如果线程已被中断,但线程池不是STOP状态,则清除中断状态并重新检查if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// 执行前的钩子方法(空实现,子类可重写)beforeExecute(wt, task);Throwable thrown = null;try {// 实际执行任务(调用Runnable的run方法)task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 执行后的钩子方法(空实现,子类可重写)afterExecute(task, thrown);}} finally {// 清空task引用,帮助GCtask = null;// 增加该Worker完成的任务计数w.completedTasks++;// 释放Worker锁w.unlock();}}// 循环正常退出(非异常情况):getTask()返回null时执行到这里completedAbruptly = false;} finally {// 处理工作线程退出的逻辑(无论正常还是异常退出)// 1. 从workers集合中移除当前Worker// 2. 根据completedAbruptly决定是否创建新线程替代// 3. 尝试终止线程池(如果满足条件)processWorkerExit(w, completedAbruptly);}
    }
    
    • 任务获取循环:通过 getTask() 从队列获取任务,可能阻塞、超时或被中断
    • 中断处理:确保在STOP状态时正确中断线程,非STOP状态时清除意外中断
    • 任务执行:通过 task.run() 执行实际任务,包含前后钩子方法
    • 异常处理:任务执行异常会导致线程异常退出,由 processWorkerExit 处理
    • 资源清理:无论正常还是异常退出,最终都会执行 processWorkerExit 进行清理
    • 状态维护:通过 completedAbruptly 区分正常退出和异常退出,影响线程回收策略

4.5 processWorkerExit方法

  • processWorkerExit是线程池处理工作线程退出的核心方法,负责完成线程数更新、任务统计、线程移除、状态流转、线程补充等一系列收尾和补偿操作,确保线程池的稳定性;

  • 源码解析:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {// 如果completedAbruptly为true,表示线程因执行任务异常而退出// 这种情况下workerCount还没有被调整,需要手动减1// 如果completedAbruptly为false,表示正常退出(超时或被中断)// 这种情况下workerCount已经在getTask()中调整过了,不需要再次调整if (completedAbruptly)decrementWorkerCount();// 获取线程池的主锁,用于同步操作final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {// 将当前Worker完成的任务数累加到线程池的总完成任务计数中completedTaskCount += w.completedTasks;// 从workers集合中移除当前Worker(线程池不再管理这个Worker)workers.remove(w);} finally {mainLock.unlock();}// 尝试终止线程池:检查是否满足终止条件// 如果线程池状态为SHUTDOWN且工作队列为空,或状态为STOP且没有活动线程// 则可能将状态转换为TERMINATEDtryTerminate();// 获取当前线程池的控制状态int c = ctl.get();// 检查线程池状态是否为RUNNING或SHUTDOWN(小于STOP)if (runStateLessThan(c, STOP)) {// 如果是正常退出(非异常情况)if (!completedAbruptly) {// 计算需要保留的最小线程数// 如果允许核心线程超时,最小数为0,否则为corePoolSizeint min = allowCoreThreadTimeOut ? 0 : corePoolSize;// 特殊情况:如果最小数为0但任务队列不为空,至少保留1个线程处理剩余任务if (min == 0 && !workQueue.isEmpty())min = 1;// 如果当前工作线程数已经达到或超过最小要求数,不需要创建新线程if (workerCountOf(c) >= min)return; // 不需要替补线程}// 以下情况需要创建新线程替补:// 1. 线程异常退出(completedAbruptly为true)// 2. 正常退出但当前线程数不足最小要求数// 创建新的Worker(无初始任务,非核心线程)addWorker(null, false);}
    }
    
  • 总结一下,某个工作线程正常情况下会不停地循环从阻塞队列中获取任务来执行,正常情况下就是通过阻塞来保证线程永远活着,但是会有一些特殊情况:

    • 工作线程在执行过程中被中断退出(比如调用shutdownNow()),那么退出【任务获取循环】,进行善后处理:将ctl中的工作线程数减1,线程自然运行结束。这是主动终止线程的方式;

    • 线程在阻塞队列获取任务时超时(等待时间超过keepAliveTime),那么退出循环后检查线程池状态。如果工作线程数不足(少于corePoolSize):创建新线程替补,当前线程结束;如果工作线程数足够:当前线程直接结束。这是线程池动态调整线程数量的机制,虽然效率上有一定开销,但这是必要的;

    • 执行任务时抛出未捕获的异常。那么立即退出循环,只有在RUNNING状态时:创建新线程替补;非RUNNING状态时可能不创建替补。这是异常恢复机制,保证线程池的处理能力。

4.6 getTask方法

  • getTask是工作线程从阻塞队列获取任务的核心方法,它定义了线程如何判断是否需要获取任务、如何阻塞等待任务、何时因超时而回收的完整逻辑;

  • 源码解析:

    private Runnable getTask() {// 标记上一次poll操作是否超时boolean timedOut = false;// 无限循环,直到获取到任务或确定线程应该退出for (;;) {// 获取线程池的控制状态int c = ctl.get();// 解析出线程池的运行状态int rs = runStateOf(c);// 检查线程池状态,决定是否应该返回null让线程退出// 如果线程池状态 >= SHUTDOWN(非RUNNING状态)// 并且(状态 >= STOP 或者 工作队列为空)if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {// 减少工作线程计数decrementWorkerCount();// 返回null,让调用者线程退出return null;}// 获取当前工作线程数量int wc = workerCountOf(c);// 判断当前线程是否应该被回收(超时退出)// timed为true表示当前线程应该使用超时等待// timed为false表示当前线程应该无限期等待// 如果允许核心线程超时,所有线程都使用超时等待// 或者如果当前线程数超过核心线程数,超出的线程使用超时等待boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;// 检查是否应该让当前线程退出:// 1. 工作线程数超过最大线程数限制,或者(应该超时等待且上次等待已超时)// 2. 并且(工作线程数大于1 或者 工作队列为空)if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {// 使用CAS减少工作线程计数(避免并发问题)if (compareAndDecrementWorkerCount(c))return null; // 线程退出continue; // CAS失败,继续循环重试}try {// 根据timed标志选择不同的获取任务方式:// timed为true:使用poll()带超时等待// timed为false:使用take()无限期等待Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();// 如果成功获取到任务,返回给调用者if (r != null)return r;// 如果poll()超时返回null,标记为超时// 下次循环时会检查这个标记,决定是否退出timedOut = true;} catch (InterruptedException retry) {// 在等待任务时被中断// 重置超时标记,继续循环timedOut = false;// 继续循环,会在下一次循环中检查线程池状态// 如果线程池已关闭,会正常退出// 如果线程池仍在运行,会继续尝试获取任务}}
    }
    
  • 特别注意:只有通过调用线程池的shutdown方法或shutdownNow方法才能真正中断线程池中的线程

    • 因为在 Java 中,线程中断只是设置一个中断标志位,并不是强制杀死线程。被中断的线程仍然有完全的自主权决定如何响应中断;
    • 比如线程遇到了中断异常后,他可以忽略中断,继续运行;也可以响应中断,优雅退出。

4.7 shutdown方法

  • shutdown是线程池的优雅关闭方法:调用后线程池不再接收新任务 ,但会执行完阻塞队列中剩余的任务,最终平稳关闭;

  • 源码解析:

    public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN); // 将线程池状态转为SHUTDOWN,此时线程池不再接收新任务interruptIdleWorkers(); // 中断【阻塞等待任务】的线程(这些线程会因中断退出getTask循环,触发processWorkerExit)onShutdown(); // 子类扩展钩子} finally {mainLock.unlock();}tryTerminate(); // 尝试将状态转为TERMINATED(若所有任务执行完毕)
    }
    
    private void interruptIdleWorkers(boolean onlyOne) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers) {Thread t = w.thread;// 若线程未被中断,且能成功获取Worker的锁(表示线程处于“空闲/阻塞等待”状态)if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt(); // 中断线程} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne) break;}} finally {mainLock.unlock();}
    }
    
  • 不过还有一种情况:

    • 就是目前所有工作线程都在执行任务,但是阻塞队列中还有剩余任务,那逻辑应该就是这些工作线程执行完当前任务后要继续执行队列中的剩余任务;
    • 但是在上面的shutdown方法的逻辑中,发现这些工作线程在执行完当前任务后,就会释放锁,该线程可能会被中断掉,那么队列中剩余的任务怎么办?
    • 工作线程一旦被中断,就会进入processWorkerExit方法(见 4.5 processWorkerExit方法),在这个方法会判断线程池状态是否为 SHUTDOWN,然后会重新生成新的工作线程来执行队列中的剩余任务,那么这样就能保证队列中剩余的任务一定会被执行完。

4.8 shutdownNow方法

  • shutdownNow是线程池的强制关闭方法:调用后线程池立即中断所有工作线程 (无论是否在执行任务),并返回阻塞队列中剩余的任务,最终快速关闭线程池;

  • 源码解析:

    public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(STOP); // 将线程池状态转为STOPinterruptWorkers(); // 中断所有工作线程tasks = drainQueue(); // 取出阻塞队列中剩余的任务} finally {mainLock.unlock();}tryTerminate(); // 尝试将状态转为TERMINATEDreturn tasks;
    }
    
    private void interruptWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers)w.interruptIfStarted(); // 中断Worker对应的线程} finally {mainLock.unlock();}
    }void interruptIfStarted() {Thread t;// 若线程已启动(state ≥ 0)且未被中断,执行中断if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt(); // 中断线程} catch (SecurityException ignore) {}}
    }
    

4.9 mainLock

  • 在上述源码中,发现很多地方都会用到mainLock,它是线程池中的全局可重入锁(ReentrantLock,用于保证线程池核心组件(如workers集合、线程状态)操作的并发安全性
  • 线程池是多线程共享的资源,若不做并发控制,会出现以下问题:
    • 场景1:线程A调用execute提交任务,线程B调用shutdown关闭线程池。若无锁保护,可能出现“线程池已标记为SHUTDOWN,但线程A仍在提交新任务”的矛盾,或“shutdown时未中断所有工作线程”的不完整关闭;
    • 场景2:多个线程同时操作workers集合(如添加/移除Worker、统计线程数);若无锁保护,会出现集合操作的线程安全问题(如ConcurrentModificationException);
  • 在源码中,mainLock主要用于以下关键操作的并发控制:
    • 线程池状态变更:如shutdownshutdownNow时切换线程池状态(RUNNING→SHUTDOWN→STOP);
    • workers集合操作:如添加WorkeraddWorker)、移除WorkerprocessWorkerExit)、遍历WorkerinterruptIdleWorkers);
    • 任务队列操作:如shutdownNow时取出队列剩余任务(drainQueue)。
http://www.xdnf.cn/news/19550.html

相关文章:

  • 顶级科学家的AI使用指南:从工具到合作伙伴
  • 华清远见25072班I/O学习day3
  • Redis分层缓存
  • DELPHI 利用OpenSSL实现加解密,证书(X.509)等功能
  • 犀牛派A1上使用Faster Whisper完成音频转文字
  • 哈尔滨云前沿服务器托管与租用服务
  • 科普:为什么在开发板上运行 Qt 程序时需要在命令后加 -platform linuxfb
  • Linux文本处理工具完全指南:cut、sort、uniq、tr、sed与awk详解
  • odps链接表并预测出现程序阻塞导致任务未完成问题排查
  • 信创服务器总死机原因及解决办法
  • WPF曲线自定义控件 - CurveHelper
  • Java-Spring入门指南(二)利用IDEA手把手教你如何创建第一个Spring系统
  • ChatDOC工具测评:AI驱动PDF/Word文档处理,支持敏感内容隐私保护与表格提取分析
  • Memento:基于记忆无需微调即可让大语言模型智能体持续学习的框架
  • keycloak中对接oidc协议时设置prompt=login
  • lesson52:CSS进阶指南:雪碧图与边框技术的创新应用
  • 公司电脑监控软件应该怎么选择?五款超实用的公司电脑监控软件推荐
  • 高性能多线程 PHP 图像处理库 PHP-VIPS:颠覆你对图像处理的认知
  • 从零开始学习C#上位机开发学习进阶路线,窥探工业自动化和物联网应用
  • 硬件开发1-51单片机1
  • Windows 电脑发现老是自动访问外网的域名排障步骤
  • 渗透测试-FastJson漏洞原理与复现
  • 【51单片机】【protues仿真】基于51单片机脉搏体温检测仪系统
  • 2024 年 AI 技术全景图:大模型轻量化、多模态融合如何重塑产业边界?
  • 数据库索引失效的原因+示例
  • (线上问题排查)3.线上API接口响应慢?一套高效排查与定位问题的心法
  • OpenCV-Python Tutorial : A Candy from Official Main Page(五)
  • Roo Code自定义Mode(模式)
  • 基于单片机智能家居环境监测报警系统Proteus仿真(含全部资料)
  • Cesium 加载桥梁3DTiles数据时,出现部分区域发暗、部分正常的现象