当前位置: 首页 > ai >正文

Java ArraysParallelSortHelpers 并行排序

Java 另外两个排序分析见:

深入浅出 Arrays.sort(DualPivotQuicksort):如何结合快排、归并、堆排序和插入排序

TimSort:论Java Arrays.sort的稳定性-CSDN博客

ArraysParallelSortHelpers

这是一个非常有意思的类,是 Java 并行排序功能的核心实现之一。首先,从类的声明来看:

/*package*/ class ArraysParallelSortHelpers {
//...
}
  • /*package*/: 这个注释表明该类是包级私有的(package-private)。这意味着它不是 Java 公开的 API,只能被 java.util 包内的其他类访问。它的主要使用者就是我们熟知的 java.util.Arrays 类,为 Arrays.parallelSort(...) 方法提供底层的并行排序实现。这种设计把复杂的实现细节隐藏起来,只对外暴露简洁的接口。
  • 它是一个“数组并行排序辅助工具”类。它本身不直接提供给用户使用,而是作为 Arrays.parallelSort 的一个内部实现细节。

该类的注释中清晰地说明了其核心算法:

Sorter classes based mainly on CilkSort... Basic algorithm: if array size is small, just use a sequential sort (via Arrays.sort) Otherwise: 1. Break array in half. 2. For each half, a. break the half in half (i.e., quarters), b. sort the quarters c. merge them together 3. merge together the two halves.

这是一种典型的 分治(Divide and Conquer) 算法,其思想主要借鉴了 CilkSort。

  1. 递归分解: 如果数组不够大(小于一个预设的粒度 gran),就没必要并行化,直接使用高效的串行排序算法(如 TimSort)来处理。
  2. 四分法: 如果数组足够大,就将其分解为四个部分(quarters)。这么做不仅仅是为了分解任务,更重要的一个原因是,通过两两合并,可以保证最终的排序结果恰好落在原始数组中,而不是辅助的工作空间数组里。
  3. 并行处理: 算法使用 Java 的 Fork/Join 框架(具体来说是 CountedCompleter)来并行地对这四个子数组进行排序。
  4. 归并: 当子数组排序完成后,再两两进行归并(Merge),最终合并成一个完全有序的数组。

内部类的结构与职责

ArraysParallelSortHelpers 内部定义了一系列静态内部类,每个类都有明确的职责。

FJObject 类

这是一个容器类,包含了对泛型 Object 数组进行排序和归并的所有逻辑。

Sorter<T> 类

这是执行排序任务的核心。

// ... existing code ...public final void compute() {CountedCompleter<?> s = this;Comparator<? super T> c = this.comparator;T[] a = this.a, w = this.w; // localize all paramsint b = this.base, n = this.size, wb = this.wbase, g = this.gran;while (n > g) {int h = n >>> 1, q = h >>> 1, u = h + q; // quartilesRelay fc = new Relay(new Merger<>(s, w, a, wb, h,wb+h, n-h, b, g, c));Relay rc = new Relay(new Merger<>(fc, a, w, b+h, q,b+u, n-u, wb+h, g, c));new Sorter<>(rc, a, w, b+u, n-u, wb+u, g, c).fork();new Sorter<>(rc, a, w, b+h, q, wb+h, g, c).fork();Relay bc = new Relay(new Merger<>(fc, a, w, b, q,b+q, h-q, wb, g, c));new Sorter<>(bc, a, w, b+q, h-q, wb+q, g, c).fork();s = new EmptyCompleter(bc);n = q;}TimSort.sort(a, b, b + n, c, w, wb, n);s.tryComplete();}
// ... existing code ...
  • compute() 方法: 这是任务的执行体。
  • while (n > g): 这是一个循环,但它模拟了递归的尾调用优化。只要当前任务处理的数组大小 n 还大于设定的粒度 g,它就会继续分解。
  • fork(): 它创建了3个新的 Sorter 子任务(分别处理第2、3、4个四分之一数组),并通过 fork() 方法将它们提交到 Fork/Join 线程池中异步执行。
  • Merger 和 Relay: 它同时创建了多个 Merger(归并)任务,并用 Relay(中继)任务将它们串联起来,形成一个依赖关系:当两个子排序和它们的归并完成后,再触发上一层的归并。
  • TimSort.sort(...): 当数组块大小 n 不再大于粒度 g 时,循环结束,直接调用 TimSort 对这个小数组块进行串行排序。TimSort 是一种非常高效且稳定的排序算法。

