当前位置: 首页 > news >正文

并发编程——17 CPU缓存架构详解高性能内存队列Disruptor实战

1 CPU缓存架构详解

1.1 CPU高速缓存概念

  • CPU高速缓存,是介于CPU与主内存之间,容量小但速度很快的存储器;

    • 分为一级(L1)、二级(L2)、部分高端CPU有三级(L3)缓存,且每一级缓存存储的数据都是下一级缓存的一部分;
    • 技术难度和成本随缓存级别升高而降低,所以容量相对递增;

    在这里插入图片描述

  • 由于CPU速度远高于主内存,若CPU直接从主内存存取数据需等待。而高速缓存中保存着CPU刚用过或循环使用的部分数据,当CPU再次使用这些数据时,可直接从缓存调用,减少等待时间,提升系统效率;

  • 通过下图可知,不同级别的缓存、不同访问方式(如全随机访问、页随机访问、顺序访问)的时钟周期延迟不同。L1缓存的各种访问延迟都很低(如顺序访问仅4个时钟周期),主内存访问延迟最高(达167个时钟周期),这也体现出缓存越靠近CPU,访问速度越快;

    在这里插入图片描述

  • CPU访问存储设备时,存取数据或指令往往聚集在连续区域,包含时间局部性(Temporal Locality)和空间局部性(Spatial Locality);

    • 时间局部性指若一个信息正在被访问,近期很可能再次被访问,像循环、递归、方法的反复调用;
    • 空间局部性指若一个存储器位置被引用,未来其附近位置也会被引用,比如顺序执行的代码、连续创建的对象或数组等。

1.2 CPU 多核缓存架构

  • 现代CPU为提升执行效率、减少与内存交互,集成多级缓存架构,常见为三级缓存结构(L1、L2、L3),还有主内存RAM,且每个CPU核心(如CPU Core1、CPU Core2)都配备CPU寄存器和专属的CPU高速缓存;

    • CPU寄存器在CPU内部,读写速度极快(比缓存和主存还快),但容量极小(仅几十个或几百个字节),只能存少量常用数据,用于加速数据访问和处理;
    • L1、L2、L3缓存速度依次降低,但容量依次增大;主内存速度最慢、容量最大;

    在这里插入图片描述

  • 数据读取流程

    • CPU读取地址数据时,先查L1 Cache,找到就直接读取;
    • 没找到就发请求给L2 Cache,L2没找到再发请求给L3 Cache;
    • L3也没找到,就从主内存读取数据并存储到CPU缓存中;
  • 数据写入流程

    • CPU写入地址数据时,会依据缓存一致性协议,将数据写入L1 Cache,同时可能写入L2 Cache、L3 Cache以及主内存;
    • 写入过程可能只写L1,也可能需写L2、L3和主内存,还会运用缓存行失效、写回等技术提高效率;
  • Linux下查看 Cache Line 大小:

    • 方案一:通过 /sys 文件系统。进入 cpu0 缓存目录(cd /sys/devices/system/cpu/cpu0/cache/index0),查看 coherency_line_size 文件,输出 64 表示 Cache Line 为 64 字节;

      在这里插入图片描述

    • 方案二:通过 proc 文件系统。执行 cat /proc/cpuinfo,查看 cache_alignment 字段,输出 64 也表示 Cache Line 为 64 字节;

      在这里插入图片描述

1.3 CPU多核缓存架构的缓存一致性问题

  • 思考:这种缓存架构在多线程访问的时候存在什么问题?

  • 场景1:

    在这里插入图片描述

  • 场景2:

    在这里插入图片描述

  • 在CPU多核缓存架构里,每个处理器都有单独的缓存。对于共享数据,可能存在多个副本,一个在主内存中,还有一个在请求该数据的每个处理器的本地缓存中。当其中一个数据副本发生更改时,其他副本必须反映出这一更改,也就是说,CPU多核缓存架构需要保证缓存一致性,否则多线程访问共享数据时会出现数据不一致的情况,进而影响程序的正确执行。

