JUC LongAdder并发计数器设计
Striped64
java.util.concurrent.atomic
包下的一个关键抽象类,像 LongAdder
、DoubleAdder
和 LongAccumulator
等都是基于它实现的。它的主要目标是解决在高并发场景下,单个 AtomicLong
或 AtomicDouble
因大量线程竞争同一个变量而导致的性能瓶颈。
其核心设计思想可以总结为 “空间换时间” 和 “热点分离”。
核心设计详解
基础结构:
base
+Cell[]
数组Striped64
内部维护了两个核心变量来存储值:volatile long base
: 一个基础值。在低并发、无竞争的情况下,所有更新操作都直接在base
上进行 CAS (Compare-And-Swap)。这和AtomicLong
的行为类似,开销很小。volatile Cell[] cells
: 一个Cell
类型的数组。Cell
是Striped64
的一个静态内部类,本质上是对long
值的简单包装,并使用@Contended
注解来避免“伪共享”(False Sharing),确保不同Cell
对象不会位于同一个缓存行,从而减少缓存竞争。
动态的竞争处理机制
- 无竞争时:线程会尝试直接通过 CAS 更新
base
字段。如果成功,操作就完成了,这是最快路径。 - 首次出现竞争时:当一个线程尝试 CAS 更新
base
失败时,意味着出现了竞争。此时,Striped64
会进行懒加载,首次创建cells
数组,初始大小为 2。 - 竞争加剧时:
- 系统会为每个线程计算一个哈希值(通过
ThreadLocalRandom
),并将线程映射到cells
数组的一个特定槽位(Cell
对象)上。 - 线程不再尝试更新
base
,而是转而去更新自己对应的那个Cell
的值。 - 由于不同线程被分散到不同的
Cell
中进行操作,它们之间互相竞争的概率大大降低,从而提升了整体性能。
- 系统会为每个线程计算一个哈希值(通过
- 无竞争时:线程会尝试直接通过 CAS 更新
Cell 数组的动态扩容
- 如果多个线程被哈希到同一个
Cell
上,导致在这个Cell
上也发生了竞争,Striped64
会尝试对cells
数组进行扩容。 - 扩容过程通过一个简单的自旋锁 (
cellsBusy
) 来保证线程安全。当一个线程获取到锁后,它会将cells
数组的长度翻倍。 - 扩容后,线程的哈希映射会重新计算,使得线程能更均匀地分布到新的、更大的数组中,进一步降低冲突。
- 数组的扩容有上限,最大不会超过大于等于 CPU 核心数的最小 2 的幂次方,因为当线程数超过 CPU 核心数时,多余的线程也无法并行执行,过度扩容没有意义。
- 如果多个线程被哈希到同一个
最终值的计算
- 由于值被分散存储在
base
和cells
数组中,当需要获取当前总和时(例如调用LongAdder.sum()
),Striped64
会遍历整个cells
数组,累加所有Cell
的value
字段,最后再加上base
的值,得到最终结果。 - 需要注意的是,获取总和时并没有加锁,因此结果是一个“弱一致性”的值。如果在获取期间有其他线程正在修改值,这个总和可能不是最新的,但在很多统计类场景下,这种弱一致性是可以接受的。
- 由于值被分散存储在
总结
Striped64
的设计精髓在于,它巧妙地将对单个共享变量的集中竞争,分散到了一个**Cell
数组**上。通过为每个线程或一小组线程分配独立的计数单元,它极大地减少了线程间的 CAS 冲突和缓存同步开销,从而在高并发环境下实现了非常高的吞吐量。这是一种典型的用空间(Cell
数组)换取时间(减少竞争等待)的并发编程模式。
longAccumulate
方法
首先,我们要明确 longAccumulate
在何时被调用。它并不是线程更新数据的第一选择。通常,线程会首先尝试以 CAS(Compare-And-Swap)方式直接更新 base
字段。只有当这个 CAS 操作失败时,才意味着出现了竞争,这时线程才会调用 longAccumulate
方法进入更复杂的处理流程。
longAccumulate
的核心使命是:在多线程竞争环境下,以高效、非阻塞的方式完成值的累加操作。 它通过空间换时间、分散热点的方式来实现这一目标。
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended, int index)
x
: 需要累加的值。fn
: 一个二元操作函数。- 在
LongAdder
中,这个参数为null
,方法内部会执行简单的加法value + x
。 - 在
LongAccumulator
中,这里会传入一个用户自定义的函数,例如(left, right) -> left * right
。
- 在
wasUncontended
: 一个布尔标记。如果为false
,表示在调用此方法前,对base
的 CAS 操作已经失败了,说明已经存在竞争。index
: 当前线程的 "探针" 值(来自ThreadLocalRandom
),可以看作是线程的哈希码,用于定位其在cells
数组中的槽位。
核心逻辑:一个无限重试的乐观循环 for (;;)
整个方法的主体是一个死循环,它不断地尝试各种策略来完成累加,直到成功为止。这种乐观的重试模式避免了使用传统锁带来的线程阻塞和上下文切换开销。
第一步:探针值初始化 (Probe Initialization)
// ...
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended, int index) {if (index == 0) {ThreadLocalRandom.current(); // force initializationindex = getProbe();wasUncontended = true;}
// ...
- 场景:
index
为 0 是一个特殊状态,表示当前线程的ThreadLocalRandom
探针值还未初始化。 - 问题: 如果不处理,所有新线程的探针值都将是 0,它们会集中竞争
cells
数组的第 0 个槽位,这会形成新的性能瓶颈。 - 解决方案:
ThreadLocalRandom.current()
: 这行代码会强制初始化当前线程的ThreadLocalRandom
实例,从而生成一个非零的随机探针值。index = getProbe()
: 获取这个新生成的、非零的探针值。wasUncontended = true
: 因为我们现在有了一个全新的、大概率不会冲突的探针值,所以可以乐观地认为接下来的尝试不会遇到竞争。
第二步:主循环 - 处理三种主要情况
循环体内部分为三个大的分支,对应着 cells
数组的三种状态:已初始化、未初始化、以及所有尝试失败后的最终回退。
情况 A: cells
数组已存在 ((cs = cells) != null && (n = cs.length) > 0
)
这是最常见的分支。当竞争发生后,cells
数组已经被创建。线程会根据自己的探针值(index
)定位到一个槽位。
目标槽位为空,尝试创建新
Cell
// ... if ((c = cs[(n - 1) & index]) == null) {if (cellsBusy == 0) { // Try to attach new CellCell r = new Cell(x); // Optimistically createif (cellsBusy == 0 && casCellsBusy()) {try { // Recheck under lock// ...if (rs[j = (m - 1) & index] == null) {rs[j] = r;break;}} finally {cellsBusy = 0;}// ...}}// ... } // ...
- 意图: 当前线程哈希到的槽位是空的,需要为它创建一个新的
Cell
并放入。 - 动作:
- 乐观尝试: 首先检查
cellsBusy
锁是否空闲(cellsBusy == 0
),如果空闲,就“乐观地”创建一个Cell
对象。 - 获取锁: 尝试通过 CAS 获取
cellsBusy
自旋锁。 - 双重检查 (Double-Checking): 这是关键!在持有锁之后,必须重新检查所有条件:
cells
数组是否被其他线程改变了?目标槽位是否仍然是null
?这是为了防止在当前线程等待并获取锁的期间,有其他线程已经扩容了数组或在同一个槽位创建了Cell
。 - 成功: 如果所有检查都通过,就把新创建的
Cell
放入槽位,然后break
退出整个longAccumulate
方法,任务完成。 - 失败: 如果获取锁失败或双重检查失败,说明有其他线程在操作,那么就放弃本次创建,进入下一次大循环重试。
- 乐观尝试: 首先检查
- 意图: 当前线程哈希到的槽位是空的,需要为它创建一个新的
目标槽位存在,直接尝试 CAS 更新
// ... else if (c.cas(v = c.value,(fn == null) ? v + x : fn.applyAsLong(v, x)))break; // ...
- 意图: 槽位里已经有
Cell
了,直接尝试更新它的value
。 - 动作: 对
Cell
的value
字段执行 CAS 操作。如果成功,说明没有其他线程同时更新这个Cell
,任务完成,break
退出。
- 意图: 槽位里已经有
CAS 更新
Cell
失败 -> 发生哈希冲突 (Collision)// ... else if (n >= NCPU || cells != cs)collide = false; // At max size or stale else if (!collide)collide = true; else if (cellsBusy == 0 && casCellsBusy()) {try {if (cells == cs) // Expand table unless stalecells = Arrays.copyOf(cs, n << 1);} finally {cellsBusy = 0;}// ...continue; // Retry with expanded table } index = advanceProbe(index); // ...
- 意图: CAS 失败意味着至少有两个线程在竞争同一个
Cell
。这是“哈希冲突”,需要解决。 - 解决策略:
- 策略一:扩容数组 (Expand Table)
else if (!collide) collide = true;
: 这是一个精妙的设计。第一次在Cell
上 CAS 失败时,先不急于扩容,而是将collide
标记设为true
。这相当于“再给你一次机会”。else if (cellsBusy == 0 && casCellsBusy())
: 如果collide
已经是true
(意味着在同一个Cell
上连续两次 CAS 失败),就认为冲突是真实的,值得去扩容。此时,线程会尝试获取cellsBusy
锁。- 获取锁后: 同样进行双重检查,确保数组没被修改过,然后将其大小翻倍
Arrays.copyOf(cs, n << 1)
。扩容后,线程的哈希值会映射到更广的范围,从而降低冲突概率。
- 策略二:重新哈希 (Rehash)
index = advanceProbe(index);
: 如果扩容的条件不满足(例如数组已达到 CPU 核心数的最大上限,或没抢到锁),线程不会原地等待。它会调用advanceProbe
来改变自己的探针值。- 这相当于“此路不通,换条路走”。在下一次循环中,线程会映射到数组的不同槽位,从而动态地避开当前的竞争热点。
- 策略一:扩容数组 (Expand Table)
- 意图: CAS 失败意味着至少有两个线程在竞争同一个
情况 B: cells
数组未初始化 (else if (cellsBusy == 0 && cells == cs && casCellsBusy())
)
else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {try { // Initialize tableif (cells == cs) {Cell[] rs = new Cell[2];rs[index & 1] = new Cell(x);cells = rs;break;}} finally {cellsBusy = 0;}}
- 意图:
cells
数组还是null
,当前线程是第一个(或之一)尝试初始化它的线程。 - 动作:
- 尝试获取
cellsBusy
锁。 - 获取锁后: 再次进行双重检查,确保在等待锁的过程中没有其他线程已经完成了初始化。
- 如果检查通过,就创建一个大小为 2 的新
Cell
数组,并将当前值x
放入其中一个槽位,完成初始化,然后break
退出。
- 尝试获取
情况 C: 最终回退 (Fallback to base
)
// ...// Fall back on using baseelse if (casBase(v = base,(fn == null) ? v + x : fn.applyAsLong(v, x)))break;
// ...
- 意图: 如果上述所有尝试都暂时失败了(比如,初始化
cells
时没抢到锁,扩容时也没抢到锁),线程不能无限自旋等待,必须做点什么来保证任务能向前推进。 - 动作: 回到最初的起点,再次尝试对
base
字段进行 CAS 操作。虽然base
是竞争点,但这是保证系统“活性”的最后手段。如果这次 CAS 成功了,就break
退出。如果还是失败,也没关系,整个for(;;)
循环会从头再来一遍,重新评估cells
数组的最新状态。
总结
longAccumulate
的设计精髓在于其多层次、动态适应的竞争处理机制:
- 低竞争: 线程首先尝试 CAS
base
字段(在add
方法中,未进入longAccumulate
)。这是最快路径。 - 中等竞争: CAS
base
失败,进入longAccumulate
。- 如果
cells
未初始化,加锁初始化一个大小为 2 的数组。 - 如果
cells
已初始化,线程通过哈希找到自己的Cell
,并尝试 CAS 更新它。
- 如果
- 高竞争: CAS
Cell
失败,表明多个线程在竞争同一个Cell
。- 扩容: 如果连续失败,并且数组未达到容量上限,则加锁将
cells
数组扩容一倍,以分散竞争。 - Rehash: 如果不能扩容(数组已满或锁被占用),则改变自身哈希值,在下一次循环中尝试其他
Cell
。
- 扩容: 如果连续失败,并且数组未达到容量上限,则加锁将
- 极端情况/回退: 在尝试加锁(用于初始化或扩容)失败时,不原地等待,而是回退去尝试 CAS
base
,确保总有线程在推进工作,避免活锁。
这种分层、乐观、动态调整的策略,使得 Striped64
及其子类能够在不同并发强度下都表现出极高的吞吐量,是 java.util.concurrent
包中无锁并发设计的典范之作。
LongAdder
LongAdder
是 Java 8 引入的一个并发包下的类,它的核心目标是解决高并发环境下AtomicLong
的性能瓶颈,专门用于“高并发统计汇总”的场景。
AtomicLong
的问题在于,当大量线程同时尝试更新同一个原子变量时,只有一个线程的CAS(Compare-And-Swap)操作会成功,其他线程都会失败并进入自旋重试。这在高并发下会导致激烈的CPU资源竞争,性能急剧下降。
LongAdder
的设计哲学是**“空间换时间”和“热点分离”**。它没有将所有更新压力都集中在一个变量上,而是将计数分散到一个Cell
数组中,不同的线程可以更新数组中不同的元素,从而大大减少了线程间的直接竞争。
源码分析与语义解读
LongAdder
的实现主要依赖于其父类 Striped64
。我们来看一下它的关键实现:
1. 内部结构:base
+ Cell[]
// ... existing code ...
/*** Base value, used mainly when there is no contention, but also as* a fallback during table initialization. Updated via CAS.*/
transient volatile long base;/*** Table of cells. When non-null, size is a power of two.*/
transient volatile Cell[] cells;
// ... existing code ...
base
: 这是一个基础计数值。在没有竞争或竞争很低的情况下,系统会优先尝试通过CAS直接更新base
值。这样做的好处是,在低并发时,它的性能和AtomicLong
几乎一样,避免了维护Cell
数组的额外开销。cells
: 这是一个Cell
类型的数组,Cell
是Striped64
的一个静态内部类,本质上是对long
值的一个包装,并且使用了@Contended
注解来防止“伪共享”(False Sharing),确保不同Cell
位于不同的缓存行,避免缓存行失效带来的性能问题。这个数组是懒加载的,只有在base
上出现竞争时才会被创建。
2. 核心方法:add(long x)
这是LongAdder
最核心的方法,我们来分解它的执行逻辑:
// ... existing code ...
public void add(long x) {Cell[] cs; long b, v; int m; Cell c;// 1. 优先更新 baseif ((cs = cells) != null || !casBase(b = base, b + x)) {int index = getProbe(); // 获取线程的哈希值boolean uncontended = true;// 2. 如果 cells 数组不存在,或当前线程对应的 cell 为空,或更新 cell 失败if (cs == null || (m = cs.length - 1) < 0 ||(c = cs[index & m]) == null ||!(uncontended = c.cas(v = c.value, v + x)))// 3. 进入复杂的 longAccumulate 方法处理longAccumulate(x, null, uncontended, index);}
}
// ... existing code ...
执行流程解读:
- 尝试更新
base
:代码首先检查cells
数组是否为null
。如果为null
(意味着还没有出现过竞争),它会直接尝试CAS更新base
字段 (!casBase(b = base, b + x)
)。如果CAS成功,add
方法直接返回,这是最快路径。 - 竞争出现,转战
cells
数组:如果base
的CAS更新失败,或者cells
数组已经存在,说明存在并发竞争。这时,代码会:- 通过
getProbe()
为当前线程计算一个哈希值。 - 使用哈希值定位到
cells
数组中的一个Cell
(c = cs[index & m]
)。 - 尝试CAS更新这个
Cell
的值 (c.cas(...)
)。
- 通过
- 处理复杂情况 (
longAccumulate
):如果上述步骤依然失败(比如cells
数组还没初始化、当前线程对应的Cell
是null
、或者对Cell
的CAS更新也失败了),就会调用Striped64
中的longAccumulate
方法。这个方法是LongAdder
的“后备保障”,它负责处理所有复杂情况,包括:- 初始化
cells
数组:如果数组为null
,它会加锁(通过cellsBusy
自旋锁)并创建数组。 - 创建
Cell
:如果线程对应的Cell
槽位是空的,它会创建一个新的Cell
对象并放入。 - 扩容
cells
数组:如果多个线程哈希到同一个Cell
并持续产生竞争,它会扩容cells
数组(通常是翻倍),以分散竞争。扩容过程也是通过cellsBusy
锁来保证线程安全。 - 重试:在完成上述操作后,会再次尝试更新
Cell
或base
。
- 初始化
3. 获取总和:sum()
// ... existing code ...
public long sum() {Cell[] cs = cells;long sum = base;if (cs != null) {for (Cell c : cs)if (c != null)sum += c.value;}return sum;
}
// ... existing code ...
sum()
方法的逻辑很简单:将base
的值与cells
数组中所有Cell
的值相加,得到最终结果。
重要语义:sum()
返回的值不是一个原子快照。在调用sum()
的过程中,其他线程可能仍在修改base
或cells
中的值。因此,返回的sum
是一个“弱一致性”的结果,它可能不包含那些正在进行的更新。这对于统计场景来说通常是可以接受的,因为我们关心的是一个近似准确的总量,而不是一个精确到某个时间点的瞬时值。
用法与适用场景
如何使用?
LongAdder
的用法非常直观,它提供了和AtomicLong
类似的方法:
new LongAdder()
: 创建一个初始值为0的计数器。add(long x)
: 增加指定的值。increment()
: 加1,等价于add(1)
。decrement()
: 减1,等价于add(-1)
。sum()
: 获取当前总和。reset()
: 重置为0。sumThenReset()
: 获取当前总和,然后重置为0。
示例:统计网站API调用次数
// 创建一个全局的计数器
public final LongAdder apiCallCounter = new LongAdder();// 在处理API请求的方法中
public void handleApiRequest() {// ... 处理业务逻辑 ...apiCallCounter.increment(); // 每次调用,计数器加1
}// 在后台监控或报表任务中获取总数
public void generateReport() {long totalCalls = apiCallCounter.sum();System.out.println("Total API calls so far: " + totalCalls);
}
示例:实现高性能的并发频率统计
LongAdder
与ConcurrentHashMap
是绝配,可以构建一个高性能的并发计数器集合。
ConcurrentHashMap<String, LongAdder> wordCounts = new ConcurrentHashMap<>();// 多线程处理文本
public void processText(String text) {for (String word : text.split("\\s+")) {// computeIfAbsent 保证了原子性地创建 LongAdder// increment() 充分利用了 LongAdder 的高并发性能wordCounts.computeIfAbsent(word, k -> new LongAdder()).increment();}
}
在这个例子中,如果多个线程同时处理包含相同单词的文本,它们会对同一个LongAdder
实例进行increment
。LongAdder
内部的分散竞争机制会确保这里的性能远高于使用AtomicLong
。
适用场景
- 高并发统计:如网站PV/UV、API调用次数、日志数量等统计。
- 性能监控:用于收集系统或应用内部的性能指标。
- 数据汇总:在并行计算中,各个线程可以更新自己的
LongAdder
,最后将结果汇总。
不适用场景
- 需要精确原子快照的场景:如果你需要一个在某个时间点绝对准确的值,并基于这个值做进一步的原子操作(例如,
if (counter.get() > 10) { counter.compareAndSet(...) }
),那么LongAdder
不适合你,因为它无法提供get()
和compareAndSet
这样的原子组合操作。在这种情况下,你应该继续使用AtomicLong
。
总结
LongAdder
是一个为高并发环境优化的“增强版”AtomicLong
。它通过将热点数据分散到base
和Cell
数组中,以空间换时间,极大地提升了多线程下的写入(add/increment)性能。它的核心代价是sum()
方法返回的是一个非原子快照的近似值,并且占用了更多的内存。
因此,选择LongAdder
还是AtomicLong
的关键在于你的业务场景:
- 追求高写入吞吐量,用于统计汇总,能容忍弱一致性的读取 ->
LongAdder
- 需要精确的原子操作(如
compareAndSet
),或并发度不高 ->AtomicLong