当前位置: 首页 > news >正文

线程池详解,生命周期,线程池种类,预热

为什么需要线程池

  1. ​​线程开销大​​:频繁创建/销毁线程会消耗系统资源(内存、CPU)。
  2. ​​资源管理​​:无限制创建线程可能导致内存溢出(OOM)或 CPU 过载。
  3. ​​任务调度优化​​:复用线程、控制并发数量、管理任务队列。

线程池参数

corePoolSize​​

默认保留的线程数,即使空闲也不会销毁(除非设置allowCoreThreadTimeOut)。

​​maximumPoolSize​​

线程池允许的最大线程数。当队列满时,会创建新线程直到达到此值。

​​keepAliveTime​​

非核心线程的空闲时间超过此值会被销毁。

​​workQueue​​

当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务
任务队列类型决定线程池行为:
​​直接提交队列​​:SynchronousQueue(无容量,每次插入需等待消费)。
​​有界队列​​:如ArrayBlockingQueue(固定大小)。
​​无界队列​​:如LinkedBlockingQueue(可能导致 OOM)。

RejectedExecutionHandler​​

拒绝策略(当队列和线程池全满时触发):
AbortPolicy(默认):抛出RejectedExecutionException。
CallerRunsPolicy:由提交任务的线程直接执行。
DiscardOldestPolicy:丢弃队列中最旧的任务。
DiscardPolicy:直接丢弃新任务。

Executors 工具类预定义的线程池

固定数量线程池 newFixedThreadPool​​

Executors.newFixedThreadPool(int nThreads);
特点:固定核心线程数 + 无界队列(LinkedBlockingQueue)。
问题:任务堆积可能导致 OOM。

​​缓存线程池 newCachedThreadPool​​

Executors.newCachedThreadPool();
特点:核心线程数为0,最大线程数为Integer.MAX_VALUE,空闲线程60秒回收。
问题:可能创建大量线程,导致 CPU/内存耗尽。

单例线程池 newSingleThreadExecutor​​

Executors.newSingleThreadExecutor();
特点:单线程 + 无界队列。
问题:任务堆积可能导致 OOM。

​​定时线程池 newScheduledThreadPool​​

Executors.newScheduledThreadPool(int corePoolSize);
特点:支持定时/周期性任务,使用DelayedWorkQueue。
阿里开发规范建议手动创建ThreadPoolExecutor,避免使用Executors默认实现(防止无界队列或过多线程导致资源耗尽)

线程池工作流程

  1. ​​提交任务​​:调用execute(Runnable command)。
    ​​2. 流程判断​​:
    当前线程数 < corePoolSize → 创建新线程执行任务。
    线程数 ≥ corePoolSize → 任务加入队列。
    队列已满且线程数 < maximumPoolSize → 创建新线程。
    队列和线程池均满 → 触发拒绝策略。
    在这里插入图片描述

线程池状态和生命周期

生命周期

在这里插入图片描述

线程池状态

1.RUNNING(运行状态):在这个状态下,线程池可以接收新的任务提交,并且能够处理已添加到任务队列中的任务。这是线程池的初始状态,也是最活跃的状态。这是最正常的状态

2.SHUTDOWN(关闭状态)当调用了线程池的shutdown()方法后,线程池进入此状态。此时,线程池不再接受任何新的任务提交,但它会继续执行已经存在于任务队列中的任务直到它们全部完成。

3.STOP(停止状态):如果调用了shutdownNow()方法,线程池会进入STOP状态。此时,线程池不仅不会接受新的任务,而且会尽力去中断正在执行的任务,并且清空任务队列。

4.TIDYING(整理状态):当所有的任务(包括正在执行的和队列中的)都已完成,并且所有的worker线程(除了finalizer线程)都已经结束,线程池就会从SHUTDOWN或STOP状态转换到TIDYING状态。在这个状态中,会执行terminated()钩子方法进行一些清理工作。

5.TERMINATED(终止状态):在terminated()方法执行完毕后,线程池就进入了TERMINATED状态。这时,线程池已经完全终止,所有资源都被释放,线程池生命周期结束。

为什么 shutdownNow() 不能立即停止所有线程

​​原因​​:
Thread.interrupt() 只是发送中断信号,如果任务未响应中断(如未检查 Thread.interrupted()),线程可能继续执行。
解决​​:
编写可中断的任务(例如在循环中检查 Thread.interrupted())。

