当前位置: 首页 > ds >正文

Java Stream sort算子实现:SortedOps

SortedOps 是 Java Stream API 内部一个至关重要的 final 类,它的核心职责是为 Stream 流水线提供排序功能。当在代码中调用 stream.sorted() 时,背后就是由 SortedOps 类来驱动和实现具体的排序逻辑。

这个类被设计为 final 并且构造函数是 private 的,这意味着它是一个纯粹的工具类,不能被实例化或继承,只能通过其静态方法来使用。

下面我们分几个部分来详细解析它。

静态工厂方法 (Entry Points)

SortedOps 提供了一系列静态的 make 方法,作为创建排序操作的入口。这些方法根据不同的 Stream 类型(引用类型、intlongdouble)返回一个实现了排序逻辑的 Stream 操作实例。

// ... existing code .../*** Appends a "sorted" operation to the provided stream.** @param <T> the type of both input and output elements* @param upstream a reference stream with element type T*/static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {return new OfRef<>(upstream);}/*** Appends a "sorted" operation to the provided stream.** @param <T> the type of both input and output elements* @param upstream a reference stream with element type T* @param comparator the comparator to order elements by*/static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,Comparator<? super T> comparator) {return new OfRef<>(upstream, comparator);}/*** Appends a "sorted" operation to the provided stream.** @param <T> the type of both input and output elements* @param upstream a reference stream with element type T*/static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) {return new OfInt(upstream);}/*** Appends a "sorted" operation to the provided stream.** @param <T> the type of both input and output elements* @param upstream a reference stream with element type T*/static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {return new OfLong(upstream);}/*** Appends a "sorted" operation to the provided stream.** @param <T> the type of both input and output elements* @param upstream a reference stream with element type T*/static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {return new OfDouble(upstream);}
// ... existing code ...
  • makeRef: 用于对象流 (Stream<T>)。它有两个重载版本:一个用于自然排序(要求元素实现 Comparable 接口),另一个接受一个 Comparator 用于自定义排序。
  • makeIntmakeLongmakeDouble: 分别用于原始类型流 IntStreamLongStreamDoubleStream。它们总是使用对应原始类型的自然顺序进行排序。

这些 make 方法内部会 new 一个对应的内部类实例(如 OfRefOfInt 等),这些内部类才是排序操作的核心。

StatefulOp 内部类

SortedOps 包含四个核心的静态内部类:OfRefOfIntOfLongOfDouble。它们都继承自 ...Pipeline.StatefulOp,这表明 sorted() 是一个有状态的中间操作 (Stateful Intermediate Operation)

“有状态”意味着该操作需要处理完上游(upstream)的所有元素后,才能向下游(downstream)传递第一个元素。对于排序来说,这是显而易见的——不看到所有元素,就无法确定哪个元素是最小的。这也意味着排序操作需要一个缓冲区来存储所有流元素,可能会消耗大量内存。

我们以 OfRef 为例来分析其关键方法:

