当前位置: 首页 > news >正文

揭秘 CompletedFuture 的设计精髓(深入实现分析)

CompletionStage 接口说明见 Java异步编程:CompletionStage接口详解-CSDN博客

基本流程理解见:揭秘 CompletedFuture 的设计精髓(基础)-CSDN博客

成员变量:

  1. volatile Object result

    • 作用:存储异步任务的结果或异常(通过 AltResult 封装)。
    • 编码规则
      • 正常结果直接存储(null 编码为 NIL)。
      • 异常通过 AltResult 包装(如 new AltResult(ex))。
    • 原子性保证:通过 RESULT.compareAndSet(this, null, r) 实现无锁更新。
  2. volatile Completion stack

    • 作用:维护依赖操作的 Treiber 栈结构,所有依赖此结果的阶段(Completion)按 LIFO 顺序触发。
    • Completion 类型
      • 单源依赖UniCompletion):如 thenApplythenAccept
      • 双源依赖BiCompletion):如 thenCombinethenAcceptBoth
      • 信号处理Signaller):用于阻塞等待线程(如 get() 的阻塞唤醒)。

Treiber 栈,实际上可以理解为一个 从头部CAS插入和删除的链表

Completion 是一个链表节点,有 next 字段。

stack 是一个由 Completion.next 构成的单向链表(栈),如下:

stack → C3 → C2 → C1 → null

每个 Completion 节点代表一个未执行的操作(如 thenApply),一旦完成就从栈中弹出并执行。

实际上,CompletableFuture形成了一个有向无环图(DAG)结构:

  • 单输入操作(如thenApply):形成链式结构
  • 双输入操作(如thenCombine):形成汇聚结构(通过CoCompletion转发到同一个节点)
  • 分支操作(如thenApplyAsync多次调用):形成扇出结构

每个CompletableFuture都有自己的stack,存储依赖于它的Completion链表。

Completion 

Completion 作为 CompletableFuture 的依赖操作载体,通过 Treiber 栈组织依赖关系,结合无锁化和异步触发机制,实现了高效的异步任务流水线。其子类(如 UniApplyBiApply)针对不同场景封装逻辑,而 tryFire 和 postComplete 构成了核心的触发链路。

  • 继承关系

    abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask

    • 作为 ForkJoinTask,支持异步任务提交(通过 ForkJoinPool)。
    • 作为 Runnable,可直接被线程池执行(如 executor.execute(completion))。
    • AsynchronousCompletionTask 是标记接口,用于调试和监控。
  • 关键字段与方法

    volatile Completion next;  // Treiber栈的链表指针
    abstract CompletableFuture<?> tryFire(int mode);  // 触发依赖操作
    abstract boolean isLive(); // 判断节点是否有效
  • next:构建 Treiber 栈结构,链式存储依赖关系。
  • tryFire:核心抽象方法,由子类实现具体逻辑(如应用函数、合并结果等)。
  • isLive:用于清理无效节点(如已取消的任务)

Completion 的子类分为两类:单源依赖UniCompletion)和多源依赖BiCompletion)。

(1) 单源依赖:UniCompletion

abstract static class UniCompletion<T,V> extends Completion {Executor executor;          // 执行器(可能为null)CompletableFuture<V> dep;   // 依赖此操作的目标CompletableFutureCompletableFuture<T> src;   // 源CompletableFuture// ...
}
  • 典型子类

    • UniApply:对应 thenApply,应用函数 Function<T,U>
    • UniAccept:对应 thenAccept,消费结果 Consumer<T>
    • UniRun:对应 thenRun,执行 Runnable
  • 触发流程

    • 当 src(源)完成时,调用 tryFire
      if (src.result != null && dep.result == null) {d.completeValue(fn.apply(src.result)); // 执行函数并完成依赖
      }

(2) 双源依赖:BiCompletion

java

abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> { CompletableFuture<U> snd; // 第二个源 // ... }

  • 典型子类

    • BiApply:对应 thenCombine,合并两个结果 BiFunction<T,U,V>
    • BiAccept:对应 thenAcceptBoth,双消费 BiConsumer<T,U>
    • BiRun:对应 runAfterBoth,双源完成后执行 Runnable
  • 触发流程

    • 需检查两个源(src 和 snd)是否均完成:
if (src.result != null && snd.result != null) {d.completeValue(fn.apply(src.result, snd.result));
}

Completion 和 CompletableFuture 共同构建了一个依赖链的 Treiber 栈结构

  1. 依赖注册

    • 当调用 thenApply、thenCombine 等方法时,创建对应的 Completion 节点。
    • 节点被推入源 CompletableFuture 的 stack(通过 unipush 或 bipush)。

    void unipush(Completion c) { while (!tryPushStack(c)) { /* CAS竞争入栈 */ } }

  2. 结果触发

    • 源完成时(complete、completeExceptionally),调用 postComplete() 遍历栈:

      java

      while ((h = stack) != null) {stack = h.next;  // 弹出节点h.tryFire(SYNC); // 触发依赖操作
      }
    • tryFire 会根据模式(SYNC、ASYNC)决定同步执行或提交到线程池。
  3. 异步执行

    • 若 Completion 包含 executor,其 run() 方法会委托给线程池:

      public void run() { tryFire(ASYNC); } // 通过executor异步执行


设计模式与优势

  1. 观察者模式

    • CompletableFuture 是被观察者,Completion 是观察者节点。
    • 观察者(依赖操作)在源完成时被通知(通过 postComplete)。
  2. 无锁栈管理:Treiber 栈通过 volatile 和 CAS(STACK.compareAndSet)实现线程安全。

  3. 灵活的任务链:支持单源、多源、异步、异常处理等多种组合方式。



    Completion 与 CompletableFuture 的嵌套关系

    当调用 thenApply 时,实际上会生成一个新的 CompletableFuture 作为目标 Futuredep),用于保存当前阶段的操作结果。例如:

    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "hello"); CompletableFuture<Integer> future2 = future.thenApply(s -> s.length());

    • future.thenApply(...) 创建了一个新的 CompletableFuture<Integer>future2)。
    • 这个新 Future 是 UniApply 的目标(dep 字段),而 future 是源(src 字段)。

    源 Future (future)        目标 Future (future2)|                          |v                          v[stack] → [UniApply] → [UniApply.dep = future2]

    • UniApply 节点
      • src:源 Future(future)。
      • dep:目标 Future(future2)。
      • fn:用户传入的函数(s -> s.length())。

    当 future 完成时,UniApply 节点的 tryFire(...) 会执行函数,并将结果写入 future2。此时,如果 future2 后续有 thenApply,会继续创建新的 Completion 节点,推入 future2 的栈中。

    CompletableFuture<Integer> future3 = future.thenApply(s -> s.length()) // future2.thenApply(i -> i * 2);    // future3

    • 结构

      future → [UniApply(dep=future2)] → future2 → [UniApply(dep=future3)] → future3

    • 执行流程

      1. future 完成后,触发 UniApply 节点。
      2. UniApply 执行 s -> s.length(),结果写入 future2
      3. future2 完成后,触发其栈中的 UniApply 节点。
      4. UniApply 执行 i -> i * 2,结果写入 future3

    如果多次调用就会形成一个嵌套链表,这也是postComplete为什么会那样写,因为要循环处理这样的结构需要回溯。

    f2 = f1.ThenApply(a)
    f3 = f2.ThenApply(b)
    f4 = f1.ThenApply(c)
    f5 = f4.ThenApply(c)f1.stack → [ThenApply(dep=f2)] → [ThenApply(dep=f4)] → null
    f2.stack → [ThenApply(dep=f3)] → null
    f4.stack → [ThenApply(dep=f5)] → null

      thenApply 全流程源码级解析

      CompletableFuture<Integer> src = CompletableFuture.supplyAsync(() -> 1); CompletableFuture<String> dep = src.thenApply(i -> i.toString());

      • 流程分解
        1. thenApply 创建 UniApply 节点,推入 src.stack
        2. src 完成后,触发 UniApply.tryFire,将 i.toString() 结果写入 dep
        3. 若 thenApplyAsync 指定了线程池,则通过 executor.execute(node) 异步执行。

      • thenApply 方法定义:
      public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) {return uniApplyStage(null, fn); // 同步执行
      }


      创建依赖阶段 (uniApplyStage)

      private <V> CompletableFuture<V> uniApplyStage(Executor e, Function<? super T,? extends V> f) {if (f == null) throw new NullPointerException();CompletableFuture<V> d = newIncompleteFuture(); // 创建目标FutureObject r;if ((r = result) != null)   // 源已完成的快速路径d.uniApplyNow(r, e, f); // 直接同步执行函数elseunipush(new UniApply<T,V>(e, d, this, f)); // 创建依赖节点并入栈return d;
      }
      • 关键步骤
        • newIncompleteFuture():创建目标 CompletableFuture<V>dep)。
        • 结果检查:若源已完成(result != null),直接同步执行函数。
        • 未完成时:创建 UniApply 节点并推入栈(unipush)。

      构建 UniApply 节点

      @SuppressWarnings("serial")
      static final class UniApply<T,V> extends UniCompletion<T,V> {Function<? super T,? extends V> fn;UniApply(Executor executor, CompletableFuture<V> dep,CompletableFuture<T> src,Function<? super T,? extends V> fn) {super(executor, dep, src); // 父类初始化this.fn = fn;}// ...
      }
      • 继承结构
        • UniApply → UniCompletion → Completion
      • 关键字段
        • executor:异步执行时使用的线程池(可能为 null)。
        • dep:依赖此操作的目标 Future(dep)。
        • src:源 Future(src)。
        • fn:用户传入的函数。

      推入 Treiber 栈 (unipush)

      final void unipush(Completion c) {if (c != null) {while (!tryPushStack(c)) { // CAS竞争入栈if (result != null) {  // 源突然完成时终止循环NEXT.set(c, null); // 清理next指针break;}}if (result != null)c.tryFire(SYNC); // 立即触发}
      }// CAS入栈实现
      final boolean tryPushStack(Completion c) {Completion h = stack;NEXT.set(c, h);         // c.next = hreturn STACK.compareAndSet(this, h, c); // stack = c
      }

      • 无锁栈操作
        • 通过 STACK.compareAndSet 原子性更新栈顶。
        • 若入栈过程中源突然完成(result != null),直接触发 tryFire(SYNC)

      complete时发生了什么

      源完成触发依赖 (postComplete)

      当源 Future (src) 完成时(如调用 complete(42)):

      postComplete() 的作用是:
      遍历当前 future 和其依赖 future 的 Completion 栈,依次弹出 Completion 并执行,直到所有 Completion 处理完毕
      它利用 CAS 实现无锁并发,利用栈结构支持链式 Completion,通过 tryFire 触发下游操作,最终实现高效的异步回调链执行机制。

      public boolean complete(T value) {boolean triggered = completeValue(value); // CAS设置resultpostComplete(); // 触发所有依赖节点return triggered;
      }final void postComplete() {CompletableFuture<?> f = this;while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) {CompletableFuture<?> d; Completion t;if (STACK.compareAndSet(f, h, t = h.next)) { // 弹出栈顶if (t != null) {if (f != this) {pushStack(h); // 处理嵌套依赖continue;}NEXT.compareAndSet(h, t, null); // 断开链表}f = (d = h.tryFire(NESTED)) == null ? this : d; // 触发操作}}
      }

      • 核心流程
        • 循环弹出栈顶节点(h),调用 h.tryFire(NESTED)
        • tryFire 返回非空时,处理新生成的依赖链(如级联 thenApply)。

      (h = f.stack) != null || (f != this && (h = (f = this).stack) != null)  是 两阶段扫描

      1. 第一阶段:从当前 future (f) 的栈中取 Completion;
      2. 第二阶段:如果当前栈空了,回到原始 future (this) 再次取一次;
      3. 目的:确保所有嵌套 Completion 都能被执行。

      t != null && f != this  表示:

      • 当前 Completion 有后续节点(t != null);
      • 但当前处理的是嵌套 future(f != this);
      • 所以要把当前 Completion 压回去,留给后续处理。

      否则会导致后续 Completion 被跳过,导致任务丢失。

      为什么需要 postComplete() 递归处理?

      当 dep 完成时,需要触发 dep 自己的栈。例如:

      CompletableFuture<Integer> future3 = future2.thenApply(i -> i * 2);

      • future2 完成后,UniApply 节点的 tryFire(...) 会返回 future3
      • postComplete() 会切换到 future3 的栈,继续触发后续操作。

      f = (d = h.tryFire(NESTED)) == null ? this : d;

      • d = h.tryFire(...):返回 dep(如 future2)。
      • f = d:切换到 dep 的栈,继续处理它的 Completion 节点。


      执行依赖操作 (UniApply.tryFire)

      final CompletableFuture<V> tryFire(int mode) {CompletableFuture<V> d;CompletableFuture<T> a;Object r; Throwable x; Function<? super T,? extends V> f;if ((a = src) == null || (r = a.result) == null|| (d = dep) == null || (f = fn) == null)return null; // 参数未就绪时退出try {if (mode <= 0 && !claim()) // 检查是否可执行(CAS标记)return null;if (r instanceof AltResult) { // 异常处理if ((x = ((AltResult)r).ex) != null) {d.completeThrowable(x, r);return d.postFire(a, mode);}r = null;}@SuppressWarnings("unchecked") T t = (T) r;d.completeValue(f.apply(t)); // 应用函数并完成目标Future} catch (Throwable ex) {d.completeThrowable(ex);}src = null; dep = null; fn = null; // 清理引用return d.postFire(a, mode); // 继续触发后续依赖
      }

      • 关键步骤
        • claim():通过 CAS 标记任务已被认领,防止重复执行。
        • 异常检查:若源结果包含异常,直接传播到 dep
        • 函数应用f.apply(t) 执行用户逻辑,结果写入 dep
        • 清理资源:断开与源 Future 的引用,防止内存泄漏。

      异步执行路径

      若调用 thenApplyAsync(指定 executor):

      public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) {return uniApplyStage(defaultExecutor(), fn); // 传递线程池
      }// UniApply执行时:
      if (e != null) {e.execute(new UniApply<T,V>(null, d, this, f)); // 提交到线程池
      } else {// 同步执行(同前)
      }

      • 异步触发
        • UniApply 作为 Runnable 被提交到线程池。
        • 其 run() 方法调用 tryFire(ASYNC),逻辑与同步模式一致。

      最终结果传递

      • 同步完成
        • d.completeValue(f.apply(t)) 直接设置结果,触发 dep 的 postComplete()
      • 异步完成
        • 由线程池线程调用 tryFire(ASYNC),其余逻辑相同。

      设计总结

      1. 依赖链结构

        • 每个 thenApply 创建一个 UniApply 节点,形成 Treiber 栈链。
        • 栈结构确保依赖按注册顺序 LIFO 触发(后注册先执行)。
      2. 无锁化实现

        • 通过 volatile 和 CAS 实现栈操作的线程安全。
        • claim() 方法避免多个线程重复执行同一节点。
      3. 异常传播

        • 异常通过 AltResult 包装,在 tryFire 中自动传播到下游。
      4. 资源管理

        • 完成后的节点会清理 srcdep 引用,辅助 GC 回收。
        • postFire 中调用 cleanStack() 移除无效节点。

      双源 Completion(如 thenCombine)的结构与执行流程

      🔹 示例代码:

      CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> 100); CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> f3 = f1.thenCombine(f2, (i, s) -> s + i);

      🔹 结构图:

      f1.stack → [BiApply(dep=f3, snd=f2)] → null

      f2.stack → [CoCompletion(base=BiApply)] → null

      实现类关系:

      • BiApply<T,U,V>:处理两个源的结果,执行 BiFunction<T,U,V>
      • CoCompletion:辅助类,指向第一个源的 Completion,用于第二个源完成后唤醒整个操作。

      三、thenCombine 的完整执行流程详解

      f1.bipush(f2, new BiApply<>(e, d, this, b, f));

      • bipush(...) 内部会将 BiApply 推入 f1 的栈,并创建 CoCompletion 推入 f2 的栈。

      class CoCompletion extends Completion {BiCompletion<?,?,?> base;final CompletableFuture<?> tryFire(int mode) {return base.tryFire(mode); // 转发给 BiApply}
      }

      • 这样当 f2 完成时,也能触发 BiApply

      🔹 步骤 3:任意源完成都会尝试触发

      当 f1 或 f2 完成时,各自调用 postComplete(),尝试从各自的栈中取出 Completion 并执行。

      以 f1 完成为例:

      • f1 的栈中有 BiApply 节点。
      • BiApply.tryFire(...) 会检查 f1.result != null && f2.result != null
      • 如果两个源都完成了,则执行 fn.apply(t, s) 并完成 f3

      如果 f1 先完成,但 f2 还没完成:

      if ((r instanceof AltResult && ((AltResult)r).ex != null) ||(s instanceof AltResult && ((AltResult)s).ex != null)) {d.completeThrowable(...); // 异常处理
      } else if (r == null || s == null) {return false; // 有一个未完成,不能执行
      }

      不执行,等待 f2 完成后再次触发。

      双源 Completion 并不是简单的链表结构,而是一种 双向监听 + 协作完成机制

      🔹 原因:

      • Java 的 CompletableFuture 支持 任意顺序完成,即:
        • f1 先完成,f2 后完成;
        • f2 先完成,f1 后完成;
      • 所以每个 Completion 都要能独立判断是否可以执行。

      🔹 设计方案:

      • 主节点BiApply 节点被推入 f1 的栈。
      • 辅助节点CoCompletion 节点被推入 f2 的栈,指向 BiApply
      • 协作逻辑
        • 当 f1 完成时,检查 f2 是否也完成,若完成则执行。
        • 若 f2 未完成,则等待其完成后再触发。
        • 同理,f2 完成时也会触发 CoCompletion,再转发到 BiApply

      f1.stack → [BiApply(dep=f3, src=f1, snd=f2, fn)]

      f2.stack → [CoCompletion(base=BiApply)]

      当 f1 完成时:

      • BiApply.tryFire(...) 检查 f2 是否也完成。
      • 如果是,执行 fn.apply(...), 完成 f3

      当 f2 完成时:

      • CoCompletion.tryFire(...) 转发到 BiApply.tryFire(...)
      • 同样检查 f1 是否完成,若完成则执行。

      🔹 问题背景:

      • BiApply 是属于 f1 的 Completion。
      • f2 也需要知道这个操作的存在,否则它完成时无法触发 BiApply

      🔹 解决方案:

      • 在 f2 上注册一个轻量级代理节点 CoCompletion,它不执行任何操作,只是把控制权转交给 BiApply
       
      

      class CoCompletion extends Completion {BiCompletion<?,?,?> base;final CompletableFuture<?> tryFire(int mode) {return base.tryFire(mode); // 转发到 BiApply}final boolean isLive() { return base != null && base.dep != null; }
      }

      • f1 完成 → 直接执行 BiApply
      • f2 完成 → 触发 CoCompletion → 转发到 BiApply
      • 确保无论哪个源先完成,都能正确处理。

      总体理解:

      • tryFire(...) 是所有 Completion 的统一入口,负责判断是否可执行,并实际执行用户逻辑。
      • 单源 Completion(如 thenApply)只需要一个 Completion 节点。
      • 双源 Completion(如 thenCombine)需要两个 Completion 节点:
        • 主节点 BiApply 注册在第一个源上。
        • 代理节点 CoCompletion 注册在第二个源上,仅转发控制权。

      postComplete/tryFire 的逻辑解析

      tryFire 和 postCompletion 这两个函数之前进行过一些分析,但因为两者逻辑上是令人困惑的,所以这里做一次总结。

      解释令人困惑的代码:

      while ((h = f.stack) != null ||(f != this && (h = (f = this).stack) != null)) {CompletableFuture<?> d; Completion t;if (STACK.compareAndSet(f, h, t = h.next)) {if (t != null) {if (f != this) {pushStack(h);  // 关键:为什么要放回去?continue;}NEXT.compareAndSet(h, t, null);}f = (d = h.tryFire(NESTED)) == null ? this : d;  // 关键:返回值含义}
      }
      

      postComplete方法负责在CompletableFuture完成后触发所有依赖的Completion。它的核心逻辑是:

      1. 从stack中弹出Completion节点
      2. 调用每个Completion的tryFire(NESTED)方法
      3. 处理可能的链式传播

      tryFire方法有三种模式:

      • SYNC (0): 同步模式,直接执行
      • ASYNC (1): 异步模式,通过线程池执行
      • NESTED (-1): 嵌套模式,用于避免无限递归

      NESTED模式是为了解决递归调用问题。当一个Completion完成后,可能触发其他Completion的完成,形成链式反应。如果每次都直接调用postComplete,会导致栈溢出。postComplete调用tryFire时使用NESTED模式

      tryFire在NESTED模式下

      情况1:返回null。表示这个Completion处理完毕,没有需要继续传播的CompletableFuture。f 被设置为this,继续处理当前CompletableFuture的其他Completion

      情况2返回 Completion中dep字段指向的CompletableFuture,即下一个需要处理的CompletableFuture。

      postComplete通过循环而非递归来处理这个返回值

          final CompletableFuture<V> tryFire(int mode) {// ... 执行逻辑后return dep.postFire(src, mode);  // 返回dep,即下一个CompletableFuture}final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {if (a != null && a.stack != null) {Object r;if ((r = a.result) == null)a.cleanStack();// NEST == -1 不走这if (mode >= 0 && (r != null || a.result != null))a.postComplete();}if (result != null && stack != null) {if (mode < 0)return this;elsepostComplete();}return null;}

      "放回去"的原因

      if (f != this) {pushStack(h);continue;
      }
      

      这里的逻辑是:

      • 当前处理的不是最初的CompletableFuture (this)
      • 而是在处理传播链中的其他CompletableFuture (f)
      • 将Completion h放回到 当前CompletableFuture(this) 的stack上
      • 这样确保所有需要处理的Completion最终都在原始的this上排队处理

      举一个例子,用Completion结构表示:

      f1.stack -> [Completion1{dep:f2}] -> [Completion2{dep:f3}] -> null
      f2.stack -> [Completion3{dep:f4}] -> [Completion4{dep:f5}] -> null
      f4.stack -> [Completion5{dep:f6}] -> null
      

      postComplete执行过程

      如果没有"放回去"机制,执行路径是:

      f1(this)
      f1 → f2 → f4 → f6
      f1 → f3 (结束)
      

      遗漏的依赖

      • Comp4{dep:f5}:f2→f5的依赖链

      当f != this时(即当前处理的不是原始的f1),执行pushStack(h)将未处理的Completion放回f1的stack中。经过"放回去"机制,f1.stack动态变化:

      初始:f1.stack → [Comp1{f2}] → [Comp2{f3}] → null

      处理Comp1后

      f1.stack → [Comp2{f3}] → null

      现在 f=f2,f2.stack → [Comp3{f4}] → [Comp4{f5}] → null

      处理f2的栈时:Comp3{f4}会被放回f1.stack

      • f1.stack → [Comp3{f4}] → [Comp2{f3}] → null
      • f2.stack →[Comp4{f5}] → null
      • 现在还在 f = f2,因此继续处理Comp4{f5},因为这是最后一个,不会压入 f1的栈,直接执行。

      最终保证所有依赖链都被正确处理。

      这里是为了满足能够通过循环遍历图,如果使用递归则没必要这样处理。

       为什么需要claim()

      虽然stack是通过CAS操作维护的链表,但这只保证了链表结构的一致性,不能保证Completion执行的唯一性。问题在于:

      1. 多线程竞争执行: 同一个Completion可能被多个线程同时触发
      2. 重复执行风险: 没有claim()的话,一个Completion可能被执行多次

      claim()方法使用ForkJoinTask的tag位作为原子标记:

      final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {if (e == null)return true;executor = null; // disablee.execute(this);}return false;
      }
      

      这里使用了ForkJoinTask的tag字段作为CAS标记,确保只有一个线程能成功claim这个Completion。

      CAS操作在这里分两个层面:

      1. stack链表的CAS: 保证链表结构操作的原子性(如tryPushStack、STACK.compareAndSet)
      2. Completion执行的CAS: 通过claim()方法保证每个Completion只被执行一次

      例如

      CompletableFuture<String> cf1 = new CompletableFuture<>();
      CompletableFuture<Integer> cf2 = cf1.thenApply(String::length);// 在不同线程中
      cf1.complete("hello");  // 线程A
      cf1.complete("world");  // 线程B (尝试但不会成功)

      这种情况,两个线程会取执行 包含 cf2 的Completion。另一种情况是thencombine,这里有天然的多线程操作后继 Completion的操作。

      stack的CAS保证了链表操作的原子性,而claim()保证了业务逻辑执行的唯一性,两者配合实现了完整的并发安全保障。

      在 tryFire 方法中,关键代码是:

      try {if (mode <= 0 && !claim())return null;else {@SuppressWarnings("unchecked") T t = (T) r;d.completeValue(f.apply(t));}
      } catch (Throwable ex) {d.completeThrowable(ex);
      }
      

      claim 在ASYNC模式没有被调用,可以通过整个链路来理解:

       完整的 thenApplyAsync 调用链路

      1. 用户调用 future.thenApplyAsync(fn)
      2. thenApplyAsync 调用 uniApplyStage(defaultExecutor(), fn)
      3. uniApplyStage 创建新的 CompletableFuture 和 UniApply Completion。uniApplyStage 中如果有结果会进行 uniApplyNow,直接计算(根据是不是异步,交给线程池)。
        1. unipush中,如果有结果会进行tryFire(SYNC),这之后的调用和postComplete类似,但会调用到postComplete,因为要触发之后的调用,之后使用NEST模式,不会再递归调用postComplete。
        2. tryFire(SYNC) ,判断(mode <= 0 && !claim()) 会通过,因此返回null,把执行交给线程池 e,claim返回 flase,导致 tryFire 直接 return null。
      4. UniApply Completion 通过 unipush 方法添加到源 CompletableFuture 的栈中
      5. 当源 CompletableFuture 完成时:
        • 调用 postComplete() 方法
        • postComplete() 弹出栈中的 Completion 并调用 tryFire(NESTED)
        • UniApply.tryFire(NESTED) 通过 claim() 方法提交自身到执行器
        • 执行器执行 Completion 的 run() 方法
        • run() 调用 tryFire(ASYNC)
        • tryFire(ASYNC) 直接执行函数而不再检查 claim()
        • 执行结果用于完成目标 CompletableFuture

      现在我们可以清楚地理解:

      1. tryFire(ASYNC) 的调用时机:当 Completion 被执行器异步执行时,通过 run() 或 exec() 方法调用 tryFire(ASYNC)

      2. claim() 方法的作用

        • 使用 CAS 操作确保 Completion 只被一个线程处理
        • 如果有执行器,将 Completion 提交到执行器异步执行,并返回 false
      3. 为什么 ASYNC 模式不需要 claim

        • 当以 ASYNC 模式调用 tryFire 时,Completion 已经在执行器线程中运行
        • claim() 的两个目的(确保单线程处理和提交到执行器)在 ASYNC 模式下都不再需要

      tryFire

      tryFire方法有三种模式:

      • SYNC (0): 同步模式,直接在当前线程执行
      • ASYNC (1): 异步模式,在线程池中执行
      • NESTED (-1): 嵌套模式,用于避免递归调用栈溢出

      SYNC模式的调用时机

      1.1 UniCompletion中的SYNC调用

      在unipush方法中,当CompletableFuture已经完成时:

      if (result != null)c.tryFire(SYNC);
      

      这种情况下,由于源CompletableFuture已经有结果,可以立即同步执行Completion。

      1.2 BiCompletion中的SYNC调用

      在bipush方法中,当两个源都已完成时:

          final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {if (c != null) {while (result == null) {if (tryPushStack(c)) {if (b.result == null)b.unipush(new CoCompletion(c));else if (result != null) // 检查自己有没有完成c.tryFire(SYNC);return;}}b.unipush(c);}}

      当都完成时,直接同步触发BiCompletion。

      1.3 Or操作中的SYNC调用

      在orpush方法中:

          final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {if (c != null) {while (!tryPushStack(c)) {if (result != null) {NEXT.set(c, null);break;}}if (result != null)c.tryFire(SYNC);elseb.unipush(new CoCompletion(c)); // 这里也会判断 b 有没有结束,实现了 or}}final void unipush(Completion c) {if (c != null) {while (!tryPushStack(c)) {if (result != null) {NEXT.set(c, null);break;}}if (result != null)c.tryFire(SYNC);}}

      当任一源已完成时,立即同步执行OrCompletion。

      ASYNC模式的调用时机

      UniCompletion中的ASYNC调用

      在UniCompletion的claim方法中:

      final boolean claim() {Executor e = executor;if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {if (e == null)return true;executor = null;e.execute(this); // 这里会异步调用run()方法,最终调用tryFire(ASYNC)}return false;
      }
      

      当Completion需要在指定的Executor中异步执行时,会调用tryFire(ASYNC)。

      Completion继承自ForkJoinTask,其exec方法:

      public final boolean exec() { tryFire(ASYNC); return false; }
      public final void run() { tryFire(ASYNC); }
      

      当Completion作为ForkJoinTask执行时,会调用tryFire(ASYNC)。

       UniCompletion的执行流程

      以UniApply为例:

      final CompletableFuture<V> tryFire(int mode) {// 检查前置条件if ((a = src) == null || (r = a.result) == null|| (d = dep) == null || (f = fn) == null)return null;tryComplete: if (d.result == null) {// 处理异常结果if (r instanceof AltResult) {if ((x = ((AltResult)r).ex) != null) {d.completeThrowable(x, r);break tryComplete;}r = null;}try {// 关键:模式判断if (mode <= 0 && !claim())return null;else {@SuppressWarnings("unchecked") T t = (T) r;d.completeValue(f.apply(t));}} catch (Throwable ex) {d.completeThrowable(ex);}}// 清理引用src = null; dep = null; fn = null;return d.postFire(a, mode);
      }
      

      模式判断逻辑

      • mode <= 0(SYNC或NESTED):需要调用claim()来确保独占执行
      • mode > 0(ASYNC):直接执行,因为已经在异步环境中

      BiCompletion的执行流程

      以BiApply为例:

      final CompletableFuture<V> tryFire(int mode) {// 检查两个源都已完成if ((a = src) == null || (r = a.result) == null|| (b = snd) == null || (s = b.result) == null|| (d = dep) == null || (f = fn) == null|| !d.biApply(r, s, f, mode > 0 ? null : this))return null;// 清理引用src = null; snd = null; dep = null; fn = null;return d.postFire(a, b, mode);
      }
      

      在biApply方法中:

      final <R,S> boolean biApply(Object r, Object s,BiFunction<? super R,? super S,? extends T> f,BiApply<R,S,T> c) {// 处理异常...try {if (c != null && !c.claim())  // 当mode <= 0时,c不为nullreturn false;@SuppressWarnings("unchecked") R rr = (R) r;@SuppressWarnings("unchecked") S ss = (S) s;completeValue(f.apply(rr, ss));} catch (Throwable ex) {completeThrowable(ex);}return true;
      }
      

      关键差异

      • mode > 0(ASYNC):传入null作为Completion参数,跳过claim()检查
      • mode <= 0(SYNC/NESTED):传入this,需要通过claim()确保独占执行

      Canceller、Signaller、Timeout、anyOf与allOf

      在前面的讨论中,我们已经了解了CompletableFuture的基本工作机制,包括uni、bi和or操作的执行流程。接下来,我将对CompletableFuture中的其他重要组件和特性进行详细分析。

      1. Canceller - 超时取消机制

      Canceller是一个实现了BiConsumer<Object, Throwable>的内部类,主要用于取消计划任务:

      static final class Canceller implements BiConsumer<Object, Throwable> {final Future<?> f;Canceller(Future<?> f) { this.f = f; }public void accept(Object ignore, Throwable ex) {if (f != null) // currently never nullf.cancel(false);}
      }
      

      1. 主要用途:用于取消超时任务。当CompletableFuture以其他方式完成时,不再需要超时触发。

      2. 工作流程

        • 当设置超时时(如使用orTimeoutcompleteOnTimeout方法),会创建一个定时任务
        • 同时通过whenComplete注册一个Canceller
        • 如果CompletableFuture在超时前完成,Canceller会取消定时任务
        • 这避免了不必要的超时任务继续占用系统资源
      3. 使用示例:在arrangeTimeout方法中可以看到完整流程

      private <U> void arrangeTimeout(long nanoDelay, Timeout<U> onTimeout) {ForkJoinPool e = ASYNC_POOL;if (result == null) {ScheduledForkJoinTask<Void> t = new ScheduledForkJoinTask<Void>(nanoDelay, 0L, true, onTimeout, null, e);whenComplete(new Canceller(t));  // 注册Canceller取消器e.scheduleDelayedTask(t);}
      }
      

      2. Timeout - 超时处理机制

      Timeout是一个实现了Runnable的内部类,负责在超时发生时执行操作:

      static final class Timeout<U> implements Runnable {final CompletableFuture<U> f;final U value;final boolean exceptional;Timeout(CompletableFuture<U> f, U value, boolean exceptional) {this.f = f; this.value = value; this.exceptional = exceptional;}public void run() {if (f != null && !f.isDone()) {if (exceptional)f.completeExceptionally(new TimeoutException());elsef.complete(value);}}
      }
      

      Timeout的工作机制

      1. 两种工作模式

        • 异常模式(exceptional=true):超时时以TimeoutException异常完成Future
        • 默认值模式(exceptional=false):超时时以指定的默认值完成Future
      2. 对应的公共API

        • orTimeout(timeout, unit):超时抛异常
        • completeOnTimeout(value, timeout, unit):超时使用默认值
      3. 执行流程

        • 创建Timeout实例,包含操作模式和可能的默认值
        • 将其作为定时任务提交到调度器
        • 超时触发时,执行run()方法完成CompletableFuture

      3. Signaller - 阻塞等待机制

      Signaller是一个特殊的Completion子类,实现了ForkJoinPool.ManagedBlocker接口,用于支持阻塞等待操作:

      1. 设计

        • 实现ManagedBlocker接口,确保在ForkJoinPool中阻塞时不会导致线程饥饿
        • 支持中断和超时等待
        • 使用LockSupport进行线程挂起和唤醒
      2. 等待过程

        • 将Signaller实例推入CompletableFuture的栈
        • 将当前线程记录在Signaller中
        • 通过LockSupport.parkLockSupport.parkNanos挂起线程
        • 当Future完成时,触发Signaller的tryFire方法唤醒等待线程
      3. 唤醒机制

        • 当CompletableFuture完成时,会调用postComplete处理栈中的依赖
        • Signaller的tryFire方法会解除线程的阻塞状态

          static final class Signaller extends Completionimplements ForkJoinPool.ManagedBlocker {long nanos;                    // remaining wait time if timedfinal long deadline;           // non-zero if timedfinal boolean interruptible;boolean interrupted;volatile Thread thread;Signaller(boolean interruptible, long nanos, long deadline) {this.thread = Thread.currentThread();this.interruptible = interruptible;this.nanos = nanos;this.deadline = deadline;}final CompletableFuture<?> tryFire(int ignore) {Thread w; // no need to atomically claimif ((w = thread) != null) {thread = null;LockSupport.unpark(w);}return null;}public boolean isReleasable() {if (Thread.interrupted())interrupted = true;return ((interrupted && interruptible) ||(deadline != 0L &&(nanos <= 0L ||(nanos = deadline - System.nanoTime()) <= 0L)) ||thread == null);}public boolean block() {while (!isReleasable()) {if (deadline == 0L)LockSupport.park(this);elseLockSupport.parkNanos(this, nanos);}return true;}final boolean isLive() { return thread != null; }}

      4. anyOf - 任意完成机制

      anyOf静态方法创建一个在任何输入CompletableFuture完成时完成的新CompletableFuture:

      1. 核心类AnyOf Completion类

      2. 设计特点

        • 监听多个输入Future中的任何一个
        • 当任一输入完成时,使用相同结果完成输出Future
        • 完成后清理其他输入Future的栈,避免资源泄漏
      3. 执行流程

        • 创建一个新的CompletableFuture作为结果
        • 检查所有输入Future,如有已完成则直接使用其结果
        • 对每个输入Future添加AnyOf监听器
        • 当任一输入完成时,触发AnyOf.tryFire方法
        • tryFire调用completeRelay传递结果,并清理其他输入的栈
      4. 优化处理

        • 特殊处理空数组和单元素数组的情况
        • 完成后清理其他Future的栈,避免内存泄漏
        • 结果处理使用encodeRelay,保证异常正确传播

      anyOf的实现分析

      public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {int n; Object r;if ((n = cfs.length) <= 1)return (n == 0)? new CompletableFuture<Object>(): uniCopyStage(cfs[0]);for (CompletableFuture<?> cf : cfs)if ((r = cf.result) != null)return new CompletableFuture<Object>(encodeRelay(r));cfs = cfs.clone();CompletableFuture<Object> d = new CompletableFuture<>();for (CompletableFuture<?> cf : cfs)cf.unipush(new AnyOf(d, cf, cfs));// If d was completed while we were adding completions, we should// clean the stack of any sources that may have had completions// pushed on their stack after d was completed.if (d.result != null)for (int i = 0, len = cfs.length; i < len; i++)if (cfs[i].result != null)for (i++; i < len; i++)if (cfs[i].result == null)cfs[i].cleanStack();return d;
      }
      

      1. 边界条件处理

        • 如果传入的CompletableFuture数组长度为0,返回一个新的未完成的CompletableFuture
        • 如果长度为1,则直接返回该CompletableFuture的副本
      2. 快速完成检查

        • 遍历所有传入的CompletableFuture,检查是否已有完成的,如果有则立即创建一个已完成的CompletableFuture并返回
      3. 注册依赖

        • 克隆输入数组(防止修改原数组)
        • 创建一个新的结果CompletableFuture
        • 为每个源CompletableFuture注册AnyOf完成处理器
      4. 清理

        • 如果在注册过程中结果已经完成,清理那些依然未完成的CompletableFuture的栈
      5. AnyOf Completion类
         

         
        static class AnyOf extends Completion {CompletableFuture<Object> dep; CompletableFuture<?> src;CompletableFuture<?>[] srcs;AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src,CompletableFuture<?>[] srcs) {this.dep = dep; this.src = src; this.srcs = srcs;}final CompletableFuture<Object> tryFire(int mode) {// assert mode != ASYNC;CompletableFuture<Object> d; CompletableFuture<?> a;CompletableFuture<?>[] as;Object r;if ((a = src) == null || (r = a.result) == null|| (d = dep) == null || (as = srcs) == null)return null;src = null; dep = null; srcs = null;if (d.completeRelay(r)) {for (CompletableFuture<?> b : as)if (b != a)b.cleanStack();if (mode < 0)return d;elsed.postComplete();}return null;}final boolean isLive() {CompletableFuture<Object> d;return (d = dep) != null && d.result == null;}
        }
        
        • tryFire方法在源CompletableFuture完成时被调用
        • 将结果传递给依赖的CompletableFuture
        • 清理其他所有源CompletableFuture的栈(这是关键优化,防止无用的栈积累)

      5. allOf - 全部完成机制

      allOf静态方法创建一个在所有输入CompletableFuture完成时完成的新CompletableFuture:

      1. 核心方法andTree - 递归构建完成树

      2. 设计特点

        • 使用分治法构建二叉树结构
        • 通过BiRelay节点连接子树
        • 任何异常都会导致结果Future异常完成
      3. 执行流程

        • 递归地将输入数组分成左右两半
        • 对每半递归调用andTree,构建子树
        • 使用BiRelay连接左右子树
        • 只有当两个子树都完成时,父节点才会完成
        • 如有任何一个Future异常完成,结果Future也异常完成
      4. 优化处理

        • 特殊处理空数组的情况,直接返回完成的Future
        • 使用二分法平衡树结构,提高并行度和效率
        • 优先传播异常,保证快速失败

      BiRelay是一个特殊的BiCompletion,用于协调两个输入CompletableFuture,只有当两者都完成时才完成依赖的Future。

      allOf的实现分析

      public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {return andTree(cfs, 0, cfs.length - 1);
      }
      

      allOf实际上是调用了andTree方法:

      static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,int lo, int hi) {CompletableFuture<Void> d = new CompletableFuture<Void>();if (lo > hi) // emptyd.result = NIL;else {CompletableFuture<?> a, b; Object r, s, z; Throwable x;int mid = (lo + hi) >>> 1;if ((a = (lo == mid ? cfs[lo] :andTree(cfs, lo, mid))) == null ||(b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :andTree(cfs, mid+1, hi))) == null)throw new NullPointerException();if ((r = a.result) == null || (s = b.result) == null)a.bipush(b, new BiRelay<>(d, a, b));else if ((r instanceof AltResult&& (x = ((AltResult)(z = r)).ex) != null) ||(s instanceof AltResult&& (x = ((AltResult)(z = s)).ex) != null))d.result = encodeThrowable(x, z);elsed.result = NIL;}return d;
      }
      

      allOf实现详解:

      1. 构建二叉树

        • 采用分治策略,将输入的CompletableFuture数组递归地组织成一个完全二叉树
        • 这种树形结构使得任务的并行度最大化
      2. 处理边界条件

        • 如果范围为空(lo > hi),则返回一个已完成的结果
      3. 递归构建子树

        • 通过中点(mid)划分数组为两部分
        • 递归处理左子树和右子树
      4. 状态检查和依赖设置

        • 检查左右子树是否都已完成
        • 如果有任一子树未完成,则设置双向依赖(bipush)
        • 如果都已完成,检查是否有异常,然后相应地完成结果
      5. BiRelay类

        static final class BiRelay<T,U> extends BiCompletion<T,U,Void> {BiRelay(CompletableFuture<Void> dep,CompletableFuture<T> src, CompletableFuture<U> snd) {super(null, dep, src, snd);}final CompletableFuture<Void> tryFire(int mode) {CompletableFuture<Void> d;CompletableFuture<T> a;CompletableFuture<U> b;Object r, s, z; Throwable x;if (   (a = src) == null || (r = a.result) == null|| (b = snd) == null || (s = b.result) == null|| (d = dep) == null)return null;if (d.result == null) {if ((r instanceof AltResult&& (x = ((AltResult)(z = r)).ex) != null) ||(s instanceof AltResult&& (x = ((AltResult)(z = s)).ex) != null))d.completeThrowable(x, z);elsed.completeNull();}src = null; snd = null; dep = null;return d.postFire(a, b, mode);}
        }
        

      encodeRelay/completeRelay

      encodeRelay主要用于以下场景:

      • 当一个CompletableFuture依赖于另一个CompletableFuture的结果时,使用encodeRelay确保异常结果被正确包装为CompletionException,从而保持CompletableFuture的异常处理一致性。

      假设有两个CompletableFuture f1f2f2依赖于f1的结果:

      CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {throw new RuntimeException("Error in f1");
      });CompletableFuture<Integer> f2 = f1.thenApply(result -> {// Do something with resultreturn result * 2;
      });
      

      在这种情况下,如果f1异常完成,f2需要正确处理该异常。encodeRelay确保f1的异常结果被传递给f2时,被正确包装为CompletionException

      completeRelay 则设置这个包装结果

          final boolean completeRelay(Object r) {

              return RESULT.compareAndSet(this, null, encodeRelay(r));

          }

      关键场景分析

      1. 超时处理场景

      // 5秒后超时并抛出异常
      CompletableFuture<String> future = someAsyncOperation().orTimeout(5, TimeUnit.SECONDS);// 5秒后超时并返回默认值
      CompletableFuture<String> future = someAsyncOperation().completeOnTimeout("Default value", 5, TimeUnit.SECONDS);
      

      工作流程:

      1. 调用arrangeTimeout创建定时任务
      2. 通过whenComplete注册Canceller取消器
      3. 如果原始Future在超时前完成,Canceller取消定时任务
      4. 否则,Timeout.run()方法在超时时执行,完成Future

      2. 阻塞等待场景

      String result = future.get();  // 可能阻塞直到完成
      String result = future.get(10, TimeUnit.SECONDS);  // 带超时的阻塞
      

      工作流程:

      1. 创建Signaller实例并记录当前线程
      2. 将Signaller推入Future的栈中
      3. 通过LockSupport.park挂起当前线程
      4. 当Future完成时,触发栈中的Signaller.tryFire
      5. tryFire唤醒等待线程,继续执行

      3. 组合多个Future场景

      // 等待所有Future完成
      CompletableFuture<Void> allDone = CompletableFuture.allOf(future1, future2, future3);// 等待任一Future完成
      CompletableFuture<Object> anyDone = CompletableFuture.anyOf(future1, future2, future3);
      

      工作流程(allOf):

      1. 通过andTree递归构建二叉树结构
      2. 每个节点使用BiRelay连接两个子节点
      3. 只有当左右子树都完成时,父节点才会完成
      4. 如有异常,优先传播异常

      工作流程(anyOf):

      1. 创建一个新的输出Future
      2. 为每个输入Future添加AnyOf监听器
      3. 当任一输入完成,将结果传递给输出Future
      4. 清理其他Future的栈,避免资源泄漏

      总结

      CompletableFuture的这些高级特性体现了其强大的异步编程能力:

      1. 超时机制:通过Timeout和Canceller优雅地实现超时控制,同时避免资源浪费
      2. 阻塞等待:Signaller结合ManagedBlocker提供安全的阻塞等待,防止线程饥饿
      3. 组合操作:anyOf和allOf通过巧妙的树状结构,高效实现了多Future协调

      这些机制共同构建了CompletableFuture强大且灵活的异步编程模型,使其成为Java中处理复杂异步流程的核心工具。

      CompletableFuture 中获取结果的流程详解

      CompletableFuture 提供了几种获取计算结果的方法:get()get(timeout, unit)join()getNow()。这些方法的内部实现都涉及 waitingGettimedGetreportGet 和 reportJoin 等私有方法。下面详细分析它们的执行流程。

      1. get() 方法的执行流程

      当外部使用者调用 CompletableFuture.get() 方法时:

      public T get() throws InterruptedException, ExecutionException {Object r;if ((r = result) == null)r = waitingGet(true);return (T) reportGet(r, "get");
      }
      

      执行步骤:

      1. 检查 result 字段是否为 null
      2. 如果 result 不为 null,表示任务已完成,直接获取结果
      3. 如果 result 为 null,调用 waitingGet(true) 等待任务完成
      4. 最后调用 reportGet 方法处理结果并返回

      2. waitingGet 方法分析

      waitingGet 方法实现了阻塞等待任务完成的核心逻辑:

      private Object waitingGet(boolean interruptible) {if (interruptible && Thread.interrupted())return null;Signaller q = null;boolean queued = false;Object r;while ((r = result) == null) {if (q == null) {q = new Signaller(interruptible, 0L, 0L);if (Thread.currentThread() instanceof ForkJoinWorkerThread)ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);}else if (!queued)queued = tryPushStack(q);else if (interruptible && q.interrupted) {q.thread = null;cleanStack();return null;}else {try {ForkJoinPool.managedBlock(q);} catch (InterruptedException ie) {q.interrupted = true;}}}if (q != null) {q.thread = null;if (q.interrupted)Thread.currentThread().interrupt();}postComplete();return r;
      }
      

      执行步骤:

      1. 检查线程是否被中断(如果 interruptible 为 true)
      2. 创建一个 Signaller 实例作为等待节点
      3. 如果当前线程是 ForkJoinWorkerThread,尝试帮助处理异步阻塞任务
      4. 将 Signaller 节点推入等待栈中
      5. 调用 ForkJoinPool.managedBlock(q) 阻塞当前线程
      6. 当任务完成后(result 不为 null),处理中断状态并调用 postComplete()
      7. 返回结果

      最后一段代码的目的是保留中断状态:

      • 当线程在 waitingGet 中等待时,如果发生了中断,Signaller 检测到中断并设置 interrupted = true。
      • 同时 Thread.interrupted() 方法会清除线程的实际中断状态。
      • 方法结束时,该代码段通过 Thread.currentThread().interrupt() 恢复线程的中断状态。

      3. Signaller 类的作用

      Signaller 是 CompletableFuture 中实现线程等待和唤醒机制的关键类:

      • Signaller 同时扩展了 Completion 和实现了 ForkJoinPool.ManagedBlocker 接口
      • 它被添加到 CompletableFuture 的等待栈中
      • 当 CompletableFuture 完成时,会调用 tryFire() 方法唤醒等待的线程
      • isReleasable() 判断是否可以结束阻塞状态
      • block() 实现了线程的阻塞逻辑,使用 LockSupport.park/parkNanos

      4. get(timeout, unit) 和 timedGet 方法

      带超时参数的 get 方法:

      public T get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {long nanos = unit.toNanos(timeout);Object r;if ((r = result) == null)r = timedGet(nanos);return (T) reportGet(r, "get");
      }
      

      timedGet 方法与 waitingGet 类似,但增加了超时处理:

      private Object timedGet(long nanos) throws TimeoutException {long d = System.nanoTime() + nanos;long deadline = (d == 0L) ? 1L : d; // 避免为0boolean interrupted = false, queued = false;Signaller q = null;Object r = null;for (;;) { // 检查中断、结果、超时的顺序很重要if (interrupted || (interrupted = Thread.interrupted()))break;else if ((r = result) != null)break;else if (nanos <= 0L)break;else if (q == null) {q = new Signaller(true, nanos, deadline);if (Thread.currentThread() instanceof ForkJoinWorkerThread)ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q);}else if (!queued)queued = tryPushStack(q);else {try {ForkJoinPool.managedBlock(q);interrupted = q.interrupted;nanos = q.nanos;} catch (InterruptedException ie) {interrupted = true;}}}// 清理和恢复if (q != null) {q.thread = null;if (r == null)cleanStack();}// 根据不同情况返回结果或抛出异常if (r != null) {if (interrupted)Thread.currentThread().interrupt();postComplete();return r;} else if (interrupted)return null;elsethrow new TimeoutException();
      }
      

      与 waitingGet 相比,timedGet 增加了:

      1. 计算超时截止时间
      2. 使用超时参数创建 Signaller
      3. 如果超时返回 null 并抛出 TimeoutException

      5. reportGet 和 reportJoin 方法

      这两个方法处理结果并根据不同情况抛出异常:

      private static Object reportGet(Object r, String details)throws InterruptedException, ExecutionException {if (r == null) // 按约定,null表示中断throw new InterruptedException();if (r instanceof AltResult) {Throwable x, cause;if ((x = ((AltResult)r).ex) == null)return null;if (x instanceof CancellationException)throw new CancellationException(details, (CancellationException)x);if ((x instanceof CompletionException) &&(cause = x.getCause()) != null)x = cause;throw wrapInExecutionException(x);}return r;
      }private static Object reportJoin(Object r, String details) {if (r instanceof AltResult) {Throwable x;if ((x = ((AltResult)r).ex) == null)return null;if (x instanceof CancellationException)throw new CancellationException(details, (CancellationException)x);if (x instanceof CompletionException)throw (CompletionException)x;throw wrapInCompletionException(x);}return r;
      }
      

      关键区别:

      • reportGet 抛出受检异常 ExecutionException,用于 get() 方法
      • reportJoin 抛出非受检异常 CompletionException,用于 join() 和 getNow() 方法
      • 对于 null 值的处理不同

      6. join() 和 getNow() 方法

      public T join() {Object r;if ((r = result) == null)r = waitingGet(false);return (T) reportJoin(r, "join");
      }public T getNow(T valueIfAbsent) {Object r;return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r, "getNow");
      }
      
      • join() 类似于 get(),但使用 waitingGet(false) 表示不可中断,并使用 reportJoin
      • getNow() 不会等待,如果结果不可用则返回指定的默认值

      7. 完整流程总结

      1. 用户调用 get()/join()/getNow() 方法获取结果
      2. 检查 result 字段是否为 null
      3. 如果为 null,通过 waitingGet 或 timedGet 进入等待状态
      4. 创建 Signaller 对象并添加到等待栈中
      5. 当前线程通过 ForkJoinPool.managedBlock 或直接 LockSupport.park 阻塞
      6. 当 CompletableFuture 完成时,会调用 postComplete() 唤醒等待的线程
      7. 线程醒来后通过 reportGet 或 reportJoin 处理结果
      8. 根据结果类型返回值或抛出相应异常

      http://www.xdnf.cn/news/772183.html

      相关文章:

    • 九.C++ 对引用的学习
    • 统计随机行走的结构占比
    • JDK21深度解密 Day 12:大规模迁移JDK21方法论
    • PAT-甲级JAVA题解(更新中...)
    • RGB888色彩格式转RGB565格式
    • 海外tk抓包简单暴力方式
    • 从 Windows 7 到 AnduinOS:安装、故障排除与远程控制指南
    • NLP学习路线图(十八):Word2Vec (CBOW Skip-gram)
    • 光伏功率预测 | BiLSTM多变量单步光伏功率预测(Matlab完整源码和数据)
    • 文件索引:数组、二叉树、二叉排序树、平衡树、红黑树、B树、B+树
    • 并查集(上)
    • javaFX eclipse配置
    • Redis数据类型操作命令
    • 考研系列—操作系统:(补充)第七章、输入输出系统
    • 第12次12: 修改和删除收货地址
    • 普通二叉树 —— 最近公共祖先问题解析(Leetcode 236)
    • 专业C++Qt开发服务,助力您的软件项目腾飞!
    • 二叉树的构建与逆构建/二叉查找树与替罪羊树
    • BUUCTF[HCTF 2018]WarmUp 1题解
    • 《人性的弱点》能带给我们什么?
    • C++哈希表:冲突解决与高效查找
    • uni-id-pages login-by-google实现
    • 05.MySQL表的约束
    • 使用免费wordpress成品网站模板需要注意点什么
    • NX847NX855美光固态闪存NX862NX865
    • 向量空间的练习题目
    • 前端高频面试题2:JavaScript/TypeScript
    • SOC-ESP32S3部分:26-物联网MQTT连云
    • 《深度剖析:基于Meta的GameFormer构建自博弈AI游戏代理》
    • 趋势因子均值策略思路