核心线程池回收

在 Java 线程池中,​​核心线程(Core Threads)默认不会因空闲而被回收​​,即使没有任务执行,它们也会一直存活。这是为了快速响应新任务,避免反复创建线程的开销。

通过 allowCoreThreadTimeOut(true) 方法,可以允许核心线程在空闲超时后被回收。配置需满足以下条件:
​​开启核心线程超时机制​​:显式调用 allowCoreThreadTimeOut(true)。
​​设置超时时间​​:通过 keepAliveTime 参数指定空闲时间阈值。

ThreadPoolExecutor executor = new ThreadPoolExecutor(4,      // corePoolSize8,      // maximumPoolSize60,     // keepAliveTime(单位由下一个参数指定)TimeUnit.SECONDS,new LinkedBlockingQueue<>(100)
);// 开启核心线程超时回收
executor.allowCoreThreadTimeOut(true);

线程池参数配置

CPU 密集型任务​

  1. 特点​​:
    任务主要消耗 CPU 计算资源(例如加密解密、复杂算法)。
  2. 公式​​:
    线程数=CPU核心数+1
  3. 原理​​:
    避免过多线程导致频繁上下文切换。
    多 1 个线程用于应对线程阻塞的意外情况。

​​IO 密集型任务​

  1. 特点​​:
    任务主要等待 IO 操作(例如网络请求、数据库读写、文件操作)。
  2. ​​公式​​:
    在这里插入图片描述
    ​​3. 简化版​​(经验值):
    线程数=2×CPU核心数

线程池监控

通过 ThreadPoolExecutor 提供的方法实时监控:

// 获取当前线程池中的线程总数(包括空闲线程)
int poolSize = executor.getPoolSize(); // 获取活跃线程数(正在执行任务的线程)
int activeCount = executor.getActiveCount(); // 获取队列中等待的任务数
int queueSize = executor.getQueue().size();

动态调整参数

修改核心线程数 setCorePoolSize

在运行期线程池使用方调用此方法设置corePoolSize之后,线程池会直接覆盖原来的corePoolSize值,并且基于当前值和原始值的比较结果采取不同的处理策略。

对于当前值小于当前工作线程数的情况,说明有多余的worker线程,此时会向当前idle的worker线程发起中断请求以实现回收,多余的worker在下次idel的时候也会被回收;

