当前位置: 首页 > ops >正文

JUC LongAdder并发计数器设计

Striped64 

java.util.concurrent.atomic 包下的一个关键抽象类,像 LongAdderDoubleAdder 和 LongAccumulator 等都是基于它实现的。它的主要目标是解决在高并发场景下,单个 AtomicLong 或 AtomicDouble 因大量线程竞争同一个变量而导致的性能瓶颈。

其核心设计思想可以总结为 “空间换时间” 和 “热点分离”

核心设计详解

  1. 基础结构:base + Cell[] 数组

    • Striped64 内部维护了两个核心变量来存储值:
      • volatile long base: 一个基础值。在低并发、无竞争的情况下,所有更新操作都直接在 base 上进行 CAS (Compare-And-Swap)。这和 AtomicLong 的行为类似,开销很小。
      • volatile Cell[] cells: 一个 Cell 类型的数组。Cell 是 Striped64 的一个静态内部类,本质上是对 long 值的简单包装,并使用 @Contended 注解来避免“伪共享”(False Sharing),确保不同 Cell 对象不会位于同一个缓存行,从而减少缓存竞争。
  2. 动态的竞争处理机制

    • 无竞争时:线程会尝试直接通过 CAS 更新 base 字段。如果成功,操作就完成了,这是最快路径。
    • 首次出现竞争时:当一个线程尝试 CAS 更新 base 失败时,意味着出现了竞争。此时,Striped64 会进行懒加载,首次创建 cells 数组,初始大小为 2。
    • 竞争加剧时
      • 系统会为每个线程计算一个哈希值(通过 ThreadLocalRandom),并将线程映射到 cells 数组的一个特定槽位(Cell 对象)上。
      • 线程不再尝试更新 base,而是转而去更新自己对应的那个 Cell 的值。
      • 由于不同线程被分散到不同的 Cell 中进行操作,它们之间互相竞争的概率大大降低,从而提升了整体性能。
  3. Cell 数组的动态扩容

    • 如果多个线程被哈希到同一个 Cell 上,导致在这个 Cell 上也发生了竞争,Striped64 会尝试对 cells 数组进行扩容。
    • 扩容过程通过一个简单的自旋锁 (cellsBusy) 来保证线程安全。当一个线程获取到锁后,它会将 cells 数组的长度翻倍。
    • 扩容后,线程的哈希映射会重新计算,使得线程能更均匀地分布到新的、更大的数组中,进一步降低冲突。
    • 数组的扩容有上限,最大不会超过大于等于 CPU 核心数的最小 2 的幂次方,因为当线程数超过 CPU 核心数时,多余的线程也无法并行执行,过度扩容没有意义。
  4. 最终值的计算

    • 由于值被分散存储在 base 和 cells 数组中,当需要获取当前总和时(例如调用 LongAdder.sum()),Striped64 会遍历整个 cells 数组,累加所有 Cell 的 value 字段,最后再加上 base 的值,得到最终结果。
    • 需要注意的是,获取总和时并没有加锁,因此结果是一个“弱一致性”的值。如果在获取期间有其他线程正在修改值,这个总和可能不是最新的,但在很多统计类场景下,这种弱一致性是可以接受的。

总结

Striped64 的设计精髓在于,它巧妙地将对单个共享变量的集中竞争分散到了一个**Cell 数组**上。通过为每个线程或一小组线程分配独立的计数单元,它极大地减少了线程间的 CAS 冲突和缓存同步开销,从而在高并发环境下实现了非常高的吞吐量。这是一种典型的用空间(Cell 数组)换取时间(减少竞争等待)的并发编程模式。

longAccumulate 方法

首先,我们要明确 longAccumulate 在何时被调用。它并不是线程更新数据的第一选择。通常,线程会首先尝试以 CAS(Compare-And-Swap)方式直接更新 base 字段。只有当这个 CAS 操作失败时,才意味着出现了竞争,这时线程才会调用 longAccumulate 方法进入更复杂的处理流程。

longAccumulate 的核心使命是:在多线程竞争环境下,以高效、非阻塞的方式完成值的累加操作。 它通过空间换时间、分散热点的方式来实现这一目标。

final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended, int index)
  • x: 需要累加的值。
  • fn: 一个二元操作函数。
    • 在 LongAdder 中,这个参数为 null,方法内部会执行简单的加法 value + x
    • 在 LongAccumulator 中,这里会传入一个用户自定义的函数,例如 (left, right) -> left * right
  • wasUncontended: 一个布尔标记。如果为 false,表示在调用此方法前,对 base 的 CAS 操作已经失败了,说明已经存在竞争。
  • index: 当前线程的 "探针" 值(来自 ThreadLocalRandom),可以看作是线程的哈希码,用于定位其在 cells 数组中的槽位。

