CompletableFuture高级编程指南
目录
基础概念
什么是CompletableFuture?
异步编程核心概念
创建CompletableFuture
基本创建方式
异步任务创建
指定执行器
转换和链式操作
thenApply - 结果转换
thenAccept - 结果消费
thenRun - 后续操作
异步转换操作
组合多个CompletableFuture
thenCompose - 顺序组合(扁平化)
thenCombine - 并行组合
allOf - 等待所有完成
anyOf - 等待任一完成
异常处理机制
exceptionally - 异常恢复
handle - 统一处理结果和异常
whenComplete - 无侵入式监听
超时处理
Java 9+ 的现代超时处理
Java 8 兼容的超时处理
线程池管理
选择合适的线程池
高级应用模式
重试模式
限流控制模式
最佳实践
1. 合理选择线程池
2. 完善的异常处理
3. 超时和监控
常见陷阱
1. 忘记处理异常
2. 阻塞主线程
3. 线程池资源泄露
4. 嵌套CompletableFuture
实战案例
电商订单处理系统
总结
基础概念
什么是CompletableFuture?
CompletableFuture是Java 8引入的异步编程核心类,它实现了Future
和CompletionStage
接口,为异步编程提供了更强大和灵活的解决方案。
相比传统Future的优势
特性 | 传统Future | CompletableFuture |
---|---|---|
手动完成 | ❌ 不支持 | ✅ 支持 |
链式操作 | ❌ 不支持 | ✅ 支持 |
异常处理 | ❌ 基础支持 | ✅ 完整机制 |
组合操作 | ❌ 不支持 | ✅ 丰富的API |
超时处理 | 需要额外代码 | ✅ 内置支持 |
回调函数 | ❌ 不支持 | ✅ 完整支持 |
异步编程核心概念
异步编程的核心在于:
- 非阻塞操作:调用后立即返回,不等待执行完成
- 回调机制:任务完成后执行特定的处理逻辑
- 并行执行:多个任务同时进行,提高效率
- 事件驱动:基于事件完成的响应式处理模型
创建CompletableFuture
基本创建方式
// 创建空的CompletableFuture
CompletableFuture<String> future = new CompletableFuture<>();// 手动完成
future.complete("操作完成");// 创建已完成的Future
CompletableFuture<String> completed = CompletableFuture.completedFuture("已完成的结果");
异步任务创建
// 无返回值的异步任务
CompletableFuture<Void> runTask = CompletableFuture.runAsync(() -> {System.out.println("执行异步任务:" + Thread.currentThread().getName());// 执行具体业务逻辑
});// 有返回值的异步任务
CompletableFuture<String> supplyTask = CompletableFuture.supplyAsync(() -> {System.out.println("执行供应任务:" + Thread.currentThread().getName());return "任务执行结果";
});
指定执行器
// 创建自定义线程池
ExecutorService customExecutor = Executors.newFixedThreadPool(10, r -> {Thread t = new Thread(r);t.setName("business-thread-" + t.getId());t.setDaemon(true);return t;
});// 使用自定义线程池执行
CompletableFuture<String> customTask = CompletableFuture.supplyAsync(() -> {System.out.println("使用自定义线程池:" + Thread.currentThread().getName());return "自定义执行器结果";
}, customExecutor);
转换和链式操作
thenApply - 结果转换
CompletableFuture<String> originalTask = CompletableFuture.supplyAsync(() -> "Hello");// 字符串转换
CompletableFuture<String> stringTransform = originalTask.thenApply(s -> s + " World");// 类型转换
CompletableFuture<Integer> lengthTransform = originalTask.thenApply(String::length);// 复杂对象转换
CompletableFuture<UserInfo> userTransform = originalTask.thenApply(name -> {UserInfo user = new UserInfo();user.setName(name);user.setCreateTime(LocalDateTime.now());return user;
});
thenAccept - 结果消费
CompletableFuture<Void> consumeTask = CompletableFuture.supplyAsync(() -> "处理结果").thenAccept(result -> {// 消费结果,不返回新值System.out.println("处理完成:" + result);saveToDatabase(result);sendNotification(result);});
thenRun - 后续操作
CompletableFuture<Void> followupTask = CompletableFuture.supplyAsync(() -> "完成任务").thenRun(() -> {// 不接收参数,执行后续清理工作System.out.println("执行清理操作");cleanupResources();logCompletion();});
异步转换操作
// 异步版本的转换操作
CompletableFuture<String> asyncTransform = CompletableFuture.supplyAsync(() -> "源数据").thenApplyAsync(data -> {// 在新线程中执行转换System.out.println("转换线程:" + Thread.currentThread().getName());return processData(data);}).thenAcceptAsync(result -> {// 在新线程中执行消费System.out.println("消费线程:" + Thread.currentThread().getName());handleResult(result);});
组合多个CompletableFuture
thenCompose - 顺序组合(扁平化)
public class UserService {public CompletableFuture<String> getUserInfo(String userId) {return CompletableFuture.supplyAsync(() -> {// 模拟获取用户基本信息return "User-" + userId;});}public CompletableFuture<String> getUserOrders(String userInfo) {return CompletableFuture.supplyAsync(() -> {// 基于用户信息获取订单return userInfo + "-Orders";});}// 链式组合:用户信息 -> 订单信息public CompletableFuture<String> getUserOrderInfo(String userId) {return getUserInfo(userId).thenCompose(this::getUserOrders); // 避免嵌套Future}
}
thenCombine - 并行组合
public class DataAggregator {public CompletableFuture<CombinedResult> aggregateData(String id) {CompletableFuture<String> userData = getUserData(id);CompletableFuture<String> orderData = getOrderData(id);CompletableFuture<String> preferenceData = getPreferenceData(id);// 两两组合return userData.thenCombine(orderData, (user, orders) -> new PartialResult(user, orders)).thenCombine(preferenceData, (partial, preferences) -> new CombinedResult(partial.user, partial.orders, preferences));}private static class PartialResult {String user, orders;PartialResult(String user, String orders) {this.user = user;this.orders = orders;}}private static class CombinedResult {String user, orders, preferences;CombinedResult(String user, String orders, String preferences) {this.user = user;this.orders = orders;this.preferences = preferences;}}
}
allOf - 等待所有完成
public class BatchProcessor {public CompletableFuture<List<String>> processAllTasks(List<String> taskIds) {// 创建所有异步任务List<CompletableFuture<String>> tasks = taskIds.stream().map(this::processTask).collect(Collectors.toList());// 等待所有任务完成CompletableFuture<Void> allCompleted = CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0]));// 收集所有结果return allCompleted.thenApply(v -> tasks.stream().map(CompletableFuture::join).collect(Collectors.toList()));}private CompletableFuture<String> processTask(String taskId) {return CompletableFuture.supplyAsync(() -> {// 模拟任务处理simulateWork();return "Task-" + taskId + "-Completed";});}
}
anyOf - 等待任一完成
public class RaceProcessor {public CompletableFuture<String> getFirstAvailableResult(List<String> sources) {List<CompletableFuture<String>> futures = sources.stream().map(this::fetchFromSource).collect(Collectors.toList());// 返回最先完成的结果CompletableFuture<Object> firstCompleted = CompletableFuture.anyOf(