对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的worker线程来执行队列任务,setCorePoolSize具体流程如下:
在这里插入图片描述
源码:

    /*** Sets the core number of threads.  This overrides any value set* in the constructor.  If the new value is smaller than the* current value, excess existing threads will be terminated when* they next become idle.  If larger, new threads will, if needed,* be started to execute any queued tasks.** @param corePoolSize the new core size* @throws IllegalArgumentException if {@code corePoolSize < 0}*         or {@code corePoolSize} is greater than the {@linkplain*         #getMaximumPoolSize() maximum pool size}* @see #getCorePoolSize*/public void setCorePoolSize(int corePoolSize) {if (corePoolSize < 0 || maximumPoolSize < corePoolSize)throw new IllegalArgumentException();int delta = corePoolSize - this.corePoolSize;this.corePoolSize = corePoolSize;if (workerCountOf(ctl.get()) > corePoolSize)interruptIdleWorkers();else if (delta > 0) {// We don't really know how many new threads are "needed".// As a heuristic, prestart enough new workers (up to new// core size) to handle the current number of tasks in// queue, but stop if queue becomes empty while doing so.int k = Math.min(delta, workQueue.size());while (k-- > 0 && addWorker(null, true)) {if (workQueue.isEmpty())break;}}}

修改最大线程数 setMaximumPoolSize

修改逻辑:
1.首先是参数合法性校验。
2.然后用传递进来的值,覆盖原来的值。
3.判断工作线程是否是大于最大线程数,如果大于,则对空闲线程发起中断请求。
源码:

    /*** Sets the maximum allowed number of threads. This overrides any* value set in the constructor. If the new value is smaller than* the current value, excess existing threads will be* terminated when they next become idle.** @param maximumPoolSize the new maximum* @throws IllegalArgumentException if the new maximum is*         less than or equal to zero, or*         less than the {@linkplain #getCorePoolSize core pool size}* @see #getMaximumPoolSize*/public void setMaximumPoolSize(int maximumPoolSize) {if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)throw new IllegalArgumentException();this.maximumPoolSize = maximumPoolSize;if (workerCountOf(ctl.get()) > maximumPoolSize)interruptIdleWorkers();}

修改队列长度

默认队列不可变​​
Java 提供的 BlockingQueue 实现类(如 ArrayBlockingQueue、LinkedBlockingQueue)在初始化后容量固定,无法直接修改(以final 修饰)。
​​线程池依赖队列接口​​
ThreadPoolExecutor 仅依赖 BlockingQueue 接口,因此可通过自定义队列实现动态调整。
解决(基于 LinkedBlockingQueue 的简单实现,提供setCapacity方法):

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class ResizableCapacityLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {private final ReentrantLock lock = new ReentrantLock();private final Condition notFull = lock.newCondition();private volatile int capacity;public ResizableCapacityLinkedBlockingQueue(int initialCapacity) {super(initialCapacity);this.capacity = initialCapacity;}// 动态调整容量public void setCapacity(int newCapacity) {lock.lock();try {if (newCapacity <= 0) throw new IllegalArgumentException();this.capacity = newCapacity;if (size() < newCapacity) {notFull.signalAll(); // 唤醒等待的生产者}} finally {lock.unlock();}}@Overridepublic boolean offer(E e) {lock.lock();try {while (size() >= capacity) {if (!notFull.await(100, TimeUnit.MILLISECONDS)) {return false; // 超时后拒绝任务}}return super.offer(e);} catch (InterruptedException ex) {Thread.currentThread().interrupt();return false;} finally {lock.unlock();}}
}

线程池预热

​​默认行为​​:线程池按需创建线程(提交任务时才创建,直到达到 corePoolSize)。
​​预热目的​​:提前创建核心线程,确保任务到达时线程已就绪,避免首次请求因线程创建产生延迟。

预热方法

  1. 启动单个核心线程​
ThreadPoolExecutor executor = new ThreadPoolExecutor(...);
executor.prestartCoreThread();  // 启动一个核心线程
  1. 启动所有核心线程​
int startedThreads = executor.prestartAllCoreThreads(); // 启动全部核心线程
System.out.println("已启动的线程数:" + startedThreads);

动态调整与二次预热

若业务中动态调整了 corePoolSize,需重新预热

// 调整核心线程数
executor.setCorePoolSize(10);// 再次预热新增的核心线程
int newStartedThreads = executor.prestartAllCoreThreads();
System.out.println("新增预热线程数:" + (newStartedThreads - started));

springboot线程池种类

默认线程池(SimpleAsyncTaskExecutor)​

​​1. 类型​​:
SimpleAsyncTaskExecutor
​​2. 特点​​:
​​非线程池​​:每次执行任务时创建新线程,不会复用线程。​​适用于轻量级异步任务​​,但频繁创建线程可能导致性能问题。
​​3. 触发场景​​:
使用 @Async 注解但未显式指定线程池时默认使用。

@Async  // 默认使用 SimpleAsyncTaskExecutor
public void asyncTask() {// 异步逻辑
}

固定大小线程池(ThreadPoolTaskExecutor)​

​类型​​:ThreadPoolTaskExecutor
​​特点​​:
​​核心线程数固定​​,队列满时创建新线程(直到达到最大线程数)。
适用于 ​​高并发且任务执行时间较短​​ 的场景(如 HTTP 请求处理)。

@Configuration
public class AsyncConfig {@Bean("customThreadPool")public TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(50);executor.setQueueCapacity(100);executor.setThreadNamePrefix("custom-pool-");executor.initialize();return executor;}
}// 使用指定线程池
@Async("customThreadPool")
public void asyncTask() {// 异步逻辑
}

定时任务线程池

ScheduledThreadPoolTaskExecutor

适用场景​​:需要 ​​周期性执行​​ 或 ​​延迟执行​​ 的简单任务(如每5秒执行一次)。

