当前位置: 首页 > web >正文

Java22 stream 新特性 窗口算子:GathererOp 和 GatherSink

GathererOp

GathererOp 是 Java Stream API 中 gather() 操作的核心运行时实现。它负责将用户定义的 Gatherer 应用于流中的元素,并生成新的流。

final class GathererOp<T, A, R> extends ReferencePipeline<T, R> {
  • <T, A, R>: 这是类的泛型参数:
    • T: 上游流中元素的类型,也是 Gatherer 消费的元素类型。
    • AGatherer 内部状态累加器的类型。
    • RGatherer 产生的结果元素的类型,也是下游流中元素的类型。
  • extends ReferencePipeline<T, R>GathererOp 继承自 ReferencePipeline
    • ReferencePipeline 是 JDK Stream API 内部用于处理对象类型元素流的基类之一。
    • 这意味着 GathererOp 本身就是一个流操作阶段,它可以连接到其他流操作之前或之后。
    • 它从上游接收类型为 T 的元素(实际上是 P_OUT extends T,如 of 方法所示),并向下游产生类型为 R 的元素。

根据类注释和代码结构,GathererOp 的主要作用是:

  • 实现 gather 操作: 它是 Stream.gather(Gatherer) 方法背后的实际执行者。
  • 管理 Gatherer 的生命周期: 包括初始化、整合元素、合并(并行时)和完成。
  • 支持顺序和并行执行GathererOp 能够根据流的执行模式(顺序或并行)选择合适的求值策略。
  • 融合优化:
    • gather().gather() 融合: 如果一个 gather 操作紧跟着另一个 gather 操作,GathererOp 会尝试将这两个 Gatherer 合并成一个,以减少中间流和操作开销。这是通过 gatherer.andThen(gatherer) 实现的。
    • gather().collect() 融合: 如果 gather 操作后紧跟着一个 collect 终端操作,GathererOp 会覆盖 collect 方法,将 Gatherer 的逻辑与 Collector 的逻辑融合执行,避免生成中间集合,从而提高性能,尤其是在并行流中。
  • 性能: 注释中提到 "The performance-critical code below contains some more complicated encodings",表明其内部实现对性能有较高要求,并可能采用了一些复杂的编码技巧。

静态工厂方法 of