Merger<T> 类

这是执行归并任务的核心,负责将两个已排序的子数组合并成一个有序的大数组。

// ... existing code ...public final void compute() {
// ...for (int lh, rh;;) {  // split larger, find point in smallerif (ln >= rn) {if (ln <= g)break;rh = rn;T split = a[(lh = ln >>> 1) + lb];for (int lo = 0; lo < rh; ) {
// ... (binary search)}}
// ...Merger<T> m = new Merger<>(this, a, w, lb + lh, ln - lh,rb + rh, rn - rh,k + lh + rh, g, c);rn = rh;ln = lh;addToPendingCount(1);m.fork();}// ... sequential merge logic ...tryComplete();}
// ... existing code ...
  • 并行归并: 如果待归并的两个子数组(左ln和右rn)很大,它也会尝试并行化。策略是:将较大的那个子数组一分为二,然后用二分查找较小的子数组中找到一个合适的分割点。这样就可以把一次大的归并任务分解成两个可以并行执行的小的归并任务。
  • 串行归并: 当子数组小到一定程度(小于等于粒度 g),就执行经典的串行归并算法,依次比较两个数组的元素,将较小的放入工作空间数组中。

Relay 和 EmptyCompleter 类

这两个是轻量级的 CountedCompleter,它们是实现复杂任务依赖关系的关键。

  • Relay (中继): 它的 onCompletion 方法会触发另一个任务的 compute 方法。它像一个“扳机”,用于在一个或多个前置任务完成后,启动后续任务。
  • EmptyCompleter (空任务): 它的 compute 方法是空的,它本身不做任何事,主要用作任务图中的一个汇合点或占位符,以简化依赖逻辑。
/*** A placeholder task for Sorters, used for the lowest* quartile task, that does not need to maintain array state.*/static final class EmptyCompleter extends CountedCompleter<Void> {@java.io.Serialstatic final long serialVersionUID = 2446542900576103244L;EmptyCompleter(CountedCompleter<?> p) { super(p); }public final void compute() { }}/*** A trigger for secondary merge of two merges*/static final class Relay extends CountedCompleter<Void> {@java.io.Serialstatic final long serialVersionUID = 2446542900576103244L;final CountedCompleter<?> task;Relay(CountedCompleter<?> task) {super(null, 1);this.task = task;}public final void compute() { }public final void onCompletion(CountedCompleter<?> t) {task.compute();}}

Merger

Merger<T> 是并行排序算法中“归并”这一步骤的核心实现。当 Sorter 任务将大数组分解成多个小块并分别排好序之后,就需要 Merger 任务来将这些有序的小块两两合并,最终形成一个完全有序的大数组。

// ... existing code ...static final class Merger<T> extends CountedCompleter<Void> {@java.io.Serialstatic final long serialVersionUID = 2446542900576103244L;// main and workspace arrays@SuppressWarnings("serial") // Not statically typed as Serializablefinal T[] a;@SuppressWarnings("serial") // Not statically typed as Serializablefinal T[] w;final int lbase, lsize, rbase, rsize, wbase, gran;@SuppressWarnings("serial") // Not statically typed as SerializableComparator<? super T> comparator;
// ... existing code ...
  • extends CountedCompleter<Void>:  CountedCompleter 是一种特殊的 ForkJoinTask,它维护一个“待完成”计数器。当一个任务及其所有子任务都完成后,它可以自动触发其父任务的完成,非常适合用于构建复杂的任务依赖图,这正是并行归并排序所需要的。

Merger 类的字段定义了它需要操作的所有数据:

  • final T[] a;源数组。这个数组包含了两个已经排好序的、需要被合并的子数组。
  • final T[] w;工作空间数组(或称目标数组)。合并后的结果将被写入这个数组。
  • final int lbase, lsize: 左边子数组在源数组 a 中的起始位置(base)和长度(size)。
  • final int rbase, rsize: 右边子数组在源数组 a 中的起始位置和长度。
  • final int wbase: 合并结果在工作空间数组 w 中的起始写入位置。
  • final int gran粒度(Granularity)。这是一个阈值,当要处理的数组块大小小于或等于这个值时,就不再进行并行分解,而是直接使用串行算法。这是为了避免任务分解的开销超过并行带来的收益。
  • Comparator<? super T> comparator: 用于比较数组中两个元素大小的比较器。

compute()

