当前位置: 首页 > news >正文

CompletableFuture初体验

一、Future和CompletableFuture的区别


  1. Future是Java 5引入的接口,用于表示异步计算的结果。它提供了基本的异步操作功能,当只需要简单的异步执行和结果等待时使用
  2. CompletableFuture是Java 8引入的类,实现了Future和CompletionStage接口,提供了强大的异步编程能力,当需要复杂的异步编程,包括链式操作、异常处理、多任务组合等现代异步编程模式时使用

特性FutureCompletableFuture
版本Java 5Java 8
类型接口类(实现Future接口)
手动完成❌ 不支持✅ 支持complete()方法
链式操作❌ 不支持✅ 支持thenApplythenCompose
异常处理❌ 只能try-catch✅ 支持exceptionallyhandle
组合操作❌ 不支持✅ 支持allOfanyOfthenCombine
回调函数❌ 不支持✅ 支持whenCompletethenAccept
非阻塞获取❌ 只有阻塞的get()✅ 支持getNow()非阻塞获取
超时处理get(timeout, unit)✅ 更丰富的超时控制
取消操作cancel()✅ 继承了cancel()并增强

二、CompletableFuture的常用方法

1. 基础创建方法

方法名描述示例应用场景
supplyAsync()异步执行有返回值的任务CompletableFuture.supplyAsync(() -> "result")异步计算并返回结果
runAsync()异步执行无返回值的任务CompletableFuture.runAsync(() -> System.out.println("done"))异步执行副作用操作
completedFuture()创建已完成的FutureCompletableFuture.completedFuture("value")立即返回结果,常用于测试
failedFuture()创建异常完成的FutureCompletableFuture.failedFuture(new Exception())立即返回异常状态

2. 结果转换方法(同步)

方法名描述示例特点
thenApply()对结果进行转换future.thenApply(s -> s.toUpperCase())同步转换,阻塞当前线程
thenAccept()消费结果,无返回值future.thenAccept(System.out::println)同步消费
thenRun()执行Runnable,不关心结果future.thenRun(() -> log.info("done"))同步执行
thenCompose()扁平化嵌套的CompletableFuturefuture.thenCompose(this::getUser)避免嵌套Future

3. 结果转换方法(异步)

方法名描述示例特点
thenApplyAsync()异步转换结果future.thenApplyAsync(s -> s.toUpperCase())使用线程池异步执行
thenAcceptAsync()异步消费结果future.thenAcceptAsync(System.out::println)异步消费
thenRunAsync()异步执行Runnablefuture.thenRunAsync(() -> log.info("done"))异步执行
thenComposeAsync()异步扁平化future.thenComposeAsync(this::getUserAsync)异步组合

4. 多Future组合方法

方法名描述示例应用场景
thenCombine()组合两个Future的结果f1.thenCombine(f2, (a, b) -> a + b)需要两个结果进行计算
thenCombineAsync()异步组合两个Futuref1.thenCombineAsync(f2, (a, b) -> a + b)异步组合计算
thenAcceptBoth()消费两个Future的结果f1.thenAcceptBoth(f2, (a, b) -> log(a, b))需要两个结果进行操作
thenAcceptBothAsync()异步消费两个Futuref1.thenAcceptBothAsync(f2, (a, b) -> log(a, b))异步消费操作
runAfterBoth()两个Future都完成后执行f1.runAfterBoth(f2, () -> cleanup())等待多个任务完成
runAfterBothAsync()异步等待两个完成后执行f1.runAfterBothAsync(f2, () -> cleanup())异步等待执行
applyToEither()使用最先完成的结果f1.applyToEither(f2, result -> process(result))竞争执行,使用最快结果
applyToEitherAsync()异步使用最先完成的结果f1.applyToEitherAsync(f2, result -> process(result))异步竞争执行
acceptEither()消费最先完成的结果f1.acceptEither(f2, System.out::println)处理最快完成的任务
acceptEitherAsync()异步消费最先完成的结果f1.acceptEitherAsync(f2, System.out::println)异步处理最快任务
runAfterEither()任一完成后执行f1.runAfterEither(f2, () -> notify())任意一个完成即可触发
runAfterEitherAsync()异步任一完成后执行f1.runAfterEitherAsync(f2, () -> notify())异步触发执行