// ... existing code ...private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {/*** Comparator used for sorting*/private final boolean isNaturalSort;private final Comparator<? super T> comparator;// ... existing code ...@Overridepublic Sink<T> opWrapSink(int flags, Sink<T> sink) {Objects.requireNonNull(sink);// If the input is already naturally sorted and this operation// also naturally sorted then this is a no-opif (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)return sink;else if (StreamOpFlag.SIZED.isKnown(flags))return new SizedRefSortingSink<>(sink, comparator);elsereturn new RefSortingSink<>(sink, comparator);}@Overridepublic <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,Spliterator<P_IN> spliterator,IntFunction<T[]> generator) {// If the input is already naturally sorted and this operation// naturally sorts then collect the outputif (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {return helper.evaluate(spliterator, false, generator);}else {// @@@ Weak two-pass parallel implementation; parallel collect, parallel sortT[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);Arrays.parallelSort(flattenedData, comparator);return Nodes.node(flattenedData);}}}
// ... existing code ...
  • opWrapSink(...): 这个方法用于串行流 (Sequential Stream) 的处理。它负责包装下游的 Sink(可以理解为元素的消费者)。

    1. 优化: 它首先检查上游流是否已经通过 StreamOpFlag.SORTED 标记为“已排序”。如果上游已按自然顺序排好,并且当前操作也是自然排序,那么这个排序操作就什么都不用做(no-op),直接返回下游的 sink。这是一个非常重要的性能优化,避免了对已排序的流进行重复排序。
    2. Sized vs. Unsized: 接着,它检查流是否通过 StreamOpFlag.SIZED 标记为“大小已知”。
      • 如果大小已知,它会创建一个 SizedRefSortingSink,这个 Sink 会预先分配一个足够大的数组来存放所有元素,效率更高。
      • 如果大小未知,它会创建一个 RefSortingSink,这个 Sink 内部使用 ArrayList 这种可动态调整大小的结构来存储元素。
  • opEvaluateParallel(...): 这个方法用于并行流 (Parallel Stream) 的处理。

    1. 优化: 同样,它也会检查流是否已经排序,如果是,则跳过排序步骤。
    2. 并行实现: 如果需要排序,这里的实现被注释为 "Weak two-pass parallel implementation"(弱两阶段并行实现)。
      • 阶段一 (Collect): 并行地将流中的所有元素收集到一个数组中。
      • 阶段二 (Sort): 调用 Arrays.parallelSort() 对这个大数组进行并行排序。
    3. 最后,将排序好的数组包装成一个 Node 对象返回。这种实现方式简单,但缺点是必须在内存中缓冲所有数据,对于巨大的流,内存开销会非常大。

OfIntOfLongOfDouble 的实现与 OfRef 非常相似,只是它们针对原始类型做了特化,使用原始类型数组(如 int[])和 SpinedBuffer(一种为 Stream 设计的高效可增长块状缓冲区)来优化性能和内存,避免了自动装箱/拆箱的开销。

Sink 实现:排序的执行者

Sink 是实际执行操作的组件。SortedOps 定义了多种 Sink 实现,用于在 end() 方法被调用时执行排序。

我们来看 SizedRefSortingSink 和 RefSortingSink 的核心逻辑:

// ... existing code ...private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {private T[] array;private int offset;
// ... existing code ...@Override@SuppressWarnings("unchecked")public void begin(long size) {if (size >= Nodes.MAX_ARRAY_SIZE)throw new IllegalArgumentException(Nodes.BAD_SIZE);array = (T[]) new Object[(int) size]; // 预分配数组}@Overridepublic void end() {Arrays.sort(array, 0, offset, comparator); // 排序downstream.begin(offset);if (!cancellationRequestedCalled) {for (int i = 0; i < offset; i++)downstream.accept(array[i]); // 推送给下游}else {for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)downstream.accept(array[i]); // 支持短路}downstream.end();array = null; // 释放内存}@Overridepublic void accept(T t) {array[offset++] = t; // 接受并存入数组}}private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {private ArrayList<T> list;
// ... existing code ...@Overridepublic void begin(long size) {if (size >= Nodes.MAX_ARRAY_SIZE)throw new IllegalArgumentException(Nodes.BAD_SIZE);list = (size >= 0) ? new ArrayList<>((int) size) : new ArrayList<>(); // 初始化 ArrayList}@Overridepublic void end() {list.sort(comparator); // 排序downstream.begin(list.size());if (!cancellationRequestedCalled) {list.forEach(downstream::accept); // 推送给下游}
// ... existing code ...downstream.end();list = null; // 释放内存}@Overridepublic void accept(T t) {list.add(t); // 接受并存入列表}}
// ... existing code ...