@Configuration
public class ExecutorConfig {@Beanpublic ScheduledThreadPoolTaskExecutor taskExecutor() {ScheduledThreadPoolTaskExecutor executor = new ScheduledThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setThreadNamePrefix("scheduled-executor-");return executor;}
}// 手动提交任务
@Autowired
private ScheduledThreadPoolTaskExecutor executor;public void startTask() {executor.scheduleAtFixedRate(() -> {// 每5秒执行一次}, 0, 5, TimeUnit.SECONDS);
}

常用方法:
(1) schedule(Runnable task, long delay, TimeUnit unit)​​
​​作用​​:提交单次延迟任务。
​​参数​​:
task:要执行的任务(Runnable)。
delay:延迟时间。
unit:时间单位(如 TimeUnit.SECONDS)。
​​返回值​​:ScheduledFuture<?>(可用于取消任务或检查状态)。

ScheduledFuture<?> future = executor.schedule(() -> System.out.println("Delayed task"), 5, TimeUnit.SECONDS
);

(2) schedule(Callable task, long delay, TimeUnit unit)​​
​​作用​​:提交单次延迟任务并返回结果。
​​参数​​:Callable 任务(可返回结果)。
​​示例​​:

ScheduledFuture<String> future = executor.schedule(() -> "Result after 5 seconds",5,TimeUnit.SECONDS
);
String result = future.get(); // 阻塞获取结果

(3) scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit)​​
​​作用​​:按 ​​固定频率​​ 执行任务(无论前一次任务是否完成)。
​​参数​​:
initialDelay:首次执行延迟时间。
period:后续执行间隔时间。

executor.scheduleAtFixedRate(() -> System.out.println("Fixed rate task"),0,    // 立即开始10,   // 每隔10秒执行一次TimeUnit.SECONDS
);

(4) scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit)​​
​​作用​​:按 ​​固定延迟​​ 执行任务(前一次任务结束后,延迟指定时间再执行)。

executor.scheduleWithFixedDelay(() -> System.out.println("Fixed delay task"),0,    // 立即开始10,   // 每次任务结束后延迟10秒TimeUnit.SECONDS
);

ThreadPoolTaskScheduler​

适用场景​​:需要 ​​Cron 表达式​​ 或复杂调度规则的任务(如每天凌晨执行)。(在 Spring Boot 中,使用 @Scheduled 注解的任务默认通过 ​​单线程的调度器​​ 执行,可配置Bean(taskScheduler)改为使用配置的线程池)

@Configuration
@EnableScheduling
public class SchedulerConfig {@Beanpublic ThreadPoolTaskScheduler taskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();scheduler.setPoolSize(5); // 设置线程池大小scheduler.setThreadNamePrefix("scheduler-");return scheduler;}
}// 使用 Cron 表达式
@Scheduled(cron = "0 0 3 * * ?") // 每天凌晨3点执行
public void dailyTask() {// 任务逻辑
}

ThreadPoolTaskScheduler vs ScheduledThreadPoolTaskExecutor

在这里插入图片描述

Fork/Join 线程池(ForkJoinPool)​

类型​​:ForkJoinPool
​​特点​​:
基于 ​​分治算法​​,适合 ​​CPU 密集型并行计算​​。
自动拆分任务并分配到工作线程。
​​使用场景​​:并行流(parallelStream)或手动提交任务。

