CompleteableFuture的异步任务编排
为什么会有CompleteableFuture
Java 的 1.5 版本引入了 Future,可以把它简单的理解为运算结果的占位符, 它提供了两个方法来获取运算结果。
- get():调用该方法线程将会无限期等待运算结果。
- get(longmeout, TimeUnit unit):调用该方法线程将仅在指定时间 timeout 内等待结果,如果等待超时就会抛出 TimeoutException 异常。
Future 可以使用 Runnable 或 Callable 实例来完成提交的任务,它存在如 下几个问题:
- 阻塞 调用 get() 方法会一直阻塞,直到等待直到计算完成,它没有提供任 何方法可以在完成时通知, 同时也不具有附加回调函数的功能。
- 链式调用和结果聚合处理 在很多时候我们想链接多个 Future 来完成耗时 较长的计算, 此时需要合并结果并将结果发送到另一个任务中, 该接口很难完成 这种处理。
- 异常处理 Future 没有提供任何异常处理的方式。
JDK1.8 才新加入的一个实现类 CompletableFuture ,很好的解决了这些问题, CompletableFuture 实现了 Future , CompletionStage两个接口。 实现了Future 接口,意味着可以像以前一样通过阻塞或者轮询的方式获得结果。
CompleteableFuture使用示例
假设现在有两个服务,一个是查询职员信息,一个是查询统计信息。现在用CompleteableFuture进行任务编排。
@RestController
@RequestMapping("/test")
@Slf4j
public class FutureTestController {@Autowiredprivate EmployeeService employeeService;@Autowiredprivate StatisticsService statisticsService;@RequestMapping(value = "/demo1")public void demo1(@RequestParam("id") Long id) throws Exception {StopWatch stopWatch = new StopWatch();stopWatch.start();//调用职员服务获取基本信息CompletableFuture<Employee> employeeFuture = CompletableFuture.supplyAsync(()-> employeeService.getEmployeeInfo(id));//模拟耗时操作Thread.sleep(300);//调用统计服务获取统计信息CompletableFuture<Statistics> statisticsFuture = CompletableFuture.supplyAsync(() -> statisticsService.getStatisticsInfo(id));//获取职员信息Employee employee = employeeFuture.get(2, TimeUnit.SECONDS);//获取统计信息Statistics statistics = statisticsFuture.get();stopWatch.stop();log.info("职员信息为:{},统计信息为:{}",employee.toString(),statistics.toString());log.info("耗时为:{}毫秒",stopWatch.getTotalTimeMillis());}
}
以上代码展示了CompleteableFuture的使用方法,有几个地方需要注意:
1. 默认线程池
CompletableFuture默认使用ForkJoinPool.commonPool() 作为线程池。
ForkJoinPool工作原理
ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者 submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。如果工作线程对应的任务队列空了,是不是就没活儿干了呢?不是的,ForkJoinPool 支持一种叫做“任务窃取”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务。如此一来,所有的工作线程都不会闲下来了。
在ForkJoinPool中使用阻塞型任务时需要注意以下几点:
- 防止线程饥饿:当一个线程在执行一个阻塞型任务时,它将会一直等待任务完成,这时如果没有其他线程可以窃取任务,那么该线程将一直被阻塞,直到任务完成为止。为了避免这种情况,应该避免在ForkJoinPool中提交大量的阻塞型任务。
- 使用特定的线程池:为了最大程度地利用ForkJoinPool的性能,可以使用专门的线程池来处理阻塞型任务,这些线程不会被ForkJoinPool的窃取机制所影响。例如,可以使用ThreadPoolExecutor来创建一个线程池,然后将这个线程池作为ForkJoinPool的执行器,这样就可以使用ThreadPoolExecutor来处理阻塞型任务,而使用ForkJoinPool来处理非阻塞型任务。
- 不要阻塞工作线程:如果在ForkJoinPool中使用阻塞型任务,那么需要确保这些任务不会阻塞工作线程,否则会导致整个线程池的性能下降。为了避免这种情况,可以将阻塞型任务提交到一个专门的线程池中,或者使用CompletableFuture等异步编程工具来处理阻塞型任务。
由上可知:如果任务阻塞或执行时间过长,可能会导致线程池耗尽,影响其他任务的执行。
我们也可以使用专门的线程池来处理阻塞型任务
//核心线程数int corePoolSize = 2;//最大线程数int maxPoolSize = 10;//非核心线程空闲存活时间(固定线程池可设为0)long keepAliveTime = 0L;//有界队列BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);//默认拒绝策略RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.AbortPolicy();//创建线程池ExecutorService executor = new ThreadPoolExecutor(corePoolSize,maxPoolSize,keepAliveTime,TimeUnit.SECONDS,workQueue,rejectedExecutionHandler);//调用职员服务获取基本信息CompletableFuture<Void> employeeFuture = CompletableFuture.runAsync(()->{//模拟耗时操作try {Thread.sleep(10000);} catch (InterruptedException e) {throw new RuntimeException(e);}},executor);
2. 异常处理
CompletableFuture 异常处理机制,跟我们使用的传统try…catch有点不一样。当运行时出现了异常,可以通过 exceptionally 进行补偿。
@Testvoid exceptionally() {CompletableFuture<Integer> future=CompletableFuture.supplyAsync(()->{throw new RuntimeException("运行时异常!");});future.exceptionally(ex->{System.err.println("异常:"+ex.getMessage());return -1;}).join();}
3. 超时处理
如果是JDK8,使用 get() 方法并捕获 TimeoutException
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}return 1;});// 程序会一直等待future.join(); try {Integer result = future.get(3, TimeUnit.SECONDS);} catch (TimeoutException e) {System.out.println("Task timed out");// 取消任务future.cancel(true); } catch (Exception e) {e.printStackTrace();}
如果是Java 9 或更高版本,可以直接使用 orTimeout 和 completeOnTimeout 方法:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}return 1;}).orTimeout(3, TimeUnit.SECONDS); // 3秒超时future.exceptionally(ex -> {System.err.println("Timeout: " + ex.getMessage());return -1;}).join();
4. 线程上下文传递
CompletableFuture 默认不会传递线程上下文(如 ThreadLocal),这可能导致上下文丢失
ThreadLocal<String> threadLocal = new ThreadLocal<>();threadLocal.set("主线程");CompletableFuture.runAsync(() -> {System.out.println(threadLocal.get()); // 输出 null}).join();
使用CompletableFuture 的supplyAsync 或 runAsync时,手动传递上下文。
ThreadLocal<String> threadLocal = new ThreadLocal<>();threadLocal.set("主线程");ExecutorService executor = Executors.newFixedThreadPool(1);CompletableFuture.runAsync(() -> {threadLocal.set("子线程");System.out.println(threadLocal.get()); // 输出子线程}, executor).join();
5. 回调地狱
- CompletableFuture 的回调地狱指的是在异步编程中,过度依赖回调方法(如 thenApply、thenAccept 等)导致代码嵌套过深、难以维护的现象。
当多个异步任务需要顺序执行或依赖前一个任务的结果时,如果直接嵌套回调,代码会变得臃肿且难以阅读。反例如下:
CompletableFuture.supplyAsync(() -> 1).thenApply(result -> {System.out.println("Step 1: " + result);return result + 1;}).thenApply(result -> {System.out.println("Step 2: " + result);return result + 1;}).thenAccept(result -> {System.out.println("Step 3: " + result);});
通过链式调用和方法拆分,保持代码简洁:
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> 1).thenApply(this::step1).thenApply(this::step2);future.thenAccept(this::step3);// 拆分逻辑到单独方法
private int step1(int result) {System.out.println("Step 1: " + result);return result + 1;
}private int step2(int result) {System.out.println("Step 2: " + result);return result + 1;
}private void step3(int result) {System.out.println("Step 3: " + result);
}
6. 任务编排,执行顺序混乱
任务编排时,如果任务之间有依赖关系,可能会导致任务无法按预期顺序执行。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 2);
CompletableFuture<Integer> result = future1.thenCombine(future2, (a, b) -> a + b);
result.join(); // 可能不会按预期顺序执行
使用 thenCompose 或 thenApply 来确保任务顺序。
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> 1);
CompletableFuture<Integer> future2 = future1.thenApply(a -> a + 2);
future2.join(); // 确保顺序执行