核心逻辑:一个无限重试的乐观循环 for (;;)

整个方法的主体是一个死循环,它不断地尝试各种策略来完成累加,直到成功为止。这种乐观的重试模式避免了使用传统锁带来的线程阻塞和上下文切换开销。

第一步:探针值初始化 (Probe Initialization)

// ...
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended, int index) {if (index == 0) {ThreadLocalRandom.current(); // force initializationindex = getProbe();wasUncontended = true;}
// ...
  • 场景index 为 0 是一个特殊状态,表示当前线程的 ThreadLocalRandom 探针值还未初始化。
  • 问题: 如果不处理,所有新线程的探针值都将是 0,它们会集中竞争 cells 数组的第 0 个槽位,这会形成新的性能瓶颈。
  • 解决方案:
    1. ThreadLocalRandom.current(): 这行代码会强制初始化当前线程的 ThreadLocalRandom 实例,从而生成一个非零的随机探针值。
    2. index = getProbe(): 获取这个新生成的、非零的探针值。
    3. wasUncontended = true: 因为我们现在有了一个全新的、大概率不会冲突的探针值,所以可以乐观地认为接下来的尝试不会遇到竞争。

第二步:主循环 - 处理三种主要情况

循环体内部分为三个大的分支,对应着 cells 数组的三种状态:已初始化未初始化、以及所有尝试失败后的最终回退

情况 A: cells 数组已存在 ((cs = cells) != null && (n = cs.length) > 0)

这是最常见的分支。当竞争发生后,cells 数组已经被创建。线程会根据自己的探针值(index)定位到一个槽位。

  1. 目标槽位为空,尝试创建新 Cell

    // ...
    if ((c = cs[(n - 1) & index]) == null) {if (cellsBusy == 0) {       // Try to attach new CellCell r = new Cell(x);   // Optimistically createif (cellsBusy == 0 && casCellsBusy()) {try {               // Recheck under lock// ...if (rs[j = (m - 1) & index] == null) {rs[j] = r;break;}} finally {cellsBusy = 0;}// ...}}// ...
    }
    // ...
    
    • 意图: 当前线程哈希到的槽位是空的,需要为它创建一个新的 Cell 并放入。
    • 动作:
      • 乐观尝试: 首先检查 cellsBusy 锁是否空闲(cellsBusy == 0),如果空闲,就“乐观地”创建一个 Cell 对象。
      • 获取锁: 尝试通过 CAS 获取 cellsBusy 自旋锁。
      • 双重检查 (Double-Checking): 这是关键!在持有锁之后,必须重新检查所有条件:cells 数组是否被其他线程改变了?目标槽位是否仍然是 null?这是为了防止在当前线程等待并获取锁的期间,有其他线程已经扩容了数组或在同一个槽位创建了 Cell
      • 成功: 如果所有检查都通过,就把新创建的 Cell 放入槽位,然后 break 退出整个 longAccumulate 方法,任务完成。
      • 失败: 如果获取锁失败或双重检查失败,说明有其他线程在操作,那么就放弃本次创建,进入下一次大循环重试。
  2. 目标槽位存在,直接尝试 CAS 更新

    // ...
    else if (c.cas(v = c.value,(fn == null) ? v + x : fn.applyAsLong(v, x)))break;
    // ...
    
    • 意图: 槽位里已经有 Cell 了,直接尝试更新它的 value
    • 动作: 对 Cell 的 value 字段执行 CAS 操作。如果成功,说明没有其他线程同时更新这个 Cell,任务完成,break 退出。
  3. CAS 更新 Cell 失败 -> 发生哈希冲突 (Collision)

    // ...
    else if (n >= NCPU || cells != cs)collide = false;            // At max size or stale
    else if (!collide)collide = true;
    else if (cellsBusy == 0 && casCellsBusy()) {try {if (cells == cs)        // Expand table unless stalecells = Arrays.copyOf(cs, n << 1);} finally {cellsBusy = 0;}// ...continue;                   // Retry with expanded table
    }
    index = advanceProbe(index);
    // ...
    
    • 意图: CAS 失败意味着至少有两个线程在竞争同一个 Cell。这是“哈希冲突”,需要解决。
    • 解决策略:
      • 策略一:扩容数组 (Expand Table)
        • else if (!collide) collide = true;: 这是一个精妙的设计。第一次在 Cell 上 CAS 失败时,先不急于扩容,而是将 collide 标记设为 true。这相当于“再给你一次机会”。
        • else if (cellsBusy == 0 && casCellsBusy()): 如果 collide 已经是 true(意味着在同一个 Cell 上连续两次 CAS 失败),就认为冲突是真实的,值得去扩容。此时,线程会尝试获取 cellsBusy 锁。
        • 获取锁后: 同样进行双重检查,确保数组没被修改过,然后将其大小翻倍 Arrays.copyOf(cs, n << 1)。扩容后,线程的哈希值会映射到更广的范围,从而降低冲突概率。
      • 策略二:重新哈希 (Rehash)
        • index = advanceProbe(index);: 如果扩容的条件不满足(例如数组已达到 CPU 核心数的最大上限,或没抢到锁),线程不会原地等待。它会调用 advanceProbe 来改变自己的探针值。
        • 这相当于“此路不通,换条路走”。在下一次循环中,线程会映射到数组的不同槽位,从而动态地避开当前的竞争热点。
情况 B: cells 数组未初始化 (else if (cellsBusy == 0 && cells == cs && casCellsBusy()))
 else if (cellsBusy == 0 && cells == cs && casCellsBusy()) {try {                           // Initialize tableif (cells == cs) {Cell[] rs = new Cell[2];rs[index & 1] = new Cell(x);cells = rs;break;}} finally {cellsBusy = 0;}}
  • 意图cells 数组还是 null,当前线程是第一个(或之一)尝试初始化它的线程。
  • 动作:
    • 尝试获取 cellsBusy 锁。
    • 获取锁后: 再次进行双重检查,确保在等待锁的过程中没有其他线程已经完成了初始化。
    • 如果检查通过,就创建一个大小为 2 的新 Cell 数组,并将当前值 x 放入其中一个槽位,完成初始化,然后 break 退出。
情况 C: 最终回退 (Fallback to base)
// ...// Fall back on using baseelse if (casBase(v = base,(fn == null) ? v + x : fn.applyAsLong(v, x)))break;
// ...
  • 意图: 如果上述所有尝试都暂时失败了(比如,初始化 cells 时没抢到锁,扩容时也没抢到锁),线程不能无限自旋等待,必须做点什么来保证任务能向前推进。
  • 动作: 回到最初的起点,再次尝试对 base 字段进行 CAS 操作。虽然 base 是竞争点,但这是保证系统“活性”的最后手段。如果这次 CAS 成功了,就 break 退出。如果还是失败,也没关系,整个 for(;;) 循环会从头再来一遍,重新评估 cells 数组的最新状态。

总结

longAccumulate 的设计精髓在于其多层次、动态适应的竞争处理机制

  1. 低竞争: 线程首先尝试 CAS base 字段(在 add 方法中,未进入 longAccumulate)。这是最快路径。
  2. 中等竞争: CAS base 失败,进入 longAccumulate
    • 如果 cells 未初始化,加锁初始化一个大小为 2 的数组。
    • 如果 cells 已初始化,线程通过哈希找到自己的 Cell,并尝试 CAS 更新它。
  3. 高竞争: CAS Cell 失败,表明多个线程在竞争同一个 Cell
    • 扩容: 如果连续失败,并且数组未达到容量上限,则加锁将 cells 数组扩容一倍,以分散竞争。
    • Rehash: 如果不能扩容(数组已满或锁被占用),则改变自身哈希值,在下一次循环中尝试其他 Cell
  4. 极端情况/回退: 在尝试加锁(用于初始化或扩容)失败时,不原地等待,而是回退去尝试 CAS base,确保总有线程在推进工作,避免活锁。

这种分层、乐观、动态调整的策略,使得 Striped64 及其子类能够在不同并发强度下都表现出极高的吞吐量,是 java.util.concurrent 包中无锁并发设计的典范之作。

LongAdder

LongAdder 是 Java 8 引入的一个并发包下的类,它的核心目标是解决高并发环境下AtomicLong的性能瓶颈,专门用于“高并发统计汇总”的场景。

AtomicLong 的问题在于,当大量线程同时尝试更新同一个原子变量时,只有一个线程的CAS(Compare-And-Swap)操作会成功,其他线程都会失败并进入自旋重试。这在高并发下会导致激烈的CPU资源竞争,性能急剧下降。

LongAdder 的设计哲学是**“空间换时间”“热点分离”**。它没有将所有更新压力都集中在一个变量上,而是将计数分散到一个Cell数组中,不同的线程可以更新数组中不同的元素,从而大大减少了线程间的直接竞争。

源码分析与语义解读

LongAdder 的实现主要依赖于其父类 Striped64。我们来看一下它的关键实现:

1. 内部结构:base + Cell[]
// ... existing code ...
/*** Base value, used mainly when there is no contention, but also as* a fallback during table initialization. Updated via CAS.*/
transient volatile long base;/*** Table of cells. When non-null, size is a power of two.*/
transient volatile Cell[] cells;
// ... existing code ...
  • base: 这是一个基础计数值。在没有竞争或竞争很低的情况下,系统会优先尝试通过CAS直接更新base值。这样做的好处是,在低并发时,它的性能和AtomicLong几乎一样,避免了维护Cell数组的额外开销。
  • cells: 这是一个Cell类型的数组,CellStriped64的一个静态内部类,本质上是对long值的一个包装,并且使用了@Contended注解来防止“伪共享”(False Sharing),确保不同Cell位于不同的缓存行,避免缓存行失效带来的性能问题。这个数组是懒加载的,只有在base上出现竞争时才会被创建。
2. 核心方法:add(long x)

这是LongAdder最核心的方法,我们来分解它的执行逻辑:

// ... existing code ...
public void add(long x) {Cell[] cs; long b, v; int m; Cell c;// 1. 优先更新 baseif ((cs = cells) != null || !casBase(b = base, b + x)) {int index = getProbe(); // 获取线程的哈希值boolean uncontended = true;// 2. 如果 cells 数组不存在,或当前线程对应的 cell 为空,或更新 cell 失败if (cs == null || (m = cs.length - 1) < 0 ||(c = cs[index & m]) == null ||!(uncontended = c.cas(v = c.value, v + x)))// 3. 进入复杂的 longAccumulate 方法处理longAccumulate(x, null, uncontended, index);}
}
// ... existing code ...

执行流程解读:

  1. 尝试更新 base:代码首先检查cells数组是否为null。如果为null(意味着还没有出现过竞争),它会直接尝试CAS更新base字段 (!casBase(b = base, b + x))。如果CAS成功,add方法直接返回,这是最快路径。
  2. 竞争出现,转战 cells 数组:如果base的CAS更新失败,或者cells数组已经存在,说明存在并发竞争。这时,代码会:
    • 通过getProbe()为当前线程计算一个哈希值。
    • 使用哈希值定位到cells数组中的一个Cell (c = cs[index & m])。
    • 尝试CAS更新这个Cell的值 (c.cas(...))。
  3. 处理复杂情况 (longAccumulate):如果上述步骤依然失败(比如cells数组还没初始化、当前线程对应的Cellnull、或者对Cell的CAS更新也失败了),就会调用Striped64中的longAccumulate方法。这个方法是LongAdder的“后备保障”,它负责处理所有复杂情况,包括:
    • 初始化cells数组:如果数组为null,它会加锁(通过cellsBusy自旋锁)并创建数组。
    • 创建Cell:如果线程对应的Cell槽位是空的,它会创建一个新的Cell对象并放入。
    • 扩容cells数组:如果多个线程哈希到同一个Cell并持续产生竞争,它会扩容cells数组(通常是翻倍),以分散竞争。扩容过程也是通过cellsBusy锁来保证线程安全。
    • 重试:在完成上述操作后,会再次尝试更新Cellbase
3. 获取总和:sum()
// ... existing code ...
public long sum() {Cell[] cs = cells;long sum = base;if (cs != null) {for (Cell c : cs)if (c != null)sum += c.value;}return sum;
}
// ... existing code ...

sum()方法的逻辑很简单:将base的值与cells数组中所有Cell的值相加,得到最终结果。

重要语义sum()返回的值不是一个原子快照。在调用sum()的过程中,其他线程可能仍在修改basecells中的值。因此,返回的sum是一个“弱一致性”的结果,它可能不包含那些正在进行的更新。这对于统计场景来说通常是可以接受的,因为我们关心的是一个近似准确的总量,而不是一个精确到某个时间点的瞬时值。

用法与适用场景

如何使用?

LongAdder的用法非常直观,它提供了和AtomicLong类似的方法:

  • new LongAdder(): 创建一个初始值为0的计数器。
  • add(long x): 增加指定的值。
  • increment(): 加1,等价于add(1)
  • decrement(): 减1,等价于add(-1)
  • sum(): 获取当前总和。
  • reset(): 重置为0。
  • sumThenReset(): 获取当前总和,然后重置为0。

示例:统计网站API调用次数

// 创建一个全局的计数器
public final LongAdder apiCallCounter = new LongAdder();// 在处理API请求的方法中
public void handleApiRequest() {// ... 处理业务逻辑 ...apiCallCounter.increment(); // 每次调用,计数器加1
}// 在后台监控或报表任务中获取总数
public void generateReport() {long totalCalls = apiCallCounter.sum();System.out.println("Total API calls so far: " + totalCalls);
}

示例:实现高性能的并发频率统计

LongAdderConcurrentHashMap是绝配,可以构建一个高性能的并发计数器集合。

ConcurrentHashMap<String, LongAdder> wordCounts = new ConcurrentHashMap<>();// 多线程处理文本
public void processText(String text) {for (String word : text.split("\\s+")) {// computeIfAbsent 保证了原子性地创建 LongAdder// increment() 充分利用了 LongAdder 的高并发性能wordCounts.computeIfAbsent(word, k -> new LongAdder()).increment();}
}

在这个例子中,如果多个线程同时处理包含相同单词的文本,它们会对同一个LongAdder实例进行incrementLongAdder内部的分散竞争机制会确保这里的性能远高于使用AtomicLong

适用场景
  1. 高并发统计:如网站PV/UV、API调用次数、日志数量等统计。
  2. 性能监控:用于收集系统或应用内部的性能指标。
  3. 数据汇总:在并行计算中,各个线程可以更新自己的LongAdder,最后将结果汇总。
不适用场景
  • 需要精确原子快照的场景:如果你需要一个在某个时间点绝对准确的值,并基于这个值做进一步的原子操作(例如,if (counter.get() > 10) { counter.compareAndSet(...) }),那么LongAdder不适合你,因为它无法提供get()compareAndSet这样的原子组合操作。在这种情况下,你应该继续使用AtomicLong

总结

LongAdder 是一个为高并发环境优化的“增强版”AtomicLong。它通过将热点数据分散到baseCell数组中,以空间换时间,极大地提升了多线程下的写入(add/increment)性能。它的核心代价是sum()方法返回的是一个非原子快照的近似值,并且占用了更多的内存。

因此,选择LongAdder还是AtomicLong的关键在于你的业务场景:

  • 追求高写入吞吐量,用于统计汇总,能容忍弱一致性的读取 -> LongAdder
  • 需要精确的原子操作(如compareAndSet),或并发度不高 -> AtomicLong

http://www.xdnf.cn/news/17795.html

相关文章:

  • 优先级反转问题
  • 基于阿里云音频识别模型的网页语音识别系统实现
  • Flink中基于时间的合流--双流联结(join)
  • 【Doris】-工具SQLConverter
  • Stagehand深度解析:从开源自动化工具到企业级RPA平台的演进之路
  • VisualStudio2022调试Unity C#代码步骤
  • 第2篇_Go语言基础语法_变量常量与数据类型
  • Android项目中Ktor的引入与使用实践
  • 在 Linux 服务器搭建Coturn即ICE/TURN/STUN实现P2P(点对点)直连
  • 图论Day3学习心得
  • 无脑整合springboot2.7+nacos2.2.3+dubbo3.2.9实现远程调用及配置中心
  • 计算机网络 THU 考研专栏简介
  • L2 级别自动驾驶 硬件架构设计
  • LeetCode 922.按奇偶排序数组2
  • ElasticSearch不同环境同步索引数据
  • Spring Ai 如何配置以及如何搭建
  • Jmeter自定义脚本
  • 零基础学会制作 基于STM32单片机智能加湿系统/加湿监测/蓝牙系统/监测水量
  • 探索无人机图传技术:创新视野与无限可能
  • 在 macOS 上顺利安装 lapsolver
  • OpenCV Python——VSCode编写第一个OpenCV-Python程序 ,图像读取及翻转cv2.flip(上下、左右、上下左右一起翻转)
  • 死锁总结及解决方案
  • 关于截屏时实现游戏暂停以及本地和上线不同步问题
  • 用GPT解释“GPT-5”是什么,有什么优势
  • python-pycharm切换python各种版本的环境与安装python各种版本的环境(pypi轮子下载)
  • Flink Stream API 源码走读 - map 和 flatMap
  • KNN(k近邻算法)
  • Chrome插件开发实战:从架构到发布全流程
  • 准备用Qt6 重写音视频会议系统服务端
  • 开源 Arkts 鸿蒙应用 开发(十五)自定义绘图控件--仪表盘