当前位置: 首页 > backend >正文

CompleteableFuture的异步任务编排

为什么会有CompleteableFuture

Java 的 1.5 版本引入了 Future,可以把它简单的理解为运算结果的占位符, 它提供了两个方法来获取运算结果。

  • get():调用该方法线程将会无限期等待运算结果。
  • get(longmeout, TimeUnit unit):调用该方法线程将仅在指定时间 timeout 内等待结果,如果等待超时就会抛出 TimeoutException 异常。

Future 可以使用 Runnable 或 Callable 实例来完成提交的任务,它存在如 下几个问题:

  • 阻塞 调用 get() 方法会一直阻塞,直到等待直到计算完成,它没有提供任 何方法可以在完成时通知, 同时也不具有附加回调函数的功能。
  • 链式调用和结果聚合处理 在很多时候我们想链接多个 Future 来完成耗时 较长的计算, 此时需要合并结果并将结果发送到另一个任务中, 该接口很难完成 这种处理。
  • 异常处理 Future 没有提供任何异常处理的方式。

JDK1.8 才新加入的一个实现类 CompletableFuture ,很好的解决了这些问题, CompletableFuture 实现了 Future , CompletionStage两个接口。 实现了Future 接口,意味着可以像以前一样通过阻塞或者轮询的方式获得结果。

CompleteableFuture使用示例

假设现在有两个服务,一个是查询职员信息,一个是查询统计信息。现在用CompleteableFuture进行任务编排。

@RestController
@RequestMapping("/test")
@Slf4j
public class FutureTestController {@Autowiredprivate EmployeeService employeeService;@Autowiredprivate StatisticsService statisticsService;@RequestMapping(value = "/demo1")public void demo1(@RequestParam("id") Long id) throws Exception {StopWatch stopWatch = new StopWatch();stopWatch.start();//调用职员服务获取基本信息CompletableFuture<Employee> employeeFuture = CompletableFuture.supplyAsync(()-> employeeService.getEmployeeInfo(id));//模拟耗时操作Thread.sleep(300);//调用统计服务获取统计信息CompletableFuture<Statistics> statisticsFuture = CompletableFuture.supplyAsync(() -> statisticsService.getStatisticsInfo(id));//获取职员信息Employee employee = employeeFuture.get(2, TimeUnit.SECONDS);//获取统计信息Statistics statistics = statisticsFuture.get();stopWatch.stop();log.info("职员信息为:{},统计信息为:{}",employee.toString(),statistics.toString());log.info("耗时为:{}毫秒",stopWatch.getTotalTimeMillis());}
}

以上代码展示了CompleteableFuture的使用方法,有几个地方需要注意:

1. 默认线程池

CompletableFuture默认使用ForkJoinPool.commonPool() 作为线程池。在这里插入图片描述
ForkJoinPool工作原理
ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。如果工作线程对应的任务队列空了,是不是就没活儿干了呢?不是的,ForkJoinPool 支持一种叫做“任务窃取”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务。如此一来,所有的工作线程都不会闲下来了。

在ForkJoinPool中使用阻塞型任务时需要注意以下几点:

  • 防止线程饥饿:当一个线程在执行一个阻塞型任务时,它将会一直等待任务完成,这时如果没有其他线程可以窃取任务,那么该线程将一直被阻塞,直到任务完成为止。为了避免这种情况,应该避免在ForkJoinPool中提交大量的阻塞型任务。
  • 使用特定的线程池:为了最大程度地利用ForkJoinPool的性能,可以使用专门的线程池来处理阻塞型任务,这些线程不会被ForkJoinPool的窃取机制所影响。例如,可以使用ThreadPoolExecutor来创建一个线程池,然后将这个线程池作为ForkJoinPool的执行器,这样就可以使用ThreadPoolExecutor来处理阻塞型任务,而使用ForkJoinPool来处理非阻塞型任务。
  • 不要阻塞工作线程:如果在ForkJoinPool中使用阻塞型任务,那么需要确保这些任务不会阻塞工作线程,否则会导致整个线程池的性能下降。为了避免这种情况,可以将阻塞型任务提交到一个专门的线程池中,或者使用CompletableFuture等异步编程工具来处理阻塞型任务。

由上可知:如果任务阻塞或执行时间过长,可能会导致线程池耗尽,影响其他任务的执行。
我们也可以使用专门的线程池来处理阻塞型任务

        //核心线程数int corePoolSize = 2;//最大线程数int maxPoolSize = 10;//非核心线程空闲存活时间(固定线程池可设为0)long keepAliveTime = 0L;//有界队列BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);//默认拒绝策略RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();//创建线程池ExecutorService executor = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,TimeUnit.SECONDS,workQueue,rejectedExecutionHandler);//调用职员服务获取基本信息CompletableFuture<Void> employeeFuture = CompletableFuture.runAsync(()->{//模拟耗时操作try {Thread.sleep(10000);} catch (InterruptedException e) {throw new RuntimeException(e);}},executor);

2. 异常处理

CompletableFuture 异常处理机制,跟我们使用的传统try…catch有点不一样。当运行时出现了异常,可以通过 exceptionally 进行补偿。
在这里插入图片描述

