当前位置: 首页 > news >正文

Java Fork/Join框架:三大核心组件深度解析

ForkJoinTask、ForkJoinWorkerThread 和 ForkJoinPool 构成了 Java 中 Fork/Join 框架的三个核心组件,它们之间形成了紧密的协作关系,共同提供了高效的并行计算能力。

三者关系概述

  1. ForkJoinPool:执行环境,管理工作线程和任务调度
  2. ForkJoinWorkerThread:执行单元,执行具体任务的工作线程
  3. ForkJoinTask:计算单元,表示可以拆分和合并的任务

ForkJoinPool

核心能力

  • 管理线程池和工作队列
  • 实现工作窃取算法
  • 提供任务调度和执行机制
  • 处理线程阻塞和补偿

关键源码解析

// 提交任务
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {return poolSubmit(true, Objects.requireNonNull(task));
}// 内部提交实现
private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;// 判断当前线程是否为工作线程if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&(wt = (ForkJoinWorkerThread)t).pool == this) {internal = true;q = wt.workQueue;}else {internal = false;// 为外部提交找到或创建一个队列q = submissionQueue(ThreadLocalRandom.getProbe(), true);}q.push(task, signalIfEmpty ? this : null, internal);return task;
}

ForkJoinWorkerThread

核心能力

  • 绑定到特定的 ForkJoinPool
  • 维护自己的工作队列
  • 执行分配给它的任务
  • 支持任务窃取

关键属性

public class ForkJoinWorkerThread extends Thread {final ForkJoinPool pool;           // 所属的线程池final ForkJoinPool.WorkQueue workQueue; // 工作队列
}

工作线程执行循环

final void runWorker(WorkQueue w) {if (w != null) {int phase = w.phase, r = w.stackPred;int fifo = w.config & FIFO, nsteals = 0, src = -1;for (;;) {// 1. 检查是否应该终止if ((runState & STOP) != 0L || (qs = queues) == null)break;// 2. 随机扫描队列寻找任务scan: for (int l = n; l > 0; --l, i += step) {  // 扫描队列// 尝试窃取任务if (找到任务) {// 3. 记录窃取状态w.nsteals = ++nsteals;w.source = src = j;// 4. 执行任务w.topLevelExec(t, fifo);}}// 5. 如果没找到任务,尝试进入休眠状态if (!rescan) {if (((phase = deactivate(w, phase)) & IDLE) != 0)break;}}}
}

ForkJoinTask

核心能力

  • 表示可并行执行的任务
  • 支持任务分解(fork)和结果聚合(join)
  • 提供轻量级的执行框架
  • 支持任务状态管理

关键源码解析

// fork 操作 - 将任务提交到池中异步执行
public final ForkJoinTask<V> fork() {Thread t;ForkJoinWorkerThread wt;ForkJoinPool p;ForkJoinPool.WorkQueue q;boolean internal;// 判断当前线程是否为工作线程if (internal = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {q = (wt = (ForkJoinWorkerThread)t).workQueue;p = wt.pool;}elseq = (p = ForkJoinPool.common).externalSubmissionQueue(false);// 将任务推入队列q.push(this, p, internal);return this;
}// join 操作 - 等待任务完成并获取结果
public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s);return getRawResult();
}

关键流程解析

1. 任务提交和执行流程

  1. 任务提交

    // 用户代码
    ForkJoinPool pool = new ForkJoinPool();
    pool.submit(myTask);
    
  2. 任务放入队列

    • 如果是外部提交,任务被放入提交队列
    • 如果是内部提交(fork),任务被放入当前工作线程的队列
  3. 工作线程获取任务

    • 优先从自己的队列获取任务
    • 如果自己队列为空,随机窃取其他线程队列的任务
  4. 任务执行

    final void topLevelExec(ForkJoinTask<?> task, int fifo) {while (task != null) {task.doExec(); // 执行任务// 继续处理本地队列中的任务task = nextLocalTask(fifo);}
    }
    

2. 工作窃取算法

// 在 helpJoin 中的窃取逻辑
scan: for (int l = n; l > 0; --l, r += step) {// 随机选择队列尝试窃取任务if ((q = qs[j = r & SMASK & (n - 1)]) != null) {// 从队列尾部(base端)窃取任务if (窃取成功) {w.source = src = j;  // 记录窃取源t.doExec();          // 执行窃取的任务}}
}

3. join 操作的优化

// helpJoin 帮助执行被阻塞等待的任务
final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean internal) {// 1. 尝试从队列中删除并执行任务if (w != null)w.tryRemoveAndExec(task, internal);// 2. 如果任务仍未完成,尝试执行任务链上的其他任务if (task != null && task.status >= 0 && internal && w != null) {// 3. 沿着偷取链寻找可能有助于完成目标任务的任务outer: for (boolean rescan = true;;) {// 扫描队列寻找相关任务// ...// 4. 如果找到符合条件的任务,执行它if (找到任务) {w.source = j;    // 记录来源t.doExec();      // 执行任务w.source = wsrc; // 恢复原来的来源}}}return status;
}

总结

  1. ForkJoinPool 维护整个执行环境:

    • 管理工作线程集合
    • 处理任务队列和调度
    • 实现工作窃取调度算法
    • 处理线程活动状态变化
  2. ForkJoinWorkerThread 执行具体工作:

    • 绑定到特定的 ForkJoinPool
    • 维护自己的工作队列
    • 执行和窃取任务
    • 在没有任务时进入等待状态
  3. ForkJoinTask 提供计算逻辑:

    • 实现可分解的并行任务
    • 支持 fork() 分解和 join() 合并操作
    • 维护任务状态和结果
    • 在适当情况下帮助执行其他任务

