当前位置: 首页 > news >正文

SpringMVC异步处理Servlet

使用SpringMVC异步处理Servlet解决的问题

  1. 可以不阻塞有限的tomcat 线程(默认是200~250个,springboot3是200个),确保网络请求可以持续响应
  2. 特定业务使用自定义线程池,可以处理的业务量更大
  3. 对上层业务完全无感知(但如果中间链路有超时,则需要注意,比如 nginx 代理60秒超时)
  4. SpringMVC 的包装比较好,可以完全无感 Servlet 的底层实现

注意事项

  1. springboot3里面会默认开启异步,虽然官方文档说,如果使用了AbstractAnnotationConfigDispatcherServletInitializer则会开启异步支持,但我看代码并没有使用,实际也是开启的(可能还有我不懂的,比如使用了@EnableAsync注解,先保留意见)。SpringMVC 会默认将所有的 Servlet 和 Filter 都注册为支持异步,但是否需要开启异步是根据返回值的类型判断的。
  2. 官方文档说返回结果支持DeferredResult和Callable,实测其实还有其他的类型可以支持,比如CompletableFuture。
  3. 对于 Callable 的返回结果,处理流程如下:
    1. 对于这个类型的返回值,Spring 会采用CallableMethodReturnValueHandler进行处理异步结果,注意这里的结果类型都是Callable。
    2. CallableMethodReturnValueHandler会调用 org.springframework.web.context.request.async.WebAsyncManager#startCallableProcessing(org.springframework.web.context.request.async.WebAsyncTask<?>, java.lang.Object…)处理结果,Callable 是包装成WebAsyncTask执行的。
      1. 其中的拦截器之类的都忽略,但这有非常重要的一点this.taskExecutor.submit(() -> {});,这里是 SpringMVC 会将异步的任务提交到线程池处理,也就是调用Callable.call()来获取结果,也就意味着这个线程池是阻塞的。而 Servlet 线程会在此提交到线程池后释放(假定此处的 servlet 线程是http-nio-7001-exec-1)。
      2. 所谓释放线程,其实就是 servlet 执行结束了,然后 Filter 也会执行结束,相当于整个请求的线程执行全部结束,仅仅是,没有给 response 结果而已。
      3. 这一步的线程池taskExecutor,默认是每次new Thread 的,存在风险,需要替换为执行线程池,可以实现 MVC 配置,并指定线程池org.springframework.web.servlet.config.annotation.WebMvcConfigurer#configureAsyncSupport。
    3. 在异步执行完成后,SpringMVC 会再次触发DispatcherServlet中doDispatch(这也是个坑),在这里直接将结果返回,不再执行 Controller。这是因为 Request 中已经标记request.getDispatcherType()是ASYNC值。
      1. HandlerInterceptor拦截器的实现类需要注意,因为DispatcherServlet中doDispatch会被再次调用,所以preHandle方法在一开始会调用一次,异步执行完成后,发送结果给客户端的时候会重复调用一次,如果不希望执行,可以用DispatcherType.ASYNC.equals(request.getDispatcherType())判断,并跳过执行,注意返回结果是 true即可。
      2. 再次触发的doDispatch,通常是一个新的线程(如:http-nio-7001-exec-2),因为线程不同,所以在 ThreadLocal 中存储的全链路跟踪 TraceId 会丢失,需要采用其他方式,比如放到 Request的Attribute 里面。
  4. WebAsyncTask 和 Callable 一样,也存在线程池问题。
  5. 对于DeferredResult的返回结果,处理流程如下:
    1. DeferredResultMethodReturnValueHandler
    2. org.springframework.web.context.request.async.WebAsyncManager#startDeferredResultProcessing
      1. 这里没有线程池,因为DeferredResult 需要你在业务执行的地方setResult,Spring 会在setResult后触发后续链路。
  6. CompletableFuture、Mono、Flux 返回值类型:会被认为是 Reactive 类型的返回值,通过ResponseBodyEmitterReturnValueHandler处理,最终会包装为DeferredResultSubscriber,执行到org.springframework.web.context.request.async.WebAsyncManager#startDeferredResultProcessing中,和DeferredResult的处理方式一样。
    1. 注意,在handleReturnValue 中会有判断,如果异步的返回 Reactive 对象是同步完成态,比如 Mono 只有一个确定的结果,里面没有采用订阅的模式进行异步处理,或者 Future 是完结态,那么handleReturnValue 会直接同步返回结果,而不是异步处理。所以开发的时候要注意:
      1. 一定要自己启动异步线程,在Future 中做业务逻辑,比如直接用CompletableFuture.supplyAsync(()->{}, threadPool)
      2. Publisher.subscribe订阅的时候,启动异步线程threadPool.submit()->{ subscriber.onSubscribe(subscription); --> Subscription.request(x) --> subscriber.onNext(T); },在异步线程中onNext时输出结果,才能确保在 Servlet 层面是异步的,非阻塞的。

结论

只有 Callable 用了简单线程池,存在线程问题,如果使用需要指定线程池,但也只有Callable使用最简单,return 即可。

另外,推荐采用DeferredResult和CompletableFuture类型的返回值,需要自己在线程池中处理业务并赋值结果。

代码示例