这两个 Sink 的工作流程遵循一个清晰的模式:

  1. begin(size): 在处理开始时被调用,用于初始化内部的存储结构(固定大小的数组或 ArrayList)。
  2. accept(t): 每当从上游接收到一个元素时被调用,它将元素存入缓冲区。
  3. end(): 当所有元素都接收完毕后被调用。这是排序发生的时刻。它调用 Arrays.sort() 或 list.sort() 对缓冲区内的所有元素进行排序。
  4. 排序完成后,它会遍历排好序的缓冲区,并将每个元素通过 downstream.accept(t) 推送给流水线中的下一个操作。
  5. 短路 (Short-circuiting)sorted 操作本身不能短路,但它会配合下游的短路操作(如 limit(n))。cancellationRequestedCalled 标志位记录了下游是否请求提前终止。如果是,那么在 end() 方法中推送数据时,会不断检查 downstream.cancellationRequested(),以便尽快停止。
  6. 最后,清空内部缓冲区(array = null 或 list = null),以便垃圾回收器可以回收这部分内存。

总结

SortedOps 是 Java Stream sorted() 操作的幕后功臣。它通过一套精心设计的内部类和 Sink 实现,高效地完成了对不同类型、不同场景(串行/并行、大小已知/未知)的流的排序任务。

核心要点:

  • 有状态操作sorted() 是有状态的,需要缓冲所有元素,可能导致高内存占用。
  • 优化: 对已排序的流进行再次排序是几乎没有成本的。
  • 并行实现: 并行排序通过“先收集再并行排序”的两阶段方式实现,简单但内存开销大。
  • Sink 模式: 使用 beginacceptend 的 Sink 模式来处理元素流,将数据收集和处理逻辑清晰地分离开。
  • 原始类型特化: 为 intlongdouble 提供了专门的实现,以避免装箱开销,提升性能。
http://www.xdnf.cn/news/18129.html

相关文章:

  • 《设计模式》装饰模式
  • AI可行性分析:数据×算法×反馈=成功
  • 基于GIS的无人机模拟飞行控制系统设计与实现
  • K8S的ingress
  • 模式组合应用-桥接模式(一)
  • VS Code配置MinGW64编译GLPK(GNU Linear Programming Kit)开源库
  • 一键检测接口是否存活:用 Python/Shell 写个轻量级监控脚本
  • 《MySQL 数据库备份与视图创建全流程:从数据迁移到高效查询实战》
  • 【AI论文】NextStep-1:迈向大规模连续令牌自回归图像生成
  • 2020/12 JLPT听力原文 问题二 2番
  • HackMyVM-Uvalde
  • 高等数学 8.4 空间直线及其方程
  • macOS 中查看当前生效 shell 及配置文件的方法
  • 微服务的编程测评系统12-xxl-job-历史竞赛-竞赛报名
  • 腾讯混元大模型:实现3D打印产品生成的自动化平台
  • python---异常处理
  • 微软Wasm学习-创建一个最简单的c#WebAssembly测试工程
  • ISIS区域内、区域间计算
  • 机器学习——CountVectorizer将文本集合转换为 基于词频的特征矩阵
  • Boost搜索引擎项目(详细思路版)
  • 【3D重建技术】如何基于遥感图像和DEM等数据进行城市级高精度三维重建?
  • 扫地机器人(2025蓝桥杯省A组 H题)
  • AI重构文化基因:从“工具革命”到“生态觉醒”的裂变之路
  • 线性代数之两个宇宙文明关于距离的对话
  • 完整的VOC格式数据增强脚本
  • 狗品种识别数据集:1k+图像,6个类别,yolo标注完整
  • .net印刷线路板进销存PCB材料ERP财务软件库存贸易生产企业管理系统
  • 曲面/线 拟合gnuplot
  • 第四章:大模型(LLM)】06.langchain原理-(5)LangChain Prompt 用法
  • 第七十五章:AI的“思维操控师”:Prompt变动对潜在空间(Latent Space)的影响可视化——看懂AI的“微言大义”!