    @Testvoid exceptionally() {CompletableFuture<Integer> future=CompletableFuture.supplyAsync(()->{throw new RuntimeException("运行时异常!");});future.exceptionally(ex->{System.err.println("异常:"+ex.getMessage());return -1;}).join();}

在这里插入图片描述

3. 超时处理

如果是JDK8,使用 get() 方法并捕获 TimeoutException

	CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}return 1;});// 程序会一直等待future.join(); try {Integer result = future.get(3, TimeUnit.SECONDS);} catch (TimeoutException e) {System.out.println("Task timed out");// 取消任务future.cancel(true); } catch (Exception e) {e.printStackTrace();}

如果是Java 9 或更高版本,可以直接使用 orTimeout 和 completeOnTimeout 方法:

	CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}return 1;}).orTimeout(3, TimeUnit.SECONDS); // 3秒超时future.exceptionally(ex -> {System.err.println("Timeout: " + ex.getMessage());return -1;}).join();

4. 线程上下文传递

CompletableFuture 默认不会传递线程上下文(如 ThreadLocal),这可能导致上下文丢失

 ThreadLocal<String> threadLocal = new ThreadLocal<>();threadLocal.set("主线程");CompletableFuture.runAsync(() -> {System.out.println(threadLocal.get()); // 输出 null}).join();

使用CompletableFuture 的supplyAsync 或 runAsync时,手动传递上下文。

 ThreadLocal<String> threadLocal = new ThreadLocal<>();threadLocal.set("主线程");ExecutorService executor = Executors.newFixedThreadPool(1);CompletableFuture.runAsync(() -> {threadLocal.set("子线程");System.out.println(threadLocal.get()); // 输出子线程}, executor).join();

5. 回调地狱

  • CompletableFuture 的回调地狱指的是在异步编程中,过度依赖回调方法(如 thenApply、thenAccept 等)导致代码嵌套过深、难以维护的现象。
    当多个异步任务需要顺序执行或依赖前一个任务的结果时,如果直接嵌套回调,代码会变得臃肿且难以阅读。反例如下:
CompletableFuture.supplyAsync(() -> 1).thenApply(result -> {System.out.println("Step 1: " + result);return result + 1;}).thenApply(result -> {System.out.println("Step 2: " + result);return result + 1;}).thenAccept(result -> {System.out.println("Step 3: " + result);});

通过链式调用和方法拆分,保持代码简洁:

CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1).thenApply(this::step1).thenApply(this::step2);future.thenAccept(this::step3);// 拆分逻辑到单独方法
private int step1(int result) {System.out.println("Step 1: " + result);return result + 1;
}private int step2(int result) {System.out.println("Step 2: " + result);return result + 1;
}private void step3(int result) {System.out.println("Step 3: " + result);
}

6. 任务编排,执行顺序混乱

任务编排时,如果任务之间有依赖关系,可能会导致任务无法按预期顺序执行。

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> result = future1.thenCombine(future2, (a, b) -> a + b);
result.join(); // 可能不会按预期顺序执行

使用 thenCompose 或 thenApply 来确保任务顺序。

CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> future2 = future1.thenApply(a -> a + 2);
future2.join(); // 确保顺序执行
http://www.xdnf.cn/news/7741.html

相关文章:

  • python读写bin文件
  • 《算法笔记》11.7小节——动态规划专题->背包问题 问题 C: 货币系统
  • SCAU18923--二叉树的直径
  • NC65联查单据问题总结
  • 宽带卫星通信介绍
  • 今日行情明日机会——20250520
  • 基于双通道频谱分析的振动信号故障诊断1
  • 波峰波谷策略
  • 野火鲁班猫(arrch64架构debian)从零实现用MobileFaceNet算法进行实时人脸识别(三)用yolov5-face算法实现人脸检测
  • 【BIO、NIO、AIO的区别?】
  • 【嵌入式】I2S音频接口3分钟入门
  • 独热编码笔记
  • 字符画生成(伟大的CSDN)
  • windows安装WS,实测可行
  • 2.4.1死锁的概念
  • 中小型制造业信息化战略规划指南
  • SpringBoot 自动配置
  • 【课堂笔记】指数族与广义线性模型(GLMs)
  • zipkin+micrometer实现链路追踪
  • Java 01简单集合
  • Vue3——Pinia
  • 编译原理的部分概念
  • docker常用指令总结
  • A1-A2 英语学习系列 第四集 中国版
  • HarmonyOS5云服务技术分享--ArkTS开发Node环境
  • 不同消息队列保证高可用实现方案
  • C#入门系列【基础类型大冒险】从0到1,解锁编程世界的“元素周期表”
  • 50个Java+SpringBoot+Vue毕业设计选题(含技术栈+核心功能)
  • sqli-labs第十三关——’)POST报错注入
  • go.mod:5: unknown directive: toolchain