这种设计允许开发者专注于任务分解逻辑,而将线程管理和调度交给框架处理,实现了高效的并行计算模型。

ForkJoinWorkerThread 类深入分析

ForkJoinWorkerThread 是 Fork/Join 框架中负责执行任务的线程类,它继承自 Thread,专门用于在 ForkJoinPool 中执行 ForkJoinTask。下面对其关键方法和结构进行深入分析。

构造方法

ForkJoinWorkerThread(ThreadGroup group, ForkJoinPool pool,boolean useSystemClassLoader, boolean clearThreadLocals) {super(group, null, pool.nextWorkerThreadName(), 0L, !clearThreadLocals);UncaughtExceptionHandler handler = (this.pool = pool).ueh;this.workQueue = new ForkJoinPool.WorkQueue(this, 0, (int)pool.config,clearThreadLocals);super.setDaemon(true);if (handler != null)super.setUncaughtExceptionHandler(handler);if (useSystemClassLoader && !clearThreadLocals)super.setContextClassLoader(ClassLoader.getSystemClassLoader());
}

构造过程:

  1. 调用父类 Thread 构造函数,设置线程组和名称
  2. 保存对 ForkJoinPool 的引用
  3. 创建自己的 WorkQueue 实例
  4. 设置为守护线程
  5. 设置异常处理器和类加载器

protected ForkJoinWorkerThread(ThreadGroup group, ForkJoinPool pool,boolean preserveThreadLocals) {this(group, pool, false, !preserveThreadLocals);
}protected ForkJoinWorkerThread(ForkJoinPool pool) {this(null, pool, false, false);
}

提供给子类使用的构造方法,简化了参数设置。

核心方法: run()