package com.test;import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import reactor.core.publisher.Mono;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Slf4j
@RestController
@RequestMapping("/restapi/testAsync")
public class TestAsyncToolboxController {private static final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10,100,60,TimeUnit.SECONDS,new ArrayBlockingQueue<>(10000),new ThreadPoolExecutor.AbortPolicy());@RequestMapping("/test")public DeferredResult<Object> test(HttpServletRequest request, HttpServletResponse response) {log.info("TestAsyncToolboxController.test 1 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());DeferredResult<Object> deferredResult = new DeferredResult<>();threadPoolExecutor.submit(() -> {try {log.info("TestAsyncToolboxController.test 2 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());TimeUnit.SECONDS.sleep(5);log.info("TestAsyncToolboxController.test 3 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());} catch (InterruptedException e) {deferredResult.setResult("exception");throw new RuntimeException(e);}log.info("TestAsyncToolboxController.test 4 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());deferredResult.setResult("success");});return deferredResult;}@RequestMapping("/test3")public CompletableFuture<String> test3(HttpServletRequest request, HttpServletResponse response) {log.info("TestAsyncToolboxController.test3 1 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> {try {log.info("TestAsyncToolboxController.test3 2 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());TimeUnit.SECONDS.sleep(5);log.info("TestAsyncToolboxController.test3 3 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());} catch (InterruptedException e) {throw new RuntimeException(e);}log.info("TestAsyncToolboxController.test3 4 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());return "success";}, threadPoolExecutor);return stringCompletableFuture;}@RequestMapping("/test4")public Mono<String> test4(HttpServletRequest request, HttpServletResponse response) {log.info("TestAsyncToolboxController.test4 1 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());try {return Mono.from(new Publisher<String>() {@Overridepublic void subscribe(Subscriber<? super String> subscriber) {threadPoolExecutor.submit(() -> {try {log.info("TestAsyncToolboxController.test4 2 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());TimeUnit.SECONDS.sleep(5);log.info("TestAsyncToolboxController.test4 3 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());} catch (InterruptedException e) {throw new RuntimeException(e);}log.info("TestAsyncToolboxController.test4 4 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());Subscription subscription = new Subscription() {@Overridepublic void request(long n) {log.info("TestAsyncToolboxController.test4 7 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());subscriber.onNext("success");}@Overridepublic void cancel() {log.info("TestAsyncToolboxController.test4 8 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());}};log.info("TestAsyncToolboxController.test4 10 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());subscriber.onSubscribe(subscription);log.info("TestAsyncToolboxController.test4 9 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());});log.info("TestAsyncToolboxController.test4 6 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());}});} finally {log.info("TestAsyncToolboxController.test4 5 isAsyncStarted={}, isAsyncSupported={}", request.isAsyncStarted(), request.isAsyncSupported());}}
}

其他

Servlet 的异步性能和 Reactor 的性能是否存在较大差异?为什么 Servlet 依然健在,且没有明显的被 Reactor 模式替换的迹象?

参考

https://docs.spring.io/spring-framework/reference/web/webmvc/mvc-ann-async.html
https://blog.csdn.net/sinat_33543436/article/details/88971367

http://www.xdnf.cn/news/975367.html

相关文章:

  • Wyn 商业智能与 3D 大屏的深度融合应用
  • 在ARM 架构的 Mac 上 更新Navicat到17后连接Oracle时报错:未加载 Oracle 库。
  • 高频面试之6Hive
  • 机器学习算法——集成学习
  • 电路图识图基础知识-变频器控制电动机系统解析(二十四)
  • 渗透测试PortSwigger Labs:遭遇html编码和转义符的反射型XSS
  • uniapp 云打包 iOS 应用上传到 app store 商店的过程
  • ZZU-ARM汇编语言实验 34
  • 【Rust UDP编程】rust udp编程方法解析与应用实战
  • uni-app bitmap.load() 返回 code=-100
  • XSP30是一款2~3节串联锂电池/锂离子电池升降压充电管理IC
  • Snipaste:轻量级截图,高效编辑
  • Spring中@Value注解:原理、加载顺序与实战指南
  • Springboot项目的目录结构
  • 西门子 SINAMICS S200伺服,重塑汽车焊接工艺新标准
  • 技术革新,EtherCAT转CAN网关,新能源汽车电池产线再升级
  • 汽车租赁小程序开发指南
  • Spark提交流程
  • SQL 注入:iBatis与修复
  • Charles里怎么进行断点调试
  • TripGenie:畅游济南旅行规划助手:团队工作纪实(十四)
  • 附加模块--Qt SQL模块功能及架构解析
  • SpringCloud系列 - Nacos 配置中心(二)
  • Hadoop 2.7.7 单机伪分布式安装与配置教程(JDK 8)
  • 如何设计三高架构
  • 小米玄戒O1架构深度解析(二):多核任务调度策略详解
  • 【系统设计【1】】系统设计面试方法论:从0到百万用户的需求到架构的推演
  • RPG24.设置武器伤害(二):将效果应用于目标
  • defaultdict 在python中的作用
  • 传输层协议 TCP 介绍 -- TCP协议格式,确认应答机制,超时重传机制,连接管理机制,滑动窗口,流量控制,拥塞控制,延迟应答,捎带应答