5. 静态组合方法

方法名描述示例使用场景
allOf()等待所有Future完成CompletableFuture.allOf(f1, f2, f3)需要所有任务都完成
anyOf()等待任一Future完成CompletableFuture.anyOf(f1, f2, f3)只需要任意一个完成

6. 异常处理方法

方法名描述示例特点
exceptionally()异常时提供默认值future.exceptionally(ex -> "default")只处理异常情况
exceptionallyAsync()异步异常处理future.exceptionallyAsync(ex -> "default")异步异常处理
handle()处理正常结果和异常future.handle((result, ex) -> ex != null ? "error" : result)统一处理成功和失败
handleAsync()异步处理结果和异常future.handleAsync((result, ex) -> process(result, ex))异步统一处理
whenComplete()完成时回调(不改变结果)future.whenComplete((result, ex) -> log(result, ex))用于日志、清理等副作用
whenCompleteAsync()异步完成回调future.whenCompleteAsync((result, ex) -> log(result, ex))异步副作用处理

7. 结果获取方法

方法名描述示例特点
get()阻塞获取结果future.get()会抛出检查异常
get(timeout, unit)带超时的阻塞获取future.get(5, TimeUnit.SECONDS)超时抛出TimeoutException
join()阻塞获取结果future.join()抛出运行时异常
getNow(defaultValue)非阻塞获取future.getNow("default")立即返回,未完成则返回默认值
resultNow()获取已完成的结果future.resultNow()Java 19+,未完成抛异常
exceptionNow()获取异常future.exceptionNow()Java 19+,获取异常信息

8. 状态控制方法

方法名描述示例使用场景
complete()手动完成Futurefuture.complete("result")主动设置结果
completeExceptionally()手动异常完成future.completeExceptionally(new Exception())主动设置异常
cancel()取消执行future.cancel(true)取消未完成的任务
isDone()检查是否完成future.isDone()状态查询
isCompletedExceptionally()检查是否异常完成future.isCompletedExceptionally()异常状态查询
isCancelled()检查是否被取消future.isCancelled()取消状态查询
obtrudeValue()强制设置结果future.obtrudeValue("forced")强制覆盖结果
obtrudeException()强制设置异常future.obtrudeException(new Exception())强制覆盖为异常

9. 高阶组合方法

方法名描述示例高级特性
thenCombineAsync(f, fn, executor)指定线程池的组合f1.thenCombineAsync(f2, fn, customPool)自定义执行环境
thenComposeAsync(fn, executor)指定线程池的组合future.thenComposeAsync(fn, customPool)控制执行线程池
handleAsync(fn, executor)指定线程池的异常处理future.handleAsync(fn, customPool)异步异常处理
whenCompleteAsync(action, executor)指定线程池的完成回调future.whenCompleteAsync(action, customPool)异步完成回调
exceptionallyAsync(fn, executor)指定线程池的异常处理future.exceptionallyAsync(fn, customPool)自定义异常处理线程池

10. 实用工具方法

方法名描述示例应用场景
orTimeout(timeout, unit)设置超时future.orTimeout(5, TimeUnit.SECONDS)防止永久等待
completeOnTimeout(value, timeout, unit)超时时完成future.completeOnTimeout("timeout", 5, SECONDS)超时提供默认值
copy()创建副本future.copy()创建独立的Future副本
newIncompleteFuture()创建新的未完成Futurefuture.newIncompleteFuture()创建同类型新实例
defaultExecutor()获取默认执行器CompletableFuture.defaultExecutor()获取ForkJoinPool
minimalCompletionStage()创建最小完成阶段future.minimalCompletionStage()限制API访问
toCompletableFuture()转换为CompletableFuturestage.toCompletableFuture()CompletionStage转换

