揭秘 CompletedFuture 的设计精髓(深入实现分析)
CompletionStage 接口说明见 Java异步编程:CompletionStage接口详解-CSDN博客
基本流程理解见:揭秘 CompletedFuture 的设计精髓(基础)-CSDN博客
成员变量:
-
volatile Object result
- 作用:存储异步任务的结果或异常(通过
AltResult
封装)。 - 编码规则:
- 正常结果直接存储(
null
编码为NIL
)。 - 异常通过
AltResult
包装(如new AltResult(ex)
)。
- 正常结果直接存储(
- 原子性保证:通过
RESULT.compareAndSet(this, null, r)
实现无锁更新。
- 作用:存储异步任务的结果或异常(通过
-
volatile Completion stack
- 作用:维护依赖操作的 Treiber 栈结构,所有依赖此结果的阶段(
Completion
)按 LIFO 顺序触发。 Completion
类型:- 单源依赖(
UniCompletion
):如thenApply
、thenAccept
。 - 双源依赖(
BiCompletion
):如thenCombine
、thenAcceptBoth
。 - 信号处理(
Signaller
):用于阻塞等待线程(如 get() 的阻塞唤醒)。
- 单源依赖(
- 作用:维护依赖操作的 Treiber 栈结构,所有依赖此结果的阶段(
Treiber 栈,实际上可以理解为一个 从头部CAS插入和删除的链表
Completion
是一个链表节点,有next
字段。
stack
是一个由Completion.next
构成的单向链表(栈),如下:
stack → C3 → C2 → C1 → null
每个 Completion 节点代表一个未执行的操作(如
thenApply
),一旦完成就从栈中弹出并执行。
实际上,CompletableFuture形成了一个有向无环图(DAG)结构:
- 单输入操作(如thenApply):形成链式结构
- 双输入操作(如thenCombine):形成汇聚结构(通过CoCompletion转发到同一个节点)
- 分支操作(如thenApplyAsync多次调用):形成扇出结构
每个CompletableFuture都有自己的stack,存储依赖于它的Completion链表。
Completion
Completion
作为 CompletableFuture
的依赖操作载体,通过 Treiber 栈组织依赖关系,结合无锁化和异步触发机制,实现了高效的异步任务流水线。其子类(如 UniApply
、BiApply
)针对不同场景封装逻辑,而 tryFire
和 postComplete
构成了核心的触发链路。
-
继承关系:
abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask
- 作为
ForkJoinTask
,支持异步任务提交(通过ForkJoinPool
)。 - 作为
Runnable
,可直接被线程池执行(如executor.execute(completion)
)。 AsynchronousCompletionTask
是标记接口,用于调试和监控。
- 作为
-
关键字段与方法:
volatile Completion next; // Treiber栈的链表指针 abstract CompletableFuture<?> tryFire(int mode); // 触发依赖操作 abstract boolean isLive(); // 判断节点是否有效
next
:构建 Treiber 栈结构,链式存储依赖关系。tryFire
:核心抽象方法,由子类实现具体逻辑(如应用函数、合并结果等)。isLive
:用于清理无效节点(如已取消的任务)
Completion
的子类分为两类:单源依赖(UniCompletion
)和多源依赖(BiCompletion
)。
(1) 单源依赖:UniCompletion
abstract static class UniCompletion<T,V> extends Completion {Executor executor; // 执行器(可能为null)CompletableFuture<V> dep; // 依赖此操作的目标CompletableFutureCompletableFuture<T> src; // 源CompletableFuture// ...
}
-
典型子类:
UniApply
:对应thenApply
,应用函数Function<T,U>
。UniAccept
:对应thenAccept
,消费结果Consumer<T>
。UniRun
:对应thenRun
,执行Runnable
。
-
触发流程:
- 当
src
(源)完成时,调用tryFire
:if (src.result != null && dep.result == null) {d.completeValue(fn.apply(src.result)); // 执行函数并完成依赖 }
- 当
(2) 双源依赖:BiCompletion
java
abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> { CompletableFuture<U> snd; // 第二个源 // ... }
-
典型子类:
BiApply
:对应thenCombine
,合并两个结果BiFunction<T,U,V>
。BiAccept
:对应thenAcceptBoth
,双消费BiConsumer<T,U>
。BiRun
:对应runAfterBoth
,双源完成后执行Runnable
。
-
触发流程:
- 需检查两个源(
src
和snd
)是否均完成:
- 需检查两个源(
if (src.result != null && snd.result != null) {d.completeValue(fn.apply(src.result, snd.result));
}
Completion
和 CompletableFuture
共同构建了一个依赖链的 Treiber 栈结构:
-
依赖注册:
- 当调用
thenApply、thenCombine
等方法时,创建对应的Completion
节点。 - 节点被推入源
CompletableFuture
的stack
(通过unipush
或bipush
)。
void unipush(Completion c) { while (!tryPushStack(c)) { /* CAS竞争入栈 */ } }
- 当调用
-
结果触发:
- 源完成时(
complete、completeExceptionally
),调用postComplete()
遍历栈:java
while ((h = stack) != null) {stack = h.next; // 弹出节点h.tryFire(SYNC); // 触发依赖操作 }
tryFire
会根据模式(SYNC、ASYNC
)决定同步执行或提交到线程池。
- 源完成时(
-
异步执行:
- 若
Completion
包含executor
,其run()
方法会委托给线程池:public void run() { tryFire(ASYNC); } // 通过executor异步执行
- 若
设计模式与优势
-
观察者模式:
CompletableFuture
是被观察者,Completion
是观察者节点。- 观察者(依赖操作)在源完成时被通知(通过
postComplete
)。
-
无锁栈管理:Treiber 栈通过
volatile
和 CAS(STACK.compareAndSet
)实现线程安全。 -
灵活的任务链:支持单源、多源、异步、异常处理等多种组合方式。
Completion
与 CompletableFuture
的嵌套关系
当调用 thenApply
时,实际上会生成一个新的 CompletableFuture
作为目标 Future(dep
),用于保存当前阶段的操作结果。例如:
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello"); CompletableFuture<Integer> future2 = future.thenApply(s -> s.length());
future.thenApply(...)
创建了一个新的CompletableFuture<Integer>
(future2
)。- 这个新 Future 是
UniApply
的目标(dep
字段),而future
是源(src
字段)。
源 Future (future) 目标 Future (future2)| |v v[stack] → [UniApply] → [UniApply.dep = future2]
UniApply
节点:src
:源 Future(future
)。dep
:目标 Future(future2
)。fn
:用户传入的函数(s -> s.length()
)。
当 future
完成时,UniApply
节点的 tryFire(...)
会执行函数,并将结果写入 future2
。此时,如果 future2
后续有 thenApply
,会继续创建新的 Completion 节点,推入 future2
的栈中。
CompletableFuture<Integer> future3 = future.thenApply(s -> s.length()) // future2.thenApply(i -> i * 2); // future3
-
结构:
future → [UniApply(dep=future2)] → future2 → [UniApply(dep=future3)] → future3
-
执行流程:
future
完成后,触发UniApply
节点。UniApply
执行s -> s.length()
,结果写入future2
。future2
完成后,触发其栈中的UniApply
节点。UniApply
执行i -> i * 2
,结果写入future3
。
如果多次调用就会形成一个嵌套链表,这也是postComplete为什么会那样写,因为要循环处理这样的结构需要回溯。
f2 = f1.ThenApply(a)
f3 = f2.ThenApply(b)
f4 = f1.ThenApply(c)
f5 = f4.ThenApply(c)f1.stack → [ThenApply(dep=f2)] → [ThenApply(dep=f4)] → null
f2.stack → [ThenApply(dep=f3)] → null
f4.stack → [ThenApply(dep=f5)] → null
thenApply
全流程源码级解析
CompletableFuture<Integer> src = CompletableFuture.supplyAsync(() -> 1); CompletableFuture<String> dep = src.thenApply(i -> i.toString());
- 流程分解:
thenApply
创建UniApply
节点,推入src.stack
。src
完成后,触发UniApply.tryFire
,将i.toString()
结果写入dep
。- 若
thenApplyAsync
指定了线程池,则通过executor.execute(node)
异步执行。
thenApply
方法定义:
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn); // 同步执行
}
创建依赖阶段 (uniApplyStage
)
private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T,? extends V> f) {if (f == null) throw new NullPointerException();CompletableFuture<V> d = newIncompleteFuture(); // 创建目标FutureObject r;if ((r = result) != null) // 源已完成的快速路径d.uniApplyNow(r, e, f); // 直接同步执行函数elseunipush(new UniApply<T,V>(e, d, this, f)); // 创建依赖节点并入栈return d;
}
- 关键步骤:
newIncompleteFuture()
:创建目标CompletableFuture<V>
(dep
)。- 结果检查:若源已完成(
result != null
),直接同步执行函数。 - 未完成时:创建
UniApply
节点并推入栈(unipush
)。
构建 UniApply
节点
@SuppressWarnings("serial")
static final class UniApply<T,V> extends UniCompletion<T,V> {Function<? super T,? extends V> fn;UniApply(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src,Function<? super T,? extends V> fn) {super(executor, dep, src); // 父类初始化this.fn = fn;}// ...
}
- 继承结构:
UniApply
→UniCompletion
→Completion
。
- 关键字段:
executor
:异步执行时使用的线程池(可能为null
)。dep
:依赖此操作的目标 Future(dep
)。src
:源 Future(src
)。fn
:用户传入的函数。
推入 Treiber 栈 (unipush
)
final void unipush(Completion c) {if (c != null) {while (!tryPushStack(c)) { // CAS竞争入栈if (result != null) { // 源突然完成时终止循环NEXT.set(c, null); // 清理next指针break;}}if (result != null)c.tryFire(SYNC); // 立即触发}
}// CAS入栈实现
final boolean tryPushStack(Completion c) {Completion h = stack;NEXT.set(c, h); // c.next = hreturn STACK.compareAndSet(this, h, c); // stack = c
}
- 无锁栈操作:
- 通过
STACK.compareAndSet
原子性更新栈顶。 - 若入栈过程中源突然完成(
result != null
),直接触发tryFire(SYNC)
。
- 通过
complete时发生了什么
源完成触发依赖 (postComplete
)
当源 Future (src
) 完成时(如调用 complete(42)
):
postComplete()
的作用是:
遍历当前 future 和其依赖 future 的 Completion 栈,依次弹出 Completion 并执行,直到所有 Completion 处理完毕。
它利用 CAS 实现无锁并发,利用栈结构支持链式 Completion,通过tryFire
触发下游操作,最终实现高效的异步回调链执行机制。
public boolean complete(T value) {boolean triggered = completeValue(value); // CAS设置resultpostComplete(); // 触发所有依赖节点return triggered;
}final void postComplete() {CompletableFuture<?> f = this;while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) {CompletableFuture<?> d; Completion t;if (STACK.compareAndSet(f, h, t = h.next)) { // 弹出栈顶if (t != null) {if (f != this) {pushStack(h); // 处理嵌套依赖continue;}NEXT.compareAndSet(h, t, null); // 断开链表}f = (d = h.tryFire(NESTED)) == null ? this : d; // 触发操作}}
}
- 核心流程:
- 循环弹出栈顶节点(
h
),调用h.tryFire(NESTED)
。 tryFire
返回非空时,处理新生成的依赖链(如级联thenApply
)。
- 循环弹出栈顶节点(
(h = f.stack) != null || (f != this && (h = (f = this).stack) != null)
是 两阶段扫描:
- 第一阶段:从当前 future (
f
) 的栈中取 Completion; - 第二阶段:如果当前栈空了,回到原始 future (
this
) 再次取一次; - 目的:确保所有嵌套 Completion 都能被执行。
t != null && f != this
表示:
- 当前 Completion 有后续节点(
t != null
); - 但当前处理的是嵌套 future(
f != this
); - 所以要把当前 Completion 压回去,留给后续处理。
否则会导致后续 Completion 被跳过,导致任务丢失。
为什么需要 postComplete()
递归处理?
当 dep
完成时,需要触发 dep
自己的栈。例如:
CompletableFuture<Integer> future3 = future2.thenApply(i -> i * 2);
future2
完成后,UniApply
节点的tryFire(...)
会返回future3
。postComplete()
会切换到future3
的栈,继续触发后续操作。
f = (d = h.tryFire(NESTED)) == null ? this : d;
d = h.tryFire(...)
:返回dep
(如future2
)。f = d
:切换到dep
的栈,继续处理它的 Completion 节点。
执行依赖操作 (UniApply.tryFire
)
final CompletableFuture<V> tryFire(int mode) {CompletableFuture<V> d;CompletableFuture<T> a;Object r; Throwable x; Function<? super T,? extends V> f;if ((a = src) == null || (r = a.result) == null|| (d = dep) == null || (f = fn) == null)return null; // 参数未就绪时退出try {if (mode <= 0 && !claim()) // 检查是否可执行(CAS标记)return null;if (r instanceof AltResult) { // 异常处理if ((x = ((AltResult)r).ex) != null) {d.completeThrowable(x, r);return d.postFire(a, mode);}r = null;}@SuppressWarnings("unchecked") T t = (T) r;d.completeValue(f.apply(t)); // 应用函数并完成目标Future} catch (Throwable ex) {d.completeThrowable(ex);}src = null; dep = null; fn = null; // 清理引用return d.postFire(a, mode); // 继续触发后续依赖
}
- 关键步骤:
claim()
:通过 CAS 标记任务已被认领,防止重复执行。- 异常检查:若源结果包含异常,直接传播到
dep
。 - 函数应用:
f.apply(t)
执行用户逻辑,结果写入dep
。 - 清理资源:断开与源 Future 的引用,防止内存泄漏。
异步执行路径
若调用 thenApplyAsync
(指定 executor
):
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {return uniApplyStage(defaultExecutor(), fn); // 传递线程池
}// UniApply执行时:
if (e != null) {e.execute(new UniApply<T,V>(null, d, this, f)); // 提交到线程池
} else {// 同步执行(同前)
}
- 异步触发:
UniApply
作为Runnable
被提交到线程池。- 其
run()
方法调用tryFire(ASYNC)
,逻辑与同步模式一致。
最终结果传递
- 同步完成:
d.completeValue(f.apply(t))
直接设置结果,触发dep
的postComplete()
。
- 异步完成:
- 由线程池线程调用
tryFire(ASYNC)
,其余逻辑相同。
- 由线程池线程调用
设计总结
-
依赖链结构:
- 每个
thenApply
创建一个UniApply
节点,形成 Treiber 栈链。 - 栈结构确保依赖按注册顺序 LIFO 触发(后注册先执行)。
- 每个
-
无锁化实现:
- 通过
volatile
和 CAS 实现栈操作的线程安全。 claim()
方法避免多个线程重复执行同一节点。
- 通过
-
异常传播:
- 异常通过
AltResult
包装,在tryFire
中自动传播到下游。
- 异常通过
-
资源管理:
- 完成后的节点会清理
src
、dep
引用,辅助 GC 回收。 postFire
中调用cleanStack()
移除无效节点。
- 完成后的节点会清理
双源 Completion(如 thenCombine
)的结构与执行流程
🔹 示例代码:
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 100); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> f3 = f1.thenCombine(f2, (i, s) -> s + i);
🔹 结构图:
f1.stack → [BiApply(dep=f3, snd=f2)] → null
f2.stack → [CoCompletion(base=BiApply)] → null
实现类关系:
BiApply<T,U,V>
:处理两个源的结果,执行BiFunction<T,U,V>
。CoCompletion
:辅助类,指向第一个源的 Completion,用于第二个源完成后唤醒整个操作。
三、thenCombine
的完整执行流程详解
f1.bipush(f2, new BiApply<>(e, d, this, b, f));
bipush(...)
内部会将BiApply
推入f1
的栈,并创建CoCompletion
推入f2
的栈。
class CoCompletion extends Completion {BiCompletion<?,?,?> base;final CompletableFuture<?> tryFire(int mode) {return base.tryFire(mode); // 转发给 BiApply}
}
- 这样当
f2
完成时,也能触发BiApply
。
🔹 步骤 3:任意源完成都会尝试触发
当 f1
或 f2
完成时,各自调用 postComplete()
,尝试从各自的栈中取出 Completion 并执行。
以 f1
完成为例:
f1
的栈中有BiApply
节点。BiApply.tryFire(...)
会检查f1.result != null && f2.result != null
。- 如果两个源都完成了,则执行
fn.apply(t, s)
并完成f3
。
如果 f1
先完成,但 f2
还没完成:
if ((r instanceof AltResult && ((AltResult)r).ex != null) ||(s instanceof AltResult && ((AltResult)s).ex != null)) {d.completeThrowable(...); // 异常处理
} else if (r == null || s == null) {return false; // 有一个未完成,不能执行
}
不执行,等待 f2
完成后再次触发。
双源 Completion 并不是简单的链表结构,而是一种 双向监听 + 协作完成机制。
🔹 原因:
- Java 的
CompletableFuture
支持 任意顺序完成,即:f1
先完成,f2
后完成;f2
先完成,f1
后完成;
- 所以每个 Completion 都要能独立判断是否可以执行。
🔹 设计方案:
- 主节点:
BiApply
节点被推入f1
的栈。 - 辅助节点:
CoCompletion
节点被推入f2
的栈,指向BiApply
。 - 协作逻辑:
- 当
f1
完成时,检查f2
是否也完成,若完成则执行。 - 若
f2
未完成,则等待其完成后再触发。 - 同理,
f2
完成时也会触发CoCompletion
,再转发到BiApply
。
- 当
f1.stack → [BiApply(dep=f3, src=f1, snd=f2, fn)]
f2.stack → [CoCompletion(base=BiApply)]
当 f1
完成时:
BiApply.tryFire(...)
检查f2
是否也完成。- 如果是,执行
fn.apply(...)
, 完成f3
。
当 f2
完成时:
CoCompletion.tryFire(...)
转发到BiApply.tryFire(...)
。- 同样检查
f1
是否完成,若完成则执行。
🔹 问题背景:
BiApply
是属于f1
的 Completion。f2
也需要知道这个操作的存在,否则它完成时无法触发BiApply
。
🔹 解决方案:
- 在
f2
上注册一个轻量级代理节点CoCompletion
,它不执行任何操作,只是把控制权转交给BiApply
。
class CoCompletion extends Completion {BiCompletion<?,?,?> base;final CompletableFuture<?> tryFire(int mode) {return base.tryFire(mode); // 转发到 BiApply}final boolean isLive() { return base != null && base.dep != null; }
}
f1
完成 → 直接执行BiApply
。f2
完成 → 触发CoCompletion
→ 转发到BiApply
。- 确保无论哪个源先完成,都能正确处理。
总体理解:
tryFire(...)
是所有 Completion 的统一入口,负责判断是否可执行,并实际执行用户逻辑。- 单源 Completion(如
thenApply
)只需要一个 Completion 节点。 - 双源 Completion(如
thenCombine
)需要两个 Completion 节点:- 主节点
BiApply
注册在第一个源上。 - 代理节点
CoCompletion
注册在第二个源上,仅转发控制权。
- 主节点
postComplete/tryFire 的逻辑解析
tryFire 和 postCompletion 这两个函数之前进行过一些分析,但因为两者逻辑上是令人困惑的,所以这里做一次总结。
解释令人困惑的代码:
while ((h = f.stack) != null ||(f != this && (h = (f = this).stack) != null)) {CompletableFuture<?> d; Completion t;if (STACK.compareAndSet(f, h, t = h.next)) {if (t != null) {if (f != this) {pushStack(h); // 关键:为什么要放回去?continue;}NEXT.compareAndSet(h, t, null);}f = (d = h.tryFire(NESTED)) == null ? this : d; // 关键:返回值含义}
}
postComplete
方法负责在CompletableFuture完成后触发所有依赖的Completion。它的核心逻辑是:
- 从stack中弹出Completion节点
- 调用每个Completion的
tryFire(NESTED)
方法 - 处理可能的链式传播
tryFire方法有三种模式:
- SYNC (0): 同步模式,直接执行
- ASYNC (1): 异步模式,通过线程池执行
- NESTED (-1): 嵌套模式,用于避免无限递归
NESTED模式是为了解决递归调用问题。当一个Completion完成后,可能触发其他Completion的完成,形成链式反应。如果每次都直接调用postComplete,会导致栈溢出。postComplete调用tryFire时使用NESTED模式
tryFire在NESTED模式下
情况1:返回null。表示这个Completion处理完毕,没有需要继续传播的CompletableFuture。f 被设置为this,继续处理当前CompletableFuture的其他Completion
情况2:返回 Completion中dep字段指向的CompletableFuture,即下一个需要处理的CompletableFuture。
postComplete通过循环而非递归来处理这个返回值
final CompletableFuture<V> tryFire(int mode) {// ... 执行逻辑后return dep.postFire(src, mode); // 返回dep,即下一个CompletableFuture}final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {if (a != null && a.stack != null) {Object r;if ((r = a.result) == null)a.cleanStack();// NEST == -1 不走这if (mode >= 0 && (r != null || a.result != null))a.postComplete();}if (result != null && stack != null) {if (mode < 0)return this;elsepostComplete();}return null;}
"放回去"的原因
if (f != this) {pushStack(h);continue;
}
这里的逻辑是:
- 当前处理的不是最初的CompletableFuture (this)
- 而是在处理传播链中的其他CompletableFuture (f)
- 将Completion h放回到 当前CompletableFuture(this) 的stack上
- 这样确保所有需要处理的Completion最终都在原始的this上排队处理
举一个例子,用Completion结构表示:
f1.stack -> [Completion1{dep:f2}] -> [Completion2{dep:f3}] -> null
f2.stack -> [Completion3{dep:f4}] -> [Completion4{dep:f5}] -> null
f4.stack -> [Completion5{dep:f6}] -> null
postComplete执行过程
如果没有"放回去"机制,执行路径是:
f1(this)
f1 → f2 → f4 → f6
f1 → f3 (结束)
遗漏的依赖:
- Comp4{dep:f5}:f2→f5的依赖链
当f != this时(即当前处理的不是原始的f1),执行pushStack(h)将未处理的Completion放回f1的stack中。经过"放回去"机制,f1.stack动态变化:
初始:f1.stack → [Comp1{f2}] → [Comp2{f3}] → null
处理Comp1后:
f1.stack → [Comp2{f3}] → null
现在 f=f2,f2.stack → [Comp3{f4}] → [Comp4{f5}] → null
处理f2的栈时:Comp3{f4}会被放回f1.stack
- f1.stack → [Comp3{f4}] → [Comp2{f3}] → null
- f2.stack →[Comp4{f5}] → null
- 现在还在 f = f2,因此继续处理Comp4{f5},因为这是最后一个,不会压入 f1的栈,直接执行。
最终保证所有依赖链都被正确处理。
这里是为了满足能够通过循环遍历图,如果使用递归则没必要这样处理。
为什么需要claim()
虽然stack是通过CAS操作维护的链表,但这只保证了链表结构的一致性,不能保证Completion执行的唯一性。问题在于:
- 多线程竞争执行: 同一个Completion可能被多个线程同时触发
- 重复执行风险: 没有claim()的话,一个Completion可能被执行多次
claim()方法使用ForkJoinTask的tag位作为原子标记:
final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {if (e == null)return true;executor = null; // disablee.execute(this);}return false;
}
这里使用了ForkJoinTask的tag字段作为CAS标记,确保只有一个线程能成功claim这个Completion。
CAS操作在这里分两个层面:
- stack链表的CAS: 保证链表结构操作的原子性(如tryPushStack、STACK.compareAndSet)
- Completion执行的CAS: 通过claim()方法保证每个Completion只被执行一次
例如
CompletableFuture<String> cf1 = new CompletableFuture<>();
CompletableFuture<Integer> cf2 = cf1.thenApply(String::length);// 在不同线程中
cf1.complete("hello"); // 线程A
cf1.complete("world"); // 线程B (尝试但不会成功)
这种情况,两个线程会取执行 包含 cf2 的Completion。另一种情况是thencombine,这里有天然的多线程操作后继 Completion的操作。
stack的CAS保证了链表操作的原子性,而claim()保证了业务逻辑执行的唯一性,两者配合实现了完整的并发安全保障。
在 tryFire 方法中,关键代码是:
try {if (mode <= 0 && !claim())return null;else {@SuppressWarnings("unchecked") T t = (T) r;d.completeValue(f.apply(t));}
} catch (Throwable ex) {d.completeThrowable(ex);
}
claim 在ASYNC模式没有被调用,可以通过整个链路来理解:
完整的 thenApplyAsync 调用链路
- 用户调用
future.thenApplyAsync(fn)
- thenApplyAsync 调用 uniApplyStage(defaultExecutor(), fn)
- uniApplyStage 创建新的 CompletableFuture 和 UniApply Completion。uniApplyStage 中如果有结果会进行 uniApplyNow,直接计算(根据是不是异步,交给线程池)。
- unipush中,如果有结果会进行tryFire(SYNC),这之后的调用和postComplete类似,但会调用到postComplete,因为要触发之后的调用,之后使用NEST模式,不会再递归调用postComplete。
- tryFire(SYNC) ,判断(mode <= 0 && !claim()) 会通过,因此返回null,把执行交给线程池 e,claim返回 flase,导致 tryFire 直接 return null。
- UniApply Completion 通过 unipush 方法添加到源 CompletableFuture 的栈中
- 当源 CompletableFuture 完成时:
- 调用 postComplete() 方法
- postComplete() 弹出栈中的 Completion 并调用 tryFire(NESTED)
- UniApply.tryFire(NESTED) 通过 claim() 方法提交自身到执行器
- 执行器执行 Completion 的 run() 方法
- run() 调用 tryFire(ASYNC)
- tryFire(ASYNC) 直接执行函数而不再检查 claim()
- 执行结果用于完成目标 CompletableFuture
现在我们可以清楚地理解:
-
tryFire(ASYNC) 的调用时机:当 Completion 被执行器异步执行时,通过 run() 或 exec() 方法调用 tryFire(ASYNC)
-
claim() 方法的作用:
- 使用 CAS 操作确保 Completion 只被一个线程处理
- 如果有执行器,将 Completion 提交到执行器异步执行,并返回 false
-
为什么 ASYNC 模式不需要 claim:
- 当以 ASYNC 模式调用 tryFire 时,Completion 已经在执行器线程中运行
- claim() 的两个目的(确保单线程处理和提交到执行器)在 ASYNC 模式下都不再需要
tryFire
tryFire方法有三种模式:
- SYNC (0): 同步模式,直接在当前线程执行
- ASYNC (1): 异步模式,在线程池中执行
- NESTED (-1): 嵌套模式,用于避免递归调用栈溢出
SYNC模式的调用时机
1.1 UniCompletion中的SYNC调用
在unipush方法中,当CompletableFuture已经完成时:
if (result != null)c.tryFire(SYNC);
这种情况下,由于源CompletableFuture已经有结果,可以立即同步执行Completion。
1.2 BiCompletion中的SYNC调用
在bipush方法中,当两个源都已完成时:
final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {if (c != null) {while (result == null) {if (tryPushStack(c)) {if (b.result == null)b.unipush(new CoCompletion(c));else if (result != null) // 检查自己有没有完成c.tryFire(SYNC);return;}}b.unipush(c);}}
当都完成时,直接同步触发BiCompletion。
1.3 Or操作中的SYNC调用
在orpush方法中:
final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {if (c != null) {while (!tryPushStack(c)) {if (result != null) {NEXT.set(c, null);break;}}if (result != null)c.tryFire(SYNC);elseb.unipush(new CoCompletion(c)); // 这里也会判断 b 有没有结束,实现了 or}}final void unipush(Completion c) {if (c != null) {while (!tryPushStack(c)) {if (result != null) {NEXT.set(c, null);break;}}if (result != null)c.tryFire(SYNC);}}
当任一源已完成时,立即同步执行OrCompletion。
ASYNC模式的调用时机
UniCompletion中的ASYNC调用
在UniCompletion的claim方法中:
final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {if (e == null)return true;executor = null;e.execute(this); // 这里会异步调用run()方法,最终调用tryFire(ASYNC)}return false;
}
当Completion需要在指定的Executor中异步执行时,会调用tryFire(ASYNC)。
Completion继承自ForkJoinTask,其exec方法:
public final boolean exec() { tryFire(ASYNC); return false; }
public final void run() { tryFire(ASYNC); }
当Completion作为ForkJoinTask执行时,会调用tryFire(ASYNC)。
UniCompletion的执行流程
以UniApply为例:
final CompletableFuture<V> tryFire(int mode) {// 检查前置条件if ((a = src) == null || (r = a.result) == null|| (d = dep) == null || (f = fn) == null)return null;tryComplete: if (d.result == null) {// 处理异常结果if (r instanceof AltResult) {if ((x = ((AltResult)r).ex) != null) {d.completeThrowable(x, r);break tryComplete;}r = null;}try {// 关键:模式判断if (mode <= 0 && !claim())return null;else {@SuppressWarnings("unchecked") T t = (T) r;d.completeValue(f.apply(t));}} catch (Throwable ex) {d.completeThrowable(ex);}}// 清理引用src = null; dep = null; fn = null;return d.postFire(a, mode);
}
模式判断逻辑:
- mode <= 0(SYNC或NESTED):需要调用claim()来确保独占执行
- mode > 0(ASYNC):直接执行,因为已经在异步环境中
BiCompletion的执行流程
以BiApply为例:
final CompletableFuture<V> tryFire(int mode) {// 检查两个源都已完成if ((a = src) == null || (r = a.result) == null|| (b = snd) == null || (s = b.result) == null|| (d = dep) == null || (f = fn) == null|| !d.biApply(r, s, f, mode > 0 ? null : this))return null;// 清理引用src = null; snd = null; dep = null; fn = null;return d.postFire(a, b, mode);
}
在biApply方法中:
final <R,S> boolean biApply(Object r, Object s,BiFunction<? super R,? super S,? extends T> f,BiApply<R,S,T> c) {// 处理异常...try {if (c != null && !c.claim()) // 当mode <= 0时,c不为nullreturn false;@SuppressWarnings("unchecked") R rr = (R) r;@SuppressWarnings("unchecked") S ss = (S) s;completeValue(f.apply(rr, ss));} catch (Throwable ex) {completeThrowable(ex);}return true;
}
关键差异:
- mode > 0(ASYNC):传入null作为Completion参数,跳过claim()检查
- mode <= 0(SYNC/NESTED):传入this,需要通过claim()确保独占执行
Canceller、Signaller、Timeout、anyOf与allOf
在前面的讨论中,我们已经了解了CompletableFuture的基本工作机制,包括uni、bi和or操作的执行流程。接下来,我将对CompletableFuture中的其他重要组件和特性进行详细分析。
1. Canceller - 超时取消机制
Canceller
是一个实现了BiConsumer<Object, Throwable>
的内部类,主要用于取消计划任务:
static final class Canceller implements BiConsumer<Object, Throwable> {final Future<?> f;Canceller(Future<?> f) { this.f = f; }public void accept(Object ignore, Throwable ex) {if (f != null) // currently never nullf.cancel(false);}
}
-
主要用途:用于取消超时任务。当CompletableFuture以其他方式完成时,不再需要超时触发。
-
工作流程:
- 当设置超时时(如使用
orTimeout
或completeOnTimeout
方法),会创建一个定时任务 - 同时通过
whenComplete
注册一个Canceller - 如果CompletableFuture在超时前完成,Canceller会取消定时任务
- 这避免了不必要的超时任务继续占用系统资源
- 当设置超时时(如使用
-
使用示例:在
arrangeTimeout
方法中可以看到完整流程
private <U> void arrangeTimeout(long nanoDelay, Timeout<U> onTimeout) {ForkJoinPool e = ASYNC_POOL;if (result == null) {ScheduledForkJoinTask<Void> t = new ScheduledForkJoinTask<Void>(nanoDelay, 0L, true, onTimeout, null, e);whenComplete(new Canceller(t)); // 注册Canceller取消器e.scheduleDelayedTask(t);}
}
2. Timeout - 超时处理机制
Timeout
是一个实现了Runnable
的内部类,负责在超时发生时执行操作:
static final class Timeout<U> implements Runnable {final CompletableFuture<U> f;final U value;final boolean exceptional;Timeout(CompletableFuture<U> f, U value, boolean exceptional) {this.f = f; this.value = value; this.exceptional = exceptional;}public void run() {if (f != null && !f.isDone()) {if (exceptional)f.completeExceptionally(new TimeoutException());elsef.complete(value);}}
}
Timeout的工作机制
-
两种工作模式:
- 异常模式(
exceptional=true
):超时时以TimeoutException
异常完成Future - 默认值模式(
exceptional=false
):超时时以指定的默认值完成Future
- 异常模式(
-
对应的公共API:
orTimeout(timeout, unit)
:超时抛异常completeOnTimeout(value, timeout, unit)
:超时使用默认值
-
执行流程:
- 创建Timeout实例,包含操作模式和可能的默认值
- 将其作为定时任务提交到调度器
- 超时触发时,执行run()方法完成CompletableFuture
3. Signaller - 阻塞等待机制
Signaller
是一个特殊的Completion
子类,实现了ForkJoinPool.ManagedBlocker
接口,用于支持阻塞等待操作:
-
设计:
- 实现
ManagedBlocker
接口,确保在ForkJoinPool中阻塞时不会导致线程饥饿 - 支持中断和超时等待
- 使用LockSupport进行线程挂起和唤醒
- 实现
-
等待过程:
- 将Signaller实例推入CompletableFuture的栈
- 将当前线程记录在Signaller中
- 通过
LockSupport.park
或LockSupport.parkNanos
挂起线程 - 当Future完成时,触发Signaller的
tryFire
方法唤醒等待线程
-
唤醒机制:
- 当CompletableFuture完成时,会调用
postComplete
处理栈中的依赖 - Signaller的
tryFire
方法会解除线程的阻塞状态
- 当CompletableFuture完成时,会调用
static final class Signaller extends Completionimplements ForkJoinPool.ManagedBlocker {long nanos; // remaining wait time if timedfinal long deadline; // non-zero if timedfinal boolean interruptible;boolean interrupted;volatile Thread thread;Signaller(boolean interruptible, long nanos, long deadline) {this.thread = Thread.currentThread();this.interruptible = interruptible;this.nanos = nanos;this.deadline = deadline;}final CompletableFuture<?> tryFire(int ignore) {Thread w; // no need to atomically claimif ((w = thread) != null) {thread = null;LockSupport.unpark(w);}return null;}public boolean isReleasable() {if (Thread.interrupted())interrupted = true;return ((interrupted && interruptible) ||(deadline != 0L &&(nanos <= 0L ||(nanos = deadline - System.nanoTime()) <= 0L)) ||thread == null);}public boolean block() {while (!isReleasable()) {if (deadline == 0L)LockSupport.park(this);elseLockSupport.parkNanos(this, nanos);}return true;}final boolean isLive() { return thread != null; }}
4. anyOf - 任意完成机制
anyOf
静态方法创建一个在任何输入CompletableFuture完成时完成的新CompletableFuture:
-
核心类:
AnyOf
Completion类 -
设计特点:
- 监听多个输入Future中的任何一个
- 当任一输入完成时,使用相同结果完成输出Future
- 完成后清理其他输入Future的栈,避免资源泄漏
-
执行流程:
- 创建一个新的CompletableFuture作为结果
- 检查所有输入Future,如有已完成则直接使用其结果
- 对每个输入Future添加AnyOf监听器
- 当任一输入完成时,触发AnyOf.tryFire方法
- tryFire调用completeRelay传递结果,并清理其他输入的栈
-
优化处理:
- 特殊处理空数组和单元素数组的情况
- 完成后清理其他Future的栈,避免内存泄漏
- 结果处理使用encodeRelay,保证异常正确传播
anyOf的实现分析
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {int n; Object r;if ((n = cfs.length) <= 1)return (n == 0)? new CompletableFuture<Object>(): uniCopyStage(cfs[0]);for (CompletableFuture<?> cf : cfs)if ((r = cf.result) != null)return new CompletableFuture<Object>(encodeRelay(r));cfs = cfs.clone();CompletableFuture<Object> d = new CompletableFuture<>();for (CompletableFuture<?> cf : cfs)cf.unipush(new AnyOf(d, cf, cfs));// If d was completed while we were adding completions, we should// clean the stack of any sources that may have had completions// pushed on their stack after d was completed.if (d.result != null)for (int i = 0, len = cfs.length; i < len; i++)if (cfs[i].result != null)for (i++; i < len; i++)if (cfs[i].result == null)cfs[i].cleanStack();return d;
}
-
边界条件处理:
- 如果传入的CompletableFuture数组长度为0,返回一个新的未完成的CompletableFuture
- 如果长度为1,则直接返回该CompletableFuture的副本
-
快速完成检查:
- 遍历所有传入的CompletableFuture,检查是否已有完成的,如果有则立即创建一个已完成的CompletableFuture并返回
-
注册依赖:
- 克隆输入数组(防止修改原数组)
- 创建一个新的结果CompletableFuture
- 为每个源CompletableFuture注册AnyOf完成处理器
-
清理:
- 如果在注册过程中结果已经完成,清理那些依然未完成的CompletableFuture的栈
-
AnyOf Completion类:
static class AnyOf extends Completion {CompletableFuture<Object> dep; CompletableFuture<?> src;CompletableFuture<?>[] srcs;AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src,CompletableFuture<?>[] srcs) {this.dep = dep; this.src = src; this.srcs = srcs;}final CompletableFuture<Object> tryFire(int mode) {// assert mode != ASYNC;CompletableFuture<Object> d; CompletableFuture<?> a;CompletableFuture<?>[] as;Object r;if ((a = src) == null || (r = a.result) == null|| (d = dep) == null || (as = srcs) == null)return null;src = null; dep = null; srcs = null;if (d.completeRelay(r)) {for (CompletableFuture<?> b : as)if (b != a)b.cleanStack();if (mode < 0)return d;elsed.postComplete();}return null;}final boolean isLive() {CompletableFuture<Object> d;return (d = dep) != null && d.result == null;} }
tryFire
方法在源CompletableFuture完成时被调用- 将结果传递给依赖的CompletableFuture
- 清理其他所有源CompletableFuture的栈(这是关键优化,防止无用的栈积累)
5. allOf - 全部完成机制
allOf
静态方法创建一个在所有输入CompletableFuture完成时完成的新CompletableFuture:
-
核心方法:
andTree
- 递归构建完成树 -
设计特点:
- 使用分治法构建二叉树结构
- 通过BiRelay节点连接子树
- 任何异常都会导致结果Future异常完成
-
执行流程:
- 递归地将输入数组分成左右两半
- 对每半递归调用andTree,构建子树
- 使用BiRelay连接左右子树
- 只有当两个子树都完成时,父节点才会完成
- 如有任何一个Future异常完成,结果Future也异常完成
-
优化处理:
- 特殊处理空数组的情况,直接返回完成的Future
- 使用二分法平衡树结构,提高并行度和效率
- 优先传播异常,保证快速失败
BiRelay是一个特殊的BiCompletion,用于协调两个输入CompletableFuture,只有当两者都完成时才完成依赖的Future。
allOf的实现分析
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {return andTree(cfs, 0, cfs.length - 1);
}
allOf实际上是调用了andTree
方法:
static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,int lo, int hi) {CompletableFuture<Void> d = new CompletableFuture<Void>();if (lo > hi) // emptyd.result = NIL;else {CompletableFuture<?> a, b; Object r, s, z; Throwable x;int mid = (lo + hi) >>> 1;if ((a = (lo == mid ? cfs[lo] :andTree(cfs, lo, mid))) == null ||(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :andTree(cfs, mid+1, hi))) == null)throw new NullPointerException();if ((r = a.result) == null || (s = b.result) == null)a.bipush(b, new BiRelay<>(d, a, b));else if ((r instanceof AltResult&& (x = ((AltResult)(z = r)).ex) != null) ||(s instanceof AltResult&& (x = ((AltResult)(z = s)).ex) != null))d.result = encodeThrowable(x, z);elsed.result = NIL;}return d;
}
allOf实现详解:
-
构建二叉树:
- 采用分治策略,将输入的CompletableFuture数组递归地组织成一个完全二叉树
- 这种树形结构使得任务的并行度最大化
-
处理边界条件:
- 如果范围为空(lo > hi),则返回一个已完成的结果
-
递归构建子树:
- 通过中点(mid)划分数组为两部分
- 递归处理左子树和右子树
-
状态检查和依赖设置:
- 检查左右子树是否都已完成
- 如果有任一子树未完成,则设置双向依赖(bipush)
- 如果都已完成,检查是否有异常,然后相应地完成结果
-
BiRelay类:
static final class BiRelay<T,U> extends BiCompletion<T,U,Void> {BiRelay(CompletableFuture<Void> dep,CompletableFuture<T> src, CompletableFuture<U> snd) {super(null, dep, src, snd);}final CompletableFuture<Void> tryFire(int mode) {CompletableFuture<Void> d;CompletableFuture<T> a;CompletableFuture<U> b;Object r, s, z; Throwable x;if ( (a = src) == null || (r = a.result) == null|| (b = snd) == null || (s = b.result) == null|| (d = dep) == null)return null;if (d.result == null) {if ((r instanceof AltResult&& (x = ((AltResult)(z = r)).ex) != null) ||(s instanceof AltResult&& (x = ((AltResult)(z = s)).ex) != null))d.completeThrowable(x, z);elsed.completeNull();}src = null; snd = null; dep = null;return d.postFire(a, b, mode);} }
encodeRelay/completeRelay
encodeRelay
主要用于以下场景:
- 当一个CompletableFuture依赖于另一个CompletableFuture的结果时,使用
encodeRelay
确保异常结果被正确包装为CompletionException
,从而保持CompletableFuture的异常处理一致性。
假设有两个CompletableFuture f1
和f2
,f2
依赖于f1
的结果:
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("Error in f1");
});CompletableFuture<Integer> f2 = f1.thenApply(result -> {// Do something with resultreturn result * 2;
});
在这种情况下,如果f1
异常完成,f2
需要正确处理该异常。encodeRelay
确保f1
的异常结果被传递给f2
时,被正确包装为CompletionException
。
completeRelay 则设置这个包装结果
final boolean completeRelay(Object r) {
return RESULT.compareAndSet(this, null, encodeRelay(r));
}
关键场景分析
1. 超时处理场景
// 5秒后超时并抛出异常
CompletableFuture<String> future = someAsyncOperation().orTimeout(5, TimeUnit.SECONDS);// 5秒后超时并返回默认值
CompletableFuture<String> future = someAsyncOperation().completeOnTimeout("Default value", 5, TimeUnit.SECONDS);
工作流程:
- 调用arrangeTimeout创建定时任务
- 通过whenComplete注册Canceller取消器
- 如果原始Future在超时前完成,Canceller取消定时任务
- 否则,Timeout.run()方法在超时时执行,完成Future
2. 阻塞等待场景
String result = future.get(); // 可能阻塞直到完成
String result = future.get(10, TimeUnit.SECONDS); // 带超时的阻塞
工作流程:
- 创建Signaller实例并记录当前线程
- 将Signaller推入Future的栈中
- 通过LockSupport.park挂起当前线程
- 当Future完成时,触发栈中的Signaller.tryFire
- tryFire唤醒等待线程,继续执行
3. 组合多个Future场景
// 等待所有Future完成
CompletableFuture<Void> allDone = CompletableFuture.allOf(future1, future2, future3);// 等待任一Future完成
CompletableFuture<Object> anyDone = CompletableFuture.anyOf(future1, future2, future3);
工作流程(allOf):
- 通过andTree递归构建二叉树结构
- 每个节点使用BiRelay连接两个子节点
- 只有当左右子树都完成时,父节点才会完成
- 如有异常,优先传播异常
工作流程(anyOf):
- 创建一个新的输出Future
- 为每个输入Future添加AnyOf监听器
- 当任一输入完成,将结果传递给输出Future
- 清理其他Future的栈,避免资源泄漏
总结
CompletableFuture的这些高级特性体现了其强大的异步编程能力:
- 超时机制:通过Timeout和Canceller优雅地实现超时控制,同时避免资源浪费
- 阻塞等待:Signaller结合ManagedBlocker提供安全的阻塞等待,防止线程饥饿
- 组合操作:anyOf和allOf通过巧妙的树状结构,高效实现了多Future协调
这些机制共同构建了CompletableFuture强大且灵活的异步编程模型,使其成为Java中处理复杂异步流程的核心工具。
CompletableFuture 中获取结果的流程详解
CompletableFuture 提供了几种获取计算结果的方法:get()
、get(timeout, unit)
、join()
和getNow()
。这些方法的内部实现都涉及 waitingGet
、timedGet
、reportGet
和 reportJoin
等私有方法。下面详细分析它们的执行流程。
1. get()
方法的执行流程
当外部使用者调用 CompletableFuture.get()
方法时:
public T get() throws InterruptedException, ExecutionException {Object r;if ((r = result) == null)r = waitingGet(true);return (T) reportGet(r, "get");
}
执行步骤:
- 检查
result
字段是否为 null - 如果
result
不为 null,表示任务已完成,直接获取结果 - 如果
result
为 null,调用waitingGet(true)
等待任务完成 - 最后调用
reportGet
方法处理结果并返回
2. waitingGet
方法分析
waitingGet
方法实现了阻塞等待任务完成的核心逻辑:
private Object waitingGet(boolean interruptible) {if (interruptible && Thread.interrupted())return null;Signaller q = null;boolean queued = false;Object r;while ((r = result) == null) {if (q == null) {q = new Signaller(interruptible, 0L, 0L);if (Thread.currentThread() instanceof ForkJoinWorkerThread)ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);}else if (!queued)queued = tryPushStack(q);else if (interruptible && q.interrupted) {q.thread = null;cleanStack();return null;}else {try {ForkJoinPool.managedBlock(q);} catch (InterruptedException ie) {q.interrupted = true;}}}if (q != null) {q.thread = null;if (q.interrupted)Thread.currentThread().interrupt();}postComplete();return r;
}
执行步骤:
- 检查线程是否被中断(如果
interruptible
为 true) - 创建一个
Signaller
实例作为等待节点 - 如果当前线程是
ForkJoinWorkerThread
,尝试帮助处理异步阻塞任务 - 将
Signaller
节点推入等待栈中 - 调用
ForkJoinPool.managedBlock(q)
阻塞当前线程 - 当任务完成后(
result
不为 null),处理中断状态并调用postComplete()
- 返回结果
最后一段代码的目的是保留中断状态:
- 当线程在 waitingGet 中等待时,如果发生了中断,Signaller 检测到中断并设置 interrupted = true。
- 同时 Thread.interrupted() 方法会清除线程的实际中断状态。
- 方法结束时,该代码段通过 Thread.currentThread().interrupt() 恢复线程的中断状态。
3. Signaller
类的作用
Signaller
是 CompletableFuture
中实现线程等待和唤醒机制的关键类:
Signaller
同时扩展了Completion
和实现了ForkJoinPool.ManagedBlocker
接口- 它被添加到
CompletableFuture
的等待栈中 - 当
CompletableFuture
完成时,会调用tryFire()
方法唤醒等待的线程 isReleasable()
判断是否可以结束阻塞状态block()
实现了线程的阻塞逻辑,使用LockSupport.park/parkNanos
4. get(timeout, unit)
和 timedGet
方法
带超时参数的 get
方法:
public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {long nanos = unit.toNanos(timeout);Object r;if ((r = result) == null)r = timedGet(nanos);return (T) reportGet(r, "get");
}
timedGet
方法与 waitingGet
类似,但增加了超时处理:
private Object timedGet(long nanos) throws TimeoutException {long d = System.nanoTime() + nanos;long deadline = (d == 0L) ? 1L : d; // 避免为0boolean interrupted = false, queued = false;Signaller q = null;Object r = null;for (;;) { // 检查中断、结果、超时的顺序很重要if (interrupted || (interrupted = Thread.interrupted()))break;else if ((r = result) != null)break;else if (nanos <= 0L)break;else if (q == null) {q = new Signaller(true, nanos, deadline);if (Thread.currentThread() instanceof ForkJoinWorkerThread)ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);}else if (!queued)queued = tryPushStack(q);else {try {ForkJoinPool.managedBlock(q);interrupted = q.interrupted;nanos = q.nanos;} catch (InterruptedException ie) {interrupted = true;}}}// 清理和恢复if (q != null) {q.thread = null;if (r == null)cleanStack();}// 根据不同情况返回结果或抛出异常if (r != null) {if (interrupted)Thread.currentThread().interrupt();postComplete();return r;} else if (interrupted)return null;elsethrow new TimeoutException();
}
与 waitingGet
相比,timedGet
增加了:
- 计算超时截止时间
- 使用超时参数创建
Signaller
- 如果超时返回 null 并抛出
TimeoutException
5. reportGet
和 reportJoin
方法
这两个方法处理结果并根据不同情况抛出异常:
private static Object reportGet(Object r, String details)throws InterruptedException, ExecutionException {if (r == null) // 按约定,null表示中断throw new InterruptedException();if (r instanceof AltResult) {Throwable x, cause;if ((x = ((AltResult)r).ex) == null)return null;if (x instanceof CancellationException)throw new CancellationException(details, (CancellationException)x);if ((x instanceof CompletionException) &&(cause = x.getCause()) != null)x = cause;throw wrapInExecutionException(x);}return r;
}private static Object reportJoin(Object r, String details) {if (r instanceof AltResult) {Throwable x;if ((x = ((AltResult)r).ex) == null)return null;if (x instanceof CancellationException)throw new CancellationException(details, (CancellationException)x);if (x instanceof CompletionException)throw (CompletionException)x;throw wrapInCompletionException(x);}return r;
}
关键区别:
reportGet
抛出受检异常ExecutionException
,用于get()
方法reportJoin
抛出非受检异常CompletionException
,用于join()
和getNow()
方法- 对于
null
值的处理不同
6. join()
和 getNow()
方法
public T join() {Object r;if ((r = result) == null)r = waitingGet(false);return (T) reportJoin(r, "join");
}public T getNow(T valueIfAbsent) {Object r;return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r, "getNow");
}
join()
类似于get()
,但使用waitingGet(false)
表示不可中断,并使用reportJoin
getNow()
不会等待,如果结果不可用则返回指定的默认值
7. 完整流程总结
- 用户调用
get()/join()/getNow()
方法获取结果 - 检查
result
字段是否为 null - 如果为 null,通过
waitingGet
或timedGet
进入等待状态 - 创建
Signaller
对象并添加到等待栈中 - 当前线程通过
ForkJoinPool.managedBlock
或直接LockSupport.park
阻塞 - 当 CompletableFuture 完成时,会调用
postComplete()
唤醒等待的线程 - 线程醒来后通过
reportGet
或reportJoin
处理结果 - 根据结果类型返回值或抛出相应异常