线程池详解,生命周期,线程池种类,预热
为什么需要线程池
- 线程开销大:频繁创建/销毁线程会消耗系统资源(内存、CPU)。
- 资源管理:无限制创建线程可能导致内存溢出(OOM)或 CPU 过载。
- 任务调度优化:复用线程、控制并发数量、管理任务队列。
线程池参数
corePoolSize
默认保留的线程数,即使空闲也不会销毁(除非设置allowCoreThreadTimeOut)。
maximumPoolSize
线程池允许的最大线程数。当队列满时,会创建新线程直到达到此值。
keepAliveTime
非核心线程的空闲时间超过此值会被销毁。
workQueue
当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务
任务队列类型决定线程池行为:
直接提交队列:SynchronousQueue(无容量,每次插入需等待消费)。
有界队列:如ArrayBlockingQueue(固定大小)。
无界队列:如LinkedBlockingQueue(可能导致 OOM)。
RejectedExecutionHandler
拒绝策略(当队列和线程池全满时触发):
AbortPolicy(默认):抛出RejectedExecutionException。
CallerRunsPolicy:由提交任务的线程直接执行。
DiscardOldestPolicy:丢弃队列中最旧的任务。
DiscardPolicy:直接丢弃新任务。
Executors 工具类预定义的线程池
固定数量线程池 newFixedThreadPool
Executors.newFixedThreadPool(int nThreads);
特点:固定核心线程数 + 无界队列(LinkedBlockingQueue)。
问题:任务堆积可能导致 OOM。
缓存线程池 newCachedThreadPool
Executors.newCachedThreadPool();
特点:核心线程数为0,最大线程数为Integer.MAX_VALUE,空闲线程60秒回收。
问题:可能创建大量线程,导致 CPU/内存耗尽。
单例线程池 newSingleThreadExecutor
Executors.newSingleThreadExecutor();
特点:单线程 + 无界队列。
问题:任务堆积可能导致 OOM。
定时线程池 newScheduledThreadPool
Executors.newScheduledThreadPool(int corePoolSize);
特点:支持定时/周期性任务,使用DelayedWorkQueue。
阿里开发规范建议手动创建ThreadPoolExecutor,避免使用Executors默认实现(防止无界队列或过多线程导致资源耗尽)
线程池工作流程
- 提交任务:调用execute(Runnable command)。
2. 流程判断:
当前线程数 < corePoolSize → 创建新线程执行任务。
线程数 ≥ corePoolSize → 任务加入队列。
队列已满且线程数 < maximumPoolSize → 创建新线程。
队列和线程池均满 → 触发拒绝策略。
线程池状态和生命周期
生命周期
线程池状态
1.RUNNING(运行状态):在这个状态下,线程池可以接收新的任务提交,并且能够处理已添加到任务队列中的任务。这是线程池的初始状态,也是最活跃的状态。这是最正常的状态
2.SHUTDOWN(关闭状态)当调用了线程池的shutdown()
方法后,线程池进入此状态。此时,线程池不再接受任何新的任务提交,但它会继续执行已经存在于任务队列中的任务直到它们全部完成。
3.STOP(停止状态):如果调用了shutdownNow()
方法,线程池会进入STOP状态。此时,线程池不仅不会接受新的任务,而且会尽力去中断正在执行的任务,并且清空任务队列。
4.TIDYING(整理状态):当所有的任务(包括正在执行的和队列中的)都已完成,并且所有的worker线程(除了finalizer线程)都已经结束,线程池就会从SHUTDOWN或STOP状态转换到TIDYING状态。在这个状态中,会执行terminated()
钩子方法进行一些清理工作。
5.TERMINATED(终止状态):在terminated()
方法执行完毕后,线程池就进入了TERMINATED状态。这时,线程池已经完全终止,所有资源都被释放,线程池生命周期结束。
为什么 shutdownNow() 不能立即停止所有线程
原因:
Thread.interrupt() 只是发送中断信号,如果任务未响应中断(如未检查 Thread.interrupted()),线程可能继续执行。
解决:
编写可中断的任务(例如在循环中检查 Thread.interrupted())。
核心线程池回收
在 Java 线程池中,核心线程(Core Threads)默认不会因空闲而被回收,即使没有任务执行,它们也会一直存活。这是为了快速响应新任务,避免反复创建线程的开销。
通过 allowCoreThreadTimeOut(true) 方法,可以允许核心线程在空闲超时后被回收。配置需满足以下条件:
开启核心线程超时机制:显式调用 allowCoreThreadTimeOut(true)。
设置超时时间:通过 keepAliveTime 参数指定空闲时间阈值。
ThreadPoolExecutor executor = new ThreadPoolExecutor(4, // corePoolSize8, // maximumPoolSize60, // keepAliveTime(单位由下一个参数指定)TimeUnit.SECONDS,new LinkedBlockingQueue<>(100)
);// 开启核心线程超时回收
executor.allowCoreThreadTimeOut(true);
线程池参数配置
CPU 密集型任务
- 特点:
任务主要消耗 CPU 计算资源(例如加密解密、复杂算法)。 - 公式:
线程数=CPU核心数+1 - 原理:
避免过多线程导致频繁上下文切换。
多 1 个线程用于应对线程阻塞的意外情况。
IO 密集型任务
- 特点:
任务主要等待 IO 操作(例如网络请求、数据库读写、文件操作)。 - 公式:
3. 简化版(经验值):
线程数=2×CPU核心数
线程池监控
通过 ThreadPoolExecutor 提供的方法实时监控:
// 获取当前线程池中的线程总数(包括空闲线程)
int poolSize = executor.getPoolSize(); // 获取活跃线程数(正在执行任务的线程)
int activeCount = executor.getActiveCount(); // 获取队列中等待的任务数
int queueSize = executor.getQueue().size();
动态调整参数
修改核心线程数 setCorePoolSize
在运行期线程池使用方调用此方法设置corePoolSize之后,线程池会直接覆盖原来的corePoolSize值,并且基于当前值和原始值的比较结果采取不同的处理策略。
对于当前值小于当前工作线程数的情况,说明有多余的worker线程,此时会向当前idle的worker线程发起中断请求以实现回收,多余的worker在下次idel的时候也会被回收;
对于当前值大于原始值且当前队列中有待执行任务,则线程池会创建新的worker线程来执行队列任务,setCorePoolSize具体流程如下:
源码:
/*** Sets the core number of threads. This overrides any value set* in the constructor. If the new value is smaller than the* current value, excess existing threads will be terminated when* they next become idle. If larger, new threads will, if needed,* be started to execute any queued tasks.** @param corePoolSize the new core size* @throws IllegalArgumentException if {@code corePoolSize < 0}* or {@code corePoolSize} is greater than the {@linkplain* #getMaximumPoolSize() maximum pool size}* @see #getCorePoolSize*/public void setCorePoolSize(int corePoolSize) {if (corePoolSize < 0 || maximumPoolSize < corePoolSize)throw new IllegalArgumentException();int delta = corePoolSize - this.corePoolSize;this.corePoolSize = corePoolSize;if (workerCountOf(ctl.get()) > corePoolSize)interruptIdleWorkers();else if (delta > 0) {// We don't really know how many new threads are "needed".// As a heuristic, prestart enough new workers (up to new// core size) to handle the current number of tasks in// queue, but stop if queue becomes empty while doing so.int k = Math.min(delta, workQueue.size());while (k-- > 0 && addWorker(null, true)) {if (workQueue.isEmpty())break;}}}
修改最大线程数 setMaximumPoolSize
修改逻辑:
1.首先是参数合法性校验。
2.然后用传递进来的值,覆盖原来的值。
3.判断工作线程是否是大于最大线程数,如果大于,则对空闲线程发起中断请求。
源码:
/*** Sets the maximum allowed number of threads. This overrides any* value set in the constructor. If the new value is smaller than* the current value, excess existing threads will be* terminated when they next become idle.** @param maximumPoolSize the new maximum* @throws IllegalArgumentException if the new maximum is* less than or equal to zero, or* less than the {@linkplain #getCorePoolSize core pool size}* @see #getMaximumPoolSize*/public void setMaximumPoolSize(int maximumPoolSize) {if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)throw new IllegalArgumentException();this.maximumPoolSize = maximumPoolSize;if (workerCountOf(ctl.get()) > maximumPoolSize)interruptIdleWorkers();}
修改队列长度
默认队列不可变
Java 提供的 BlockingQueue 实现类(如 ArrayBlockingQueue、LinkedBlockingQueue)在初始化后容量固定,无法直接修改(以final 修饰)。
线程池依赖队列接口
ThreadPoolExecutor 仅依赖 BlockingQueue 接口,因此可通过自定义队列实现动态调整。
解决(基于 LinkedBlockingQueue 的简单实现,提供setCapacity方法):
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class ResizableCapacityLinkedBlockingQueue<E> extends LinkedBlockingQueue<E> {private final ReentrantLock lock = new ReentrantLock();private final Condition notFull = lock.newCondition();private volatile int capacity;public ResizableCapacityLinkedBlockingQueue(int initialCapacity) {super(initialCapacity);this.capacity = initialCapacity;}// 动态调整容量public void setCapacity(int newCapacity) {lock.lock();try {if (newCapacity <= 0) throw new IllegalArgumentException();this.capacity = newCapacity;if (size() < newCapacity) {notFull.signalAll(); // 唤醒等待的生产者}} finally {lock.unlock();}}@Overridepublic boolean offer(E e) {lock.lock();try {while (size() >= capacity) {if (!notFull.await(100, TimeUnit.MILLISECONDS)) {return false; // 超时后拒绝任务}}return super.offer(e);} catch (InterruptedException ex) {Thread.currentThread().interrupt();return false;} finally {lock.unlock();}}
}
线程池预热
默认行为:线程池按需创建线程(提交任务时才创建,直到达到 corePoolSize)。
预热目的:提前创建核心线程,确保任务到达时线程已就绪,避免首次请求因线程创建产生延迟。
预热方法
- 启动单个核心线程
ThreadPoolExecutor executor = new ThreadPoolExecutor(...);
executor.prestartCoreThread(); // 启动一个核心线程
- 启动所有核心线程
int startedThreads = executor.prestartAllCoreThreads(); // 启动全部核心线程
System.out.println("已启动的线程数:" + startedThreads);
动态调整与二次预热
若业务中动态调整了 corePoolSize,需重新预热
// 调整核心线程数
executor.setCorePoolSize(10);// 再次预热新增的核心线程
int newStartedThreads = executor.prestartAllCoreThreads();
System.out.println("新增预热线程数:" + (newStartedThreads - started));
springboot线程池种类
默认线程池(SimpleAsyncTaskExecutor)
1. 类型:
SimpleAsyncTaskExecutor
2. 特点:
非线程池:每次执行任务时创建新线程,不会复用线程。适用于轻量级异步任务,但频繁创建线程可能导致性能问题。
3. 触发场景:
使用 @Async 注解但未显式指定线程池时默认使用。
@Async // 默认使用 SimpleAsyncTaskExecutor
public void asyncTask() {// 异步逻辑
}
固定大小线程池(ThreadPoolTaskExecutor)
类型:ThreadPoolTaskExecutor
特点:
核心线程数固定,队列满时创建新线程(直到达到最大线程数)。
适用于 高并发且任务执行时间较短 的场景(如 HTTP 请求处理)。
@Configuration
public class AsyncConfig {@Bean("customThreadPool")public TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(10);executor.setMaxPoolSize(50);executor.setQueueCapacity(100);executor.setThreadNamePrefix("custom-pool-");executor.initialize();return executor;}
}// 使用指定线程池
@Async("customThreadPool")
public void asyncTask() {// 异步逻辑
}
定时任务线程池
ScheduledThreadPoolTaskExecutor
适用场景:需要 周期性执行 或 延迟执行 的简单任务(如每5秒执行一次)。
@Configuration
public class ExecutorConfig {@Beanpublic ScheduledThreadPoolTaskExecutor taskExecutor() {ScheduledThreadPoolTaskExecutor executor = new ScheduledThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setThreadNamePrefix("scheduled-executor-");return executor;}
}// 手动提交任务
@Autowired
private ScheduledThreadPoolTaskExecutor executor;public void startTask() {executor.scheduleAtFixedRate(() -> {// 每5秒执行一次}, 0, 5, TimeUnit.SECONDS);
}
常用方法:
(1) schedule(Runnable task, long delay, TimeUnit unit)
作用:提交单次延迟任务。
参数:
task:要执行的任务(Runnable)。
delay:延迟时间。
unit:时间单位(如 TimeUnit.SECONDS)。
返回值:ScheduledFuture<?>(可用于取消任务或检查状态)。
ScheduledFuture<?> future = executor.schedule(() -> System.out.println("Delayed task"), 5, TimeUnit.SECONDS
);
(2) schedule(Callable task, long delay, TimeUnit unit)
作用:提交单次延迟任务并返回结果。
参数:Callable 任务(可返回结果)。
示例:
ScheduledFuture<String> future = executor.schedule(() -> "Result after 5 seconds",5,TimeUnit.SECONDS
);
String result = future.get(); // 阻塞获取结果
(3) scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit)
作用:按 固定频率 执行任务(无论前一次任务是否完成)。
参数:
initialDelay:首次执行延迟时间。
period:后续执行间隔时间。
executor.scheduleAtFixedRate(() -> System.out.println("Fixed rate task"),0, // 立即开始10, // 每隔10秒执行一次TimeUnit.SECONDS
);
(4) scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit)
作用:按 固定延迟 执行任务(前一次任务结束后,延迟指定时间再执行)。
executor.scheduleWithFixedDelay(() -> System.out.println("Fixed delay task"),0, // 立即开始10, // 每次任务结束后延迟10秒TimeUnit.SECONDS
);
ThreadPoolTaskScheduler
适用场景:需要 Cron 表达式 或复杂调度规则的任务(如每天凌晨执行)。(在 Spring Boot 中,使用 @Scheduled 注解的任务默认通过 单线程的调度器 执行,可配置Bean(taskScheduler)改为使用配置的线程池)
@Configuration
@EnableScheduling
public class SchedulerConfig {@Beanpublic ThreadPoolTaskScheduler taskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();scheduler.setPoolSize(5); // 设置线程池大小scheduler.setThreadNamePrefix("scheduler-");return scheduler;}
}// 使用 Cron 表达式
@Scheduled(cron = "0 0 3 * * ?") // 每天凌晨3点执行
public void dailyTask() {// 任务逻辑
}
ThreadPoolTaskScheduler vs ScheduledThreadPoolTaskExecutor
Fork/Join 线程池(ForkJoinPool)
类型:ForkJoinPool
特点:
基于 分治算法,适合 CPU 密集型并行计算。
自动拆分任务并分配到工作线程。
使用场景:并行流(parallelStream)或手动提交任务。
ForkJoinPool pool = new ForkJoinPool(4); // 指定并行度
pool.submit(() -> {IntStream.range(0, 100).parallel().forEach(i -> {// 并行处理逻辑});
});
ForkJoinPool核心机制
工作窃取算法(Work-Stealing)
双端队列(Deque)
每个工作线程维护一个双端队列(任务队列),存放自己的任务。
头部(Head):线程从头部取出任务执行(LIFO 或 FIFO 取决于模式)。
尾部(Tail):其他线程从尾部窃取任务(FIFO 顺序)。
窃取流程
当某线程的任务队列为空时,随机选择另一个线程的队列尾部窃取任务,避免全局竞争。
任务拆分与合并
ForkJoinTask
抽象类,表示可拆分的任务,常用子类:
RecursiveAction:无返回值的任务(如排序)。
RecursiveTask:有返回值的任务(如累加)。
关键方法
fork():将子任务异步提交到当前线程的队列。
join():等待子任务完成并获取结果(可能触发窃取)。
ForkJoinPool使用场景:
- 并行流(Parallel Streams)
List<Integer> list = Arrays.asList(1, 2, 3, 4);
list.parallelStream().forEach(System.out::println); // 使用 ForkJoinPool.commonPool(),默认线程数为 CPU 核心数 - 1
- CompletableFuture 的默认执行器
CompletableFuture 在执行异步任务时,若未指定自定义 Executor 默认使用ForkJoinPool.commonPool() - 并行数组操作
说明:Java 8 的 Arrays 类提供的并行方法(如排序、填充)利用 ForkJoinPool 分解任务。
int[] array = {3, 1, 4, 2};
Arrays.parallelSort(array); // 并行排序
- 显式创建 ForkJoinPool
说明:通过 Executors.newWorkStealingPool() 或直接构造 ForkJoinPool 实例。
注意事项
公共池共享性:ForkJoinPool.commonPool() 是全局共享的,可能被多个功能(如并行流和 CompletableFuture)竞争使用。
资源控制:若需避免资源争用,可为关键任务创建独立的 ForkJoinPool。
默认并行度:公共池的并行度默认等于 CPU 核心数减一(可通过 JVM 参数调整)。
ForkJoinPool vs ThreadPoolTaskExecutor
设计目标与核心思想
特性 | ForkJoinPool | ThreadPoolTaskExecutor |
---|---|---|
设计目的 | 支持 分治算法(Divide-and-Conquer)和 并行计算,适合递归分解任务。 | 提供通用的 异步任务执行能力,适用于独立、短生命周期的任务。 |
核心思想 | 工作窃取(Work-Stealing):空闲线程从其他线程的队列中窃取任务。 | 共享任务队列:所有线程从同一个队列获取任务。 |
适用场景 | 处理可递归拆分的任务(如并行流、大数据处理)。 | 处理独立的、无依赖的异步任务(如 HTTP 请求、简单异步操作)。 |
任务处理机制
特性 | ForkJoinPool | ThreadPoolTaskExecutor |
---|---|---|
任务队列 | 每个线程维护自己的 双端队列(Deque),优先处理本地队列的任务。 | 所有线程共享一个 阻塞队列(如 LinkedBlockingQueue)。 |
任务调度 | 任务可递归拆分为子任务(ForkJoinTask),子任务提交到本地队列。 | 任务不可拆分,直接提交到共享队列。 |
线程利用率 | 通过 工作窃取 减少线程空闲时间,提高 CPU 利用率。 | 依赖队列容量和线程数配置,可能因队列竞争导致资源浪费。 |
性能对比