三、CompletableFuture的进阶用法


1、基础链式调用

CompletableFuture.supplyAsync(() -> "hello").thenApply(String::toUpperCase).thenApply(s -> s + " WORLD").thenAccept(System.out::println);

2. 异步执行无返回值任务

CompletableFuture.runAsync(() -> {System.out.println("执行清理任务");// 清理逻辑
}).thenRun(() -> {System.out.println("清理完成");
});

3. 两个任务组合

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "World");future1.thenCombine(future2, (a, b) -> a + " " + b).thenAccept(System.out::println); // 输出: Hello World

4. 简单异常处理

CompletableFuture.supplyAsync(() -> {if (Math.random() > 0.5) {throw new RuntimeException("随机异常");}return "成功结果";
}).exceptionally(ex -> "异常时的默认值").thenAccept(System.out::println);

5. 等待所有任务完成

CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "任务1");
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "任务2");
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> "任务3");CompletableFuture.allOf(task1, task2, task3).thenRun(() -> {System.out.println("所有任务完成");System.out.println(task1.join());System.out.println(task2.join());System.out.println(task3.join());});

6. 任意一个完成就处理

CompletableFuture<String> fast = CompletableFuture.supplyAsync(() -> {sleep(100);return "快速任务";
});
CompletableFuture<String> slow = CompletableFuture.supplyAsync(() -> {sleep(1000);return "慢速任务";
});fast.applyToEither(slow, result -> "最先完成: " + result).thenAccept(System.out::println);

7. 超时控制

CompletableFuture.supplyAsync(() -> {sleep(2000); // 模拟耗时操作return "超时测试";
}).orTimeout(1, TimeUnit.SECONDS).exceptionally(ex -> "操作超时").thenAccept(System.out::println);

8. 完成后回调(不改变结果)

CompletableFuture.supplyAsync(() -> "处理数据").thenApply(data -> data + " -> 已处理").whenComplete((result, ex) -> {if (ex != null) {System.out.println("处理失败: " + ex.getMessage());} else {System.out.println("处理成功: " + result);}});

9. 链式异步调用

CompletableFuture.supplyAsync(() -> getUserId()).thenCompose(userId -> getUserInfo(userId)).thenCompose(userInfo -> getUserOrders(userInfo.getId())).thenAccept(orders -> System.out.println("订单数量: " + orders.size()));

10. 并行处理后合并

CompletableFuture<Integer> calculateA = CompletableFuture.supplyAsync(() -> {sleep(100);return 10;
});
CompletableFuture<Integer> calculateB = CompletableFuture.supplyAsync(() -> {sleep(200);return 20;
});calculateA.thenCombine(calculateB, Integer::sum).thenAccept(result -> System.out.println("计算结果: " + result));

11. 条件分支处理

CompletableFuture.supplyAsync(() -> 85) // 模拟分数.thenCompose(score -> {if (score >= 90) {return CompletableFuture.completedFuture("优秀");} else if (score >= 60) {return CompletableFuture.completedFuture("及格");} else {return CompletableFuture.completedFuture("不及格");}}).thenAccept(result -> System.out.println("评级: " + result));

12. 批量处理

List<String> tasks = Arrays.asList("任务1", "任务2", "任务3");List<CompletableFuture<String>> futures = tasks.stream().map(task -> CompletableFuture.supplyAsync(() -> "完成" + task)).collect(Collectors.toList());CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).thenApply(v -> futures.stream().map(CompletableFuture::join).collect(Collectors.toList())).thenAccept(results -> results.forEach(System.out::println));

13. 异步消费两个结果