1.4 CPU多核缓存架构的缓存一致性问题解决

  • 《64-ia-32-architectures-software-developer-vol-3a-part-1-manual.pdf》中有如下描述:

    在这里插入图片描述

  • 32位的IA - 32处理器支持对系统内存位置进行锁定的原子操作,这些操作通常用于管理共享数据结构(如信号量、段描述符等)。在这些结构中,两个或多个处理器可能同时试图修改相同的字段或标志。处理器使用三种相互依赖的机制来执行锁定的原子操作:

    • 有保证的原子操作
      • 处理器提供一些特殊的指令或者机制,可以保证在多个处理器同时执行原子操作时,它们不会相互干扰,从而保证原子性;
      • 这些指令或者机制的实现通常需要硬件支持。例如x86架构中提供了一系列的原子操作指令,如XADD、XCHG、CMPXCHG等,可以保证在多个处理器同时执行这些指令时,它们不会相互干扰,从而保证原子性;
    • 总线锁定(使用LOCK#信号和LOCK指令前缀)
      • 总线锁定是一种用于确保原子操作的机制,通常会在LOCK指令前缀和一些特殊指令中使用;
      • 在执行LOCK指令前缀时,处理器会将LOCK#信号拉低,这个信号会通知其他处理器当前总线上的数据已经被锁定,从而确保原子性;
    • 缓存一致性协议(确保原子操作在缓存数据结构执行,即缓存锁定)
      • 出现在Pentium 4、Intel Xeon和P6系列处理器中,通过处理器间通信,保证一个处理器修改数据后,其他处理器缓存中的该数据更新或失效,实现缓存与主存数据一致;
      • 缓存锁定基于此协议,利用它确保多个处理器同时修改同一缓存行数据时只有一个处理器获锁,保证原子性,也需硬件支持,不同处理器架构实现有差异;
  • 缓存一致性协议不能使用的特殊情况

    • 操作数据特性限制:操作数据不能被缓存到处理器内部,或操作的数据跨多个缓存行时,处理器会调用总线锁定。不可缓存的设备内存(如显存、网卡缓存等)不受该协议管辖,无法被缓存到处理器缓存行;
    • 处理器支持限制:有些早期处理器(如早期Pentium系列)不支持缓存锁定,只能用总线锁定实现原子操作;现代处理器通常支持缓存锁定,为获更好性能,应尽量用缓存锁定。

1.5 缓存一致性协议实现原理

1.5.1 总线窥探

  • 总线窥探(Bus snooping)是缓存中一致性控制器(snoopy cache)监视或窥探总线事务的一种方案,用于分布式共享内存系统维护缓存一致性,含一致性控制器(snooper)的缓存叫snoopy缓存,该方案由Ravishankar和Goodman于1983年提出;
  • 计算机中处理器与内存间数据通过总线传递,这一系列步骤称为总线事务(Bus Transaction);
  • 工作原理
    • 当特定数据被多个缓存共享,处理器修改共享数据后,更改需传播(数据变更的通知可以通过总线窥探来完成)到其他有该数据副本的缓存,防止违反缓存一致性;
    • 所有窥探者监视总线上的每一个事务,若有一个修改共享缓存块的事务出现在总线上,所有的窥探者都会检查自身缓存是否有共享块的相同副本,若有则执行刷新缓存块或使缓存块失效等操作以确保缓存一致性,这写操作及缓存块状态的改变取决于缓存一致性协议;
  • 根据管理写操作的本地副本的方式,有两种窥探协议:
    • 写失效(Write - invalidate)
      • 当一个处理器写入一个共享缓存块时,其他缓存中的共享副本通过总线窥探失效;
      • 这种方法确保处理器只能读写一个数据的一个副本,其他缓存中的所有其他副本都无效;
      • 这是最常用的窥探协议,MSI、MESI、MOSI、MOESI和MESIF协议都属于此类;
    • 写更新(Write - update)
      • 当一个处理器写入一个共享缓存块时,其他缓存的所有共享副本通过总线窥探更新;
      • 这个方法会将写数据广播到总线上的所有缓存中,相比于上面的 Write - invalidate 协议,会引发更大总线流量,所以该协议不常见;
      • Dragon和firefly协议属于此类。

1.5.2 缓存一致性协议

  • 缓存一致性协议在多处理器系统中应用于高速缓存一致性。为了保持一致性,人们设计了多种协议,如MSI、MESI(又名Illinois)、MOSI、MOESI、MERSI、MESIF、write - once、Synapse、Berkeley、Firefly和Dragon协议等;
  • MESI协议
    • MESI协议是一个基于写失效的缓存一致性协议,是支持回写(write-back)缓存的最常用协议。也称作伊利诺伊协议(Illinois protocol,因为是在伊利诺伊大学厄巴纳-香槟分校被发明的);
    • 缓存行有 4 种不同的状态
      • 已修改(Modified,M):缓存行是“脏的”,与主存值不同。若别的CPU内核要读主存这块数据,该缓存行必须写回主存,状态变为共享(S);
      • 独占(Exclusive,E):缓存行只在当前缓存中,且“干净”(缓存数据同主存数据)。别的缓存读取它时,状态变为共享;当前写数据时,变为已修改状态;
      • 共享(Shared,S):缓存行存在于其他缓存中且未修改,可在任意时刻抛弃;
      • 无效(Invalid,I):缓存行无效;
    • 工作机制
      • MESI 协议用于确保多个处理器之间共享的内存数据的一致性。当一个处理器需要访问某个内存数据时,它首先会检查自己的缓存中是否有该数据的副本。如果缓存中没有该数据的副本,则会发出一个缓存不命中(miss)请求,从主内存中获取该数据的副本,并将该数据的副本存储到自己的缓存中;
      • 当一个处理器发出一个缓存不命中请求时,如果该数据的副本已经存在于另一个处理器或核心的缓存中(即处于共享状态),则该处理器可以从另一个处理器的缓存中复制该数据的副本。这个过程称为缓存到缓存复制(cache-to-cache transfer);
      • 缓存到缓存复制可以减少对主内存的访问,从而提高系统的性能。但是,需要确保数据的一致性,否则会出现数据错误或不一致的情况。因此,在进行缓存到缓存复制时,需要使用MESI协议中的其他状态转换来确保数据的一致性。例如,如果两个缓存都处于修改状态,那么必须先将其中一个缓存的数据写回到主内存,然后才能进行缓存到缓存复制。

1.6 伪共享

1.6.1 简介

  • 伪共享(False Sharing):指的是多个线程虽然操作的是不同变量,但由于这些变量位于同一个**缓存行(Cache Line)**中,导致一个线程修改其中一个变量时,会使得其他线程的缓存行失效,从而引发频繁的缓存同步和主存访问,造成性能下降;

  • ArrayBlockingQueue 是 Java 中的一个有界阻塞队列,它包含三个关键成员变量:

    变量名含义
    takeIndex下一个要取出元素的位置索引(消费者使用)
    putIndex下一个可插入元素的位置索引(生产者使用)
    count队列中当前元素的数量
    • 这三个变量通常会被分配在内存中相邻的位置,容易被放入**同一个缓存行(Cache Line)**中,但是之间修改没有太多的关联。所以每次修改,都会使之前缓存的数据失效,从而不能完全达到共享的效果;

      在这里插入图片描述

      • 两个 CPU 核心分别运行生产者线程(Producer Thread)和消费者线程(Consumer Thread),它们都访问 ArrayBlockingQueue 对象,该对象的三个字段 putIndex, count, takeIndex 被映射到一条缓存行中;
      • 当生产者线程修改putIndexcount,这条缓存行就“失效”;
      • 而此时消费者线程想要修改takeIndexcount,就必须重新加载,这就会导致延迟增加。

1.6.2 避免伪共享的方案

  • 方案 1:缓存行填充

    • 原理:通过填充无用字段,让需要并发访问的变量独占一个 Cache Line;

    • 例:

      class Pointer {volatile long x;//避免伪共享: 缓存行填充long p1, p2, p3, p4, p5, p6, p7;volatile long y;
      }
      
    • 代码中,Pointer 类的 x 和 y 是需要并发访问的 volatile 变量(多线程可见);

    • 中间插入 long 类型的p1p2p3p4p5p6p7这些填充字段,目的是让 x 所在的 Cache Line 和 y 所在的 Cache Line 完全分离(因为 Cache Line 是 64 字节,存储x + 填充字段 + y至少需要两个 Cache Line),避免多线程操作 x 和 y 时互相干扰缓存;

      一个 long 占 8 个字节;

  • 方案 2:使用 @sun.misc.Contended 注解(Java 8+)

    • 原理:通过 JVM 注解和参数,自动实现让某个变量独占 Cache Line;

    • 例:

      public class FalseSharingTest {public static void main(String[] args) throws InterruptedException {testPointer(new Pointer());}private static void testPointer(Pointer pointer) throws InterruptedException {long start = System.currentTimeMillis();Thread t1 = new Thread(() -> {for (int i = 0; i < 100000000; i++) {pointer.x++;  // 线程1对x变量进行1亿次自增操作}});Thread t2 = new Thread(() -> {for (int i = 0; i < 100000000; i++) {pointer.y++;  // 线程2对y变量进行1亿次自增操作}});t1.start();t2.start();t1.join();  // 等待线程1执行完成t2.join();  // 等待线程2执行完成System.out.println(pointer.x+","+pointer.y);  // 输出x和y的最终值System.out.println(System.currentTimeMillis() - start);  // 输出两个线程执行的总耗时}
      }class Pointer {// 这两个volatile变量可能会位于同一个CPU缓存行中,导致伪共享问题// 当不同CPU核心上的线程分别修改x和y时,由于缓存一致性协议(如MESI)// 一个核心修改x会使另一个核心中包含y的缓存行失效,反之亦然// 这会导致大量的缓存行同步操作,降低性能// @Contendedvolatile long x;  // 声明为volatile确保可见性,但不保证原子性volatile long y;  // 声明为volatile确保可见性,但不保证原子性// 解决方案1:使用@Contended注解(需要JVM参数-XX:-RestrictContended解除对该注解的限制)// 解决方案2:添加缓存行填充(padding),如注释掉的7个long变量// 典型缓存行大小为64字节,一个long占8字节,所以需要7个填充变量// long p1, p2, p3, p4, p5, p6, p7;
      }
      
  • 方案 3:使用线程本地内存(如 ThreadLocal)

    • 原理:让每个线程操作自己独有的变量副本,从根源上避免多线程共享同一变量导致的缓存竞争;
    • ThreadLocal 可以为每个线程创建独立的变量副本,线程间互不干扰,自然不会出现多个线程操作同一变量的不同部分、却因 Cache Line 共享导致伪共享的问题。

2 高性能内存队列Disruptor详解

2.1 JUC 包下阻塞队列的缺陷

  • 锁机制影响:JUC 包下的队列大多用 ReentrantLock 锁保证线程安全。在稳定性要求高的系统里,为避免生产者速度过快致内存溢出,只能选有界队列,但加锁会严重影响性能,线程因竞争不到锁被挂起再唤醒,开销大且有死锁隐患;
  • 伪共享问题:有界队列常用数组实现,这会引发伪共享(false sharing)问题,影响性能。

2.2 Disruptor 介绍

  • Disruptor 是英国外汇交易公司 LMAX 开发的高性能队列,初衷是解决内存队列延迟问题(在性能测试中发现延迟与 I/O 操作数量级相当)。基于它开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后受业界关注,2011 年被 Martin Fowler 撰长文介绍,还获 Oracle 官方 Duke 大奖。目前 Apache Storm、Camel、Log4j2 等知名项目都应用它来获取高性能;
  • Disruptor 是一个有界队列,可用于生产者-消费者模型;
  • GitHub:GitHub - LMAX-Exchange/disruptor: High Performance Inter-Thread Messaging Library。

2.3 Disruptor 的高性能设计方案

  • Disruptor 实现高性能的设计方案有以下几种:

    • 环形数组结构:采用数组而非链表,一方面能避免垃圾回收带来的性能损耗;另一方面,数组符合处理器的缓存机制(空间局部性原理),有利于提升数据访问效率;
    • 元素位置定位:数组长度设为 (2^n),借助位运算来加快元素定位速度。下标采用递增形式,且 indexlong 类型,数值极大,即便以每秒 100 万次操作(100 万 QPS)的处理速度,也需要约 30 万年才会用完,不用担心下标溢出问题;
    • 无锁设计:每个生产者或消费者线程,会先申请数组中可操作元素的位置,申请到后直接在该位置进行写入或读取数据操作。整个过程通过原子变量的 CAS 操作来保证线程安全,避免了加锁带来的性能开销;
    • 利用缓存行填充解决伪共享问题:通过合理填充缓存行,减少不同线程操作的数据在缓存行上的冲突,从而提升性能;
    • 基于事件驱动的生产者 - 消费者模型(观察者模式):消费者持续关注队列中是否有新消息,一旦有新消息产生,消费者线程就会立刻进行消费,能快速响应数据的产生与消费,提高处理效率。
  • RingBuffer 数据结构

    • RingBuffer 是 Disruptor 用作队列的数据结构,是一个可自定义大小的环形数组。除了数组本身,还有一个 sequence(序列号),用于指向下一个可用元素,供生产者和消费者使用;

      在这里插入图片描述

    • 要确定元素在环形数组中的位置(索引 index),有两种计算方式:

      • 取模运算:index = sequence % entries.length,其中 entries.length 是数组长度;
      • 位运算(更高效):index = sequence & (entries.length - 1)。但使用位运算的前提是数组长度必须为 (2^n)(2 的 n 次幂),这样能大幅提升计算效率;
    • Disruptor 要求数组长度设为 (2^n),这是为了能高效地通过位运算计算索引,保证存、取数组元素的时间复杂度为 (O(1))(常数时间),非常高效;

    • 当环形数组所有位置都被填满后,若再放入新元素,会覆盖 0 号位置的旧数据;

    • 思考:覆盖数据是否会导致数据丢失呢?

      • 在 Disruptor 的生产者-消费者模型中,需要通过合理的机制(比如消费者的消费速度要跟上生产者的生产速度,或者利用序号等机制确保数据被正确消费)来避免因覆盖导致的关键数据丢失;
      • 如果消费者消费不及时,后续生产者继续生产,就可能覆盖未被消费的数据,从而造成数据丢失,所以 Disruptor 的使用需要结合合适的生产-消费协调策略;
  • Disruptor 针对不同的 CPU 资源情况、延迟和吞吐量要求等,提供了多种等待策略:

    名称措施适用场景
    BlockingWaitStrategy加锁CPU 资源紧缺,吞吐量和延迟并不重要的场景
    BusySpinWaitStrategy自旋通过不断重试,减少切换线程导致的系统调用,降低延迟。推荐在线程绑定到固定 CPU 的场景下使用
    PhasedBackoffWaitStrategy自旋 + yield + 自定义策略CPU 资源紧缺,吞吐量和延迟并不重要的场景
    SleepingWaitStrategy自旋 + yield + sleep性能和 CPU 资源之间有很好的折中。但延迟不均匀
    TimeoutBlockingWaitStrategy加锁,有超时限制CPU 资源紧缺,吞吐量和延迟并不重要的场景
    YieldingWaitStrategy自旋 + yield + 自旋性能和 CPU 资源之间有很好的折中。且延迟比较均匀
  • Disruptor 在日志框架中的应用:

    • Log4j 2 相较于 Log4j 1,在多线程并发场景下性能更优,这一特性源于 Log4j 2 的异步模式采用了 Disruptor 来处理日志。并且在 Log4j 2 的配置文件中,可配置等待策略(WaitStrategy),默认是 Timeout 策略;

      在这里插入图片描述

    • 上图展示了同步(Sync)、使用 ArrayBlockingQueue 队列的异步(Async Appender)以及采用 Disruptor 的全异步(loggers all async)三种日志记录方式的吞吐量(单位:msg/sec,数值越高越好);

      • 单线程情况下,loggers all async(蓝色)和Async Appender(红色)吞吐量相差不大;

      • 当线程数达到 64 时,loggers all async(蓝色)的吞吐量比Async Appender(红色)提升了 12 倍,更是同步模式(Sync)的 68 倍。

2.4 Disruptor 实战

2.4.1 依赖准备及构造器介绍

  • 引入依赖:

    <!-- disruptor -->
    <dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version>
    </dependency>
    
  • Disruptor 构造器:

    public Disruptor(final EventFactory<T> eventFactory, // 创建事件(任务)的工厂类final int ringBufferSize, // 容器的长度final ThreadFactory threadFactory, // 用于创建执行任务的线程final ProducerType producerType, // 生产者类型:单生产者、多生产者final WaitStrategy waitStrategy // 等待策略
    )
    
  • 使用流程:

    • 构建消息载体(事件)
    • 构建生产者
    • 构建消费者
    • 生产消息,消费消息的测试

2.4.2 单生产者单消费者模式

  • 创建 Event(消息载体/事件)和 EventFactory(事件工厂)

    // 订单事件数据类,该类会被放入环形队列中作为消息内容
    @Data
    public class OrderEvent {private long value;     // 订单数值private String name;    // 订单名称
    }// 事件工厂类,用于创建OrderEvent对象实例
    // Disruptor预分配内存,通过工厂模式创建和复用对象,减少GC开销
    public class OrderEventFactory implements EventFactory<OrderEvent> {@Overridepublic OrderEvent newInstance() {return new OrderEvent(); // 创建新的OrderEvent对象}
    }
    
  • 创建消息(事件)生产者

    // 事件生产者类,负责向Disruptor RingBuffer中发布事件
    public class OrderEventProducer {// RingBuffer数据结构private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}// 生产数据方法public void onData(long value, String name) {// 从RingBuffer获取下一个可用的序列号(槽位),这一步可能会阻塞,取决于等待策略long sequence = ringBuffer.next();try {// 根据序列号获取预分配的事件对象OrderEvent orderEvent = ringBuffer.get(sequence);// 设置事件数据(注意:这里是修改已存在的对象,不是创建新对象)orderEvent.setValue(value);orderEvent.setName(name);} catch (Exception e) {e.printStackTrace();} finally {System.out.println("生产者发送数据value:" + value + ",name:" + name);// 发布事件,使消费者可见,只有发布后消费者才能处理该事件ringBuffer.publish(sequence);}}
    }
    
  • 创建消费者:

    // 事件处理器(消费者),实现EventHandler接口处理事件
    public class OrderEventHandler implements EventHandler<OrderEvent> {@Override// 事件回调方法:当有新事件时自动调用// event: 事件对象, sequence: 序列号, endOfBatch: 是否批处理的最后一条public void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {// TODO 消费逻辑System.out.println("消费者获取数据value:" + event.getValue() + ",name:" + event.getName());}
    }
    
  • 测试:

    // Disruptor主程序演示类
    public class DisruptorDemo {public static void main(String[] args) throws Exception {// 1. 创建Disruptor实例Disruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory(),     // 事件工厂,用于创建事件对象1024 * 1024,                 // RingBuffer大小,必须是2的幂次方Executors.defaultThreadFactory(), // 线程工厂,用于创建消费者线程ProducerType.SINGLE,         // 生产者类型:单生产者(多生产者可选MULTI)new YieldingWaitStrategy()   // 等待策略:高性能场景下的策略);// 2. 设置事件处理器(消费者)disruptor.handleEventsWith(new OrderEventHandler());// 3. 启动Disruptor(启动消费者线程)disruptor.start();// 4. 获取RingBuffer引用RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();// 5. 创建生产者OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);// 6. 生产并发送100条消息for (int i = 0; i < 100; i++) {eventProducer.onData(i, "Fox" + i);}// 7. 优雅关闭Disruptor(等待所有事件处理完成)disruptor.shutdown();}
    }
    

2.4.3 单生产者多消费者模式

  • 如果有多个消费者,那么在handleEventsWith方法中传入多个消费者即可;

    // 设置多消费者,消息会被重复消费
    disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());
    
  • 上面传入的两个消费者会重复消费每一条消息。如果想实现在有多个消费者的情况下,一条消息只会被一个消费者消费,那么需要调用handleEventsWithWorkerPool方法;

    // 设置多消费者,一条消息只会被一个消费者消费
    // 注意:消费者要实现 WorkHandler 接口(见下一段代码)
    disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());
    
    public class OrderEventHandler implements EventHandler<OrderEvent>, WorkHandler<OrderEvent> {@Overridepublic void onEvent(OrderEvent event, long sequence, boolean endOfBatch) throws Exception {// TODO 消费逻辑System.out.println("消费者" + Thread.currentThread().getName()+ "获取数据value:" + event.getValue() + ",name:" + event.getName());}@Overridepublic void onEvent(OrderEvent event) throws Exception {// TODO 消费逻辑System.out.println("消费者" + Thread.currentThread().getName()+ "获取数据value:" + event.getValue() + ",name:" + event.getName());}
    }
    

2.4.4 多生产者多消费者模式

public class DisruptorDemo2 {public static void main(String[] args) throws Exception {Disruptor<OrderEvent> disruptor = new Disruptor<>(new OrderEventFactory(),1024 * 1024,Executors.defaultThreadFactory(),ProducerType.MULTI,          // 生产者类型:多生产者(支持多个生产者线程并发写入)new YieldingWaitStrategy());// 消费者配置方式一:单一消费者模式// disruptor.handleEventsWith(new OrderEventHandler());// 消费者配置方式二:并行消费者模式(广播模式)// 两个消费者都会接收到所有消息,实现消息的重复消费// disruptor.handleEventsWith(new OrderEventHandler(), new OrderEventHandler());// 消费者配置方式三:工作组模式(竞争消费者模式)// 多个消费者共享消息负载,每条消息只会被一个消费者处理// 适合需要负载均衡的场景,提高整体吞吐量disruptor.handleEventsWithWorkerPool(new OrderEventHandler(), new OrderEventHandler());// 启动Disruptor(启动消费者线程池)disruptor.start();// 获取RingBuffer引用,生产者通过它发布事件RingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();// 创建第一个生产者线程new Thread(() -> {OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);// 生产100条消息,前缀为"Fox"for (int i = 0; i < 100; i++) {eventProducer.onData(i, "Fox" + i);}}, "producer1").start();  // 线程命名为producer1// 创建第二个生产者线程new Thread(() -> {OrderEventProducer eventProducer = new OrderEventProducer(ringBuffer);// 生产100条消息,前缀为"monkey"for (int i = 0; i < 100; i++) {eventProducer.onData(i, "monkey" + i);}}, "producer2").start();  // 线程命名为producer2// 注意:在实际生产环境中需要适当的关闭机制// 此处注释掉shutdown,因为生产者线程是异步的,立即关闭可能导致消息未处理完成// disruptor.shutdown();}
}
http://www.xdnf.cn/news/1457821.html

相关文章:

  • ResNet(残差网络)-彻底改变深度神经网络的训练方式
  • linux——自定义协议
  • 多Agent协作案例:用AutoGen实现“写代码+测Bug”的自动开发流程
  • 秒店功能更新:多维度优化升级,助力商家经营
  • 当 LLM 遇上真实世界:MCP-Universe 如何撕开大模型 “工具能力” 的伪装?
  • 记录相机触发相关
  • 机器学习入门,第一个MCP示例
  • (D题|矿井突水水流漫延模型与逃生方案)2025年高教杯全国大学生数学建模国赛解题思路|完整代码论文集合
  • 生成式引擎优化(GEO):数字营销新标配,企业如何抢占AI搜索流量高地?
  • Trae + MCP : 一键生成专业封面的高阶玩法——自定义插件、微服务编排与性能调优
  • 设计模式六大原则2-里氏替换原则
  • Linux —— 环境变量
  • mysql中find_in_set()函数的使用, ancestors字段,树形查询
  • AI视频画质提升效果实用指南:提升清晰度的完整路径
  • [论文阅读] 软件工程 | REST API模糊测试的“标准化革命”——WFC与WFD如何破解行业三大痛点
  • 【论文阅读】-《Besting the Black-Box: Barrier Zones for Adversarial Example Defense》
  • AutoLayout与Masonry:简化iOS布局
  • (E题|AI 辅助智能体测)2025年高教杯全国大学生数学建模国赛解题思路|完整代码论文集合
  • 解密llama.cpp:Prompt Processing如何实现高效推理?
  • Nginx 实战系列(一)—— Web 核心概念、HTTP/HTTPS协议 与 Nginx 安装
  • Scikit-learn Python机器学习 - 特征预处理 - 归一化 (Normalization):MinMaxScaler
  • 孩子学手机里的坏毛病,怎样限制他打开某些APP?
  • Flutter 3.35.2 以上版本中 数字转字符串的方法指南
  • 机器学习基础-day05-深度学习框架PyTorch的tensor及PyTorch进行线性回归
  • 猫头虎AI 荐研|腾讯开源长篇叙事音频生成模型 AudioStory:统一模型,让 AI 会讲故事
  • 数据结构 之 【哈希的相关概念】
  • npm/pnpm软链接的优点和使用场景
  • 2025精选榜:4款好用的企业即时通讯软件推荐!安全有保障
  • 【Proteus仿真】AT89C51单片机中断系列仿真——INT0中断控制LED小灯/INT0和INT1中断控制数码管
  • 小白也能看懂,HTTP中的文件上传与下载到底发生了什么?