一、Future和CompletableFuture的区别
Future是Java 5引入的接口,用于表示异步计算的结果。它提供了基本的异步操作功能,当只需要简单的异步执行和结果等待时使用 CompletableFuture是Java 8引入的类,实现了Future和CompletionStage接口,提供了强大的异步编程能力,当需要复杂的异步编程,包括链式操作、异常处理、多任务组合等现代异步编程模式时使用
特性 Future CompletableFuture 版本 Java 5 Java 8 类型 接口 类(实现Future接口) 手动完成 ❌ 不支持 ✅ 支持complete()
方法 链式操作 ❌ 不支持 ✅ 支持thenApply
、thenCompose
等 异常处理 ❌ 只能try-catch ✅ 支持exceptionally
、handle
等 组合操作 ❌ 不支持 ✅ 支持allOf
、anyOf
、thenCombine
等 回调函数 ❌ 不支持 ✅ 支持whenComplete
、thenAccept
等 非阻塞获取 ❌ 只有阻塞的get()
✅ 支持getNow()
非阻塞获取 超时处理 ✅ get(timeout, unit)
✅ 更丰富的超时控制 取消操作 ✅ cancel()
✅ 继承了cancel()
并增强
二、CompletableFuture的常用方法
1. 基础创建方法方法名 描述 示例 应用场景 supplyAsync()
异步执行有返回值的任务 CompletableFuture.supplyAsync(() -> "result")
异步计算并返回结果 runAsync()
异步执行无返回值的任务 CompletableFuture.runAsync(() -> System.out.println("done"))
异步执行副作用操作 completedFuture()
创建已完成的Future CompletableFuture.completedFuture("value")
立即返回结果,常用于测试 failedFuture()
创建异常完成的Future CompletableFuture.failedFuture(new Exception())
立即返回异常状态
2. 结果转换方法(同步)方法名 描述 示例 特点 thenApply()
对结果进行转换 future.thenApply(s -> s.toUpperCase())
同步转换,阻塞当前线程 thenAccept()
消费结果,无返回值 future.thenAccept(System.out::println)
同步消费 thenRun()
执行Runnable,不关心结果 future.thenRun(() -> log.info("done"))
同步执行 thenCompose()
扁平化嵌套的CompletableFuture future.thenCompose(this::getUser)
避免嵌套Future
3. 结果转换方法(异步)方法名 描述 示例 特点 thenApplyAsync()
异步转换结果 future.thenApplyAsync(s -> s.toUpperCase())
使用线程池异步执行 thenAcceptAsync()
异步消费结果 future.thenAcceptAsync(System.out::println)
异步消费 thenRunAsync()
异步执行Runnable future.thenRunAsync(() -> log.info("done"))
异步执行 thenComposeAsync()
异步扁平化 future.thenComposeAsync(this::getUserAsync)
异步组合
4. 多Future组合方法方法名 描述 示例 应用场景 thenCombine()
组合两个Future的结果 f1.thenCombine(f2, (a, b) -> a + b)
需要两个结果进行计算 thenCombineAsync()
异步组合两个Future f1.thenCombineAsync(f2, (a, b) -> a + b)
异步组合计算 thenAcceptBoth()
消费两个Future的结果 f1.thenAcceptBoth(f2, (a, b) -> log(a, b))
需要两个结果进行操作 thenAcceptBothAsync()
异步消费两个Future f1.thenAcceptBothAsync(f2, (a, b) -> log(a, b))
异步消费操作 runAfterBoth()
两个Future都完成后执行 f1.runAfterBoth(f2, () -> cleanup())
等待多个任务完成 runAfterBothAsync()
异步等待两个完成后执行 f1.runAfterBothAsync(f2, () -> cleanup())
异步等待执行 applyToEither()
使用最先完成的结果 f1.applyToEither(f2, result -> process(result))
竞争执行,使用最快结果 applyToEitherAsync()
异步使用最先完成的结果 f1.applyToEitherAsync(f2, result -> process(result))
异步竞争执行 acceptEither()
消费最先完成的结果 f1.acceptEither(f2, System.out::println)
处理最快完成的任务 acceptEitherAsync()
异步消费最先完成的结果 f1.acceptEitherAsync(f2, System.out::println)
异步处理最快任务 runAfterEither()
任一完成后执行 f1.runAfterEither(f2, () -> notify())
任意一个完成即可触发 runAfterEitherAsync()
异步任一完成后执行 f1.runAfterEitherAsync(f2, () -> notify())
异步触发执行
5. 静态组合方法方法名 描述 示例 使用场景 allOf()
等待所有Future完成 CompletableFuture.allOf(f1, f2, f3)
需要所有任务都完成 anyOf()
等待任一Future完成 CompletableFuture.anyOf(f1, f2, f3)
只需要任意一个完成
6. 异常处理方法方法名 描述 示例 特点 exceptionally()
异常时提供默认值 future.exceptionally(ex -> "default")
只处理异常情况 exceptionallyAsync()
异步异常处理 future.exceptionallyAsync(ex -> "default")
异步异常处理 handle()
处理正常结果和异常 future.handle((result, ex) -> ex != null ? "error" : result)
统一处理成功和失败 handleAsync()
异步处理结果和异常 future.handleAsync((result, ex) -> process(result, ex))
异步统一处理 whenComplete()
完成时回调(不改变结果) future.whenComplete((result, ex) -> log(result, ex))
用于日志、清理等副作用 whenCompleteAsync()
异步完成回调 future.whenCompleteAsync((result, ex) -> log(result, ex))
异步副作用处理
7. 结果获取方法方法名 描述 示例 特点 get()
阻塞获取结果 future.get()
会抛出检查异常 get(timeout, unit)
带超时的阻塞获取 future.get(5, TimeUnit.SECONDS)
超时抛出TimeoutException join()
阻塞获取结果 future.join()
抛出运行时异常 getNow(defaultValue)
非阻塞获取 future.getNow("default")
立即返回,未完成则返回默认值 resultNow()
获取已完成的结果 future.resultNow()
Java 19+,未完成抛异常 exceptionNow()
获取异常 future.exceptionNow()
Java 19+,获取异常信息
8. 状态控制方法方法名 描述 示例 使用场景 complete()
手动完成Future future.complete("result")
主动设置结果 completeExceptionally()
手动异常完成 future.completeExceptionally(new Exception())
主动设置异常 cancel()
取消执行 future.cancel(true)
取消未完成的任务 isDone()
检查是否完成 future.isDone()
状态查询 isCompletedExceptionally()
检查是否异常完成 future.isCompletedExceptionally()
异常状态查询 isCancelled()
检查是否被取消 future.isCancelled()
取消状态查询 obtrudeValue()
强制设置结果 future.obtrudeValue("forced")
强制覆盖结果 obtrudeException()
强制设置异常 future.obtrudeException(new Exception())
强制覆盖为异常
9. 高阶组合方法方法名 描述 示例 高级特性 thenCombineAsync(f, fn, executor)
指定线程池的组合 f1.thenCombineAsync(f2, fn, customPool)
自定义执行环境 thenComposeAsync(fn, executor)
指定线程池的组合 future.thenComposeAsync(fn, customPool)
控制执行线程池 handleAsync(fn, executor)
指定线程池的异常处理 future.handleAsync(fn, customPool)
异步异常处理 whenCompleteAsync(action, executor)
指定线程池的完成回调 future.whenCompleteAsync(action, customPool)
异步完成回调 exceptionallyAsync(fn, executor)
指定线程池的异常处理 future.exceptionallyAsync(fn, customPool)
自定义异常处理线程池
10. 实用工具方法方法名 描述 示例 应用场景 orTimeout(timeout, unit)
设置超时 future.orTimeout(5, TimeUnit.SECONDS)
防止永久等待 completeOnTimeout(value, timeout, unit)
超时时完成 future.completeOnTimeout("timeout", 5, SECONDS)
超时提供默认值 copy()
创建副本 future.copy()
创建独立的Future副本 newIncompleteFuture()
创建新的未完成Future future.newIncompleteFuture()
创建同类型新实例 defaultExecutor()
获取默认执行器 CompletableFuture.defaultExecutor()
获取ForkJoinPool minimalCompletionStage()
创建最小完成阶段 future.minimalCompletionStage()
限制API访问 toCompletableFuture()
转换为CompletableFuture stage.toCompletableFuture()
CompletionStage转换
三、CompletableFuture的进阶用法
1、基础链式调用
CompletableFuture . supplyAsync ( ( ) -> "hello" ) . thenApply ( String :: toUpperCase ) . thenApply ( s -> s + " WORLD" ) . thenAccept ( System . out:: println ) ;
2. 异步执行无返回值任务
CompletableFuture . runAsync ( ( ) -> { System . out. println ( "执行清理任务" ) ;
} ) . thenRun ( ( ) -> { System . out. println ( "清理完成" ) ;
} ) ;
3. 两个任务组合
CompletableFuture < String > future1 = CompletableFuture . supplyAsync ( ( ) -> "Hello" ) ;
CompletableFuture < String > future2 = CompletableFuture . supplyAsync ( ( ) -> "World" ) ; future1. thenCombine ( future2, ( a, b) -> a + " " + b) . thenAccept ( System . out:: println ) ;
4. 简单异常处理
CompletableFuture . supplyAsync ( ( ) -> { if ( Math . random ( ) > 0.5 ) { throw new RuntimeException ( "随机异常" ) ; } return "成功结果" ;
} ) . exceptionally ( ex -> "异常时的默认值" ) . thenAccept ( System . out:: println ) ;
5. 等待所有任务完成
CompletableFuture < String > task1 = CompletableFuture . supplyAsync ( ( ) -> "任务1" ) ;
CompletableFuture < String > task2 = CompletableFuture . supplyAsync ( ( ) -> "任务2" ) ;
CompletableFuture < String > task3 = CompletableFuture . supplyAsync ( ( ) -> "任务3" ) ; CompletableFuture . allOf ( task1, task2, task3) . thenRun ( ( ) -> { System . out. println ( "所有任务完成" ) ; System . out. println ( task1. join ( ) ) ; System . out. println ( task2. join ( ) ) ; System . out. println ( task3. join ( ) ) ; } ) ;
6. 任意一个完成就处理
CompletableFuture < String > fast = CompletableFuture . supplyAsync ( ( ) -> { sleep ( 100 ) ; return "快速任务" ;
} ) ;
CompletableFuture < String > slow = CompletableFuture . supplyAsync ( ( ) -> { sleep ( 1000 ) ; return "慢速任务" ;
} ) ; fast. applyToEither ( slow, result -> "最先完成: " + result) . thenAccept ( System . out:: println ) ;
7. 超时控制
CompletableFuture . supplyAsync ( ( ) -> { sleep ( 2000 ) ; return "超时测试" ;
} ) . orTimeout ( 1 , TimeUnit . SECONDS ) . exceptionally ( ex -> "操作超时" ) . thenAccept ( System . out:: println ) ;
8. 完成后回调(不改变结果)
CompletableFuture . supplyAsync ( ( ) -> "处理数据" ) . thenApply ( data -> data + " -> 已处理" ) . whenComplete ( ( result, ex) -> { if ( ex != null ) { System . out. println ( "处理失败: " + ex. getMessage ( ) ) ; } else { System . out. println ( "处理成功: " + result) ; } } ) ;
9. 链式异步调用
CompletableFuture . supplyAsync ( ( ) -> getUserId ( ) ) . thenCompose ( userId -> getUserInfo ( userId) ) . thenCompose ( userInfo -> getUserOrders ( userInfo. getId ( ) ) ) . thenAccept ( orders -> System . out. println ( "订单数量: " + orders. size ( ) ) ) ;
10. 并行处理后合并
CompletableFuture < Integer > calculateA = CompletableFuture . supplyAsync ( ( ) -> { sleep ( 100 ) ; return 10 ;
} ) ;
CompletableFuture < Integer > calculateB = CompletableFuture . supplyAsync ( ( ) -> { sleep ( 200 ) ; return 20 ;
} ) ; calculateA. thenCombine ( calculateB, Integer :: sum ) . thenAccept ( result -> System . out. println ( "计算结果: " + result) ) ;
11. 条件分支处理
CompletableFuture . supplyAsync ( ( ) -> 85 ) . thenCompose ( score -> { if ( score >= 90 ) { return CompletableFuture . completedFuture ( "优秀" ) ; } else if ( score >= 60 ) { return CompletableFuture . completedFuture ( "及格" ) ; } else { return CompletableFuture . completedFuture ( "不及格" ) ; } } ) . thenAccept ( result -> System . out. println ( "评级: " + result) ) ;
12. 批量处理
List < String > tasks = Arrays . asList ( "任务1" , "任务2" , "任务3" ) ; List < CompletableFuture < String > > futures = tasks. stream ( ) . map ( task -> CompletableFuture . supplyAsync ( ( ) -> "完成" + task) ) . collect ( Collectors . toList ( ) ) ; CompletableFuture . allOf ( futures. toArray ( new CompletableFuture [ 0 ] ) ) . thenApply ( v -> futures. stream ( ) . map ( CompletableFuture :: join ) . collect ( Collectors . toList ( ) ) ) . thenAccept ( results -> results. forEach ( System . out:: println ) ) ;
13. 异步消费两个结果
CompletableFuture < String > userTask = CompletableFuture . supplyAsync ( ( ) -> "用户数据" ) ;
CompletableFuture < String > configTask = CompletableFuture . supplyAsync ( ( ) -> "配置数据" ) ; userTask. thenAcceptBoth ( configTask, ( user, config) -> { System . out. println ( "用户: " + user + ", 配置: " + config) ;
} ) ;
14. 等待两个任务完成后执行
CompletableFuture < Void > init1 = CompletableFuture . runAsync ( ( ) -> System . out. println ( "初始化1" ) ) ;
CompletableFuture < Void > init2 = CompletableFuture . runAsync ( ( ) -> System . out. println ( "初始化2" ) ) ; init1. runAfterBoth ( init2, ( ) -> System . out. println ( "所有初始化完成" ) ) ;
15. 手动完成Future
CompletableFuture < String > manualFuture = new CompletableFuture < > ( ) ;
CompletableFuture . runAsync ( ( ) -> { sleep ( 1000 ) ; manualFuture. complete ( "手动完成的结果" ) ;
} ) ; manualFuture. thenAccept ( System . out:: println ) ;
16. 结果转换和处理
CompletableFuture . supplyAsync ( ( ) -> " hello world " ) . thenApply ( String :: trim ) . thenApply ( String :: toUpperCase ) . thenApply ( s -> s. replace ( " " , "_" ) ) . thenAccept ( System . out:: println ) ;
17. 非阻塞获取结果(立即获取)
CompletableFuture < String > future = CompletableFuture . supplyAsync ( ( ) -> { sleep ( 1000 ) ; return "延迟结果" ;
} ) ;
String result = future. getNow ( "默认值" ) ;
System . out. println ( "当前结果: " + result) ;
18. 简单的重试逻辑
CompletableFuture < String > retryableFuture = CompletableFuture . supplyAsync ( ( ) -> { if ( Math . random ( ) > 0.7 ) { return "成功" ; } throw new RuntimeException ( "失败" ) ; } ) . exceptionally ( ex -> { System . out. println ( "第一次失败,重试..." ) ; return CompletableFuture . supplyAsync ( ( ) -> "重试成功" ) . join ( ) ; } ) ; retryableFuture. thenAccept ( System . out:: println ) ;
19. 统一异常处理
CompletableFuture . supplyAsync ( ( ) -> { return 10 / 0 ;
} ) . handle ( ( result, ex) -> { if ( ex != null ) { System . out. println ( "处理异常: " + ex. getCause ( ) . getMessage ( ) ) ; return 0 ; } return result;
} ) . thenAccept ( result -> System . out. println ( "最终结果: " + result) ) ;
20. 简单的数据管道
CompletableFuture . supplyAsync ( ( ) -> Arrays . asList ( 1 , 2 , 3 , 4 , 5 ) ) . thenApply ( list -> list. stream ( ) . mapToInt ( i -> i * 2 ) . sum ( ) ) . thenApply ( sum -> "总和: " + sum) . thenAccept ( System . out:: println ) ;
辅助方法
private static void sleep ( long ms) { try { Thread . sleep ( ms) ; } catch ( InterruptedException e) { Thread . currentThread ( ) . interrupt ( ) ; }
}
四、案例分析
业务场景:
查询A接口耗时 300ms,B接口耗时 200ms,C接口耗时 300ms,D接口耗时 300ms,E接口耗时200ms 任务一需要 A+C,任务二需要 A+B,任务三需要 A+B+C 任务四需要任务一二三都完成后执行D接口 任务五需要任务三完成后执行E接口 要求总耗时不超过 800ms
public class AsyncTaskDemo { private static final long START_TIME = System . currentTimeMillis ( ) ; private static final Executor CUSTOM_EXECUTOR = new ForkJoinPool ( 8 ) ; public static void main ( String [ ] args) { System . out. println ( "=== CompletableFuture 异步任务优化示例 ===\n" ) ; try { System . out. println ( "🚀 阶段1:并行启动校验接口" ) ; CompletableFuture < String > futureA = callInterfaceA ( ) ; CompletableFuture < String > futureB = callInterfaceB ( ) ; CompletableFuture < String > futureC = callInterfaceC ( ) ; CompletableFuture < Void > allValidations = CompletableFuture . allOf ( futureA, futureB, futureC) ; allValidations. get ( ) ; System . out. println ( "🚀 阶段2:组装依赖任务" ) ; CompletableFuture < String > task1 = futureA. thenCombineAsync ( futureC, ( a, c) -> executeTaskOne ( a, c) , CUSTOM_EXECUTOR ) ; CompletableFuture < String > task2 = futureA. thenCombineAsync ( futureB, ( a, b) -> executeTaskTwo ( a, b) , CUSTOM_EXECUTOR ) ; CompletableFuture < String > task3 = CompletableFuture . allOf ( futureA, futureB, futureC) . thenApplyAsync ( v -> executeTaskThree ( futureA. join ( ) , futureB. join ( ) , futureC. join ( ) ) , CUSTOM_EXECUTOR ) ; System . out. println ( "🚀 阶段3:最终任务并行执行" ) ; CompletableFuture < String > task4 = CompletableFuture . allOf ( task1, task2, task3) . thenComposeAsync ( v -> callInterfaceD ( ) . thenApplyAsync ( d -> executeTaskFour ( d, task1. join ( ) , task2. join ( ) , task3. join ( ) ) , CUSTOM_EXECUTOR ) , CUSTOM_EXECUTOR ) ; CompletableFuture < String > task5 = task3. thenComposeAsync ( task3Result -> callInterfaceE ( ) . thenApplyAsync ( e -> executeTaskFive ( e, task3Result) , CUSTOM_EXECUTOR ) , CUSTOM_EXECUTOR ) ; CompletableFuture . allOf ( task4, task5) . get ( 800 , TimeUnit . MILLISECONDS ) ; long totalTime = getCurrentTime ( ) ; System . out. println ( "\n=== 执行结果 ===" ) ; System . out. println ( "总耗时: " + totalTime + "ms (目标: <800ms)" ) ; System . out. println ( "性能状态: " + ( totalTime < 800 ? "✅ 达标" : "❌ 超时" ) ) ; } catch ( Exception e) { System . err. println ( "❌ 执行失败: " + e. getMessage ( ) ) ; } finally { ( ( ForkJoinPool ) CUSTOM_EXECUTOR ) . shutdown ( ) ; } } private static CompletableFuture < String > callInterfaceA ( ) { return createAsyncInterfaceWithFailure ( "接口A" , 300 , "校验结果A" , 0.1 , "接口A校验失败:数据不符合要求" ) ; } private static CompletableFuture < String > callInterfaceB ( ) { return createAsyncInterfaceWithFailure ( "接口B" , 200 , "校验结果B" , 0.1 , "接口B校验失败:数据不符合要求" ) ; } private static CompletableFuture < String > callInterfaceC ( ) { return createAsyncInterfaceWithFailure ( "接口C" , 300 , "校验结果C" , 0.1 , "接口C校验失败:数据不符合要求" ) ; } private static CompletableFuture < String > callInterfaceD ( ) { return createAsyncInterfaceWithFailure ( "接口D" , 300 , "结果D" , 0.3 , "接口D校验失败:数据不符合要求" ) ; } private static CompletableFuture < String > callInterfaceE ( ) { return createAsyncInterfaceWithFailure ( "接口E" , 200 , "结果E" , 0.1 , "接口E校验失败:数据不符合要求" ) ; } private static String executeTaskOne ( String resultA, String resultC) { System . out. printf ( " 🎯 执行任务一,输入: %s, %s - %dms%n" , resultA, resultC, getCurrentTime ( ) ) ; return "任务一完成" ; } private static String executeTaskTwo ( String resultA, String resultB) { System . out. printf ( " 🎯 执行任务二,输入: %s, %s - %dms%n" , resultA, resultB, getCurrentTime ( ) ) ; return "任务二完成" ; } private static String executeTaskThree ( String resultA, String resultB, String resultC) { System . out. printf ( " 🎯 执行任务三,输入: %s, %s, %s - %dms%n" , resultA, resultB, resultC, getCurrentTime ( ) ) ; return "任务三完成" ; } private static String executeTaskFour ( String resultD, String task1Result, String task2Result, String task3Result) { System . out. printf ( " 🎯 执行任务四,输入: %s, %s, %s, %s - %dms%n" , resultD, task1Result, task2Result, task3Result, getCurrentTime ( ) ) ; return "任务四完成" ; } private static String executeTaskFive ( String resultE, String task3Result) { System . out. printf ( " 🎯 执行任务五,输入: %s, %s - %dms%n" , resultE, task3Result, getCurrentTime ( ) ) ; return "任务五完成" ; } private static long getCurrentTime ( ) { return System . currentTimeMillis ( ) - START_TIME ; } private static void safeSleep ( long milliseconds, String interfaceName) { try { Thread . sleep ( milliseconds) ; } catch ( InterruptedException e) { Thread . currentThread ( ) . interrupt ( ) ; throw new RuntimeException ( interfaceName + "被中断" , e) ; } } private static CompletableFuture < String > createAsyncInterfaceWithFailure ( String interfaceName, long delayMs, String result, double failureRate, String failureMessage) { return CompletableFuture . supplyAsync ( ( ) -> { safeSleep ( delayMs, interfaceName) ; if ( Math . random ( ) < failureRate) { System . out. printf ( " ❌ %s失败 - %dms%n" , interfaceName, getCurrentTime ( ) ) ; throw new RuntimeException ( failureMessage) ; } System . out. printf ( " ✅ %s完成: %s - %dms%n" , interfaceName, result, getCurrentTime ( ) ) ; return result; } , CUSTOM_EXECUTOR ) ; } }