CompletableFuture<String> userTask = CompletableFuture.supplyAsync(() -> "用户数据");
CompletableFuture<String> configTask = CompletableFuture.supplyAsync(() -> "配置数据");userTask.thenAcceptBoth(configTask, (user, config) -> {System.out.println("用户: " + user + ", 配置: " + config);
});

14. 等待两个任务完成后执行

CompletableFuture<Void> init1 = CompletableFuture.runAsync(() -> System.out.println("初始化1"));
CompletableFuture<Void> init2 = CompletableFuture.runAsync(() -> System.out.println("初始化2"));init1.runAfterBoth(init2, () -> System.out.println("所有初始化完成"));

15. 手动完成Future

CompletableFuture<String> manualFuture = new CompletableFuture<>();// 在其他地方手动完成
CompletableFuture.runAsync(() -> {sleep(1000);manualFuture.complete("手动完成的结果");
});manualFuture.thenAccept(System.out::println);

16. 结果转换和处理

CompletableFuture.supplyAsync(() -> "  hello world  ").thenApply(String::trim).thenApply(String::toUpperCase).thenApply(s -> s.replace(" ", "_")).thenAccept(System.out::println); // 输出: HELLO_WORLD

17. 非阻塞获取结果(立即获取)

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {sleep(1000);return "延迟结果";
});// 立即获取,如果未完成返回默认值
String result = future.getNow("默认值");
System.out.println("当前结果: " + result);

18. 简单的重试逻辑

CompletableFuture<String> retryableFuture = CompletableFuture.supplyAsync(() -> {if (Math.random() > 0.7) {return "成功";}throw new RuntimeException("失败");}).exceptionally(ex -> {System.out.println("第一次失败,重试...");return CompletableFuture.supplyAsync(() -> "重试成功").join();});retryableFuture.thenAccept(System.out::println);

19. 统一异常处理

CompletableFuture.supplyAsync(() -> {return 10 / 0; // 故意制造异常
}).handle((result, ex) -> {if (ex != null) {System.out.println("处理异常: " + ex.getCause().getMessage());return 0;}return result;
}).thenAccept(result -> System.out.println("最终结果: " + result));

20. 简单的数据管道

CompletableFuture.supplyAsync(() -> Arrays.asList(1, 2, 3, 4, 5)).thenApply(list -> list.stream().mapToInt(i -> i * 2).sum()).thenApply(sum -> "总和: " + sum).thenAccept(System.out::println);

辅助方法

private static void sleep(long ms) {try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}

四、案例分析


业务场景:

  1. 查询A接口耗时 300ms,B接口耗时 200ms,C接口耗时 300ms,D接口耗时 300ms,E接口耗时200ms
  2. 任务一需要 A+C,任务二需要 A+B,任务三需要 A+B+C
  3. 任务四需要任务一二三都完成后执行D接口
  4. 任务五需要任务三完成后执行E接口
  5. 要求总耗时不超过 800ms