compute() 方法是 ForkJoinTask 的执行入口,Merger 的所有魔法都在这里发生。它可以分为两个主要部分:并行分解串行归并

// ... existing code ...public final void compute() {Comparator<? super T> c = this.comparator;T[] a = this.a, w = this.w; // localize all paramsint lb = this.lbase, ln = this.lsize, rb = this.rbase,rn = this.rsize, k = this.wbase, g = this.gran;if (a == null || w == null || lb < 0 || rb < 0 || k < 0 ||c == null)throw new IllegalStateException(); // hoist checks// --- 1. 并行分解部分 ---for (int lh, rh;;) {  // split larger, find point in smallerif (ln >= rn) { // 如果左边数组更大或相等if (ln <= g)break; // 小于粒度,退出分解rh = rn;T split = a[(lh = ln >>> 1) + lb]; // 取左边数组的中间元素作为分割点// 在右边数组中二分查找分割点的位置for (int lo = 0; lo < rh; ) {int rm = (lo + rh) >>> 1;if (c.compare(split, a[rm + rb]) <= 0)rh = rm;elselo = rm + 1;}}else { // 如果右边数组更大if (rn <= g)break; // 小于粒度,退出分解lh = ln;T split = a[(rh = rn >>> 1) + rb]; // 取右边数组的中间元素作为分割点// 在左边数组中二分查找分割点的位置for (int lo = 0; lo < lh; ) {int lm = (lo + lh) >>> 1;if (c.compare(split, a[lm + lb]) <= 0)lh = lm;elselo = lm + 1;}}// 创建新任务处理后半部分,并提交到线程池Merger<T> m = new Merger<>(this, a, w, lb + lh, ln - lh,rb + rh, rn - rh,k + lh + rh, g, c);rn = rh;ln = lh;addToPendingCount(1); // 增加一个待完成的子任务计数m.fork(); // 异步执行子任务}// --- 2. 串行归并部分 ---int lf = lb + ln, rf = rb + rn; // index boundswhile (lb < lf && rb < rf) {T t, al, ar;if (c.compare((al = a[lb]), (ar = a[rb])) <= 0) { // 保证排序稳定性lb++; t = al;}else {rb++; t = ar;}w[k++] = t;}// 复制剩余的元素if (rb < rf)System.arraycopy(a, rb, w, k, rf - rb);else if (lb < lf)System.arraycopy(a, lb, w, k, lf - lb);tryComplete(); // 尝试完成当前任务}
// ... existing code ...
并行分解逻辑
  • 目标: 如果要合并的两个子数组都很大,那么合并操作本身也可以并行化。
  • 策略:
    1. 找出两个子数组中较大的一个。
    2. 取这个较大数组的中间元素作为 split 值。
    3. 在那个较小的数组中使用二分查找,找到 split 值应该插入的位置。
    4. 这样,两个子数组都被这个 split 值有效地分成了两部分:小于 split 的部分和大于等于 split 的部分。
    5. 创建一个新的 Merger 任务 m 去合并两个“大于等于”的部分,并通过 m.fork() 把它交给 Fork/Join 线程池去并行执行。
    6. 当前任务则收缩自己的处理范围,去处理两个“小于”的部分,然后循环这个过程。
  • addToPendingCount(1): 这是 CountedCompleter 的关键。每次 fork 一个子任务,就调用这个方法告诉框架:“我有一个子任务正在执行,请在我这个子任务完成之后再认为我完成了”。
串行归并逻辑
  • 当数组块被分解到足够小(小于等于 gran)时,并行分解的循环就会退出。
  • 接下来就是经典的双指针(two-finger)归并算法
    1. 两个指针分别指向左右两个子数组的开头。
    2. 比较两个指针指向的元素,将较小的那个复制到工作空间数组 w 中,并将对应的指针后移。
    3. c.compare(...) <= 0 这个条件非常重要,当元素相等时,优先取左边数组的元素,这保证了整个并行排序算法的稳定性(Stable Sort)
    4. 循环结束后,某一个子数组可能还有剩余元素,使用 System.arraycopy 将所有剩余元素一次性复制到 w 的末尾。
任务完成
  • tryComplete(): 在串行归并完成后,当前任务调用 tryComplete()
    • 如果它之前 fork 出的子任务还没有完成,tryComplete() 不会做任何事,当前线程可能会去帮助其他任务(work-stealing)。
    • 当所有子任务都完成了(它们会调用自己的 tryComplete() 并逐级通知父任务),当前任务的待完成计数会减到0,此时 tryComplete() 就会成功,并接着去通知它的父任务。

