并发编程——14 线程池参数动态化
1 线程池在业务中的实践
1.1 业务场景
-
在当今的互联网业界,为了最大程度利用 CPU 的多核性能,并行运算的能力是不可或缺的。通过线程池管理线程获取并发性是一个非常基础的操作,下面看两个典型的使用线程池获取并发性的场景。
-
场景一:快速响应用户请求
-
描述:当用户发起实时请求(如查看商品信息,需聚合价格、优惠、库存、图片等信息)时,服务追求短响应时间;
-
分析:从用户体验看,响应越快越好。而商品信息聚合这类功能复杂,存在调用级联等情况。此时用线程池,把各调用封装成任务并行执行,能缩短总体响应时间。为了最大程度提升响应速度,不设置队列缓冲并发任务,调高
corePoolSize
(核心线程数)和maxPoolSize
(最大线程数),尽可能创建多的线程快速执行任务。如下图所示,串行执行时获取价格、库存、图片是依次进行,耗时久;并行执行时这些操作同时开展,能更快返回结果;
-
-
场景2:快速处理批量任务
-
描述:像统计报表(计算全国各门店有某属性的商品以辅助营销策略)这类离线大量计算任务,需要快速执行;
-
分析:这类场景任务量大,虽也希望快,但不需要瞬时完成,更关注单位时间内处理更多任务(吞吐量优先)。所以要用多线程并行计算,但要设置队列缓冲并发任务,调整合适的
corePoolSize
。因为线程数过多会引发线程上下文切换频繁,降低处理速度和吞吐量。如下图所示,串行执行批量任务时,任务依次进行,总耗时是各任务耗时之和;并行执行时,任务先缓存到队列,然后线程并行处理队列里的任务,能更高效完成,提升吞吐量;
-
1.2 实际问题及方案思考
-
线程池使用面临的核心的问题在于:线程池的参数并不好配置。一方面其运行机制难理解,合理配置需要强依赖开发人员的经验与知识;另一方面,线程池执行情况和任务类型(如IO密集型、CPU密集型)关联性强,不同任务运行表现差异大,业界缺乏成熟普适的经验策略;
-
关于线程池配置不合理引发的故障,下面举一些例子:
-
Case1:XX页面展示接口大量调用降级
- 事故描述:XX页面展示接口出现大量调用降级情况,数量级达到几十到上百;
- 事故原因:该服务展示接口内部逻辑采用线程池进行并行计算,由于没有预先估算好调用的流量,使得最大线程数设置偏小。当任务量超过线程池承载能力时,大量抛出
RejectedExecutionException
异常,从而触发接口降级条件。如下图:请求进入展示接口后生成多个任务,线程池因容量不足,只能处理部分任务,其余任务被拒绝;
-
Case2:XX业务服务不可用S2级故障
- 事故描述:XX业务提供的服务执行时间过长,作为上游服务整体超时,进而导致大量下游服务调用失败;
- 事故原因:该服务处理请求的内部逻辑使用线程池做资源隔离,但队列设置过长,且最大线程数设置失效。当请求数量增加时,大量任务堆积在队列中,任务执行时间被大幅拉长,最终造成下游服务大量调用超时。如下图:请求生成任务后,任务都在队列中缓冲等待执行,而核心线程数设置过小,导致任务执行速率低;
-
-
业务中要使用线程池,而线程池使用不当又会导致故障,那么我们怎样才能更好地使用线程池呢?
2 线程池参数动态化
2.1 引出
-
在日常项目开发中,线程池用于处理并发场景以提升任务处理效率。但传统方式下,线程池参数难以精准预设,只能在服务运行时不断调整参数,且每次调整都需重启服务,这会对服务可用性造成影响,因此需要一种不重启服务就能动态调整参数的方案;
-
那如何实现在不重启服务的前提下,动态调整线程池参数呢?
-
旧流程:修改线程池参数 → 重新发布服务 → 查看服务是否运行正常 → 结束。流程繁琐,且重新发布意味着服务会中断或重启;
-
新流程:仅需修改线程池参数即可完成,无需重启服务,大幅简化了参数调整的成本,提升了服务的灵活性;
-
2.2 线程池可调整的参数
- 线程池构造参数有8个,但最核心的3个是:
corePoolSize
(核心线程数)、maximumPoolSize
(最大线程数)、workQueue
(任务队列)。这3个参数直接决定了线程池的任务分配和线程分配策略,结合不同业务场景,又衍生出两种典型队列选择逻辑:-
场景1:并行执行子任务,提高响应速度。这类场景追求即时响应(如前文快速响应用户请求场景),应使用同步队列。同步队列不缓存任务,任务到达后会立即尝试执行,避免任务在队列中等待的时间开销,保障响应速度;
-
场景2:并行执行大批量任务,提升吞吐量。这类场景追求单位时间处理更多任务(如前文快速处理批量任务场景),应使用有界队列。有界队列会缓存大批量任务,通过队列缓冲来削峰填谷,同时要声明队列容量,防止任务无限制堆积导致内存溢出等问题。
-
2.3 实现思路
-
JDK提供的
ThreadPoolExecutor
类,通过多个public
的setter
方法(如setCorePoolSize
、setMaximumPoolSize
、setKeepAliveTime
等),支持在运行时动态修改线程池核心参数: -
以
setCorePoolSize
为例,修改后线程池会根据新核心线程数、当前工作线程数与原始值的对比,采取不同策略:-
若新核心线程数 < 当前工作线程数:说明有多余的
worker
线程,会尝试将它们中断,回收多余线程; -
若新核心线程数 > 原始值,且任务队列中有等待任务:会创建新的
Worker
线程,加快任务执行;
-
-
示例代码:
public class DynamicThreadPool {// 初始化ThreadPoolExecutorprivate ThreadPoolExecutor executor;// 指定核心线程数、最大线程数、存活时间、任务队列等基础参数public DynamicThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {this.executor = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);}// 调用ThreadPoolExecutor的setCorePoolSize和setMaximumPoolSize,动态修改核心线程数和最大线程数,无需重启服务public void adjustThreadPool(int newCorePoolSize, int newMaximumPoolSize) {this.executor.setCorePoolSize(newCorePoolSize);this.executor.setMaximumPoolSize(newMaximumPoolSize);}// 向线程池提交任务(调用execute方法)public void submitTask(Runnable task) {this.executor.execute(task);}// 打印线程池当前状态(核心线程数、最大线程数、活跃线程数),用于观察参数调整效果public void print(){System.out.println("核心线程数:" + executor.getCorePoolSize()+ " " +"最大线程数:" + executor.getMaximumPoolSize()+" " + "活跃线程数:" + executor.getActiveCount());}// 关闭线程池public void shutdown() {this.executor.shutdown();}// 测试public static void main(String[] args) throws InterruptedException {// 初始化任务队列和DynamicThreadPool,核心 / 最大线程数均为 10BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(20);DynamicThreadPool dynamicThreadPool = new DynamicThreadPool(10, 10, 10, TimeUnit.SECONDS, workQueue);// 提交 30 个任务(每个任务休眠 10 秒,模拟耗时操作)for (int i = 0; i < 30; i++) {dynamicThreadPool.submitTask(() -> {log.info(Thread.currentThread().getName() + "开始执行任务");try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}});}// 休眠 1 秒后,打印“修改前”的线程池状态Thread.sleep(1000);System.out.println("======修改前=======");dynamicThreadPool.print();// 动态调整线程池参数。调用adjustThreadPool,将核心线程数改为 5、最大线程数改为 8dynamicThreadPool.adjustThreadPool(5, 8);System.out.println("======修改后=======");dynamicThreadPool.print();// 打印 “修改后” 的线程池状态,再休眠 10 秒后再次打印,观察参数调整对线程池运行的影响Thread.sleep(10000);dynamicThreadPool.print();// 关闭线程池dynamicThreadPool.shutdown();} }
2.4 实现方案
2.4.1 基于 Nacos 配置中心动态调整线程池参数
-
借助Nacos的
Listener
机制,在Spring Bean初始化时,开启对Nacos中线程池配置文件(如threadPool.yml
)的监听。当Nacos中的配置发生变更时,通过监听逻辑实时更新线程池的核心参数(无需重启服务);nacosConfigManager.getConfigService().addListener("threadPool.yml", nacosConfigProperties.getGroup(),new Listener() {@Overridepublic Executor getExecutor() {return null;}@Overridepublic void receiveConfigInfo(String configInfo) {}});
-
nacosConfigManager.getConfigService()
:获取Nacos的配置服务实例,用于与Nacos服务器交互(获取配置、监听配置等); -
.addListener(...)
:为指定配置添加监听器,当配置发生变更时触发回调逻辑;- 第一个参数
"threadPool.yml"
:要监听的配置文件名(Nacos中存储的配置标识); - 第二个参数
nacosConfigProperties.getGroup()
:配置所属的分组(Nacos中配置的组织方式,默认分组为DEFAULT_GROUP
); - 第三个参数
new Listener(){...}
:匿名内部类实现的监听器,包含配置变更时的处理逻辑;
- 第一个参数
-
Listener
接口有两个核心方法,用于定义配置变更的处理规则:-
getExecutor()
方法:- 返回值:
Executor
线程池实例(可为null
); - 作用:指定执行
receiveConfigInfo
回调方法的线程池;- 若返回
null
:默认使用Nacos客户端内部的线程池执行回调; - 若返回自定义线程池:则在该线程池中执行回调,避免回调逻辑阻塞Nacos内部线程;
- 若返回
- 返回值:
-
receiveConfigInfo(String configInfo)
方法:- 参数
configInfo
:变更后的配置内容(字符串形式,如threadPool.yml
的最新文本内容); - 作用:配置变更时的核心处理逻辑。在实际业务中,这里会编写解析
configInfo
的代码(如将字符串转为YAML/JSON对象),然后提取新的线程池参数(如corePoolSize
、maxPoolSize
),最后调用线程池的setter
方法完成动态更新;
- 参数
-
-
-
示例代码:
@Configuration @Data public class MyDynamicThreadPool implements InitializingBean {// 通过@Value注解,从配置中注入线程池的初始参数@Value("${threadPool.corePoolSize}")private int corePoolSize; // 核心线程数@Value("${threadPool.maxPoolSize}")private int maxPoolSize; // 最大线程数@Value("${threadPool.queueCapacity}")private int queueCapacity; // 队列容量@Value("${threadPool.keepAliveSeconds}")private int keepAliveSeconds; // 线程存活时间private static ThreadPoolTaskExecutor threadPoolTaskExecutor;@Autowiredprivate NacosConfigManager nacosConfigManager;@Autowiredprivate NacosConfigProperties nacosConfigProperties;// 实现InitializingBean接口,在 Bean 初始化时:@Overridepublic void afterPropertiesSet() throws Exception {// 创建ThreadPoolTaskExecutor实例threadPoolTaskExecutor = new ThreadPoolTaskExecutor();// 配置初始线程池参数threadPoolTaskExecutor.setCorePoolSize(corePoolSize);threadPoolTaskExecutor.setMaxPoolSize(maxPoolSize);threadPoolTaskExecutor.setQueueCapacity(queueCapacity);threadPoolTaskExecutor.setKeepAliveSeconds(keepAliveSeconds);// 设置线程名前缀threadPoolTaskExecutor.setThreadNamePrefix( "SHISAN--");// 设置拒绝策略threadPoolTaskExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("队列已满,丢弃任务");}});threadPoolTaskExecutor.initialize();// 监听 Nacos 中threadPool.yml配置的变更nacosConfigManager.getConfigService().addListener("threadPool.yml", nacosConfigProperties.getGroup(),new Listener() {@Overridepublic Executor getExecutor() {return null;}// 当配置变更时,receiveConfigInfo方法会被触发@Overridepublic void receiveConfigInfo(String configInfo) {System.out.println("动态修改前-->");print();// 解析新的配置内容(通过 YAML 解析工具将配置字符串转为Map)Yaml yaml = new Yaml();InputStream inputStream = new ByteArrayInputStream(configInfo.getBytes());Map<String, Object> dataMap = yaml.load(inputStream);JSONObject pool = new JSONObject(dataMap).getJSONObject("threadPool");// 调用ThreadPoolTaskExecutor的setter方法,动态更新线程池参数threadPoolTaskExecutor.setCorePoolSize(pool.getInteger("corePoolSize"));threadPoolTaskExecutor.setMaxPoolSize(pool.getInteger("maxPoolSize"));threadPoolTaskExecutor.setQueueCapacity(pool.getInteger("keepAliveSeconds"));threadPoolTaskExecutor.setQueueCapacity(pool.getInteger("queueCapacity"));System.out.println("动态修改后-->");print();}});}// 提交任务到线程池执行public void execute(Runnable runnable){threadPoolTaskExecutor.execute(runnable);}// 打印线程池当前状态(核心线程数、最大线程数、阻塞队列已用 / 总容量、活跃线程数),用于观察参数调整效果public void print(){System.out.println("核心线程数:" + threadPoolTaskExecutor.getThreadPoolExecutor().getCorePoolSize()+ " " +"最大线程数:" + threadPoolTaskExecutor.getThreadPoolExecutor().getMaximumPoolSize()+" " + "阻塞队列数:" + threadPoolTaskExecutor.getThreadPoolExecutor().getQueue().size()+ "/" + queueCapacity+" " + "活跃线程数:" + threadPoolTaskExecutor.getThreadPoolExecutor().getActiveCount());} }
2.4.2 使用DynamicTp——基于配置中心的轻量级动态可监控线程池
-
DynamicTp 是基于配置中心实现的轻量级动态线程池管理工具,核心功能涵盖:
-
动态调参:支持不重启服务,动态调整线程池参数(如核心线程数、最大线程数等);
-
通知报警:当线程池运行状态异常(如队列积压、线程活跃度过高等)时,通过企微、钉钉、 Lark、邮件等渠道发送告警;
-
运行监控:对线程池的运行状态(如活跃线程数、队列长度等)进行监控,结合监控系统(Prometheus、InfluxDB + Grafana 等)可视化展示;
-
三方包线程池管理:能管理第三方中间件(如 Tomcat、Dubbo、RocketMQ 等)的线程池;
-
-
官网:首页 | dynamictp;
-
架构分为三大核心模块,相互协作实现线程池的动态化、可监控管理:
-
配置中心:支持多种主流配置中心(Nacos、Apollo、Zookeeper、Consul 等),用于存储线程池的配置(如创建/删除线程池配置、修改参数、告警相关配置等),是“动态调参”的配置来源;
-
SpringBoot 服务:承载业务自定义线程池(如 DtpExecutor、OrderedDtpExecutor 等)和第三方中间件线程池(如 Tomcat、Dubbo 等的线程池),并基于配置中心的配置,实现任务增强(对任务执行的扩展)、运行监控(采集线程池运行指标)、通知告警(异常时触发告警)、动态调参(应用配置中心的参数变更);
-
监控模块:通过指标采集(如 JsonLog、MicroMeter、Endpoint 等方式),将线程池运行指标对接监控系统(Prometheus、InfluxDB、Grafana 等),实现线程池状态的可视化监控;
-
-
核心配置:
spring:dynamic:tp:enabled: true # 是否启用 dynamictp,默认trueexecutors: # 动态线程池配置,都有默认值,采用默认值的可以不配置该项,减少配置量- threadPoolName: dtpExecutor1 # 线程池名称,必填threadPoolAliasName: 测试线程池 # 线程池别名,可选executorType: common # 线程池类型 common、eager、ordered、scheduled、priority,默认 commoncorePoolSize: 5 # 核心线程数,默认1maximumPoolSize: 8 # 最大线程数,默认cpu核数queueCapacity: 2000 # 队列容量,默认1024queueType: VariableLinkedBlockingQueue # 任务队列,查看源码QueueTypeEnum枚举类,默认VariableLinkedBlockingQueuerejectedHandlerType: CallerRunsPolicy # 拒绝策略,查看RejectedTypeEnum枚举类,默认AbortPolicykeepAliveTime: 10 # 空闲线程等待超时时间,默认60threadNamePrefix: Fox # 线程名前缀,默认dtpallowCoreThreadTimeOut: false # 是否允许核心线程池超时,默认falsewaitForTasksToCompleteOnShutdown: true # 参考spring线程池设计,优雅关闭线程池,默认trueawaitTerminationSeconds: 5 # 优雅关闭线程池时,阻塞等待线程池中任务执行时间,默认3,单位(s)preStartAllCoreThreads: false # 是否预热所有核心线程,默认false