当前位置: 首页 > news >正文

Springboot 异步场景 使用注解 @Async 及 自定义线程池分模块使用

目录

  • 前言
  • 一、Springboot项目如何开启异步?
  • 二、存在的问题
  • 三、自定义线程池
  • 四、自定义线程池使用
  • 五、阻塞队列和拒绝策略


前言

当开发中遇到不影响主流程任务时,使用异步去处理。
如有以下场景:
1、业务需要生成一个季度的数据进行员工排名(涉及到的数据很多),数据查询、组装、按规则排名耗时比较长,并且开发方案能接受延时查看具体排名信息数据。在数据变动时,需要调用重新排名的方法,故把排名方法设置为异步。


一、Springboot项目如何开启异步?

启动类 上添加或者 自定义线程池 上添加注解:@EnableAsync。
执行方法上加上注解 @Async。

启动类配置
在这里插入图片描述
Controller
在这里插入图片描述

Service
在这里插入图片描述
打印信息
在这里插入图片描述
到这里就可以正常使用异步了。

二、存在的问题

虽然在 Spring 框架中,@Async 注解可以用于异步执行方法。

但是Spring 会自动创建一个默认的线程池用于执行方法。这个默认线程池是由 SimpleAsyncTaskExecutor 管理的,它为每个任务创建一个新的线程。虽然这可以工作,但可能会遇到以下问题:

  1. 无限制的线程创建:SimpleAsyncTaskExecutor 会为每个任务创建一个新的线程,而没有最大线程数的限制。如果异步任务的数量非常多,这可能导致大量的线程被创建,消耗大量的系统资源,最终可能导致 OutOfMemoryError 或降低系统性能。
  2. 线程管理:由于每次调用都会创建新线程,没有线程复用,这可能会导致线程管理上的开销,尤其是在高并发场景下。
  3. 调试和监控困难:默认线程池创建的线程名称没有明确的命名规则,这可能会使得在日志中或监控工具中跟踪异步任务变得困难。
  4. 资源竞争:大量的线程可能会引起CPU和内存资源的激烈竞争,尤其是在JVM和操作系统层面上的上下文切换。
  5. 安全性问题:如果异步任务执行的时间过长,而默认线程池没有适当的管理策略,可能会因为线程过多而影响到系统的稳定性和安全性。

三、自定义线程池

鉴于以上问题,建议使用自定义线程池。

import cn.hutool.core.thread.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;/*** 全局异步任务配置类*/
@Slf4j
@EnableAsync
@Configuration
public class GlobalAsyncConfig implements AsyncConfigurer {// 从 application.yml 中注入配置参数@Value("${async.executor.core-pool-size:10}")private int corePoolSize;@Value("${async.executor.max-pool-size:50}")private int maxPoolSize;@Value("${async.executor.keep-alive-seconds:60}")private int keepAliveSeconds;@Value("${async.executor.queue-capacity:200}")private int queueCapacity;/*** 创建主异步线程池** @return Executor*/@Bean(name = "asyncExecutor")public Executor asyncExecutor() {return createThreadPool("async-pool-", "MainAsyncTask");}/*** 创建邮件发送专用线程池(可选扩展)** @return Executor*/@Bean(name = "emailExecutor")public Executor emailExecutor() {return createThreadPool("email-pool-", "EmailTask");}/*** 创建线程池通用方法** @param namePrefix 线程名称前缀* @param taskType   任务类型描述(用于日志区分)* @return ThreadPoolTaskExecutor*/private Executor createThreadPool(String namePrefix, String taskType) {// 使用 Spring 提供的 ThreadPoolTaskExecutor,相较于原生 ThreadPoolExecutor,// 更加适合与 Spring 的 @Async 注解配合使用,并且支持更丰富的配置选项。ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 设置核心线程数,线程池初始化时创建的线程数量executor.setCorePoolSize(corePoolSize);// 设置最大线程数,当任务队列满时,线程池最多可扩容到的线程数量executor.setMaxPoolSize(maxPoolSize);// 设置非核心线程空闲存活时间(单位为秒)executor.setKeepAliveSeconds(keepAliveSeconds);// 设置任务队列容量,用于缓存待执行的任务executor.setQueueCapacity(queueCapacity);// 设置线程工厂,用于创建具有指定命名前缀的线程,便于日志追踪和问题定位executor.setThreadFactory(createThreadFactory(namePrefix));// 设置拒绝策略:当线程池和任务队列都已满时,由调用线程(即提交任务的线程)自己执行该任务executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 允许核心线程在空闲时超时并被回收,有助于节省资源(适用于负载波动较大的场景)executor.setAllowCoreThreadTimeOut(true);// 设置线程名称前缀,方便在日志中识别不同线程池中的线程executor.setThreadNamePrefix("[" + taskType + "] ");// 必须显式调用 initialize() 来启动线程池executor.initialize();// 返回配置完成的线程池实例return executor;}/*** 创建线程工厂(统一命名格式)** @param prefix 线程名称前缀* @return ThreadFactory*/private ThreadFactory createThreadFactory(String prefix) {return new ThreadFactoryBuilder().setNamePrefix(prefix).build();}@Overridepublic Executor getAsyncExecutor() {return asyncExecutor();}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return new GlobalAsyncUncaughtExceptionHandler();}/*** 异步任务全局异常处理器,这里可以单独放一个类文件(这里鉴于篇幅写在一起)*/@Slf4jpublic static class GlobalAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {@Overridepublic void handleUncaughtException(Throwable ex, Method method, Object... params) {log.error("[Async Task Error] Method: {}, Params: {}", method.getName(), Arrays.deepToString(params), ex);}}
}