public class AsyncTaskDemo {private static final long START_TIME = System.currentTimeMillis();private static final Executor CUSTOM_EXECUTOR = new ForkJoinPool(8);public static void main(String[] args) {System.out.println("=== CompletableFuture 异步任务优化示例 ===\n");try {// 关键优化1:校验接口并行执行System.out.println("🚀 阶段1:并行启动校验接口");CompletableFuture<String> futureA = callInterfaceA();CompletableFuture<String> futureB = callInterfaceB();CompletableFuture<String> futureC = callInterfaceC();// 等待所有校验接口完成,如果任何一个失败则快速失败CompletableFuture<Void> allValidations = CompletableFuture.allOf(futureA, futureB, futureC);allValidations.get(); // 阻塞等待所有校验完成// 关键优化2:智能依赖任务组合(只有校验通过才执行)System.out.println("🚀 阶段2:组装依赖任务");CompletableFuture<String> task1 = futureA.thenCombineAsync(futureC, (a, c) -> executeTaskOne(a, c), CUSTOM_EXECUTOR);CompletableFuture<String> task2 = futureA.thenCombineAsync(futureB,(a, b) -> executeTaskTwo(a, b), CUSTOM_EXECUTOR);CompletableFuture<String> task3 = CompletableFuture.allOf(futureA, futureB, futureC).thenApplyAsync(v -> executeTaskThree(futureA.join(), futureB.join(), futureC.join()), CUSTOM_EXECUTOR);// 关键优化3:最终任务并行执行System.out.println("🚀 阶段3:最终任务并行执行");// 任务四:等待任务1、2、3完成后调用接口DCompletableFuture<String> task4 = CompletableFuture.allOf(task1, task2, task3).thenComposeAsync(v -> callInterfaceD().thenApplyAsync(d -> executeTaskFour(d, task1.join(), task2.join(), task3.join()), CUSTOM_EXECUTOR), CUSTOM_EXECUTOR);// 任务五:等待任务3完成后调用接口E(与任务4并行)CompletableFuture<String> task5 = task3.thenComposeAsync(task3Result -> callInterfaceE().thenApplyAsync(e -> executeTaskFive(e, task3Result), CUSTOM_EXECUTOR), CUSTOM_EXECUTOR);// 等待所有任务完成CompletableFuture.allOf(task4, task5).get(800, TimeUnit.MILLISECONDS);// 输出结果long totalTime = getCurrentTime();System.out.println("\n=== 执行结果 ===");// System.out.println("任务四结果: " + task4.get());// System.out.println("任务五结果: " + task5.get());System.out.println("总耗时: " + totalTime + "ms (目标: <800ms)");System.out.println("性能状态: " + (totalTime < 800 ? "✅ 达标" : "❌ 超时"));} catch (Exception e) {System.err.println("❌ 执行失败: " + e.getMessage());} finally {((ForkJoinPool) CUSTOM_EXECUTOR).shutdown();}}/*** 校验接口A - 耗时300ms*/private static CompletableFuture<String> callInterfaceA() {return createAsyncInterfaceWithFailure("接口A", 300, "校验结果A", 0.1, "接口A校验失败:数据不符合要求");}/*** 校验接口B - 耗时200ms*/private static CompletableFuture<String> callInterfaceB() {return createAsyncInterfaceWithFailure("接口B", 200, "校验结果B", 0.1, "接口B校验失败:数据不符合要求");}/*** 校验接口C - 耗时300ms (带30%失败概率)*/private static CompletableFuture<String> callInterfaceC() {return createAsyncInterfaceWithFailure("接口C", 300, "校验结果C", 0.1, "接口C校验失败:数据不符合要求");}/*** 业务接口D - 耗时300ms*/private static CompletableFuture<String> callInterfaceD() {return createAsyncInterfaceWithFailure("接口D", 300, "结果D", 0.3, "接口D校验失败:数据不符合要求");}/*** 业务接口E - 耗时200ms*/private static CompletableFuture<String> callInterfaceE() {return createAsyncInterfaceWithFailure("接口E", 200, "结果E", 0.1, "接口E校验失败:数据不符合要求");}/*** 执行任务一 - 需要接口A和C的校验结果*/private static String executeTaskOne(String resultA, String resultC) {System.out.printf("  🎯 执行任务一,输入: %s, %s - %dms%n", resultA, resultC, getCurrentTime());return "任务一完成";}/*** 执行任务二 - 需要接口A和B的校验结果*/private static String executeTaskTwo(String resultA, String resultB) {System.out.printf("  🎯 执行任务二,输入: %s, %s - %dms%n", resultA, resultB, getCurrentTime());return "任务二完成";}/*** 执行任务三 - 需要接口A、B、C的校验结果*/private static String executeTaskThree(String resultA, String resultB, String resultC) {System.out.printf("  🎯 执行任务三,输入: %s, %s, %s - %dms%n", resultA, resultB, resultC, getCurrentTime());return "任务三完成";}/*** 执行任务四 - 需要接口D的结果和任务一二三的结果*/private static String executeTaskFour(String resultD, String task1Result, String task2Result, String task3Result) {System.out.printf("  🎯 执行任务四,输入: %s, %s, %s, %s - %dms%n", resultD, task1Result, task2Result, task3Result, getCurrentTime());return "任务四完成";}/*** 执行任务五 - 需要接口E的结果和任务三的结果*/private static String executeTaskFive(String resultE, String task3Result) {System.out.printf("  🎯 执行任务五,输入: %s, %s - %dms%n", resultE, task3Result, getCurrentTime());return "任务五完成";}private static long getCurrentTime() {return System.currentTimeMillis() - START_TIME;}// ==================== 通用工具方法 ====================/*** 安全的线程休眠方法,统一处理中断异常* @param milliseconds 休眠时间(毫秒)* @param interfaceName 接口名称,用于异常信息*/private static void safeSleep(long milliseconds, String interfaceName) {try {Thread.sleep(milliseconds);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException(interfaceName + "被中断", e);}}/*** 创建可能失败的异步接口调用* @param interfaceName 接口名称* @param delayMs 延迟时间* @param result 成功时的返回结果* @param failureRate 失败概率 (0.0 - 1.0)* @param failureMessage 失败时的异常信息* @return CompletableFuture*/private static CompletableFuture<String> createAsyncInterfaceWithFailure(String interfaceName,long delayMs,String result,double failureRate,String failureMessage) {return CompletableFuture.supplyAsync(() -> {safeSleep(delayMs, interfaceName);// 模拟随机失败if (Math.random() < failureRate) {System.out.printf("  ❌ %s失败 - %dms%n", interfaceName, getCurrentTime());throw new RuntimeException(failureMessage);}System.out.printf("  ✅ %s完成: %s - %dms%n", interfaceName, result, getCurrentTime());return result;}, CUSTOM_EXECUTOR);}}
http://www.xdnf.cn/news/1416151.html

相关文章:

  • (9.1)Python测试之记录
  • Shell 编程 —— 正则表达式与文本处理器
  • 函数,数组与正则表达式
  • Android原生HttpURLConnection上传图片方案
  • 打造智能写作工作流:n8n + 蓝耘MaaS平台完整实战指南
  • Apollo学习之决策模块
  • 【Linux手册】Unix/Linux 信号:原理、触发与响应机制实战
  • Ajax笔记(下)
  • 在.NET标准库中进行数据验证的方法
  • Java视觉跟踪入门:使用OpenCV实现实时对象追踪
  • 【开题答辩全过程】以 基于php的校园兼职求职网站为例,包含答辩的问题和答案
  • 【Android】使用Handler做多个线程之间的通信
  • 【Flask】测试平台开发,应用管理模块实现-第十一篇
  • 【lucene核心】impacts的由来
  • 旧物回收小程序:科技赋能,开启旧物新生之旅
  • 山东省信息技术应用创新开展进程(一)
  • 《C++进阶之STL》【红黑树】
  • OS+MySQL+(其他)八股小记
  • 【macOS】垃圾箱中文件无法清理的常规方法
  • 应用平台更新:可定制目录、基于Git的密钥管理与K8s项目自动化管理
  • Qt中的信号与槽机制的主要优点
  • LeetCode 142. 环形链表 II - 最优雅解法详解
  • 阿里云代理商:轻量应用服务是什么?怎么用轻量应用服务器搭建个人博客?
  • Linux性能调试工具之ftrace
  • JSP 输出语法全面解析
  • 制造业生产线连贯性动作识别系统开发
  • MCP SDK 学习二
  • 【开题答辩全过程】以 基于Java的网络购物平台设计与实现为例,包含答辩的问题和答案
  • 集合-单列集合(Collection)
  • Docker中使用Compose配置现有网络