Java Fork/Join框架:三大核心组件深度解析
ForkJoinTask、ForkJoinWorkerThread 和 ForkJoinPool 构成了 Java 中 Fork/Join 框架的三个核心组件,它们之间形成了紧密的协作关系,共同提供了高效的并行计算能力。
三者关系概述
- ForkJoinPool:执行环境,管理工作线程和任务调度
- ForkJoinWorkerThread:执行单元,执行具体任务的工作线程
- ForkJoinTask:计算单元,表示可以拆分和合并的任务
ForkJoinPool
核心能力:
- 管理线程池和工作队列
- 实现工作窃取算法
- 提供任务调度和执行机制
- 处理线程阻塞和补偿
关键源码解析:
// 提交任务
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {return poolSubmit(true, Objects.requireNonNull(task));
}// 内部提交实现
private <T> ForkJoinTask<T> poolSubmit(boolean signalIfEmpty, ForkJoinTask<T> task) {Thread t; ForkJoinWorkerThread wt; WorkQueue q; boolean internal;// 判断当前线程是否为工作线程if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&(wt = (ForkJoinWorkerThread)t).pool == this) {internal = true;q = wt.workQueue;}else {internal = false;// 为外部提交找到或创建一个队列q = submissionQueue(ThreadLocalRandom.getProbe(), true);}q.push(task, signalIfEmpty ? this : null, internal);return task;
}
ForkJoinWorkerThread
核心能力:
- 绑定到特定的 ForkJoinPool
- 维护自己的工作队列
- 执行分配给它的任务
- 支持任务窃取
关键属性:
public class ForkJoinWorkerThread extends Thread {final ForkJoinPool pool; // 所属的线程池final ForkJoinPool.WorkQueue workQueue; // 工作队列
}
工作线程执行循环:
final void runWorker(WorkQueue w) {if (w != null) {int phase = w.phase, r = w.stackPred;int fifo = w.config & FIFO, nsteals = 0, src = -1;for (;;) {// 1. 检查是否应该终止if ((runState & STOP) != 0L || (qs = queues) == null)break;// 2. 随机扫描队列寻找任务scan: for (int l = n; l > 0; --l, i += step) { // 扫描队列// 尝试窃取任务if (找到任务) {// 3. 记录窃取状态w.nsteals = ++nsteals;w.source = src = j;// 4. 执行任务w.topLevelExec(t, fifo);}}// 5. 如果没找到任务,尝试进入休眠状态if (!rescan) {if (((phase = deactivate(w, phase)) & IDLE) != 0)break;}}}
}
ForkJoinTask
核心能力:
- 表示可并行执行的任务
- 支持任务分解(fork)和结果聚合(join)
- 提供轻量级的执行框架
- 支持任务状态管理
关键源码解析:
// fork 操作 - 将任务提交到池中异步执行
public final ForkJoinTask<V> fork() {Thread t;ForkJoinWorkerThread wt;ForkJoinPool p;ForkJoinPool.WorkQueue q;boolean internal;// 判断当前线程是否为工作线程if (internal = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {q = (wt = (ForkJoinWorkerThread)t).workQueue;p = wt.pool;}elseq = (p = ForkJoinPool.common).externalSubmissionQueue(false);// 将任务推入队列q.push(this, p, internal);return this;
}// join 操作 - 等待任务完成并获取结果
public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s);return getRawResult();
}
关键流程解析
1. 任务提交和执行流程
-
任务提交:
// 用户代码 ForkJoinPool pool = new ForkJoinPool(); pool.submit(myTask);
-
任务放入队列:
- 如果是外部提交,任务被放入提交队列
- 如果是内部提交(fork),任务被放入当前工作线程的队列
-
工作线程获取任务:
- 优先从自己的队列获取任务
- 如果自己队列为空,随机窃取其他线程队列的任务
-
任务执行:
final void topLevelExec(ForkJoinTask<?> task, int fifo) {while (task != null) {task.doExec(); // 执行任务// 继续处理本地队列中的任务task = nextLocalTask(fifo);} }
2. 工作窃取算法
// 在 helpJoin 中的窃取逻辑
scan: for (int l = n; l > 0; --l, r += step) {// 随机选择队列尝试窃取任务if ((q = qs[j = r & SMASK & (n - 1)]) != null) {// 从队列尾部(base端)窃取任务if (窃取成功) {w.source = src = j; // 记录窃取源t.doExec(); // 执行窃取的任务}}
}
3. join 操作的优化
// helpJoin 帮助执行被阻塞等待的任务
final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean internal) {// 1. 尝试从队列中删除并执行任务if (w != null)w.tryRemoveAndExec(task, internal);// 2. 如果任务仍未完成,尝试执行任务链上的其他任务if (task != null && task.status >= 0 && internal && w != null) {// 3. 沿着偷取链寻找可能有助于完成目标任务的任务outer: for (boolean rescan = true;;) {// 扫描队列寻找相关任务// ...// 4. 如果找到符合条件的任务,执行它if (找到任务) {w.source = j; // 记录来源t.doExec(); // 执行任务w.source = wsrc; // 恢复原来的来源}}}return status;
}
总结
-
ForkJoinPool 维护整个执行环境:
- 管理工作线程集合
- 处理任务队列和调度
- 实现工作窃取调度算法
- 处理线程活动状态变化
-
ForkJoinWorkerThread 执行具体工作:
- 绑定到特定的 ForkJoinPool
- 维护自己的工作队列
- 执行和窃取任务
- 在没有任务时进入等待状态
-
ForkJoinTask 提供计算逻辑:
- 实现可分解的并行任务
- 支持 fork() 分解和 join() 合并操作
- 维护任务状态和结果
- 在适当情况下帮助执行其他任务
这种设计允许开发者专注于任务分解逻辑,而将线程管理和调度交给框架处理,实现了高效的并行计算模型。
ForkJoinWorkerThread 类深入分析
ForkJoinWorkerThread
是 Fork/Join 框架中负责执行任务的线程类,它继承自 Thread
,专门用于在 ForkJoinPool
中执行 ForkJoinTask
。下面对其关键方法和结构进行深入分析。
构造方法
ForkJoinWorkerThread(ThreadGroup group, ForkJoinPool pool,boolean useSystemClassLoader, boolean clearThreadLocals) {super(group, null, pool.nextWorkerThreadName(), 0L, !clearThreadLocals);UncaughtExceptionHandler handler = (this.pool = pool).ueh;this.workQueue = new ForkJoinPool.WorkQueue(this, 0, (int)pool.config,clearThreadLocals);super.setDaemon(true);if (handler != null)super.setUncaughtExceptionHandler(handler);if (useSystemClassLoader && !clearThreadLocals)super.setContextClassLoader(ClassLoader.getSystemClassLoader());
}
构造过程:
- 调用父类
Thread
构造函数,设置线程组和名称 - 保存对
ForkJoinPool
的引用 - 创建自己的
WorkQueue
实例 - 设置为守护线程
- 设置异常处理器和类加载器
protected ForkJoinWorkerThread(ThreadGroup group, ForkJoinPool pool,boolean preserveThreadLocals) {this(group, pool, false, !preserveThreadLocals);
}protected ForkJoinWorkerThread(ForkJoinPool pool) {this(null, pool, false, false);
}
提供给子类使用的构造方法,简化了参数设置。
核心方法: run()
public void run() {Throwable exception = null;ForkJoinPool p = pool;ForkJoinPool.WorkQueue w = workQueue;if (p != null && w != null) { // 避免初始化失败try {p.registerWorker(w); // 1. 注册工作线程onStart(); // 2. 调用初始化钩子p.runWorker(w); // 3. 执行主循环} catch (Throwable ex) {exception = ex;} finally {try {onTermination(exception); // 4. 调用终止钩子} catch (Throwable ex) {if (exception == null)exception = ex;} finally {p.deregisterWorker(this, exception); // 5. 取消注册}}}
}
run()
方法是线程的执行入口,流程包括:
- 注册工作线程: 调用
pool.registerWorker(w)
将自己注册到池中 - 初始化: 调用
onStart()
钩子方法 - 执行主循环: 调用
pool.runWorker(w)
进入主工作循环 - 清理: 调用
onTermination(exception)
钩子方法 - 取消注册: 调用
pool.deregisterWorker(this, exception)
从池中移除
其中最关键的是第3步 p.runWorker(w)
- 这是工作线程的主循环,负责执行和窃取任务。
生命周期钩子方法
protected void onStart() {// 默认为空,子类可重写
}protected void onTermination(Throwable exception) {// 默认为空,子类可重写
}
这两个钩子方法允许子类在线程生命周期的关键点执行自定义逻辑:
onStart()
: 线程启动后,执行任务前onTermination()
: 线程终止前,可获取异常信息
ThreadLocal 处理
final void resetThreadLocals() {if (U.getReference(this, THREADLOCALS) != null)U.putReference(this, THREADLOCALS, null);if (U.getReference(this, INHERITABLETHREADLOCALS) != null)U.putReference(this, INHERITABLETHREADLOCALS, null);onThreadLocalReset();
}void onThreadLocalReset() {// 默认为空,子类可重写
}
这些方法用于清理 ThreadLocal 变量:
resetThreadLocals()
: 使用 Unsafe 直接清除 ThreadLocal 引用onThreadLocalReset()
: 钩子方法,子类可在 ThreadLocal 清理后执行操作
工作队列管理方法
public int getPoolIndex() {return workQueue.getPoolIndex();
}public int getQueuedTaskCount() {return workQueue.queueSize();
}
这些方法提供队列状态信息:
getPoolIndex()
: 获取线程在池中的索引getQueuedTaskCount()
: 获取队列中待处理任务数量
静态辅助方法
static boolean hasKnownQueuedWork() {ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue q, sq;ForkJoinPool p; ForkJoinPool.WorkQueue[] qs; int i;Thread c = JLA.currentCarrierThread();return ((c instanceof ForkJoinWorkerThread) &&(p = (wt = (ForkJoinWorkerThread)c).pool) != null &&(q = wt.workQueue) != null &&(i = q.source) >= 0 && // 检查本地和当前源队列(((qs = p.queues) != null && qs.length > i &&(sq = qs[i]) != null && sq.top - sq.base > 0) ||q.top - q.base > 0));
}
此方法检查当前线程是否有已知的等待任务,用于任务执行过程中的启发式决策。
InnocuousForkJoinWorkerThread 内部类
static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread {private static final ThreadGroup innocuousThreadGroup = createGroup();private boolean resetCCL;InnocuousForkJoinWorkerThread(ForkJoinPool pool) {super(innocuousThreadGroup, pool, true, true);}// 其他重写方法...
}
这是一个特殊的工作线程实现:
- 使用一个独立的线程组
- 使用系统类加载器
- 会清理 ThreadLocal 变量
- 不允许设置异常处理器
执行流程详解
让我们深入分析一个 ForkJoinWorkerThread
的完整执行流程:
-
创建阶段:
// 在 ForkJoinPool 中创建新的工作线程 ForkJoinWorkerThread wt = factory.newThread(this); // factory是线程工厂
-
初始化阶段:
// 在构造函数中 this.workQueue = new ForkJoinPool.WorkQueue(this, 0, (int)pool.config, clearThreadLocals); super.setDaemon(true); // 设置线程名称、异常处理器等
-
启动阶段:
wt.start(); // 在 ForkJoinPool.createWorker() 中调用
-
运行阶段:
// 执行 run() 方法 p.registerWorker(w); // 向池注册线程 onStart(); // 钩子方法 p.runWorker(w); // 主循环 - 在池中执行
-
工作循环内部 (在
ForkJoinPool.runWorker
中):// 简化的逻辑 while (条件满足) {if (有本地任务) {执行本地任务} else {尝试窃取其他线程队列中的任务如果找不到任务,可能进入休眠状态} }
-
终止阶段:
onTermination(exception); // 钩子方法 p.deregisterWorker(this, exception); // 从池中注销
与 ForkJoinPool 和 ForkJoinTask 的交互
-
与 ForkJoinPool 的交互:
- 通过
pool
字段持有对池的引用 - 调用
pool.registerWorker()
注册自身 - 调用
pool.runWorker()
委托主循环逻辑 - 调用
pool.deregisterWorker()
注销自身
- 通过
-
与 ForkJoinTask 的交互:
- 通过
workQueue
管理和执行任务 ForkJoinTask
可以通过workQueue.push()
提交新任务- 任务的执行在
pool.runWorker()
中由ForkJoinTask.doExec()
完成
- 通过
总结
ForkJoinWorkerThread
是连接 ForkJoinPool
和 ForkJoinTask
的纽带:
- 它持有
ForkJoinPool
引用和自己的工作队列 - 它独立于普通线程运行,专注于执行 Fork/Join 任务
- 它支持工作窃取算法,可以执行自己队列中的任务,也可以窃取其他线程队列中的任务
- 它提供了生命周期钩子方法,便于子类扩展
- 它优化了 ThreadLocal 处理,提高了任务执行的隔离性和效率
核心工作流程是: 初始化 → 注册 → 执行主循环(处理任务) → 清理 → 注销。
ForkJoinPool.WorkQueue 实现解析
WorkQueue
是 ForkJoinPool 的核心内部类,负责管理任务的存储、调度和窃取。它是 Fork/Join 框架实现工作窃取算法的关键组件。
WorkQueue 在 Fork/Join 框架中扮演两个重要角色:
- 工作线程队列 - 当 owner 不为 null 时
- 外部提交队列 - 当 owner 为 null 时
主要属性结构
final ForkJoinWorkerThread owner; // 队列所属线程,如果为 null 则为共享模式
ForkJoinTask<?>[] array; // 任务数组,大小为 2 的幂
int base; // 下一个被偷取任务的索引
final int config; // 静态配置位(如 FIFO 模式标志)
int top; // 下一个 push 位置的索引
volatile int phase; // 同步状态和工作池索引
int stackPred; // 池栈(ctl)前驱链接
volatile int source; // 偷取来源队列 ID
int nsteals; // 偷取任务计数
volatile int parking; // 非零表示线程正在等待工作
WorkQueue 的属性使用了 @Contended
注解,这是为了避免伪共享(false sharing)问题【CPU缓存行可能存储多个变量,导致多个变量缓存同时失效】,提高性能。
队列实现的数据结构
WorkQueue 内部使用了一个循环数组来存储任务:
-
双端队列设计:
- 工作线程从
top
端(数组尾部)添加和获取任务(LIFO 模式),也可以配置成 FIFO 模式 - 窃取操作从
base
端(数组头部)获取任务(总是 FIFO)
- 工作线程从
-
存储结构:
- 使用
ForkJoinTask<?>[] array
作为任务存储 - 索引通过
& (array.length - 1)
实现循环引用
- 使用
核心方法解析
1. 任务入队 - push 方法
final void push(ForkJoinTask<?> task, ForkJoinPool pool, boolean internal) {int s = top, b = base, m, cap, room; ForkJoinTask<?>[] a;if ((a = array) != null && (cap = a.length) > 0 && task != null) {// 计算索引位置,使用位运算确保循环int pk = task.noUserHelp() + 1;if ((room = (m = cap - 1) - (s - b)) >= 0) {// 更新 top 指针top = s + 1;long pos = slotOffset(m & s);if (!internal)// 如果是外部提交,使用普通引用操作U.putReference(a, pos, task);else// 如果是内部提交,使用原子操作确保可见性U.getAndSetReference(a, pos, task);// 数组满了需要扩容if (room == 0)growArray(a, cap, s);}if (!internal)unlockPhase(); // 如果是外部提交,解锁队列// 如果队列可能看起来是空的,通知工作池if ((room == 0 || a[m & (s - pk)] == null) && pool != null)pool.signalWork(); }
}
该方法将任务添加到队列顶部,如果队列满则扩容,并在必要时通知线程池有新任务。
noUserHelp()
方法在ForkJoinTask
类中定义:
final int noUserHelp() {return (U.getInt(this, STATUS) & NO_USER_HELP) >>> NUH_BIT;
}
这个方法检查任务的status
字段中是否设置了NO_USER_HELP
标志位。NO_USER_HELP
表示该任务不应该被外部用户线程帮助执行。
从代码中可以看出,ForkJoinTask
定义了这些常量:
static final int NUH_BIT = 24;
static final int NO_USER_HELP = 1 << NUH_BIT;
pk 值在大多数情况下等于 1(普通任务)。特定情况下(如 InterruptibleTask),可能为 2。
a[m & (s - pk)] == null 判断的含义
s 是当前的 top 索引(即新任务要放入的位置)
pk 通常为 1(普通任务)
s - pk 计算的是将要插入任务的前一个槽位
a[m & (s - pk)] 检查该槽位是否为空
if (room == 0 || a[m & (s - pk)] == null) && pool != null)pool.signalWork();
这段逻辑的具体含义:
-
从空变为非空的检测:如果队列是空的,然后添加了第一个任务,那么
a[m & (s - pk)]
将会是null
(因为前一个槽位即为空)。在这种情况下需要发信号通知工作者线程来处理任务。 -
队列满的情况:如果
room == 0
(队列已满需要扩容),也需要发送信号确保有工作线程能及时处理任务。
InterruptibleTask (pk = 2
): 会检查 a[m & (s - 2)]
是否为空。这个位置更"远",更可能为空,使得 signalWork() 更容易被触发
这种设计是故意的,因为:
1. InterruptibleTask 需要特殊处理
InterruptibleTask 被设计为不允许用户线程帮助执行(通过设置 NO_USER_HELP 标志)。因此,系统需要确保有足够的工作线程来处理这类任务。
2. 降低检测门槛提高响应性
通过检查更远位置是否为空,系统更容易触发 signalWork() 调用,从而更积极地唤醒或创建工作线程。这种设计是有意为之的性能优化,确保 InterruptibleTask 能被及时处理。
3. 避免干扰用户线程执行
在 awaitDone
方法中可以看到,系统会阻止非工作线程(用户线程)帮助执行标记了 NO_USER_HELP 的任务:
!internal && ((ss = status) & NO_USER_HELP) != 0 ? ss : p.helpJoin(this, q, internal)
这种设计确保了 InterruptibleTask 只能被工作线程处理,绝不会被用户线程执行。
pk
值不同是故意的,用于对特殊任务进行差异化处理- InterruptibleTask 获得了更低的"信号触发门槛",确保它们能被及时处理
- 通过适当提高 signalWork() 触发频率,系统确保有足够的工作线程来执行不允许用户线程帮助的 InterruptibleTask
内部与外部调用的场景区分
-
内部调用(internal=true):
- 当前线程是ForkJoinWorkerThread,属于池中的工作线程
- 调用来自工作线程自己的
fork()
等方法
-
外部调用(internal=false):
- 调用来自外部提交(如
execute()
、submit()
等方法) - 调用线程不是该队列的所有者
- 调用来自外部提交(如
内存写入的区别
if (!internal)U.putReference(a, pos, task); // inside lock
elseU.getAndSetReference(a, pos, task); // fully fenced
这两种写入方式有重要区别:
- 外部调用时(
!internal
)使用putReference
:- 采用普通写入
- 依赖于外部的锁机制来保证可见性
- 代码注释
// inside lock
表明在这种情况下,整个操作应该已经在锁内
- 内部调用时使用
getAndSetReference
:- 这是个原子操作,提供了完全内存屏障(fully fenced)
- 确保写入对其他线程立即可见
- 不需要外部锁的保护
检查代码确认外部调用是否一定有锁保护:
- 在外部调用场景中,队列通过
phase
字段实现锁保护:
// 在ForkJoinPool.submissionQueue方法中
if (reuse == 0 || !q.tryLockPhase()) { // 尝试锁定队列// 移动索引...
}
- 在
push
方法的末尾,外部调用会解锁:
if (!internal)unlockPhase(); // 如果是外部提交,解锁队列
tryLockPhase
和unlockPhase
方法实现了队列的锁定机制:
// 在WorkQueue类中
final boolean tryLockPhase() { // seqlock acquireint p;return (((p = phase) & IDLE) != 0 &&U.compareAndSetInt(this, PHASE, p, p + IDLE));
}final void unlockPhase() {U.getAndAddInt(this, PHASE, IDLE);
}
结论
-
noUserHelp()
:标识任务是否允许外部用户线程帮助执行,主要用于InterruptibleTask
类任务。 -
pk = task.noUserHelp() + 1
:用于计算前一个槽位偏移,来确定是否需要发出工作信号。 -
内外部调用的区别:
- 外部调用总是在锁的保护下执行,使用普通引用写入
- 内部调用无需锁保护,使用原子操作和完全内存屏障确保可见性
-
锁保护机制:任何外部提交(通过
submit
、execute
等)都会通过tryLockPhase
和unlockPhase
进行锁定和解锁,确保队列操作的线程安全。
2. 本地任务获取 - nextLocalTask 方法
private ForkJoinTask<?> nextLocalTask(int fifo) {ForkJoinTask<?> t = null;ForkJoinTask<?>[] a = array;int b = base, p = top, cap;if (p - b > 0 && a != null && (cap = a.length) > 0) {for (int m = cap - 1, s, nb;;) {if (fifo == 0 || (nb = b + 1) == p) {// LIFO 模式或只有一个任务:从 top 端获取if ((t = (ForkJoinTask<?>)U.getAndSetReference(a, slotOffset(m & (s = p - 1)), null)) != null)updateTop(s); // 更新 top 指针break;}// FIFO 模式:从 base 端获取if ((t = (ForkJoinTask<?>)U.getAndSetReference(a, slotOffset(m & b), null)) != null) {updateBase(nb); // 更新 base 指针break;}// 自旋等待直到 base 值稳定while (b == (b = U.getIntAcquire(this, BASE)))Thread.onSpinWait();if (p - b <= 0)break;}}return t;
}
根据配置的模式(FIFO 或 LIFO)获取任务,工作线程一般使用 LIFO 模式提高局部性。
b == (b = U.getIntAcquire(this, BASE))
判断解析
这个判断是在处理并发竞争场景下的自旋等待机制,目的是等待 base
值稳定:
为什么会不等?
-
并发修改情况:在多线程环境中,当前线程在读取
base
值后,其他线程可能已经修改了base
(通过其他线程成功的 poll 操作) -
代码执行过程:
b == (b = U.getIntAcquire(this, BASE))
- 左侧的
b
是之前读取的base
值 - 右侧的
(b = U.getIntAcquire(this, BASE))
是读取当前最新的base
值并赋给b
- 如果两次读取的值不同,表示
base
已被其他线程更改
- 左侧的
-
不等的情况:当其他线程成功窃取了队列中的任务并增加了
base
值,此判断将返回false
,线程会脱离自旋状态
这是一种自旋优化机制,目的是:
- 避免在
base
不稳定时进行不必要的操作 - 等待直到
base
稳定(停止变化) - 减少内存流量,通过
Thread.onSpinWait()
提高自旋效率
p-b<=0
判断解析
虽然 WorkQueue 使用的是循环数组,但队列的逻辑仍然是线性的:
-
队列状态表示:
p
(即top
): 下一个 push 位置的索引b
(即base
): 下一个要被窃取任务的索引
-
空队列条件:
- 当
p == b
时,表示队列完全空 - 当
p < b
可能出现在并发操作中(临时状态),但逻辑上也应被视为"空"
- 当
-
循环数组与逻辑索引:
- 循环数组是物理存储方式,通过
& (length-1)
转换逻辑索引到物理位置 - 但
base
和top
作为逻辑索引持续增长,不会循环重置 - 因此
p-b
始终反映队列中待处理元素的逻辑数量
- 循环数组是物理存储方式,通过
3. 任务窃取 - poll 方法
final ForkJoinTask<?> poll() {for (int pb = -1, b; ; pb = b) {ForkJoinTask<?> t; int cap, nb; long k; ForkJoinTask<?>[] a;if ((a = array) == null || (cap = a.length) <= 0)break;t = (ForkJoinTask<?>)U.getReferenceAcquire(a, k = slotOffset((cap - 1) & (b = base)));Object u = U.getReference(a, slotOffset((cap - 1) & (nb = b + 1)));if (base != b) // 检查并发修改;else if (t == null) {if (u == null && top - b <= 0)break; // 队列为空if (pb == b)Thread.onSpinWait(); // 自旋等待状态变化}else if (U.compareAndSetReference(a, k, t, null)) {updateBase(nb); // CAS 成功,更新 basereturn t;}}return null;
}
这是工作窃取的核心实现,其他线程通过这个方法从队列底部窃取任务。
4. 任务执行 - topLevelExec 方法
final void topLevelExec(ForkJoinTask<?> task, int fifo) {while (task != null) {task.doExec(); // 执行当前任务task = nextLocalTask(fifo); // 尝试获取下一个本地任务}
}
执行给定任务,然后继续执行队列中的其他任务,直到队列为空。
5. 辅助方法 - tryRemoveAndExec 方法
这个方法的主要作用是:从当前工作队列中寻找指定任务,如果找到则移除并执行它。它是 ForkJoin 框架中任务 join 操作的核心实现部分。
final void tryRemoveAndExec(ForkJoinTask<?> task, boolean internal) {ForkJoinTask<?>[] a = array;int b = base, p = top, s = p - 1, d = p - b, cap;if (a != null && (cap = a.length) > 0) {// 从队列顶部向下扫描寻找指定任务for (int m = cap - 1, i = s; d > 0; --i, --d) {long k; boolean taken;ForkJoinTask<?> t = (ForkJoinTask<?>)U.getReference(a, k = slotOffset(i & m));if (t == null)break; // 遇到空槽位,停止扫描if (t == task) { // 找到目标任务if (!internal && !tryLockPhase())break; // 如果是外部调用,尝试锁定队列// 尝试从队列中移除任务if (taken = (top == p && U.compareAndSetReference(a, k, task, null))) {// 根据任务位置调整队列索引if (i == s) // 最顶上的元素updateTop(s);else if (i == base) // 最底下的元素updateBase(i + 1);else { // 中间元素,与顶部元素交换位置后移除顶部U.putReferenceVolatile(a, k, (ForkJoinTask<?>)U.getAndSetReference(a, slotOffset(s & m), null));updateTop(s);}}if (!internal)unlockPhase();if (taken)task.doExec(); // 执行任务break;}}}
}
核心步骤:
- 从队列顶部开始向下扫描
- 寻找与参数 task 相等的任务
- 找到后,将其从队列中移除
- 执行该任务
这个方法主要在以下场景中被调用:
-
任务的 join 操作中:当一个任务调用
join()
方法等待另一个任务完成时,如果被等待的任务恰好在当前线程的队列中,则可以直接执行它而不是等待 -
ForkJoinPool.helpJoin 方法中:
final int helpJoin(ForkJoinTask<?> task, WorkQueue w, boolean internal) {if (w != null)w.tryRemoveAndExec(task, internal); // 首先尝试在自己的队列中找任务// ...其余代码... }
这个方法是 ForkJoin 框架中 "work-stealing" 算法的关键部分,有两个重要作用:
-
避免阻塞:如果任务 A 等待任务 B,而任务 B 恰好在当前线程的队列中等待执行,那么直接执行 B 比让 A 阻塞更有效率
-
防止死锁:如果没有这种机制,可能会出现线程互相等待对方队列中的任务,导致死锁
实际应用示例
假设有以下代码:
ForkJoinTask<Integer> task1 = new RecursiveTask<Integer>() { ... };
ForkJoinTask<Integer> task2 = new RecursiveTask<Integer>() { ... };
task1.fork();
task2.fork();
task1.join(); // 这里会尝试使用 tryRemoveAndExec
当执行到 task1.join()
时:
- 检查 task1 是否已完成,如果未完成
- 调用
tryRemoveAndExec
尝试从当前线程的队列中找到并执行 task1 - 如果找到并执行了,则 join 操作立即返回而不会阻塞
这体现了 ForkJoin 框架的核心设计理念:线程尽可能地处理自己的任务,避免不必要的阻塞和线程切换。
6. 辅助完成 - helpComplete 方法
helpComplete 的执行流程
- 检查传入任务和当前工作队列的状态
- 尝试从队列顶部获取一个任务
- 检查这个任务是否为 CountedCompleter 类型
- 通过 completer 引用向上追溯,检查它是否与目标任务相关联
- 如果相关联,从队列中移除并执行该任务
- 根据 limit 参数决定是否继续处理更多任务
与 tryRemoveAndExec 的关键区别
-
任务类型不同:
tryRemoveAndExec
: 适用于任何类型的ForkJoinTask
,主要用于join()
操作helpComplete
: 专门针对CountedCompleter
类型的任务
-
查找逻辑不同:
tryRemoveAndExec
: 从队列顶部向下扫描,查找特定的任务实例helpComplete
: 寻找任务依赖链上的 CountedCompleter 任务(通过 completer 引用链)
-
目标不同:
tryRemoveAndExec
: 只寻找并执行一个特定的任务helpComplete
: 可以帮助执行多个子任务(由limit参数控制)
-
适用场景:
tryRemoveAndExec
: join操作中,避免线程阻塞helpComplete
: 帮助完成CountedCompleter的子任务,支持可数完成模型
总结
tryRemoveAndExec
是寻找特定任务实例的针对性方法,主要用于 join 操作helpComplete
是针对 CountedCompleter 任务的专用辅助方法,帮助完成任务树- CountedCompleter 提供了一套基于计数的任务完成机制,特别适合多层树形任务结构
final int helpComplete(ForkJoinTask<?> task, boolean internal, int limit) {int status = 0;if (task != null) {outer: for (;;) {// 查找与根任务相关的待完成任务ForkJoinTask<?>[] a; boolean taken; Object o;int stat, p, s, cap;if ((stat = task.status) < 0) {status = stat;break;}if ((a = array) == null || (cap = a.length) <= 0)break;// 获取顶部任务long k = slotOffset((cap - 1) & (s = (p = top) - 1));if (!((o = U.getReference(a, k)) instanceof CountedCompleter))break;CountedCompleter<?> t = (CountedCompleter<?>)o, f = t;// 检查任务关系for (int steps = cap;;) {if (f == task)break;if ((f = f.completer) == null || --steps == 0)break outer;}// 移除并执行任务if (!internal && !tryLockPhase())break;if (taken = (top == p &&U.compareAndSetReference(a, k, t, null)))updateTop(s);if (!internal)unlockPhase();if (!taken)break;t.doExec();if (limit != 0 && --limit == 0)break;}}return status;
}
WorkQueue 的关键特点
1. 工作窃取算法实现
WorkQueue 支持经典的工作窃取(work-stealing)算法:
- 本地操作:线程操作自己队列的
top
端(LIFO 模式),提高缓存局部性 - 窃取操作:其他线程从队列的
base
端窃取任务(FIFO 模式) - 无锁设计:通过原子操作(CAS)实现高效并发
2. 分离的提交队列和工作队列
ForkJoinPool 将队列分为两类:
- 工作队列(odd 索引):有 owner 线程的队列,用于线程执行和窃取任务
- 提交队列(even 索引):无 owner 的队列,用于外部提交任务
3. 高效内存管理
- 动态扩容:队列满时会自动扩容
- 避免伪共享:通过
@Contended
注解分离频繁访问的字段 - Unsafe 操作:使用
Unsafe
类的 CAS 操作实现高效并发访问
4. 协同处理机制
- 源追踪: 通过
source
字段记录窃取源,帮助实现 join 和 helpComplete - 计数: 通过
nsteals
记录窃取次数,用于负载统计和调优
总结
WorkQueue 是 ForkJoinPool 的核心组件,通过精心设计的双端队列实现高效的工作窃取算法。其关键特点包括:
- 双端队列设计,支持 LIFO/FIFO 两种工作模式
- 无锁并发访问,使用 CAS 操作确保线程安全
- 高效的任务窃取和协助机制
- 内存优化设计,避免伪共享问题
这种设计使 ForkJoinPool 能高效地执行细粒度的并行任务,特别适合递归分治算法。
CountedCompleter 介绍
CountedCompleter
是 ForkJoinTask 的一个特殊子类,设计用于可以追踪子任务数量的分治算法。
核心特性
- 内部计数器机制:每个CountedCompleter维护一个等待完成的子任务计数器
- 自动触发完成:当计数器归零时,自动触发完成逻辑
- 完成通知链:通过completer引用形成父子任务链,子任务完成会通知父任务
public abstract class CountedCompleter<T> extends ForkJoinTask<T> {// 指向父任务的引用final CountedCompleter<?> completer;// 待完成的子任务数量volatile int pending;// 在所有子任务完成后执行public void onCompletion(CountedCompleter<?> caller) { }// 减少待完成任务计数,如果归零则触发完成逻辑public final void tryComplete() {CountedCompleter<?> a = this, s = a;for (int c;;) {if ((c = a.pending) == 0) {a.onCompletion(s);if ((a = (s = a).completer) == null) {s.quietlyComplete();return;}}else if (U.compareAndSetInt(a, PENDING, c, c - 1))return;}}// 其他方法...
}
适用场景
- 树形依赖计算:子任务可以进一步分解,形成多层任务树
- 任务间有依赖关系:某任务需要等待其他多个任务完成后才能继续
- 并行聚合处理:多个任务并行处理,最后汇总结果
用法示例
public class SumTask extends CountedCompleter<Long> {final long[] array;final int lo, hi;long sum = 0;SumTask(CountedCompleter<?> parent, long[] array, int lo, int hi) {super(parent);this.array = array; this.lo = lo; this.hi = hi;}@Overridepublic void compute() {if (hi - lo <= 100) { // 小任务直接计算for (int i = lo; i < hi; ++i)sum += array[i];} else { // 大任务分解int mid = (lo + hi) >>> 1;// 设置等待2个子任务setPendingCount(2);// 创建子任务new SumTask(this, array, lo, mid).fork();new SumTask(this, array, mid, hi).fork();// 等待子任务完成tryComplete();}}@Overridepublic void onCompletion(CountedCompleter<?> caller) {if (caller != this) {SumTask child = (SumTask)caller;this.sum += child.sum;}}@Overridepublic Long getRawResult() {return sum;}
}
CountedCompleter 与 RecursiveTask 的差异与作用
CountedCompleter 和 RecursiveTask 是 ForkJoinTask 的两个不同子类,它们在并行计算模型上有显著区别,适用于不同场景。
1. 任务完成机制
RecursiveTask:
- 使用传统的"等待-获取结果"模式
- 通过
fork()
和join()
显式控制任务依赖 - 父任务必须等待子任务完成才能继续执行
CountedCompleter:
- 使用"计数完成"模式
- 通过内部计数器跟踪依赖的子任务数量
- 当所有子任务完成时自动触发回调(无需显式join)
2. 代码结构对比
RecursiveTask:
class SumTask extends RecursiveTask<Long> {final long[] array;final int lo, hi;@Overrideprotected Long compute() {if (hi - lo <= 100) { // 直接计算long sum = 0;for (int i = lo; i < hi; ++i) sum += array[i];return sum;}int mid = (lo + hi) >>> 1;SumTask left = new SumTask(array, lo, mid);SumTask right = new SumTask(array, mid, hi);left.fork();long rightResult = right.compute(); // 直接计算右侧long leftResult = left.join(); // 等待左侧完成return leftResult + rightResult; // 合并结果}
}
CountedCompleter:
class SumTask extends CountedCompleter<Long> {long sum = 0;final long[] array;final int lo, hi;SumTask sibling;@Overridepublic void compute() {if (hi - lo <= 100) {for (int i = lo; i < hi; ++i) sum += array[i];tryComplete(); // 标记当前任务完成} else {int mid = (lo + hi) >>> 1;setPendingCount(2); // 设置等待2个子任务new SumTask(this, array, lo, mid).fork();new SumTask(this, array, mid, hi).fork();}}@Overridepublic void onCompletion(CountedCompleter<?> caller) {if (caller != this) {SumTask child = (SumTask)caller;this.sum += child.sum; // 汇总子任务结果}}
}
3. 任务协调方式
RecursiveTask:
- 使用阻塞式 join() 方法等待任务完成
- 自顶向下的操作流程(先分解,后合并)
- 需要主动收集子任务结果
CountedCompleter:
- 使用计数器和回调机制处理任务完成
- 自底向上的通知机制(子任务完成通知父任务)
- 通过
onCompletion()
自动处理结果合并
使用场景对比
RecursiveTask 适用于:
- 简单的分治算法,如归并排序、求和等
- 任务结构比较规则、子任务数量固定的场景
- 层次化数据处理,需要从子任务收集结果的场景
CountedCompleter 适用于:
- 任务树结构复杂,子任务数量动态变化的场景
- 需要避免过多阻塞的高性能场景
- 异步任务处理,不需立即获取结果的场景
- 复杂图算法、广度优先搜索等多依赖任务
CountedCompleter 和 RecursiveTask 在 ForkJoin 框架中针对不同计算模型提供了不同的并行处理方案:
- RecursiveTask 提供了简单直观的分治模型,适合传统的分而治之算法
- CountedCompleter 提供了更灵活的计数完成模型,强调非阻塞操作和自动完成通知
为什么有 helpComplete 方法
-
CountedCompleter 的特殊性质
在
ForkJoinTask.awaitDone()
方法中,有一个专门的分支处理:// ForkJoinTask.java ((this instanceof CountedCompleter) ?p.helpComplete(this, q, internal) :!internal && ((ss = status) & NO_USER_HELP) != 0 ? ss :p.helpJoin(this, q, internal))
系统区分了
CountedCompleter
和普通ForkJoinTask
,为前者提供了特别的帮助机制。 -
依赖传播的高效处理
CountedCompleter 设计了一个完成通知链(通过
completer
引用),这使得在任务等待时,可以专门寻找和当前任务有关联的其他 CountedCompleter 任务进行处理,而不是随机处理队列中的任务。
虽然 CountedCompleter 不一定是最常用的任务类型,但它解决了一类非常重要的并行计算场景:具有复杂依赖关系的任务树。在大规模数据处理、图算法等领域,这种场景很常见。
-
性能考量
在
ForkJoinPool
中的实现显示了 helpComplete 的主要逻辑:// ForkJoinPool.java - helpComplete 方法 t = (ForkJoinTask<?>)U.getReferenceAcquire(a, k); if (t instanceof CountedCompleter) {CountedCompleter<?> f = (CountedCompleter<?>)t;for (int steps = cap; steps > 0; --steps) {if (f == task) {eligible = true;break;}if ((f = f.completer) == null)break;} }
这段代码专门搜索与目标任务相关的 CountedCompleter,通过 completer 引用链向上追溯,这种针对性搜索比随机任务处理更有效。
-
是否经常使用 CountedCompleter?
虽然 RecursiveTask 可能更常见,但 CountedCompleter 在一些关键场景中非常重要:
- CompletableFuture 内部实现就使用了类似 CountedCompleter 的模式( CompletableFuture 有 Completion 类)
- 大规模并行数据处理框架
- 具有复杂依赖关系的算法
为什么没有其他类似的帮助方法
-
其他任务类型不需要特殊帮助
普通的 ForkJoinTask 和 RecursiveTask 依靠传统的 fork-join 模式,通过
helpJoin
方法就能高效处理。而 CountedCompleter 的依赖传播模型需要特殊的帮助机制。 -
架构设计决策
从代码中可以看出,ForkJoinPool 确实提供了不同的工作窃取策略:
// ForkJoinTask.java - awaitDone 方法 ((this instanceof CountedCompleter) ?p.helpComplete(this, q, internal) : // 针对 CountedCompleter!internal && ((ss = status) & NO_USER_HELP) != 0 ? ss :p.helpJoin(this, q, internal)) // 针对其他任务类型
这显示了框架设计者对不同任务类型做了专门优化,而不是使用单一的通用方法。
-
从 CompletableFuture 实现看重要性
CompletableFuture 的实现与 CountedCompleter 有相似之处,其内部类
Completion
继承自 ForkJoinTask:// CompletableFuture.java abstract static class Completion extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {volatile Completion next;// ... }
这说明这种基于依赖传播的模型在异步编程中非常重要,值得专门优化。
helpComplete
方法的存在反映了 Java 并发库设计者对特定并行计算模式的深入理解和优化。虽然 CountedCompleter 可能不是最常用的任务类型,但它解决了一类重要且复杂的并行计算问题,因此值得专门设计一个高效的帮助机制。
此外,从 CompletableFuture 的实现来看,基于依赖传播的编程模型在 Java 的异步编程中占有重要地位,这也证明了对 CountedCompleter 特别优化的合理性。
ForkJoinPool 类详解
详细的分析见另一篇文章:深入解析ForkJoinPool核心原理-CSDN博客
ForkJoinTask深度解析
见:ForkJoinTask深度解析:Java并行计算利器-CSDN博客