总结

Merger<T> 是一个高度优化的、可并行的归并任务。它巧妙地利用了 Fork/Join 框架和 CountedCompleter 的特性,实现了递归式的任务分解。当问题规模较大时,它通过“取大数组中点、在小数组中二分”的策略来创建并行的子任务;当问题规模较小时,它会切换到高效的串行归并算法。这种混合模式确保了 Arrays.parallelSort 在处理大规模数据时的卓越性能。

Sorter

Sorter<T> 是整个并行排序算法的“分治”策略中的“分”和“治”的执行者。它负责将一个大的排序任务递归地分解为更小的排序任务,直到任务规模小到可以直接使用串行排序算法为止。

// ... existing code .../** Object + Comparator support class */static final class FJObject {static final class Sorter<T> extends CountedCompleter<Void> {
// ... existing code ...
  • extends CountedCompleter<Void>CountedCompleter 是一种特殊的 ForkJoinTask,它能自动处理复杂的任务依赖关系。当一个任务所依赖的所有子任务都完成后,它会自动触发父任务的完成流程,这对于构建归并排序中“先排序、后合并”的依赖链条至关重要。

Sorter 的字段定义了它执行任务所需的所有上下文信息:

  • final T[] a;主数组。在排序的初始阶段,这是待排序的原始数组。在递归的中间步骤中,它和 w 的角色可能会互换。
  • final T[] w;工作空间数组 (workspace)。这是归并操作时用于存放临时结果的辅助数组。
  • final int base, size: 定义了当前 Sorter 任务需要处理的数组切片在 a 数组中的起始位置和长度。
  • final int wbase: 对应的工作空间切片的起始位置。
  • final int gran粒度 (granularity)。这是一个非常重要的阈值。当 size 小于或等于 gran 时,并行分解停止,转而使用更高效的串行排序。
  • Comparator<? super T> comparator: 元素比较器。

compute() 方法:核心算法实现

compute() 方法是 Sorter 任务的执行体,它完美地诠释了基于 CilkSort 的分治思想。

// ... existing code ...public final void compute() {CountedCompleter<?> s = this;Comparator<? super T> c = this.comparator;T[] a = this.a, w = this.w; // localize all paramsint b = this.base, n = this.size, wb = this.wbase, g = this.gran;while (n > g) {int h = n >>> 1, q = h >>> 1, u = h + q; // quartilesRelay fc = new Relay(new Merger<>(s, w, a, wb, h,wb+h, n-h, b, g, c));Relay rc = new Relay(new Merger<>(fc, a, w, b+h, q,b+u, n-u, wb+h, g, c));new Sorter<>(rc, a, w, b+u, n-u, wb+u, g, c).fork();new Sorter<>(rc, a, w, b+h, q, wb+h, g, c).fork();Relay bc = new Relay(new Merger<>(fc, a, w, b, q,b+q, h-q, wb, g, c));new Sorter<>(bc, a, w, b+q, h-q, wb+q, g, c).fork();s = new EmptyCompleter(bc);n = q;}TimSort.sort(a, b, b + n, c, w, wb, n);s.tryComplete();}
// ... existing code ...
递归分解 (The while loop)
  • 代码使用 while (n > g) 循环来模拟递归。这是一种尾递归优化的形式,避免了因递归过深导致的 StackOverflowError
  • 只要当前任务处理的数组大小 n 还大于粒度 g,循环就会继续,将当前任务进一步分解。
四分法 (Quartiles)
  • int h = n >>> 1, q = h >>> 1, u = h + q; 这一行将当前数组段在逻辑上分为四个四分之一(quarter)的部分。
    • hn / 2 (一半)
    • qn / 4 (四分之一)
    • un * 3 / 4 (四分之三)
  • 为什么要分成四份而不是两份?注释中提到,这样做可以保证经过两轮归并后,最终的排序结果恰好落在原始数组中,而不是工作空间数组里,避免了最后一次从工作空间到原始数组的拷贝。

传统的归并排序通常是这样工作的:

分解: 将 Main 数组递归地分成两半 (H1, H2),直到子数组足够小。

排序: 对子数组进行排序。

归并: 将两个已排序的子数组(例如,都在 Main 数组里)进行归并。
归并的结果必须写入到一个不同的地方,也就是 Workspace 数组。
当 H1 和 H2 合并完成后,整个排好序的结果就位于 Workspace 数组中。