public void run() {Throwable exception = null;ForkJoinPool p = pool;ForkJoinPool.WorkQueue w = workQueue;if (p != null && w != null) {   // 避免初始化失败try {p.registerWorker(w);    // 1. 注册工作线程onStart();              // 2. 调用初始化钩子p.runWorker(w);         // 3. 执行主循环} catch (Throwable ex) {exception = ex;} finally {try {onTermination(exception);  // 4. 调用终止钩子} catch (Throwable ex) {if (exception == null)exception = ex;} finally {p.deregisterWorker(this, exception);  // 5. 取消注册}}}
}

run() 方法是线程的执行入口,流程包括:

  1. 注册工作线程: 调用 pool.registerWorker(w) 将自己注册到池中
  2. 初始化: 调用 onStart() 钩子方法
  3. 执行主循环: 调用 pool.runWorker(w) 进入主工作循环
  4. 清理: 调用 onTermination(exception) 钩子方法
  5. 取消注册: 调用 pool.deregisterWorker(this, exception) 从池中移除

其中最关键的是第3步 p.runWorker(w) - 这是工作线程的主循环,负责执行和窃取任务。

生命周期钩子方法

protected void onStart() {// 默认为空,子类可重写
}protected void onTermination(Throwable exception) {// 默认为空,子类可重写
}

这两个钩子方法允许子类在线程生命周期的关键点执行自定义逻辑:

  • onStart(): 线程启动后,执行任务前
  • onTermination(): 线程终止前,可获取异常信息

ThreadLocal 处理

final void resetThreadLocals() {if (U.getReference(this, THREADLOCALS) != null)U.putReference(this, THREADLOCALS, null);if (U.getReference(this, INHERITABLETHREADLOCALS) != null)U.putReference(this, INHERITABLETHREADLOCALS, null);onThreadLocalReset();
}void onThreadLocalReset() {// 默认为空,子类可重写
}

这些方法用于清理 ThreadLocal 变量:

  1. resetThreadLocals(): 使用 Unsafe 直接清除 ThreadLocal 引用
  2. onThreadLocalReset(): 钩子方法,子类可在 ThreadLocal 清理后执行操作

工作队列管理方法

public int getPoolIndex() {return workQueue.getPoolIndex();
}public int getQueuedTaskCount() {return workQueue.queueSize();
}

这些方法提供队列状态信息:

  • getPoolIndex(): 获取线程在池中的索引
  • getQueuedTaskCount(): 获取队列中待处理任务数量

静态辅助方法

static boolean hasKnownQueuedWork() {ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue q, sq;ForkJoinPool p; ForkJoinPool.WorkQueue[] qs; int i;Thread c = JLA.currentCarrierThread();return ((c instanceof ForkJoinWorkerThread) &&(p = (wt = (ForkJoinWorkerThread)c).pool) != null &&(q = wt.workQueue) != null &&(i = q.source) >= 0 && // 检查本地和当前源队列(((qs = p.queues) != null && qs.length > i &&(sq = qs[i]) != null && sq.top - sq.base > 0) ||q.top - q.base > 0));
}

此方法检查当前线程是否有已知的等待任务,用于任务执行过程中的启发式决策。

InnocuousForkJoinWorkerThread 内部类

static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {private static final ThreadGroup innocuousThreadGroup = createGroup();private boolean resetCCL;InnocuousForkJoinWorkerThread(ForkJoinPool pool) {super(innocuousThreadGroup, pool, true, true);}// 其他重写方法...
}

这是一个特殊的工作线程实现:

  1. 使用一个独立的线程组
  2. 使用系统类加载器
  3. 会清理 ThreadLocal 变量
  4. 不允许设置异常处理器

执行流程详解

让我们深入分析一个 ForkJoinWorkerThread 的完整执行流程:

  1. 创建阶段:

    // 在 ForkJoinPool 中创建新的工作线程
    ForkJoinWorkerThread wt = factory.newThread(this); // factory是线程工厂
    
  2. 初始化阶段:

    // 在构造函数中
    this.workQueue = new ForkJoinPool.WorkQueue(this, 0, (int)pool.config, clearThreadLocals);
    super.setDaemon(true);
    // 设置线程名称、异常处理器等
    
  3. 启动阶段:

    wt.start(); // 在 ForkJoinPool.createWorker() 中调用
    
  4. 运行阶段:

    // 执行 run() 方法
    p.registerWorker(w);  // 向池注册线程
    onStart();            // 钩子方法
    p.runWorker(w);       // 主循环 - 在池中执行
    
  5. 工作循环内部 (在 ForkJoinPool.runWorker 中):

    // 简化的逻辑
    while (条件满足) {if (有本地任务) {执行本地任务} else {尝试窃取其他线程队列中的任务如果找不到任务,可能进入休眠状态}
    }
    
  6. 终止阶段:

    onTermination(exception);    // 钩子方法
    p.deregisterWorker(this, exception);  // 从池中注销
    

与 ForkJoinPool 和 ForkJoinTask 的交互

  1. 与 ForkJoinPool 的交互:

    • 通过 pool 字段持有对池的引用
    • 调用 pool.registerWorker() 注册自身
    • 调用 pool.runWorker() 委托主循环逻辑
    • 调用 pool.deregisterWorker() 注销自身
  2. 与 ForkJoinTask 的交互:

    • 通过 workQueue 管理和执行任务
    • ForkJoinTask 可以通过 workQueue.push() 提交新任务
    • 任务的执行在 pool.runWorker() 中由 ForkJoinTask.doExec() 完成

总结

ForkJoinWorkerThread 是连接 ForkJoinPool 和 ForkJoinTask 的纽带:

  1. 它持有 ForkJoinPool 引用和自己的工作队列
  2. 它独立于普通线程运行,专注于执行 Fork/Join 任务
  3. 它支持工作窃取算法,可以执行自己队列中的任务,也可以窃取其他线程队列中的任务
  4. 它提供了生命周期钩子方法,便于子类扩展
  5. 它优化了 ThreadLocal 处理,提高了任务执行的隔离性和效率

核心工作流程是: 初始化 → 注册 → 执行主循环(处理任务) → 清理 → 注销。

ForkJoinPool.WorkQueue 实现解析

WorkQueue 是 ForkJoinPool 的核心内部类,负责管理任务的存储、调度和窃取。它是 Fork/Join 框架实现工作窃取算法的关键组件。

WorkQueue 在 Fork/Join 框架中扮演两个重要角色:

  1. 工作线程队列 - 当 owner 不为 null 时
  2. 外部提交队列 - 当 owner 为 null 时

主要属性结构

final ForkJoinWorkerThread owner;   // 队列所属线程,如果为 null 则为共享模式
ForkJoinTask<?>[] array;            // 任务数组,大小为 2 的幂
int base;                           // 下一个被偷取任务的索引
final int config;                   // 静态配置位(如 FIFO 模式标志)
int top;                            // 下一个 push 位置的索引
volatile int phase;                 // 同步状态和工作池索引
int stackPred;                      // 池栈(ctl)前驱链接
volatile int source;                // 偷取来源队列 ID
int nsteals;                        // 偷取任务计数
volatile int parking;               // 非零表示线程正在等待工作

WorkQueue 的属性使用了 @Contended 注解,这是为了避免伪共享(false sharing)问题【CPU缓存行可能存储多个变量,导致多个变量缓存同时失效】,提高性能。

队列实现的数据结构

WorkQueue 内部使用了一个循环数组来存储任务:

  1. 双端队列设计

    • 工作线程从 top 端(数组尾部)添加和获取任务(LIFO 模式),也可以配置成 FIFO 模式
    • 窃取操作从 base 端(数组头部)获取任务(总是 FIFO)
  2. 存储结构

    • 使用 ForkJoinTask<?>[] array 作为任务存储
    • 索引通过 & (array.length - 1) 实现循环引用

核心方法解析

1. 任务入队 - push 方法

final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;if ((a = array) != null && (cap = a.length) > 0 && task != null) {// 计算索引位置,使用位运算确保循环int pk = task.noUserHelp() + 1;if ((room = (m = cap - 1) - (s - b)) >= 0) {// 更新 top 指针top = s + 1;long pos = slotOffset(m & s);if (!internal)// 如果是外部提交,使用普通引用操作U.putReference(a, pos, task);else// 如果是内部提交,使用原子操作确保可见性U.getAndSetReference(a, pos, task);// 数组满了需要扩容if (room == 0)growArray(a, cap, s);}if (!internal)unlockPhase();  // 如果是外部提交,解锁队列// 如果队列可能看起来是空的,通知工作池if ((room == 0 || a[m & (s - pk)] == null) && pool != null)pool.signalWork(); }
}

该方法将任务添加到队列顶部,如果队列满则扩容,并在必要时通知线程池有新任务。

noUserHelp()方法在ForkJoinTask类中定义:

final int noUserHelp() {return (U.getInt(this, STATUS) & NO_USER_HELP) >>> NUH_BIT;
}

这个方法检查任务的status字段中是否设置了NO_USER_HELP标志位。NO_USER_HELP表示该任务不应该被外部用户线程帮助执行。

从代码中可以看出,ForkJoinTask定义了这些常量:

static final int NUH_BIT        = 24;
static final int NO_USER_HELP   = 1 << NUH_BIT;

pk 值在大多数情况下等于 1(普通任务)。特定情况下(如 InterruptibleTask),可能为 2。

a[m & (s - pk)] == null 判断的含义

s 是当前的 top 索引(即新任务要放入的位置)
pk 通常为 1(普通任务)
s - pk 计算的是将要插入任务的前一个槽位
a[m & (s - pk)] 检查该槽位是否为空

if (room == 0 || a[m & (s - pk)] == null) && pool != null)pool.signalWork();

这段逻辑的具体含义:

  1. 从空变为非空的检测:如果队列是空的,然后添加了第一个任务,那么 a[m & (s - pk)] 将会是 null(因为前一个槽位即为空)。在这种情况下需要发信号通知工作者线程来处理任务。

  2. 队列满的情况:如果 room == 0(队列已满需要扩容),也需要发送信号确保有工作线程能及时处理任务。

InterruptibleTask (pk = 2): 会检查 a[m & (s - 2)] 是否为空。这个位置更"远",更可能为空,使得 signalWork() 更容易被触发

这种设计是故意的,因为:

1. InterruptibleTask 需要特殊处理

InterruptibleTask 被设计为不允许用户线程帮助执行(通过设置 NO_USER_HELP 标志)。因此,系统需要确保有足够的工作线程来处理这类任务。

2. 降低检测门槛提高响应性

通过检查更远位置是否为空,系统更容易触发 signalWork() 调用,从而更积极地唤醒或创建工作线程。这种设计是有意为之的性能优化,确保 InterruptibleTask 能被及时处理。

3. 避免干扰用户线程执行

在 awaitDone 方法中可以看到,系统会阻止非工作线程(用户线程)帮助执行标记了 NO_USER_HELP 的任务:

!internal && ((ss = status) & NO_USER_HELP) != 0 ? ss : p.helpJoin(this, q, internal)

这种设计确保了 InterruptibleTask 只能被工作线程处理,绝不会被用户线程执行。

  1. pk 值不同是故意的,用于对特殊任务进行差异化处理
  2. InterruptibleTask 获得了更低的"信号触发门槛",确保它们能被及时处理
  3. 通过适当提高 signalWork() 触发频率,系统确保有足够的工作线程来执行不允许用户线程帮助的 InterruptibleTask

内部与外部调用的场景区分

  1. 内部调用(internal=true)

    • 当前线程是ForkJoinWorkerThread,属于池中的工作线程
    • 调用来自工作线程自己的fork()等方法
  2. 外部调用(internal=false)

    • 调用来自外部提交(如execute()submit()等方法)
    • 调用线程不是该队列的所有者

内存写入的区别

if (!internal)U.putReference(a, pos, task);       // inside lock
elseU.getAndSetReference(a, pos, task); // fully fenced

这两种写入方式有重要区别:

  1. 外部调用时(!internal)使用putReference
    • 采用普通写入
    • 依赖于外部的锁机制来保证可见性
    • 代码注释// inside lock表明在这种情况下,整个操作应该已经在锁内
  2. 内部调用时使用getAndSetReference
    • 这是个原子操作,提供了完全内存屏障(fully fenced)
    • 确保写入对其他线程立即可见
    • 不需要外部锁的保护

检查代码确认外部调用是否一定有锁保护:

  1. 在外部调用场景中,队列通过phase字段实现锁保护:
// 在ForkJoinPool.submissionQueue方法中
if (reuse == 0 || !q.tryLockPhase()) {  // 尝试锁定队列// 移动索引...
}
  1. push方法的末尾,外部调用会解锁:
if (!internal)unlockPhase();  // 如果是外部提交,解锁队列

tryLockPhaseunlockPhase方法实现了队列的锁定机制:

// 在WorkQueue类中
final boolean tryLockPhase() {    // seqlock acquireint p;return (((p = phase) & IDLE) != 0 &&U.compareAndSetInt(this, PHASE, p, p + IDLE));
}final void unlockPhase() {U.getAndAddInt(this, PHASE, IDLE);
}

结论

  1. noUserHelp():标识任务是否允许外部用户线程帮助执行,主要用于InterruptibleTask类任务。

  2. pk = task.noUserHelp() + 1:用于计算前一个槽位偏移,来确定是否需要发出工作信号。

  3. 内外部调用的区别

    • 外部调用总是在锁的保护下执行,使用普通引用写入
    • 内部调用无需锁保护,使用原子操作和完全内存屏障确保可见性
  4. 锁保护机制:任何外部提交(通过submitexecute等)都会通过tryLockPhaseunlockPhase进行锁定和解锁,确保队列操作的线程安全。

2. 本地任务获取 - nextLocalTask 方法

private ForkJoinTask<?> nextLocalTask(int fifo) {ForkJoinTask<?> t = null;ForkJoinTask<?>[] a = array;int b = base, p = top, cap;if (p - b > 0 && a != null && (cap = a.length) > 0) {for (int m = cap - 1, s, nb;;) {if (fifo == 0 || (nb = b + 1) == p) {// LIFO 模式或只有一个任务:从 top 端获取if ((t = (ForkJoinTask<?>)U.getAndSetReference(a, slotOffset(m & (s = p - 1)), null)) != null)updateTop(s);  // 更新 top 指针break;}// FIFO 模式:从 base 端获取if ((t = (ForkJoinTask<?>)U.getAndSetReference(a, slotOffset(m & b), null)) != null) {updateBase(nb);  // 更新 base 指针break;}// 自旋等待直到 base 值稳定while (b == (b = U.getIntAcquire(this, BASE)))Thread.onSpinWait();if (p - b <= 0)break;}}return t;
}

根据配置的模式(FIFO 或 LIFO)获取任务,工作线程一般使用 LIFO 模式提高局部性。

b == (b = U.getIntAcquire(this, BASE)) 判断解析

这个判断是在处理并发竞争场景下的自旋等待机制,目的是等待 base 值稳定:

为什么会不等?

  1. 并发修改情况:在多线程环境中,当前线程在读取 base 值后,其他线程可能已经修改了 base(通过其他线程成功的 poll 操作)

  2. 代码执行过程

    b == (b = U.getIntAcquire(this, BASE))
    
    • 左侧的 b 是之前读取的 base 值
    • 右侧的 (b = U.getIntAcquire(this, BASE)) 是读取当前最新的 base 值并赋给 b
    • 如果两次读取的值不同,表示 base 已被其他线程更改
  3. 不等的情况:当其他线程成功窃取了队列中的任务并增加了 base 值,此判断将返回 false,线程会脱离自旋状态

这是一种自旋优化机制,目的是:

  • 避免在 base 不稳定时进行不必要的操作
  • 等待直到 base 稳定(停止变化)
  • 减少内存流量,通过 Thread.onSpinWait() 提高自旋效率

p-b<=0 判断解析

虽然 WorkQueue 使用的是循环数组,但队列的逻辑仍然是线性的:

  1. 队列状态表示

    • p (即 top): 下一个 push 位置的索引
    • b (即 base): 下一个要被窃取任务的索引
  2. 空队列条件

    • 当 p == b 时,表示队列完全空
    • 当 p < b 可能出现在并发操作中(临时状态),但逻辑上也应被视为"空"
  3. 循环数组与逻辑索引

    • 循环数组是物理存储方式,通过 & (length-1) 转换逻辑索引到物理位置
    • 但 base 和 top 作为逻辑索引持续增长,不会循环重置
    • 因此 p-b 始终反映队列中待处理元素的逻辑数量

3. 任务窃取 - poll 方法

final ForkJoinTask<?> poll() {for (int pb = -1, b; ; pb = b) {ForkJoinTask<?> t; int cap, nb; long k; ForkJoinTask<?>[] a;if ((a = array) == null || (cap = a.length) <= 0)break;t = (ForkJoinTask<?>)U.getReferenceAcquire(a, k = slotOffset((cap - 1) & (b = base)));Object u = U.getReference(a, slotOffset((cap - 1) & (nb = b + 1)));if (base != b)  // 检查并发修改;else if (t == null) {if (u == null && top - b <= 0)break;  // 队列为空if (pb == b)Thread.onSpinWait();  // 自旋等待状态变化}else if (U.compareAndSetReference(a, k, t, null)) {updateBase(nb);  // CAS 成功,更新 basereturn t;}}return null;
}

这是工作窃取的核心实现,其他线程通过这个方法从队列底部窃取任务。

4. 任务执行 - topLevelExec 方法

final void topLevelExec(ForkJoinTask<?> task, int fifo) {while (task != null) {task.doExec();  // 执行当前任务task = nextLocalTask(fifo);  // 尝试获取下一个本地任务}
}

执行给定任务,然后继续执行队列中的其他任务,直到队列为空。

5. 辅助方法 - tryRemoveAndExec 方法

这个方法的主要作用是:从当前工作队列中寻找指定任务,如果找到则移除并执行它。它是 ForkJoin 框架中任务 join 操作的核心实现部分。

final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {ForkJoinTask<?>[] a = array;int b = base, p = top, s = p - 1, d = p - b, cap;if (a != null && (cap = a.length) > 0) {// 从队列顶部向下扫描寻找指定任务for (int m = cap - 1, i = s; d > 0; --i, --d) {long k; boolean taken;ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReference(a, k = slotOffset(i & m));if (t == null)break;  // 遇到空槽位,停止扫描if (t == task) {  // 找到目标任务if (!internal && !tryLockPhase())break;  // 如果是外部调用,尝试锁定队列// 尝试从队列中移除任务if (taken = (top == p && U.compareAndSetReference(a, k, task, null))) {// 根据任务位置调整队列索引if (i == s)  // 最顶上的元素updateTop(s);else if (i == base)  // 最底下的元素updateBase(i + 1);else {  // 中间元素,与顶部元素交换位置后移除顶部U.putReferenceVolatile(a, k, (ForkJoinTask<?>)U.getAndSetReference(a, slotOffset(s & m), null));updateTop(s);}}if (!internal)unlockPhase();if (taken)task.doExec();  // 执行任务break;}}}
}

核心步骤:

  1. 从队列顶部开始向下扫描
  2. 寻找与参数 task 相等的任务
  3. 找到后,将其从队列中移除
  4. 执行该任务

这个方法主要在以下场景中被调用:

  1. 任务的 join 操作中:当一个任务调用 join() 方法等待另一个任务完成时,如果被等待的任务恰好在当前线程的队列中,则可以直接执行它而不是等待

  2. ForkJoinPool.helpJoin 方法中

    final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean internal) {if (w != null)w.tryRemoveAndExec(task, internal); // 首先尝试在自己的队列中找任务// ...其余代码...
    }
    

这个方法是 ForkJoin 框架中 "work-stealing" 算法的关键部分,有两个重要作用:

  1. 避免阻塞:如果任务 A 等待任务 B,而任务 B 恰好在当前线程的队列中等待执行,那么直接执行 B 比让 A 阻塞更有效率

  2. 防止死锁:如果没有这种机制,可能会出现线程互相等待对方队列中的任务,导致死锁

实际应用示例

假设有以下代码:

ForkJoinTask<Integer> task1 = new RecursiveTask<Integer>() { ... };
ForkJoinTask<Integer> task2 = new RecursiveTask<Integer>() { ... };
task1.fork();
task2.fork();
task1.join(); // 这里会尝试使用 tryRemoveAndExec

当执行到 task1.join() 时:

  1. 检查 task1 是否已完成,如果未完成
  2. 调用 tryRemoveAndExec 尝试从当前线程的队列中找到并执行 task1
  3. 如果找到并执行了,则 join 操作立即返回而不会阻塞

这体现了 ForkJoin 框架的核心设计理念:线程尽可能地处理自己的任务,避免不必要的阻塞和线程切换。

6. 辅助完成 - helpComplete 方法

helpComplete 的执行流程

  1. 检查传入任务和当前工作队列的状态
  2. 尝试从队列顶部获取一个任务
  3. 检查这个任务是否为 CountedCompleter 类型
  4. 通过 completer 引用向上追溯,检查它是否与目标任务相关联
  5. 如果相关联,从队列中移除并执行该任务
  6. 根据 limit 参数决定是否继续处理更多任务

与 tryRemoveAndExec 的关键区别

  1. 任务类型不同

    • tryRemoveAndExec: 适用于任何类型的 ForkJoinTask,主要用于join()操作
    • helpComplete: 专门针对 CountedCompleter 类型的任务
  2. 查找逻辑不同

    • tryRemoveAndExec: 从队列顶部向下扫描,查找特定的任务实例
    • helpComplete: 寻找任务依赖链上的 CountedCompleter 任务(通过 completer 引用链)
  3. 目标不同

    • tryRemoveAndExec: 只寻找并执行一个特定的任务
    • helpComplete: 可以帮助执行多个子任务(由limit参数控制)
  4. 适用场景

    • tryRemoveAndExec: join操作中,避免线程阻塞
    • helpComplete: 帮助完成CountedCompleter的子任务,支持可数完成模型

总结

  • tryRemoveAndExec 是寻找特定任务实例的针对性方法,主要用于 join 操作
  • helpComplete 是针对 CountedCompleter 任务的专用辅助方法,帮助完成任务树
  • CountedCompleter 提供了一套基于计数的任务完成机制,特别适合多层树形任务结构
final int helpComplete(ForkJoinTask<?> task, boolean internal, int limit) {int status = 0;if (task != null) {outer: for (;;) {// 查找与根任务相关的待完成任务ForkJoinTask<?>[] a; boolean taken; Object o;int stat, p, s, cap;if ((stat = task.status) < 0) {status = stat;break;}if ((a = array) == null || (cap = a.length) <= 0)break;// 获取顶部任务long k = slotOffset((cap - 1) & (s = (p = top) - 1));if (!((o = U.getReference(a, k)) instanceof CountedCompleter))break;CountedCompleter<?> t = (CountedCompleter<?>)o, f = t;// 检查任务关系for (int steps = cap;;) {if (f == task)break;if ((f = f.completer) == null || --steps == 0)break outer;}// 移除并执行任务if (!internal && !tryLockPhase())break;if (taken = (top == p &&U.compareAndSetReference(a, k, t, null)))updateTop(s);if (!internal)unlockPhase();if (!taken)break;t.doExec();if (limit != 0 && --limit == 0)break;}}return status;
}

      WorkQueue 的关键特点

      1. 工作窃取算法实现

      WorkQueue 支持经典的工作窃取(work-stealing)算法:

      • 本地操作:线程操作自己队列的 top 端(LIFO 模式),提高缓存局部性
      • 窃取操作:其他线程从队列的 base 端窃取任务(FIFO 模式)
      • 无锁设计:通过原子操作(CAS)实现高效并发

      2. 分离的提交队列和工作队列

      ForkJoinPool 将队列分为两类:

      • 工作队列(odd 索引):有 owner 线程的队列,用于线程执行和窃取任务
      • 提交队列(even 索引):无 owner 的队列,用于外部提交任务

      3. 高效内存管理

      • 动态扩容:队列满时会自动扩容
      • 避免伪共享:通过 @Contended 注解分离频繁访问的字段
      • Unsafe 操作:使用 Unsafe 类的 CAS 操作实现高效并发访问

      4. 协同处理机制

      • 源追踪: 通过 source 字段记录窃取源,帮助实现 join 和 helpComplete
      • 计数: 通过 nsteals 记录窃取次数,用于负载统计和调优

      总结

      WorkQueue 是 ForkJoinPool 的核心组件,通过精心设计的双端队列实现高效的工作窃取算法。其关键特点包括:

      1. 双端队列设计,支持 LIFO/FIFO 两种工作模式
      2. 无锁并发访问,使用 CAS 操作确保线程安全
      3. 高效的任务窃取和协助机制
      4. 内存优化设计,避免伪共享问题

      这种设计使 ForkJoinPool 能高效地执行细粒度的并行任务,特别适合递归分治算法。

      CountedCompleter 介绍

      CountedCompleter 是 ForkJoinTask 的一个特殊子类,设计用于可以追踪子任务数量的分治算法。

      核心特性

      1. 内部计数器机制:每个CountedCompleter维护一个等待完成的子任务计数器
      2. 自动触发完成:当计数器归零时,自动触发完成逻辑
      3. 完成通知链:通过completer引用形成父子任务链,子任务完成会通知父任务
      public abstract class CountedCompleter<T> extends ForkJoinTask<T> {// 指向父任务的引用final CountedCompleter<?> completer;// 待完成的子任务数量volatile int pending;// 在所有子任务完成后执行public void onCompletion(CountedCompleter<?> caller) { }// 减少待完成任务计数,如果归零则触发完成逻辑public final void tryComplete() {CountedCompleter<?> a = this, s = a;for (int c;;) {if ((c = a.pending) == 0) {a.onCompletion(s);if ((a = (s = a).completer) == null) {s.quietlyComplete();return;}}else if (U.compareAndSetInt(a, PENDING, c, c - 1))return;}}// 其他方法...
      }
      

      适用场景

      1. 树形依赖计算:子任务可以进一步分解,形成多层任务树
      2. 任务间有依赖关系:某任务需要等待其他多个任务完成后才能继续
      3. 并行聚合处理:多个任务并行处理,最后汇总结果

      用法示例

      public class SumTask extends CountedCompleter<Long> {final long[] array;final int lo, hi;long sum = 0;SumTask(CountedCompleter<?> parent, long[] array, int lo, int hi) {super(parent);this.array = array; this.lo = lo; this.hi = hi;}@Overridepublic void compute() {if (hi - lo <= 100) { // 小任务直接计算for (int i = lo; i < hi; ++i)sum += array[i];} else { // 大任务分解int mid = (lo + hi) >>> 1;// 设置等待2个子任务setPendingCount(2);// 创建子任务new SumTask(this, array, lo, mid).fork();new SumTask(this, array, mid, hi).fork();// 等待子任务完成tryComplete();}}@Overridepublic void onCompletion(CountedCompleter<?> caller) {if (caller != this) {SumTask child = (SumTask)caller;this.sum += child.sum;}}@Overridepublic Long getRawResult() {return sum;}
      }
      

      CountedCompleter 与 RecursiveTask 的差异与作用

      CountedCompleter 和 RecursiveTask 是 ForkJoinTask 的两个不同子类,它们在并行计算模型上有显著区别,适用于不同场景。

      1. 任务完成机制

      RecursiveTask:

      • 使用传统的"等待-获取结果"模式
      • 通过 fork() 和 join() 显式控制任务依赖
      • 父任务必须等待子任务完成才能继续执行

      CountedCompleter:

      • 使用"计数完成"模式
      • 通过内部计数器跟踪依赖的子任务数量
      • 当所有子任务完成时自动触发回调(无需显式join)

      2. 代码结构对比

      RecursiveTask:

      class SumTask extends RecursiveTask<Long> {final long[] array;final int lo, hi;@Overrideprotected Long compute() {if (hi - lo <= 100) { // 直接计算long sum = 0;for (int i = lo; i < hi; ++i) sum += array[i];return sum;}int mid = (lo + hi) >>> 1;SumTask left = new SumTask(array, lo, mid);SumTask right = new SumTask(array, mid, hi);left.fork();long rightResult = right.compute(); // 直接计算右侧long leftResult = left.join();      // 等待左侧完成return leftResult + rightResult;    // 合并结果}
      }
      

      CountedCompleter:

      class SumTask extends CountedCompleter<Long> {long sum = 0;final long[] array;final int lo, hi;SumTask sibling;@Overridepublic void compute() {if (hi - lo <= 100) {for (int i = lo; i < hi; ++i) sum += array[i];tryComplete(); // 标记当前任务完成} else {int mid = (lo + hi) >>> 1;setPendingCount(2); // 设置等待2个子任务new SumTask(this, array, lo, mid).fork();new SumTask(this, array, mid, hi).fork();}}@Overridepublic void onCompletion(CountedCompleter<?> caller) {if (caller != this) {SumTask child = (SumTask)caller;this.sum += child.sum; // 汇总子任务结果}}
      }
      

      3. 任务协调方式

      RecursiveTask:

      • 使用阻塞式 join() 方法等待任务完成
      • 自顶向下的操作流程(先分解,后合并)
      • 需要主动收集子任务结果

      CountedCompleter:

      • 使用计数器和回调机制处理任务完成
      • 自底向上的通知机制(子任务完成通知父任务)
      • 通过 onCompletion() 自动处理结果合并

      使用场景对比

      RecursiveTask 适用于:

      • 简单的分治算法,如归并排序、求和等
      • 任务结构比较规则、子任务数量固定的场景
      • 层次化数据处理,需要从子任务收集结果的场景

      CountedCompleter 适用于:

      • 任务树结构复杂,子任务数量动态变化的场景
      • 需要避免过多阻塞的高性能场景
      • 异步任务处理,不需立即获取结果的场景
      • 复杂图算法、广度优先搜索等多依赖任务

      CountedCompleter 和 RecursiveTask 在 ForkJoin 框架中针对不同计算模型提供了不同的并行处理方案:

      • RecursiveTask 提供了简单直观的分治模型,适合传统的分而治之算法
      • CountedCompleter 提供了更灵活的计数完成模型,强调非阻塞操作和自动完成通知

      为什么有 helpComplete 方法

      1. CountedCompleter 的特殊性质

        在 ForkJoinTask.awaitDone() 方法中,有一个专门的分支处理:

        // ForkJoinTask.java
        ((this instanceof CountedCompleter) ?p.helpComplete(this, q, internal) :!internal && ((ss = status) & NO_USER_HELP) != 0 ? ss :p.helpJoin(this, q, internal))
        

        系统区分了 CountedCompleter 和普通 ForkJoinTask,为前者提供了特别的帮助机制。

      2. 依赖传播的高效处理

        CountedCompleter 设计了一个完成通知链(通过 completer 引用),这使得在任务等待时,可以专门寻找和当前任务有关联的其他 CountedCompleter 任务进行处理,而不是随机处理队列中的任务。

      虽然 CountedCompleter 不一定是最常用的任务类型,但它解决了一类非常重要的并行计算场景:具有复杂依赖关系的任务树。在大规模数据处理、图算法等领域,这种场景很常见。

      1. 性能考量

        在 ForkJoinPool 中的实现显示了 helpComplete 的主要逻辑:

        // ForkJoinPool.java - helpComplete 方法
        t = (ForkJoinTask<?>)U.getReferenceAcquire(a, k);
        if (t instanceof CountedCompleter) {CountedCompleter<?> f = (CountedCompleter<?>)t;for (int steps = cap; steps > 0; --steps) {if (f == task) {eligible = true;break;}if ((f = f.completer) == null)break;}
        }
        

        这段代码专门搜索与目标任务相关的 CountedCompleter,通过 completer 引用链向上追溯,这种针对性搜索比随机任务处理更有效。

      2. 是否经常使用 CountedCompleter?

        虽然 RecursiveTask 可能更常见,但 CountedCompleter 在一些关键场景中非常重要:

        • CompletableFuture 内部实现就使用了类似 CountedCompleter 的模式( CompletableFuture 有 Completion 类)
        • 大规模并行数据处理框架
        • 具有复杂依赖关系的算法

      为什么没有其他类似的帮助方法

      1. 其他任务类型不需要特殊帮助

        普通的 ForkJoinTask 和 RecursiveTask 依靠传统的 fork-join 模式,通过 helpJoin 方法就能高效处理。而 CountedCompleter 的依赖传播模型需要特殊的帮助机制。

      2. 架构设计决策

        从代码中可以看出,ForkJoinPool 确实提供了不同的工作窃取策略:

        // ForkJoinTask.java - awaitDone 方法
        ((this instanceof CountedCompleter) ?p.helpComplete(this, q, internal) :  // 针对 CountedCompleter!internal && ((ss = status) & NO_USER_HELP) != 0 ? ss :p.helpJoin(this, q, internal))       // 针对其他任务类型
        

        这显示了框架设计者对不同任务类型做了专门优化,而不是使用单一的通用方法。

      3. 从 CompletableFuture 实现看重要性

        CompletableFuture 的实现与 CountedCompleter 有相似之处,其内部类 Completion 继承自 ForkJoinTask:

        // CompletableFuture.java
        abstract static class Completion extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {volatile Completion next;// ...
        }
        

        这说明这种基于依赖传播的模型在异步编程中非常重要,值得专门优化。

      helpComplete 方法的存在反映了 Java 并发库设计者对特定并行计算模式的深入理解和优化。虽然 CountedCompleter 可能不是最常用的任务类型,但它解决了一类重要且复杂的并行计算问题,因此值得专门设计一个高效的帮助机制。

      此外,从 CompletableFuture 的实现来看,基于依赖传播的编程模型在 Java 的异步编程中占有重要地位,这也证明了对 CountedCompleter 特别优化的合理性。

      ForkJoinPool 类详解

      详细的分析见另一篇文章:深入解析ForkJoinPool核心原理-CSDN博客

      ForkJoinTask深度解析

      见:ForkJoinTask深度解析:Java并行计算利器-CSDN博客

      http://www.xdnf.cn/news/875737.html

      相关文章:

    • 功率估计和功率降低方法指南(1~2)
    • 2025年6月4日收获
    • 如何进行股票回测?
    • 第三方检测:软件适配测试报告
    • SAFe/LeSS/DAD等框架的核心适用场景如何选择?
    • Paraformer分角色语音识别-中文-通用 FunASR
    • SEO长尾关键词布局优化法
    • 二维码生成器
    • 宝马集团推进数字化转型:强化生产物流与财务流程,全面引入SAP现代架构
    • expect程序交互学习
    • 电子电路:共集电极放大器原理与作用解析
    • GO语言----基础类型取别名
    • PhpStorm设置中文
    • 数据库MySQL基础(3)
    • OpenAI API 流式传输
    • NX963NX970美光固态闪存NX978NX983
    • 基于单片机的FFT的频谱分析仪设计
    • Linux 系统 rsyslog 配置
    • 1.1随机试验与随机事件
    • Java 2D 图形变换方法
    • Linux 云服务器部署 Flask 项目(含后台运行与 systemd 开机自启)
    • 在java中不同数据类型的运算与内存占用分析
    • WordToCard,一键将Markdown内容转换为精美知识卡片(使用Qwen3)
    • image: ragsaas/backend:latest 背后的 来源机制 和 可能的来源地
    • Elasticsearch中的映射(Mapping)是什么?
    • 一文读懂开源AI框架REINVENT 4
    • EtherCAT AOE
    • Linux防火墙实战演练
    • Vue Router 导航方法完全指南
    • Openwrt 嵌入式Linux发行版