ForkJoinPool pool = new ForkJoinPool(4); // 指定并行度
pool.submit(() -> {IntStream.range(0, 100).parallel().forEach(i -> {// 并行处理逻辑});
});

ForkJoinPool核心机制​

​工作窃取算法(Work-Stealing)​​

​​双端队列(Deque)​​
每个工作线程维护一个双端队列(任务队列),存放自己的任务。
​​头部(Head)​​:线程从头部取出任务执行(LIFO 或 FIFO 取决于模式)。
​​尾部(Tail)​​:其他线程从尾部窃取任务(FIFO 顺序)。
​​窃取流程​​
当某线程的任务队列为空时,随机选择另一个线程的队列尾部窃取任务,避免全局竞争。

​任务拆分与合并​​

​​ForkJoinTask​​
抽象类,表示可拆分的任务,常用子类:
​​RecursiveAction​​:无返回值的任务(如排序)。
​​RecursiveTask​​:有返回值的任务(如累加)。
​​关键方法​​
fork():将子任务异步提交到当前线程的队列。
join():等待子任务完成并获取结果(可能触发窃取)。

ForkJoinPool使用场景:

  1. 并行流(Parallel Streams)​
List<Integer> list = Arrays.asList(1, 2, 3, 4);
list.parallelStream().forEach(System.out::println); // 使用 ForkJoinPool.commonPool(),默认线程数为 ​​CPU 核心数 - 1​​
  1. CompletableFuture 的默认执行器​
    CompletableFuture 在执行异步任务时,若未指定自定义 Executor 默认使用ForkJoinPool.commonPool()
  2. ​​并行数组操作​​
    ​​说明​​:Java 8 的 Arrays 类提供的并行方法(如排序、填充)利用 ForkJoinPool 分解任务。
int[] array = {3, 1, 4, 2};
Arrays.parallelSort(array); // 并行排序
  1. 显式创建 ForkJoinPool​​
    ​​说明​​:通过 Executors.newWorkStealingPool() 或直接构造 ForkJoinPool 实例。
    注意事项
    公共池共享性​​:ForkJoinPool.commonPool() 是全局共享的,可能被多个功能(如并行流和 CompletableFuture)竞争使用。
    ​​资源控制​​:若需避免资源争用,可为关键任务创建独立的 ForkJoinPool。
    默认并行度​​:公共池的并行度默认等于 CPU 核心数减一(可通过 JVM 参数调整)。

ForkJoinPool vs ThreadPoolTaskExecutor

设计目标与核心思想​

特性ForkJoinPoolThreadPoolTaskExecutor
​​设计目的​​支持 ​​分治算法​​(Divide-and-Conquer)和 ​​并行计算​​,适合递归分解任务。提供通用的 ​​异步任务执行能力​​,适用于独立、短生命周期的任务。
核心思想​工作窃取(Work-Stealing)​​:空闲线程从其他线程的队列中窃取任务。共享任务队列​​:所有线程从同一个队列获取任务。
​​适用场景​​ 处理可递归拆分的任务(如并行流、大数据处理)。处理独立的、无依赖的异步任务(如 HTTP 请求、简单异步操作)。

任务处理机制​

特性​​ ​​ForkJoinPool​​ThreadPoolTaskExecutor​​
任务队列每个线程维护自己的 ​​双端队列(Deque)​​,优先处理本地队列的任务。所有线程共享一个 ​​阻塞队列(如 LinkedBlockingQueue)。
任务调度任务可递归拆分为子任务(ForkJoinTask),子任务提交到本地队列。任务不可拆分,直接提交到共享队列。
​​ 线程利用率​​通过 ​​工作窃取​​ 减少线程空闲时间,提高 CPU 利用率。依赖队列容量和线程数配置,可能因队列竞争导致资源浪费。

性能对比​
在这里插入图片描述

http://www.xdnf.cn/news/322021.html

相关文章:

  • day18 python聚类分析对数据集模型性能影响
  • Content-Type使用场景及示例
  • 阿里云2核2g安装nexus
  • KL散度(Kullback-Leibler Divergence):概率分布差异的量化利器
  • 同步 / 异步、阻塞 / 非阻塞
  • 基于STM32、HAL库的SCD41-D-R2 气体传感器驱动程序设计
  • 数据中心机电建设
  • 【论文阅读】Attentive Collaborative Filtering:
  • 【MongoDB篇】MongoDB的分片操作!
  • FAST-LIO笔记
  • 【北京迅为】iTOP-4412精英版使用手册-第十章 QtE5.7系统编译
  • [OpenManus]部署笔记
  • Mkdocs文档引用相对地址的一些问题
  • 使用OpenCV的VideoCapture播放视频文件示例
  • 偏导数和梯度
  • shell-sed
  • MCP 规范新版本特性全景解析与落地实践
  • 图片文件转base64存储在数据库
  • redis端口漏洞未授权访问漏洞
  • Rust 中 Arc 的深度分析:从原理到性能优化实践
  • 2020年NCA CCF-C,改进灰狼算法RSMGWO+大规模函数优化,深度解析+性能实测
  • 鸿蒙开发——4.ArkTS快速入门指南
  • 我的世界云端服务器具体是指什么?
  • Laravel 12 实现验证码功能
  • 代码随想录算法训练营第三十四天
  • WordPress个人博客搭建(三):WordPress网站优化
  • RabbitMq学习(第一天)
  • 5.7 react 路由
  • Go语言八股之并发详解
  • 管家婆实用贴-如何在Excel中清除空格