最后一步 (性能瓶颈):

此时,Main 数组里还是旧的、未排序的数据。
为了让调用者得到排好序的 Main 数组,我们必须执行一次额外的、完整的数组复制:System.arraycopy(Workspace, 0, Main, 0, Main.length)。

任务创建与依赖编排

这是最精妙的部分,它创建了一系列 Sorter 和 Merger 任务,并使用 Relay 和 EmptyCompleter 将它们像电路一样连接起来,形成一个精确的依赖图:

  1. 创建排序任务:

    • new Sorter<>(...).fork(); 创建了3个新的 Sorter 任务,分别对应第2、3、4个四分之一的数组段。fork() 将它们提交到 Fork/Join 线程池中异步执行。
    • 第1个四分之一的数组段由当前线程继续在 while 循环中处理(通过 n = q; 缩小范围)。
  2. 创建合并任务与依赖关系:

    • Merger 任务负责合并两个已排序的子数组。
    • Relay 任务是一个“触发器”。它的 onCompletion 方法会调用其内部 task 的 compute 方法。
    • 让我们来梳理这个依赖链:
      • Sorter(Q3) 和 Sorter(Q4) 完成后 -> 触发 rc (Relay) -> 触发 Merger(Q3, Q4)
      • Sorter(Q1) 和 Sorter(Q2) 完成后 -> 触发 bc (Relay) -> 触发 Merger(Q1, Q2)
      • Merger(Q1, Q2) 和 Merger(Q3, Q4) 完成后 -> 触发 fc (Relay) -> 触发最终的 Merger( (Q1,Q2), (Q3,Q4) )
      • 最终的 Merger 完成后,整个 Sorter 任务(s)才算完成。
排序基准情况 (Base Case)
  • 当 while 循环结束时(即 n <= g),意味着数组块已经足够小,无需再进行并行分解。
  • 此时,TimSort.sort(a, b, b + n, c, w, wb, n); 被调用。
  • TimSort 是 Java 中 Arrays.sort() 使用的高效、稳定的串行排序算法。这里使用的是一个内部版本,它可以接收一个预先分配好的工作空间数组 w,从而避免了在排序过程中重复分配内存。

总结

Sorter<T> 是一个高度优化的、实现了分治思想的并行排序任务。它通过四分法递归地分解问题,利用 CountedCompleter 的特性构建了一个复杂的、无锁的任务依赖图,以确保排序和归并能以正确的顺序执行。当问题规模缩小到一定程度时,它会无缝切换到高效的串行排序算法 TimSort,从而在各种规模的数组上都能表现出卓越的性能。

http://www.xdnf.cn/news/18018.html

相关文章:

  • PyTorch 面试题及详细答案120题(01-05)-- 基础概念与安装
  • 深度学习-计算机视觉-数据增广/图像增广
  • AMBA-AXI and ACE协议详解(三)
  • TDengine IDMP 运维指南(1. 部署规划)
  • 基于飞算JavaAI的可视化数据分析集成系统项目实践:从需求到落地的全流程解析
  • 学习游戏制作记录(玩家掉落系统,删除物品功能和独特物品)8.17
  • Vue深入组件:Props 详解2
  • LINUX学习笔记
  • [RCTF2015]EasySQL
  • 11.苹果ios逆向-FridaHook-ios中的算法-CC_SHA1(sha1算法)
  • maxwell安装部署
  • 裸机框架:按键模组
  • PCA 实现多向量压缩:首个主成分的深层意义
  • 网络通信的基本概念与设备
  • 链路聚合与软件网桥
  • Android面试指南(二)
  • 记SpringBoot3.x + Thymeleaf 项目实现(MVC架构模式)
  • 校园综合数据分析可视化大屏 -Vue纯前端静态页面项目
  • Ugit使用记录
  • Git 入门指南:核心概念与常用命令全解析
  • Docker-14.项目部署-DockerCompose
  • 【Jenkins】02 - 自动化部署配置
  • 【Linux系列】如何在 Linux 服务器上快速获取公网
  • PAT 1068 Find More Coins
  • 补充:用信号量实现前驱关系
  • 【架构师干货】数据库管理系统
  • JavaWeb前端(HTML,CSS具体案例)
  • 火狐(Mozilla Firefox)浏览器离线安装包下载
  • 【JavaEE】多线程 -- 单例模式
  • 2025:AI狂飙下的焦虑与追问