application.yml配置(已有默认值,看个人需求配置)

# 全局线程池相关配置
async:executor:core-pool-size: 10max-pool-size: 50keep-alive-seconds: 60queue-capacity: 200

四、自定义线程池使用

在业务开发中,建议每块业务区分线程池使用,模块互不影响,便于日志收集。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

五、阻塞队列和拒绝策略

Java 中常用的阻塞队列(BlockingQueue)

队列类型特点适用场景
ArrayBlockingQueue基于数组、有界、FIFO内存敏感、任务量可控的系统
LinkedBlockingQueue基于链表、可有界也可无界、FIFO通用型,吞吐量要求较高
PriorityBlockingQueue支持优先级排序、无界需要按优先级处理的任务(如报警、日志级别)
SynchronousQueue不存储元素,插入必须等待取出高并发、低延迟场景,任务直接由消费者线程执行

🚫 线程池拒绝策略(RejectedExecutionHandler)

策略类名行为说明使用建议
AbortPolicy抛出异常 RejectedExecutionException默认策略,适用于不能丢失任务的场景
CallerRunsPolicy由调用线程自己执行任务减缓提交速度,适合临时过载时
DiscardOldestPolicy丢弃队列中最老的任务,尝试重新提交当前任务可接受部分任务丢失,希望保留最新任务
DiscardPolicy默默丢弃任务,不做任何处理可容忍任务丢失,如非关键日志、监控等任务

在这里插入图片描述
默认使用的是LinkedBlockingQueue队列,建议初始化大小,防止内存溢出。

http://www.xdnf.cn/news/476047.html

相关文章:

  • 一分钟了解机器学习
  • 专业版降重指南:如何用Python批量替换同义词?自动化操作不香嘛?
  • STM32 ADC+DMA+TIM触发采样实战:避坑指南与源码解析
  • 宇宙中是否存在量子现象?
  • Jenkins的流水线执行shell脚本执行jar命令后项目未启动未输出日志问题处理
  • #跟着若城学鸿蒙# web篇-运动和方向传感器监测
  • 【愚公系列】《Manus极简入门》042-投资策略分析师:“投资智慧导航”
  • 武汉火影数字全息剧秀制作:科技与艺术的梦幻联动
  • RabbitMQ 消息模式实战:从简单队列到复杂路由(三)
  • 通信安全堡垒:profinet转ethernet ip主网关提升冶炼安全与连接
  • PCL PolygonMesh 与 TextureMesh 源码阅读与简单测试
  • 数据结构进阶:AVL树与红黑树
  • SRS流媒体服务器(5)源码分析之RTMP握手
  • Python中in和is关键字详解和使用
  • C语言实现简单的--队列
  • Redis解析
  • C#将1GB大图裁剪为8张图片
  • 100G QSFP28 BIDI光模块一览:100G单纤高速传输方案|易天光通信
  • 组件导航 (Navigation)+flutter项目搭建-混合开发+分栏
  • Android 中 权限分类及申请方式
  • HNU工训--计算机串口数据收发与测量
  • 安科瑞AcrelEMS3.0企业微电网智慧能源平台-安科瑞 蒋静
  • .NET Core liunx二进制文件安装
  • 22、能源监控与优化 - 数据中心模拟 - /能源管理组件/data-center-energy-monitoring
  • CSS面试题汇总
  • 中文分词与数据可视化02
  • 接触感知 钳位电路分析
  • [模型部署] 3. 性能优化
  • 我的 PDF 工具箱:CodeBuddy 打造 PDFMagician 的全过程记录
  • Java 并发编程归纳总结(可重入锁 | JMM | synchronized 实现原理)