    @SuppressWarnings("unchecked")static <P_IN, P_OUT extends T, T, A, R> Stream<R> of(ReferencePipeline<P_IN, P_OUT> upstream,Gatherer<T, A, R> gatherer) {// 当一个 gather 操作附加到另一个 gather 操作上时,// 我们可以将它们融合成一个if (upstream.getClass() == GathererOp.class) {return new GathererOp<>(((GathererOp<P_IN, Object, P_OUT>) upstream).gatherer.andThen(gatherer),(GathererOp<?, ?, P_IN>) upstream);} else {return new GathererOp<>((ReferencePipeline<?, T>) upstream,gatherer);}}
  • 这是创建 GathererOp 实例的入口。当在流上调用 .gather() 时,内部会调用此方法。
  • 参数:
    • upstream: 上游的 ReferencePipeline,即前一个流操作。
    • gatherer: 用户提供的 Gatherer 接口的实现。
  • 逻辑:
    • 它首先检查上游操作是否也是一个 GathererOp
    • 如果是 (upstream.getClass() == GathererOp.class): 这意味着连续调用了 .gather()。此时,它会使用 upstream.gatherer.andThen(gatherer) 将两个 Gatherer 组合成一个新的 Gatherer,然后用这个组合后的 Gatherer 和更上游的流(upstream.upstream(),通过第二个构造函数间接获取)创建一个新的 GathererOp。这就是 gather().gather() 融合。
    • 如果不是: 则直接使用传入的 upstream 和 gatherer 创建一个新的 GathererOp

构造函数

GathererOp 有两个私有构造函数:

  1. private GathererOp(ReferencePipeline<?, T> upstream, Gatherer<T, A, R> gatherer):

    • 用于处理第一个(或非融合的).gather() 调用。
    • 它调用父类 ReferencePipeline 的构造函数,并根据 gatherer.integrator() 的类型(是否为 Integrator.Greedy)设置操作标志(opFlags),例如是否为短路操作。
    • 保存传入的 gatherer
  2. private GathererOp(Gatherer<T, A, R> gatherer, GathererOp<?, ?, T> upstream):

    • 用于融合连续的 .gather() 调用。此时,gatherer 参数是已经通过 andThen 组合后的 Gatherer
    • upstream 参数是前一个 GathererOp。它会追溯到这个 upstream GathererOp 的上游(即 upstream.upstream())作为新 GathererOp 的直接上游。
    • 同样设置操作标志并保存 gatherer

核心成员变量

  • final Gatherer<T, A, R> gatherer;: 存储与此 GathererOp 关联的 Gatherer 实例。

内部类

a. NodeBuilder<X>

static final class NodeBuilder<X> implements Consumer<X> {// ...
}
  • 作用: 在并行流评估中,NodeBuilder 充当元素的惰性累加器。每个并行任务可能会使用一个 NodeBuilder 来收集其处理的元素。最终,这些 NodeBuilder 的内容会被构建成 Node<X> 对象(Stream API 内部用于表示一批数据的结构),然后这些 Node 可以被合并。
  • 主要方法:
    • accept(X x): 向构建器中添加一个元素。它会维护一个 rightMost 的 SpinedBuffer.Builder 来高效追加。
    • join(NodeBuilder<X> that): 将另一个 NodeBuilder 的内容合并到当前构建器。为了避免创建过深的 Node 树(不平衡的 Concat-trees),对于小的 NodeBuilder,它会直接将元素追加到当前 rightMost 构建器中,而不是总是创建一个 Nodes.conc 节点。LINEAR_APPEND_MAX 控制这个阈值。
    • build(): 从累积的元素中构建一个 Node<X>

b. GatherSink<T, A, R>

static final class GatherSink<T, A, R> implements Sink<T>, Gatherer.Downstream<R> {// ...
}
  • 作用GatherSink 是连接 Gatherer 逻辑和下游流操作(或终端操作)的桥梁。它实现了 Sink<T> 接口来接收上游元素,并实现了 Gatherer.Downstream<R> 接口供 Gatherer 的 integrator 和 finisher 推送结果。
  • 实现接口:
    • Sink<T>: 使其能够作为流操作链中的一个环节,接收上游元素。
      • begin(long size): 在处理开始前调用,用于初始化 Gatherer 的状态 (state = gatherer.initializer().get()) 和下游 Sink
      • accept(T t): 每当上游产生一个元素 t 时调用。它会调用 gatherer.integrator().integrate(state, t, this) 来处理元素并更新状态。proceed 标志会根据 integrate 的返回值更新,用于支持短路。
      • cancellationRequested(): 判断是否应停止处理。这是实现短路的关键,它会检查自身的 proceed 和 downstreamProceed 标志以及下游 sink.cancellationRequested()
      • end(): 在所有元素处理完毕后调用。它会调用 gatherer.finisher().accept(state, this) 来执行最终处理,并通知下游 sink.end()
    • Gatherer.Downstream<R>: 作为 Gatherer 的输出通道。
      • isRejecting(): 返回 !downstreamProceed,表示下游是否已拒绝接收更多元素。
      • push(R r)Gatherer 通过此方法将处理结果 r 推送出去。GatherSink 会将 r 传递给下游的 sink.accept(r)
  • 状态变量:
    • sink: 下游的 Sink
    • gatherer: 当前的 Gatherer
    • integrator: 缓存的 gatherer.integrator(),用于优化。
    • stateGatherer 的内部状态。
    • proceed: 表示 Gatherer 本身是否希望继续处理。
    • downstreamProceed: 表示下游 Sink 是否希望继续接收元素。

关键方法的覆盖

  • opIsStateful():

    boolean opIsStateful() {// TODOreturn true;
    }
    

    目前总是返回 true,表示 GathererOp 是一个有状态的操作。注释中提到了未来可能的优化方向,即根据 Gatherer 的具体特性(是否有 initializercombinerfinisher)来判断其是否真正有状态。有状态操作在并行流中通常需要更复杂的处理(例如,不能简单地流水线化)。

  • opWrapSink(int flags, Sink<R> downstream):

    Sink<T> opWrapSink(int flags, Sink<R> downstream) {return new GatherSink<>(gatherer, downstream);
    }
    

    这是 AbstractPipeline 中的一个核心方法,用于将下游的 Sink 包装成当前操作需要的 Sink。在这里,它创建并返回一个 GatherSink 实例。

  • opEvaluateParallel(PipelineHelper<R> unused1, Spliterator<I> spliterator, IntFunction<R[]> unused2): 此方法定义了 GathererOp 在并行流中的求值逻辑。它利用了 evaluate 辅助方法,并使用 NodeBuilder 来收集并行任务的结果,最后将这些结果合并成一个 Node<R>

  • opEvaluateParallelLazy(PipelineHelper<R> helper, Spliterator<P_IN> spliterator): 用于并行流的惰性求值。注释提到,只有非常特定类型的 Gatherer(无初始化器、有组合器、无完成器)才能直接、高效地表示为 Spliterator。目前的实现是先通过 opEvaluateParallel 完全求值得到一个 Node,然后再从这个 Node 创建 Spliterator,这意味着它不是真正的惰性并行。

  • collect(...) 方法 (两个重载):

    @Override
    public <CR, CA> CR collect(Collector<? super R, CA, CR> c) {// ...return evaluate(...);
    }@Override
    public <RR> RR collect(Supplier<RR> supplier,BiConsumer<RR, ? super R> accumulator,BiConsumer<RR, RR> combiner) {// ...return evaluate(...);
    }
    

    这两个方法覆盖了 Stream 接口的 collect 终端操作。它们实现了 gather().collect() 的融合。通过调用内部的 evaluate 方法,将 Gatherer 的逻辑和 Collector 的逻辑一起执行,避免了先完成 gather 生成中间结果再进行 collect 的开销。

私有辅助方法 evaluate

evaluate 方法是 GathererOp 的“大脑”和“执行引擎”。它位于整个操作的核心,负责统筹和调度,根据不同的运行环境(串行/并行)和 Gatherer 的特性,选择最合适的执行策略来完成 gather 和下游 collect 操作。

它的核心职责可以概括为:接收一个数据源 (Spliterator) 和一个终端收集器 (Collector) 的组件,然后智能地选择并执行一条最优路径,最终返回收集器的结果。

这个方法是 GathererOp 中两个主要的终端操作 collect 的最终实现。你会发现 collect 方法的主要工作就是解析 Collector,然后将所有组件(spliteratorgatherercollector 的各个部分)打包传递给 evaluate

// ... existing code ...private <CA, CR> CR evaluate(final Spliterator<T> spliterator,final boolean parallel,final Gatherer<T, A, R> gatherer,final Supplier<CA> collectorSupplier,final BiConsumer<CA, ? super R> collectorAccumulator,final BinaryOperator<CA> collectorCombiner,final Function<CA, CR> collectorFinisher) {
// ... existing code ...
  • spliterator: 数据源。
  • parallel: 一个布尔标志,指示当前流是否处于并行模式。这是决定执行路径的第一个关键分叉点。
  • gatherer: 用户提供的 Gatherer 实例。
  • collectorSuppliercollectorAccumulatorcollectorCombinercollectorFinisher: 这四个参数是下游 Collector 的四个核心函数,被解构后传入。evaluate 方法将它们与 Gatherer 的逻辑进行融合。

evaluate 的执行逻辑与决策树

evaluate 内部的执行逻辑可以看作一个清晰的决策树:

evaluate(...)
|
+-- 1. 是并行流吗 (is parallel?)|+-- 否 (No) -> **串行路径**|   ||   +-- 创建一个 `Sequential` 实例|   ||   +-- 调用 `sequential.evaluateUsing(spliterator)`|   ||   +-- 调用 `sequential.get()` 返回最终结果|+-- 是 (Yes) -> **并行路径**|+-- 2. Gatherer 是否可合并 (has combiner?)|+-- 否 (No) -> **Hybrid 策略**|   ||   +-- 创建一个 `Hybrid` 任务|   ||   +-- `invoke()` 执行任务|   ||   +-- `get()` 获取共享的 `Sequential` 实例|   ||   +-- 调用 `sequential.get()` 返回最终结果|+-- 是 (Yes) -> **Parallel 策略**|+-- 创建一个 `Parallel` 任务|+-- `invoke()` 执行任务|+-- `get()` 获取根任务合并后的 `Sequential` 实例|+-- 调用 `sequential.get()` 返回最终结果

现在我们把这个决策树和代码对应起来看:

1. 串行路径 (!parallel)
// ... existing code ...if (!parallel)return new Sequential().evaluateUsing(spliterator).get();
// ... existing code ...

这是最简单直接的路径。如果流不是并行的,evaluate 会:

  1. new Sequential(): 创建一个 Sequential 实例。在这个实例的构造函数中,Gatherer 的状态 (state) 和 Collector 的容器 (collectorState) 都被初始化了。
  2. .evaluateUsing(spliterator): 驱动 Sequential 实例开始处理 spliterator 中的所有元素。如我们之前分析的,这个过程将 Gatherer 的输出直接推送给 Collector 的累加器。
  3. .get(): 在所有元素处理完毕后,调用 Sequential 的 get 方法,该方法会依次执行 Gatherer 的 finisher 和 Collector 的 finisher,产出最终结果。
2. 并行路径 (parallel)

当 parallel 为 true 时,evaluate 进入并行处理逻辑。它首先要做的就是判断应该使用 Hybrid 还是 Parallel 策略。

// ... existing code ...// Parallel section starts here:final var combiner = gatherer.combiner();// ... (Hybrid 和 Parallel 类的定义) ...if (combiner == Gatherer.defaultCombiner()) {// NO COMBINER -> HYBRIDreturn new Hybrid(spliterator).invoke().get();} else {// HAS COMBINER -> PARALLELreturn new Parallel(spliterator).invoke().get();}
// ... existing code ...

(注:为了清晰,将 Hybrid 和 Parallel 的调用逻辑移到了一起,实际代码中它们的定义在调用之前)

  • gatherer.combiner(): 这是决策的关键。evaluate 获取 Gatherer 的合并器。
  • if (combiner == Gatherer.defaultCombiner()): 判断 Gatherer 是否提供了有效的合并器。
    • true (不可合并): 选择 Hybrid 策略。创建一个 Hybrid 根任务,并通过 invoke() 启动 ForkJoin 计算。计算完成后,invoke() 返回根任务,我们再调用 .get() 获取最终的 Sequential 实例(在 Hybrid 模式下,所有任务共享这一个实例),最后调用 Sequential 实例的 .get() 方法获取最终结果。
    • false (可合并): 选择 Parallel 策略。创建一个 Parallel 根任务,并通过 invoke() 启动。在计算过程中,每个叶子任务有自己的 Sequential 实例,结果会逐级向上合并。invoke() 完成后,返回的根任务中包含了完全合并后的 Sequential 实例。最后同样调用这个实例的 .get() 方法获取最终结果。

总结

evaluate 方法是 GathererOp 实现高性能和高灵活性的基石。它通过对 parallel 标志和 gatherer.combiner() 的判断,精确地将执行流导向三个专门设计的执行器之一:

  • Sequential: 用于串行流,实现了 Gatherer 和 Collector 的零开销融合。
  • Hybrid: 用于并行的、但 Gatherer 不可合并的场景。它通过“上游并行,下游串行”的混合模式,在保证正确性的前提下最大化并行度。
  • Parallel: 用于并行的、且 Gatherer 可合并的场景。它实现了彻底的分治并行计算,将性能发挥到极致。

这个方法完美地体现了Java Stream API在设计上的深思熟虑,即如何将一个复杂的操作分解,并为不同的场景提供高度优化的执行路径。

GatherSink

GatherSink 是 Gatherer 操作在串行流(Sequential Stream)或作为并行处理中叶子节点的执行体。它扮演着一个“适配器”和“状态管理器”的关键角色,连接上游的元素流和下游的 Sink

  • 角色:它是一个 Sink(接收器)。在 Stream 的流水线模型中,每个操作都会包装下游的 Sink,形成一个链条。GatherSink 正是 gather() 操作的 Sink 实现。
  • 双重身份:它实现了两个关键接口:
    1. Sink<T>: 使其能被上游操作调用,接收上游流过来的元素 T
    2. Gatherer.Downstream<R>: 使其能被 Gatherer 的 integrator 和 finisher 调用,将处理后的结果 R 推送给下游。
  • 核心目标
    1. 管理 Gatherer 的生命周期:调用 initializer 创建初始状态,在接收每个元素时调用 integrator,在流结束时调用 finisher
    2. 管理 Gatherer 的状态(state)。
    3. 处理短路(Short-Circuiting):当 Gatherer 或下游 Sink 不再需要更多元素时,能够有效地向上游传递“取消”信号。

我们来逐个部分解析 GatherSink 的实现。

// ... existing code ...static final class GatherSink<T, A, R> implements Sink<T>, Gatherer.Downstream<R> {private final Sink<R> sink;private final Gatherer<T, A, R> gatherer;private final Integrator<A, T, R> integrator; // Optimization: reuseprivate A state;private boolean proceed = true;private boolean downstreamProceed = true;GatherSink(Gatherer<T, A, R> gatherer, Sink<R> sink) {this.gatherer = gatherer;this.sink = sink;this.integrator = gatherer.integrator(); // 提前获取,避免重复调用}
// ... existing code ...
  • sink: 指向下游的 SinkGatherSink 处理完元素后,通过这个 sink 将结果传递下去。
  • gatherer: 当前操作所使用的 Gatherer 实例。
  • integrator: 这是从 gatherer 中提前获取的 integrator。这是一个性能优化,因为 integrator 是 accept 方法中的热点代码,提前缓存可以避免在每次调用时都通过 gatherer.integrator() 访问。
  • stateGatherer 的状态对象,由 initializer 创建。
  • proceed: 一个布尔标志,表示当前 Gatherer 是否还想继续处理上游的元素。如果 integrator 返回 false,这个标志就会被设为 false
  • downstreamProceed: 一个布尔标志,表示下游 Sink 是否还想接收元素。它通过调用 sink.cancellationRequested() 来更新。

这两个 proceed 标志是实现短路机制的核心。

Sink<T> 接口实现 - 与上游交互

这是 GatherSink 作为标准 Sink 的行为。

// ... existing code ...// java.util.stream.Sink contract below:@Overridepublic void begin(long size) {final var initializer = gatherer.initializer();if (initializer != Gatherer.defaultInitializer()) // 优化:如果不是默认的无操作initializerstate = initializer.get();sink.begin(-1); // GathererOp 通常不知道输出大小,所以传递-1}@Overridepublic void accept(T t) {// ... 性能优化注释 ...proceed &= integrator.integrate(state, t, this);}@Overridepublic boolean cancellationRequested() {return cancellationRequested(proceed && downstreamProceed);}private boolean cancellationRequested(boolean knownProceed) {// 高性能敏感区域return !(knownProceed && (!sink.cancellationRequested() || (downstreamProceed = false)));}@Overridepublic void end() {final var finisher = gatherer.finisher();if (finisher != Gatherer.<A, R>defaultFinisher()) // 优化:如果不是默认的无操作finisherfinisher.accept(state, this);sink.end();state = null; // 帮助GC}
// ... existing code ...
  • begin(long size): 在流处理开始时调用。它会调用 gatherer 的 initializer 来创建初始状态 state,然后通知下游 sink 处理开始。
  • accept(T t): 这是最核心的方法,每当上游传来一个元素 t 时被调用。
    • 它调用 integrator.integrate(state, t, this)integrator 会处理这个元素,可能会更新 state,也可能会通过 this(即 Gatherer.Downstream)向下游推送零个或多个结果。
    • integrator 返回一个布尔值,表示它自己是否还想接收更多元素。
    • proceed &= ... 这个写法非常巧妙。它确保一旦 proceed 变为 false,它就再也不会变回 true。这是一个比 if (!integrator.integrate(...)) proceed = false; 更高效的无分支写法。
  • cancellationRequested(): 上游操作会调用这个方法来查询是否应该停止发送元素。
    • 它同时检查 proceed (自己是否想继续) 和 downstreamProceed (下游是否想继续)。
    • cancellationRequested(boolean knownProceed) 内部的逻辑 !(knownProceed && (!sink.cancellationRequested() || (downstreamProceed = false))) 稍微复杂,我们分解一下:
      • 如果 knownProceed 是 false(意味着 proceed 或 downstreamProceed 已经是 false),则直接返回 true(请求取消)。
      • 如果 knownProceed 是 true,则检查 !sink.cancellationRequested()
      • 如果下游没有请求取消,!sink.cancellationRequested() 为 true,整个 && 表达式为 true,取反后为 false(不请求取消)。
      • 如果下游请求了取消,!sink.cancellationRequested() 为 false。由于 || 的短路特性,会执行 downstreamProceed = false,将下游的取消状态缓存起来。然后 || 表达式结果为 false,整个 && 表达式为 false,取反后为 true(请求取消)。
  • end(): 在流处理结束时调用。它会调用 gatherer 的 finisher 来处理最终的状态,可能会向下游推送最后的元素。然后通知下游 sink 处理结束。

Gatherer.Downstream<R> 接口实现 - 与 Gatherer 内部交互

这是 GatherSink 作为 integrator 和 finisher 的回调通道的行为。

// ... existing code ...// Gatherer.Sink contract below:@Overridepublic boolean isRejecting() {return !downstreamProceed;}@Overridepublic boolean push(R r) {var p = downstreamProceed;if (p)sink.accept(r); // 将结果推给下游return !cancellationRequested(p);}}
// ... existing code ...
  • push(R r)integrator 或 finisher 通过调用这个方法来产出结果。
    • 它首先检查 downstreamProceed,如果下游还在接收,就调用 sink.accept(r) 将结果 r 推送下去。
    • 然后它返回一个布尔值,告诉调用者(integrator/finisher)处理完这次 push 后,整个流水线是否还希望继续。这个返回值基于 cancellationRequested 的结果,这样 integrator 就可以在 push 之后立即知道是否应该短路。
  • isRejecting(): 允许 integrator 查询下游是否已经拒绝接收更多元素。这对于某些复杂的 Gatherer 很有用,它们可能在 push 之前就想知道下游的状态。

总结

GatherSink 是连接 Stream 流水线和 Gatherer 逻辑的核心桥梁。它通过实现 Sink 和 Gatherer.Downstream 两个接口,完美地扮演了双重角色:

  1. 对上游,它是一个标准的 Sink,遵循 begin -> accept* -> end 的生命周期,并能通过 cancellationRequested 向上游传递短路信号。
  2. 对 Gatherer 内部,它是一个 Downstream 回调,为 integrator 和 finisher 提供了推送结果 (push) 和查询下游状态 (isRejecting) 的能力。

它的设计充满了性能优化的考量,例如缓存 integrator、使用无分支的 &= 操作、以及精巧的 cancellationRequested 逻辑,确保了 gather 操作在串行模式下的高效执行。

Sequential

Sequential 类是 GathererOp 中 evaluate 方法的核心组件之一,它专门用于 串行(Sequential) 执行模式。它的设计目标是将一个 Gatherer 和一个下游的 Collector 融合在一起,形成一个单一的、高效的串行处理单元。

  • 角色:它是一个融合了 Gatherer 逻辑和 Collector 逻辑的处理器。它同时扮演了多个角色,通过实现 Consumer<T> 和 Gatherer.Downstream<R> 接口来完成。
  • 核心目标
    1. 避免中间结果物化:常规的 stream.gather(...).collect(...) 会先由 gather 操作完全处理完所有元素,生成一个中间的 Stream<R>,然后再由 collect 操作来消费这个中间流。Sequential 类通过融合,使得 Gatherer 产生一个结果 R 后,能立即被 Collector 的 accumulator 消费,从而避免了创建和存储整个中间结果集,极大地提升了效率和降低了内存消耗。
    2. 统一处理逻辑:它为串行流和 Hybrid 并行模式的串行处理阶段提供了一个统一的、可复用的执行体。

我们来逐个部分解析 Sequential 的实现。

// ... existing code ...// Sequential is the fusion of a Gatherer and a Collector which can// be evaluated sequentially.final class Sequential implements Consumer<T>, Gatherer.Downstream<R> {A state;CA collectorState;boolean proceed;Sequential() {if (initializer != Gatherer.defaultInitializer())state = initializer.get();collectorState = collectorSupplier.get();proceed = true;}
// ... existing code ...
  • implements Consumer<T>, Gatherer.Downstream<R>: 这个类的双重身份。
    • Consumer<T>: 使其 accept(T) 方法可以被 Spliterator.forEachRemaining 或 tryAdvance 调用,从而接收上游流过来的原始元素 T
    • Gatherer.Downstream<R>: 使其 push(R) 方法可以被 Gatherer 的 integrator 调用,从而接收 Gatherer 处理后产生的结果 R
  • stateGatherer 的状态对象,由 gatherer.initializer() 创建。
  • collectorStateCollector 的状态对象(即累加容器),由 collectorSupplier.get() 创建。
  • proceed: 一个布尔标志,用于处理非 greedy 模式下的短路。当 integrator 返回 false 时,它会被设为 false,从而终止 evaluateUsing 中的循环。

evaluateUsing(Spliterator<T> spliterator) - 驱动方法

// ... existing code ...@ForceInlineSequential evaluateUsing(Spliterator<T> spliterator) {if (greedy)spliterator.forEachRemaining(this);elsedo {} while (proceed && spliterator.tryAdvance(this));return this;}
// ... existing code ...
  • 这是驱动整个串行处理流程的入口。
  • if (greedy): 如果 Gatherer 是 greedy 的(即不会短路),就一次性调用 spliterator.forEachRemaining(this)。这会遍历 spliterator 中的所有元素,并对每个元素调用 this.accept(T) 方法。这是最高效的处理方式。
  • else: 如果 Gatherer 可能会短路,则使用 do-while 循环和 spliterator.tryAdvance(this)tryAdvance 一次只处理一个元素,并且循环条件会检查 proceed 标志。一旦 proceed 变为 false,循环就会立即终止,实现短路。

accept(T t) - 实现 Consumer<T>

// ... existing code ...@Overridepublic void accept(T t) {/** ...*/var ignore = integrator.integrate(state, t, this)|| (!greedy && (proceed = false));}
// ... existing code ...
  • 当 evaluateUsing 拉取上游元素时,此方法被调用。
  • integrator.integrate(state, t, this): 这是核心调用。它将上游元素 tGatherer 的当前状态 state 交给 integrator 处理。this 作为 Downstream 被传入,以便 integrator 可以通过 push 方法输出结果。
  • || (!greedy && (proceed = false)): 这是一个精巧的短路实现。
    • integrator.integrate 返回一个布尔值,true 表示希望继续,false 表示不希望继续。
    • 如果 integrator 返回 true,由于 || 的短路特性,后面的部分不会执行,proceed 保持 true
    • 如果 integrator 返回 false,则会执行 || 后面的部分。!greedy 条件确保这只在非贪婪模式下发生,然后 proceed = false 被执行,进行短路。

push(R r) - 实现 Gatherer.Downstream<R>

// ... existing code ...@Overridepublic boolean push(R r) {collectorAccumulator.accept(collectorState, r);return true;}
// ... existing code ...
  • 当 integrator 内部决定要产出一个结果 r 时,它会调用 downstream.push(r),也就是这个方法。
  • collectorAccumulator.accept(collectorState, r)这就是融合的关键所在Gatherer 产生的结果 r,没有被放入任何中间集合,而是被直接传递给了 Collector 的累加器。
  • return true;: 在这个融合的场景下,Collector 本身是不能短路的(这是 Collector 的规范),所以它总是告诉 Gatherer 的 integrator “可以继续推数据给我”。

get() - 获取最终结果

// ... existing code ...@SuppressWarnings("unchecked")public CR get() {final var finisher = gatherer.finisher();if (finisher != Gatherer.<A, R>defaultFinisher())finisher.accept(state, this);// IF collectorFinisher == null -> IDENTITY_FINISHreturn (collectorFinisher == null)? (CR) collectorState: collectorFinisher.apply(collectorState);}
// ... existing code ...
  • 在所有元素都被 evaluateUsing 处理完毕后,调用此方法来获取最终结果。
  • gatherer.finisher().accept(state, this): 首先,调用 Gatherer 的 finisher,让它可以处理最终的状态,并可能通过 push 输出最后的元素。
  • collectorFinisher.apply(collectorState): 然后,调用 Collector 的 finisher,对累加容器 collectorState 进行最终的转换,得到最终结果 CR。如果 Collector 有 IDENTITY_FINISH 特性,则直接返回累加容器本身。

总结

Sequential 类是一个高度优化的、专门用于串行执行的融合处理器。它通过巧妙地实现 Consumer 和 Downstream 接口,将 Gatherer 的输出无缝对接到 Collector 的输入,避免了不必要的中间数据结构,并将 Gatherer 和 Collector 的整个生命周期(初始化、累加、终结)紧密地结合在一起,形成了一个高效的单一处理循环。这对于提升串行流的性能至关重要。

Hybrid

Hybrid 是 Gatherer 操作在并行流(Parallel Stream)中,针对没有提供 combiner 的有状态 Gatherer 的一种特殊执行策略。它的名字“Hybrid”(混合)恰如其分地描述了它的工作模式:上游并行处理,下游串行

Hybrid 的设计借鉴了 ForEachOrderedTask,它将任务分解成一个链表式的结构,确保了即使任务被 ForkJoinPool 中的不同线程并行执行,最终结果的处理顺序也和原始 Spliterator 的顺序一致。

其工作流程可以概括为:

  1. 任务分裂(compute:将原始的 Spliterator 递归地分裂成更小的块,形成一个任务树。关键在于,它会维护一个 leftPredecessor(左前驱)的引用,构建出一个逻辑上的任务链表。
  2. 上游并行处理(compute - greedy 分支):对于叶子任务,如果 Gatherer 是 greedy 的(即它会消耗所有上游元素),Hybrid 会先执行完所有上游操作(如 mapfilter),并将结果缓冲到一个 NodeBuilder 中。这一步是完全并行的。
  3. 串行化处理(onCompletion:当一个任务完成时(无论是完成了上游处理,还是它本身就是叶子节点),onCompletion 方法会被调用。这个方法会按照任务链表的顺序,依次执行 localResult.evaluateUsing(s),即调用 Sequential 实例来串行地处理自己持有的那一小块数据。
  4. 短路支持(cancelled:通过一个共享的 AtomicBoolean cancelled 标志,实现非 greedy 模式下的短路。一旦下游的 Sequential 处理器表示不再需要数据 (proceed 变为 false),就会设置 cancelled 标志,后续的任务在执行前会检查此标志,从而避免不必要的计算。

为什么需要Hybrid

场景:一个无法并行合并的 Gatherer

让我们以 Gatherers.windowSliding(3) 为例。这是一个典型的有状态无法并行合并的 Gatherer

  • 有状态:它需要维护一个内部队列(大小最多为3),记录最近看到的元素,以便生成滑动窗口。
  • 无法并行合并:假设我们有一个流 [1, 2, 3, 4, 5, 6]
    • 线程A处理 [1, 2, 3],它的最终状态是队列 [1, 2, 3],并输出了窗口 [1, 2, 3]
    • 线程B处理 [4, 5, 6],它的最终状态是队列 [4, 5, 6],并输出了窗口 [4, 5, 6]
    • 现在,我们如何合并这两个结果?我们丢失了 [2, 3, 4] 和 [3, 4, 5] 这两个跨越了数据块边界的窗口。没有一个通用的 combiner 能猜到需要这样去合并。因此,它没有提供 combiner

对于这样一个 Gatherer,如果没有 Hybrid 策略,在并行流中,我们只有两个选择,但都不理想:

  1. 完全串行执行

    • 做法:放弃并行,整个流从头到尾都在一个线程里执行。
    • 优点:能得到正确的结果。
    • 缺点:性能极差。如果流是这样的:source.filter(...).map(...).gather(windowSliding(3)),那么 filter 和 map 这两个本可以高效并行的无状态操作,也被迫在单线程里执行,完全浪费了多核CPU的优势。
  2. 常规的并行执行(会出错)

    • 做法:像处理 map 或 filter 一样,把数据分块,让每个线程独立处理自己的那部分 gather 逻辑。
    • 优点:速度快。
    • 缺点:结果是错误的!就像上面例子展示的,我们会丢失所有跨越数据块边界的窗口。

这就是 Hybrid 策略要解决的核心矛盾:我们既想要利用多核CPU并行处理 filter 和 map 等上游操作来提升性能,又必须保证 windowSliding 这个 Gatherer 本身是按顺序处理元素的以确保结果正确。

Hybrid(混合)策略的名字完美地诠释了它的解决方案:把一个流处理过程拆分成两部分,分别对待

  1. 上游并行(Upstream Parallelism)

    • Hybrid 任务在 ForkJoinPool 中被分裂成许多小任务,分布到不同CPU核心上。
    • 每个任务负责一小块数据。它会首先执行完所有上游的无状态操作(如 filtermap)。比如,线程A处理 source 的前1000个元素,它会先对这1000个元素进行 filter 和 map。线程B同时对后1000个元素做同样的事。
    • 这一步是完全并行的,极大地利用了CPU资源。
    • 处理完的结果被临时缓冲起来(存入 NodeBuilder)。
  2. 下游串行(Downstream Serial)

    • 当上游的并行计算完成后,Hybrid 任务会进入 onCompletion 阶段。
    • Hybrid 内部通过一个精巧的链表结构 (next 指针) 保证了,即使各个任务块的并行计算完成时间是乱序的,它们向下游提交数据(即执行 gather 逻辑)的顺序也严格遵循原始数据流的顺序
    • 所有任务共享同一个 Sequential 实例,这个实例包含了 Gatherer 的状态。
    • 任务A处理完后,把它的缓冲结果交给 Sequential 实例处理;然后,任务B才能把它缓冲的结果交过来。这就确保了 windowSliding 看到的元素流是 [..., element_N, element_N+1, ...] 这样连续不断的,从而可以正确地生成所有窗口。

所以,Hybrid 策略的必要性在于:

  • 对于那些有状态不可合并的 Gatherer,我们不能使用常规的并行模式,否则结果会出错。
  • 但我们又不想因为这一个 Gatherer 操作而放弃整个流的并行化潜力,让 mapfilter 等无辜的操作也跟着串行执行,这会造成巨大的性能损失。

Hybrid 提供了一个两全其美的方案:它像一个聪明的调度器,将可以并行的部分(上游无状态操作)充分并行化,然后将必须串行的部分(有状态的 Gatherer 逻辑)严格串行化,并在这两者之间建立了一座桥梁,从而在保证结果正确性的前提下,最大化地压榨了多核CPU的性能。

类的定义与关键成员变量

// ... existing code ...@SuppressWarnings("serial")final class Hybrid extends CountedCompleter<Sequential> {private final long targetSize;private final Hybrid leftPredecessor;private final AtomicBoolean cancelled;private final Sequential localResult;private Spliterator<T> spliterator;private Hybrid next;private static final VarHandle NEXT = MhUtil.findVarHandle(MethodHandles.lookup(), "next", Hybrid.class);
// ... existing code ...
  • extends CountedCompleter<Sequential>: 继承自 CountedCompleter,这是 ForkJoin 框架中用于处理完成依赖关系的核心类。它的结果类型是 Sequential,即下游的串行处理器。
  • targetSizeForkJoin 任务分裂的建议目标大小。
  • leftPredecessor: 指向其在逻辑顺序上的前一个任务。这是保证顺序执行的关键。
  • cancelled: 一个共享的 AtomicBoolean,用于在非 greedy 模式下实现短路。所有分裂出的任务共享同一个 cancelled 实例。
  • localResult: 下游的串行处理器。所有任务也共享同一个 Sequential 实例,因为处理逻辑必须是串行的。
  • spliterator: 当前任务负责处理的数据片段。
  • next: 指向其在逻辑顺序上的后一个任务。通过 VarHandle 进行原子更新,用于在任务完成时触发下一个任务的执行。

compute() - 任务分裂与上游并行化

Hybrid.compute() 的执行流程可以分为两个主要阶段:

  1. 任务分裂阶段(Task Splitting):通过一个 while 循环,递归地将大的数据块(Spliterator)分解成更小的、适合并行处理的子任务。这个阶段的核心是构建一个逻辑上的任务链表,为后续的有序处理做准备。
  2. 叶子任务处理阶段(Leaf Task Processing):当任务块小到不再适合继续分裂时,循环停止。此时,当前任务成为“叶子任务”,它需要真正地处理自己所持有的那一小块数据。“存入临时 nb”就发生在这个阶段

 

// ... existing code ...@Overridepublic void compute() {var task = this;Spliterator<T> rightSplit = task.spliterator, leftSplit;long sizeThreshold = task.targetSize;boolean forkRight = false;
// ... existing code ...
  • 初始化task 指向当前执行的 Hybrid 实例。rightSplit 是当前任务需要处理的数据源。sizeThreshold 是决定是否继续分裂的阈值。
1. 任务分裂阶段 (while 循环)
// ... existing code ...while ((greedy || !cancelled.get())&& rightSplit.estimateSize() > sizeThreshold&& (leftSplit = rightSplit.trySplit()) != null) {var leftChild = new Hybrid(task, leftSplit, task.leftPredecessor);var rightChild = new Hybrid(task, rightSplit, leftChild);/* leftChild and rightChild were just created and not* fork():ed yet so no need for a volatile write*/leftChild.next = rightChild;// ... (pending count 和 fork 逻辑 和 ForEachOrdered一样) ...// 这部分是 ForkJoin 框架的控制逻辑,用于管理任务依赖和执行// 核心思想是交替地 fork 左/右子任务,让当前线程继续处理另一半if (forkRight) {rightSplit = leftSplit;task = leftChild;rightChild.fork();} else {task = rightChild;leftChild.fork();}forkRight = !forkRight;}
// ... existing code ...
  • 循环条件
    • (greedy || !cancelled.get()): 如果不是 greedy 模式(即可能会短路),则每次分裂前都检查是否已被取消。
    • rightSplit.estimateSize() > sizeThreshold: 数据块足够大,值得分裂。
    • (leftSplit = rightSplit.trySplit()) != null: 数据源支持分裂。
  • 创建子任务:创建 leftChild 和 rightChild 两个新的 Hybrid 任务。
  • 构建链表leftChild.next = rightChild; 这一行至关重要。它在任务树的兄弟节点之间建立了一个逻辑上的先后关系。这保证了即使 leftChild 和 rightChild 被不同线程并行执行,我们也能在将来按正确的顺序处理它们的结果。
  • fork()rightChild.fork() 或 leftChild.fork() 将一个子任务提交给 ForkJoinPool,让其他空闲线程可以“窃取”并执行它。当前线程则继续在循环中处理剩下的那一半数据。

这个分裂过程会一直持续,直到数据块小得不能再分,此时 while 循环退出,进入叶子任务处理阶段。

2. 叶子任务处理阶段 (关键部分)
// ... existing code .../** ...* IMPORTANT: Currently we only perform the processing of this* upstream data if we know the operation is greedy -- as we cannot* safely speculate on the cost/benefit ratio of parallelizing* the pre-processing of upstream data under short-circuiting.*/if (greedy && task.getPendingCount() > 0) {// Upstream elements are bufferedNodeBuilder<T> nb = new NodeBuilder<>();rightSplit.forEachRemaining(nb); // Run the upstreamtask.spliterator = nb.build().spliterator();}task.tryComplete();}
// ... existing code ...
  • if (greedy && ...): 这个优化只在 greedy 模式下进行。greedy 意味着 Gatherer 必须处理完所有上游元素才能完成,不会短路。这使得我们可以安全地预处理所有数据。
  • NodeBuilder<T> nb = new NodeBuilder<>();: 创建一个临时的缓冲区。
  • rightSplit.forEachRemaining(nb);这就是实现上游并行的魔法所在!
    • rightSplit 是什么?它不是原始的数据源,而是经过了上游所有操作(如 filtermap)层层包装后的 Spliterator
    • forEachRemaining(nb) 的作用是:拉动 rightSplit,让它开始吐出元素。每吐出一个元素,上游的 mapfilter 等操作就会被执行。
    • nb 是一个 Sink(通过 accept 方法),它接收这些经过上游操作处理后的结果,并把它们缓冲在内部。
    • 为什么能并行? 因为此时,多个不同的叶子任务(Hybrid 实例)正在不同的CPU核心上同时执行它们的 compute 方法。每个叶子任务都在调用 forEachRemaining,从而驱动它所负责的那一小块数据流过上游的 map/filter 流水线。这就实现了对上游无状态操作的并行处理。
  • task.spliterator = nb.build().spliterator();: 处理完成后,nb 中包含了当前任务块所有处理好的元素。我们用 nb.build() 将其固化成一个 Node,然后替换掉任务原来的 spliterator。现在,task.spliterator 指向的是一块已经过上游并行处理、结果被缓存的内存数据
  • task.tryComplete();: 通知 ForkJoin 框架,当前任务的 compute 阶段已经完成。这会触发 onCompletion 的执行。

总结与流程梳理

我们把整个流程串起来看:

  1. 一个大的 Hybrid 任务开始执行 compute
  2. while 循环将任务分裂成许多小的叶子任务,这些任务被 fork() 到 ForkJoinPool 中,由不同的线程并行执行。同时,通过 next 指针构建了一个逻辑顺序链。
  3. 并行阶段:每个叶子任务在其 compute 方法中,调用 forEachRemaining并行地执行 gather 之前的所有 mapfilter 等操作,并将结果缓冲到各自的 NodeBuilder 中。
  4. 串行阶段的准备:当一个叶子任务的 compute 完成后,它调用 tryComplete(),这会触发 onCompletion
  5. 串行阶段onCompletion 方法是按 next 链表顺序被调用的。它会把自己缓冲好的数据(task.spliterator)交给下游唯一的、共享的 Sequential 处理器进行串行处理。

所以,“存入临时 nb”这个步骤,本质上是将上游的并行计算和下游的串行处理解耦开。它允许我们在上游尽情地利用多核并行计算,然后把计算结果(乱序到达的)先暂存起来,最后再由 onCompletion 机制像接力赛一样,一棒一棒地、按正确的顺序,把这些暂存的结果交给下游的串行处理器。

onCompletion() - 串行化处理与任务串联

// ... existing code ...@Overridepublic void onCompletion(CountedCompleter<?> caller) {var s = spliterator;spliterator = null; // GC// 1. 执行串行处理if (s != null&& (greedy || !cancelled.get())&& !localResult.evaluateUsing(s).proceed // 调用 Sequential 处理器&& !greedy)cancelled.set(true); // 如果需要,设置短路标志// 2. 触发下一个任务@SuppressWarnings("unchecked")var leftDescendant = (Hybrid) NEXT.getAndSet(this, null);if (leftDescendant != null) {leftDescendant.tryComplete();}}
// ... existing code ...
  • 执行串行处理:这是 Hybrid 模式的核心。当一个任务完成其 compute 阶段后,onCompletion 被调用。它会调用共享的 localResult (一个 Sequential 实例) 的 evaluateUsing 方法,把自己持有的 spliterator (对于 greedy 模式,这是缓冲后的 Node 的 spliterator) 交给它处理。由于所有任务共享同一个 localResult,并且通过 next 指针的串联保证了 onCompletion 的调用顺序,这里的处理实际上是串行的。
  • 触发下一个任务NEXT.getAndSet(this, null) 是一个原子操作,它获取并清空 next 引用。如果 next 任务(即 leftDescendant)存在,就调用它的 tryComplete()。这就像一个接力赛,当前任务完成处理后,拍一下下一个任务的肩膀,让它开始处理。

总结

Hybrid 类是一个非常精巧的设计,它在面对“有状态且不可并行合并”的 Gatherer 时,找到了一条兼顾并行性能和顺序要求的中间道路:

  • 它将问题分解为**上游(Upstream)下游(Downstream)**两部分。
  • 上游gather 之前的操作)通过 ForkJoin 和 NodeBuilder 缓冲,实现了最大程度的并行计算
  • 下游gather 和 collect 操作)通过共享的 Sequential 实例和 CountedCompleter 的有序完成机制,实现了严格的串行处理

Hybrid 模式是 GathererOp 能够高效融入并行流体系,同时又不破坏复杂有状态操作的顺序依赖性的关键所在。

Parallel

Parallel 类是 GathererOp 为可并行合并的 Gatherer 操作量身定制的并行执行策略。当一个 Gatherer 提供了有效的 combiner(合并器)时,就会采用此策略。它借鉴了 AbstractShortCircuitTask 的设计思想,实现了高效的并行处理和结果合并,并支持短路操作。

  • 角色:一个基于 ForkJoin 框架的并行任务,用于执行那些状态可以被并行计算然后合并的 Gatherer
  • 核心目标
    1. 完全并行化:与 Hybrid 不同,Parallel 策略旨在将 Gatherer 的逻辑(包括状态的累积)也完全并行化。每个工作线程都会有一个独立的 Gatherer 状态。
    2. 状态合并:在并行计算完成后,通过用户提供的 gatherer.combiner() 和下游 collector.combiner() 将各个线程的独立状态合并成最终结果。
    3. 支持短路:能够处理非 greedy 的 Gatherer,当某个分支决定短路时,能有效地取消后续不必要的计算。

我们来逐个部分解析 Parallel 的实现。

// ... existing code ...@SuppressWarnings("serial")final class Parallel extends CountedCompleter<Sequential> {private Spliterator<T> spliterator;private Parallel leftChild; // Only non-null if rightChild isprivate Parallel rightChild; // Only non-null if leftChild isprivate Sequential localResult;private volatile boolean canceled;private long targetSize; // lazily initialized// ... 构造函数 ...
// ... existing code ...
  • extends CountedCompleter<Sequential>: 继承自 CountedCompleter,这是 ForkJoin 框架中用于管理依赖任务完成计数的关键类。Sequential 是其最终的计算结果类型。
  • spliterator: 当前任务需要处理的数据分片。
  • leftChildrightChild: 指向分裂出的左右子任务,用于构建任务树。
  • localResult每个 Parallel 任务实例都拥有一个独立的 Sequential 实例。这个 Sequential 实例负责处理当前任务分片的数据,并持有该分片的局部 Gatherer 状态和 Collector 状态。这是与 Hybrid 模式(共享同一个 localResult)的根本区别。
  • canceled: 一个 volatile 的布尔值,用于实现短路。当一个任务需要取消时,它会通知其兄弟任务和父任务。
  • targetSize: 任务分裂的阈值,延迟初始化。

compute() - 任务分裂与执行

// ... existing code ...@Overridepublic void compute() {Spliterator<T> rs = spliterator, ls;long sizeEstimate = rs.estimateSize();final long sizeThreshold = getTargetSize(sizeEstimate);Parallel task = this;boolean forkRight = false;boolean proceed;while ((proceed = (greedy || !task.isRequestedToCancel()))&& sizeEstimate > sizeThreshold&& (ls = rs.trySplit()) != null) {final var leftChild = task.leftChild = new Parallel(task, ls);final var rightChild = task.rightChild = new Parallel(task, rs);task.setPendingCount(1);if (forkRight) {rs = ls;task = leftChild;rightChild.fork();} else {task = rightChild;leftChild.fork();}forkRight = !forkRight;sizeEstimate = rs.estimateSize();}if (proceed)task.doProcess();task.tryComplete();}
// ... existing code ...
  • 任务分裂循环 (while):
    • 逻辑与 Hybrid 类似,通过 trySplit 将大任务递归地分解成小任务。
    • isRequestedToCancel(): 在每次分裂前检查是否已被其他任务请求取消,以实现短路。
    • fork(): 将其中一个子任务提交给 ForkJoinPool,让其他线程窃取执行,当前线程继续处理另一半。
  • 叶子任务处理:
    • 当任务不再分裂时,循环退出。
    • if (proceed): 如果没有被取消,则调用 task.doProcess()

doProcess():

private void doProcess() {if (!(localResult = new Sequential()).evaluateUsing(spliterator).proceed&& !greedy)cancelLaterTasks();
}
  • localResult = new Sequential()关键点! 每个叶子任务都会创建一个全新的 Sequential 实例来处理自己的数据分片。
  • evaluateUsing(spliterator): 调用 Sequential 的方法,完成对当前数据分片的 Gatherer 和 Collector 的处理。
  • cancelLaterTasks(): 如果 evaluateUsing 返回 false(表示当前分支短路了),则调用此方法去通知其他相关的任务取消执行。

merge(Sequential l, Sequential r) - 结果合并

// ... existing code ...Sequential merge(Sequential l, Sequential r) {/** Only join the right if the left side didn't short-circuit,* or when greedy*/if (greedy || (l != null && r != null && l.proceed)) {l.state = combiner.apply(l.state, r.state);l.collectorState =collectorCombiner.apply(l.collectorState, r.collectorState);l.proceed = r.proceed;return l;}return (l != null) ? l : r;}
// ... existing code ...
  • 这是 Parallel 策略的核心。当左右子任务都完成后,父任务会调用此方法来合并它们的结果。
  • if (greedy || ...): 检查是否需要合并。如果不是 greedy 模式,且左边的任务已经短路了 (l.proceed 为 false),则无需合并右边的结果。
  • l.state = combiner.apply(l.state, r.state): 调用用户提供的 Gatherer 合并器,合并两个子任务的 Gatherer 状态。
  • l.collectorState = collectorCombiner.apply(...): 调用下游 Collector 的合并器,合并两个子任务的 Collector 状态。

onCompletion

// ... existing code ...@Overridepublic void onCompletion(CountedCompleter<?> caller) {spliterator = null; // GC assistanceif (leftChild != null) {/* ... */localResult = merge(leftChild.localResult, rightChild.localResult);leftChild = rightChild = null; // GC assistance}}
// ... existing code ...
  • 当一个任务的所有子任务都完成时(即 pendingCount 减到0),ForkJoin 框架会调用此方法。
  • if (leftChild != null): 这判断当前任务是否是一个“父任务”(即它曾经分裂过)。叶子任务的 leftChild 为 null,不会进入此逻辑。
  • localResult = merge(...): 调用 merge 方法,将左右子任务的 localResult 合并,并将合并后的结果存入当前父任务的 localResult 中。这个过程会从叶子节点开始,逐级向上合并,直到根任务。

总结

Parallel 类实现了一种经典的分治(Divide and Conquer)并行计算模式:

  1. 分解 (Divide)compute 方法将数据源递归地分裂成小块,构建出一个任务树。
  2. 解决 (Conquer): 树的每个叶子节点独立地、并行地处理自己的数据块,生成一个局部的结果(一个包含局部状态的 Sequential 对象)。
  3. 合并 (Combine)onCompletion 和 merge 方法协同工作,将子任务的局部结果两两合并,自底向上地在任务树中传递,直到根节点产生最终的、完全合并的结果。

这种策略的适用前提是 Gatherer 和 Collector 都必须是可结合的(Associative),即 (a op b) op c 等价于 a op (b op c),这样才能保证无论分裂和合并的顺序如何,最终结果都是一致的。

http://www.xdnf.cn/news/18716.html

相关文章:

  • 机器人控制基础:串级PID控制算法的参数如何整定?
  • 【读论文】Qwen-Image技术报告解读
  • iperf2 vs iperf3:UDP 发包逻辑差异与常见问题
  • 力扣(组合)
  • 人工智能时代下普遍基本收入(UBI)试验的实践与探索——以美国硅谷试点为例
  • LeetCode Hot 100 第二天
  • Java—— 配置文件Properties
  • 【Java SE】抽象类、接口与Object类
  • 秋招面试准备
  • 设计模式详解
  • TypeScript变量声明讲解
  • 个人思考与发展
  • 快速了解命令行界面(CLI)的行编辑模式
  • docker:compose
  • 【PSINS工具箱】MATLAB例程,平面上的组合导航,观测量为位置、速度、航向角,共5维。状态量为经典的15维
  • ModbusTCP与EtherNet/IP协议转换:工控机驱动步进电机完整教程
  • 智慧矿山误报率↓83%!陌讯多模态融合算法在矿用设备监控的落地优化
  • 安装即是已注册,永久可用!
  • Sql server的行转列
  • 数据结构:顺序表
  • C# 项目“交互式展厅管理客户端“针对的是“.NETFramework,Version=v4.8”,但此计算机上没有安装它。
  • 玳瑁的嵌入式日记D24-0823(数据结构)
  • 【基础-判断】使用http模块发起网络请求时,必须要使用on(‘headersReceive’)订阅请求头,请求才会成功。
  • 游戏广告投放数据分析项目:拆解投放的“流量密码”
  • 图像边缘检测
  • qwen2.5vl(2):lora 微调训练及代码讲解
  • Android Studio下载gradle文件很慢的捷径之路
  • 个人禁食伴侣FastTrack
  • 数据库类型与应用场景全解析:从传统关系型到新兴向量数据库
  • MySQL深分页的处理方案