当前位置: 首页 > ds >正文

Disruptor核心接口EventHandler解析

EventHandlerBase 是什么?

EventHandlerBase 是 Disruptor 内部使用的一个基础接口,它整合了事件处理所需的核心生命周期方法。它并不是直接给最终用户使用的,用户通常会实现 EventHandler 或 LifecycleAware 接口,而这些接口最终都与 EventHandlerBase 相关联。

从代码中可以看到,EventHandlerBase 继承了 EventHandlerIdentity

// ... existing code ...
@FunctionalInterface
interface EventHandlerBase<T> extends EventHandlerIdentity
{
// ... existing code ...
}

而 EventHandlerIdentity 是一个空接口:

package com.lmax.disruptor;public interface EventHandlerIdentity
{
}

EventHandlerIdentity 的作用是作为一个标记接口(Marker Interface)。它本身不提供任何方法,但它可以用来在类型系统中标识一组相关的类或接口。在这里,它将所有与事件处理器相关的接口(如 EventHandlerLifecycleAware 等)归为一类。

EventHandlerBase 的语义和用法

EventHandlerBase 的核心语义是为 BatchEventProcessor 提供一个统一的、包含完整生命周期回调的事件处理器契约

BatchEventProcessor 是 Disruptor 中负责驱动事件处理循环的核心组件。它需要知道何时启动、何时关闭、如何处理事件、如何处理超时等。EventHandlerBase 正好提供了所有这些需要的回调方法。

我们来逐一分析它的方法:

  1. void onEvent(T event, long sequence, boolean endOfBatch) throws Throwable;

    • 语义: 这是最核心的事件处理方法。当 BatchEventProcessor 从 Ring Buffer 中获取到一个事件时,就会调用这个方法。
    • 用法: 业务逻辑就写在这里。
    • event: 事件本身。
    • sequence: 事件的序号。
    • endOfBatch: 一个非常重要的标志,告诉你这是否是当前批次中的最后一个事件。你可以利用这个标志来执行一些批处理操作,比如批量写入数据库、批量发送网络包等,从而提高 I/O 效率。
  2. default void onStart()

    • 语义: 在事件处理线程启动后,但在处理任何事件之前,该方法会被调用一次。
    • 用法: 适合在这里执行一些初始化操作,比如建立数据库连接、打开文件、初始化资源等。
  3. default void onShutdown()

    • 语义: 在事件处理线程即将关闭前,该方法会被调用一次。此时,事件处理已经停止,不会再有新的事件被处理。
    • 用法: 适合在这里执行清理工作,比如关闭数据库连接、释放资源等,确保程序优雅地退出。
  4. default void onBatchStart(long batchSize, long queueDepth)

    • 语义: 在 BatchEventProcessor 即将处理一个新批次的事件之前调用。
    • 用法: 让你有机会在批处理开始前了解批次的大小 (batchSize) 和当前队列的深度 (queueDepth)。这可以用于一些监控或动态调整批处理策略的场景。
  5. default void onTimeout(long sequence) throws Exception

    • 语义: 当 BatchEventProcessor 使用的 WaitStrategy(等待策略)在等待新事件时发生超时,会调用此方法。
    • 用法: 你可以在这里实现超时逻辑,比如记录一条警告日志,或者触发一个健康检查,通知系统可能存在性能问题或生产者停止工作了。

与 EventHandler 和 LifecycleAware 的关系

你可能会问,我平时用的都是 EventHandler<T> 和 LifecycleAware,它们和 EventHandlerBase 是什么关系?

  • EventHandler<T>: 这是你最常实现的接口,它只包含 onEvent 方法。
    public interface EventHandler<T> extends EventHandlerBase<T>
    {// 它继承了 EventHandlerBase// 它的存在主要是为了简化用户的实现,因为大多数时候你只关心 onEvent
    }
    
  • LifecycleAware: 这个接口包含了 onStart 和 onShutdown 方法。
    public interface LifecycleAware
    {void onStart();void onShutdown();
    }
    

BatchEventProcessor 在内部会检查你的处理器对象:

  • 如果你的处理器只实现了 EventHandler,那么 BatchEventProcessor 只会调用 onEvent
  • 如果你的处理器同时实现了 EventHandler 和 LifecycleAware,那么 BatchEventProcessor 就会在适当的时候调用 onEventonStart 和 onShutdown

EventHandlerBase 实际上是 BatchEventProcessor 内部对这些不同接口进行统一处理的适配层。它将所有可能的回调都定义好,使得 BatchEventProcessor 的代码可以更简洁,无需在内部用大量的 instanceof 来判断该调用哪个方法。

总结

  • 核心语义EventHandlerBase 是 Disruptor 内部对事件处理器生命周期回调的一个统一抽象,它定义了事件处理的完整契约,供 BatchEventProcessor 使用。
  • 用法: 你通常不会直接实现 EventHandlerBase。你会根据需要实现 EventHandler<T>(用于事件处理逻辑)和 LifecycleAware(用于生命周期管理)。Disruptor 内部会自动将你的实现适配到 EventHandlerBase 的契约上。
  • EventHandlerIdentity: 这是一个标记接口,用于将所有事件处理器相关的接口归为一类,增强了类型系统的表达能力。

EventHandler 是什么?

EventHandler<T> 是 Disruptor 中用于处理事件的回调接口。它是你作为 Disruptor 使用者,实现业务逻辑的地方。当你希望在事件发布到 Ring Buffer 后对其进行处理时,你就需要创建一个或多个实现该接口的类。

从代码中可以看到,EventHandler<T> 继承了我们之前讨论过的 EventHandlerBase<T>

// ... existing code ...
public interface EventHandler<T> extends EventHandlerBase<T>
{
// ... existing code ...
}

这意味着 EventHandler 天然地继承了 EventHandlerBase 中定义的所有生命周期方法(onEventonStartonShutdown 等)。

但它又对 onEvent 方法的异常声明做了更具体的约束(从 throws Throwable 变成了 throws Exception),并增加了一个 setSequenceCallback 方法。

EventHandler 的语义和用法

EventHandler 的核心语义是:“定义一个消费者(Consumer)的事件处理逻辑”。每一个 EventHandler 的实例,通常都会被一个独立的 BatchEventProcessor 所驱动,运行在自己的线程中,构成消费者流水线的一部分。

1. void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;

这是你必须实现的核心方法。

  • 语义: 当 BatchEventProcessor 从 Ring Buffer 中拉取到一个可处理的事件时,就会调用此方法,将事件传递给你。

  • 用法:

    • 核心业务逻辑: 在这个方法里,你可以对事件数据进行任意处理,例如:
      • 将事件数据持久化到数据库。
      • 通过网络发送事件数据。
      • 更新应用内存中的状态(例如聚合计算)。
      • 将数据转换成另一种形式,然后发布到另一个 Ring Buffer 中。
    • 利用 endOfBatch 进行批处理: 这是一个非常重要的性能优化点。如果你的操作涉及 I/O(如数据库、磁盘、网络),频繁的单次操作会非常慢。你可以利用 endOfBatch 标志来聚合一个批次的事件,然后进行一次性的批量 I/O 操作。

    批处理示例:

    public class BatchingDBEventHandler implements EventHandler<MyEvent>, LifecycleAware {private final List<MyEvent> batch = new ArrayList<>();private final int batchSize;public BatchingDBEventHandler(int batchSize) {this.batchSize = batchSize;}@Overridepublic void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception {// 只是将事件暂存到 List 中batch.add(cloneEvent(event)); // 注意:需要克隆事件,因为 RingBuffer 会复用事件对象// 如果达到批次大小,或者这是批次的最后一个事件,就执行批量写入if (batch.size() >= batchSize || endOfBatch) {flushBatchToDB();}}private void flushBatchToDB() {if (!batch.isEmpty()) {System.out.printf("向数据库批量写入 %d 个事件...%n", batch.size());// ... 在这里执行真正的 JDBC 批量写入逻辑 ...batch.clear(); // 清空批次,为下一批做准备}}// 在关闭时,确保最后一批未满的数据也被处理@Overridepublic void onShutdown() {flushBatchToDB();}@Overridepublic void onStart() {// 初始化数据库连接等}
    }
    
2. default void setSequenceCallback(Sequence sequenceCallback)

这是一个高级用法,大多数情况下你不需要关心它。

  • 语义: 它允许 BatchEventProcessor 将自己的 Sequence 对象(用于追踪处理进度)以回调的方式注入到 EventHandler 中。

  • 用法: 主要用于异步处理手动控制序列更新的场景。在标准的 onEvent 模型中,当 onEvent 方法返回时,BatchEventProcessor 会自动更新其 Sequence,表示这个事件已经处理完毕。但在某些特殊情况下,onEvent 方法可能只是启动了一个异步操作(比如向一个 Actor 或另一个线程提交了一个任务),方法本身很快就返回了,但事件的“真正”处理尚未完成。

    在这种场景下,你可以保存这个 sequenceCallback。当异步操作最终完成时,你再手动调用 sequenceCallback.set(sequence) 来通知 Disruptor:“嘿,那个序号为 sequence 的事件,我现在才算真正处理完了!”。这可以防止后续的消费者过早地处理这个事件,确保了正确的处理顺序。

    这是一个非常高级且复杂的用法,只有在你确切地知道为什么要延迟序列更新时才应该使用它。

如何在 Disruptor 中使用 EventHandler

使用 EventHandler 是构建 Disruptor 应用的标准流程:

// 1. 创建 Disruptor 实例
Disruptor<MyEvent> disruptor = new Disruptor<>(MyEvent::new, bufferSize, threadFactory);// 2. 创建你的 EventHandler 实例
EventHandler<MyEvent> handler1 = new MyLoggingEventHandler();
EventHandler<MyEvent> handler2 = new MyReplicationEventHandler();
EventHandler<MyEvent> handler3 = new MyBusinessLogicEventHandler();// 3. 将 EventHandler 连接到 Disruptor,构建消费者依赖关系图 (DSL)
//    - handler1 和 handler2 并行处理
//    - handler3 必须在 handler1 和 handler2 都处理完之后才能开始
disruptor.handleEventsWith(handler1, handler2).then(handler3);// 4. 启动 Disruptor (这将为每个 EventHandler 启动一个线程)
RingBuffer<MyEvent> ringBuffer = disruptor.start();// 5. 发布事件
// ... publisher logic ...

总结

  • 核心语义EventHandler 是 Disruptor 中事件消费者的核心逻辑单元。它是你实现具体业务处理的地方。
  • 主要用法: 实现 onEvent 方法来处理单个事件,并利用 endOfBatch 标志进行性能优化(特别是 I/O 密集型操作)。
  • 生命周期: 如果需要初始化或清理资源,可以让你的 EventHandler 同时实现 LifecycleAware 接口。
  • 高级用法setSequenceCallback 提供了手动控制消费进度的能力,用于复杂的异步处理场景。
  • 组合: 通过 Disruptor 的 DSL (Domain Specific Language),你可以将多个 EventHandler 组合起来,构建出强大的并行或串行处理流水线。

RewindableEventHandler 是什么?

RewindableEventHandler<T> 是 EventHandler 的一个特殊变体。它赋予了事件处理器一种 “回滚重试” 的能力。

通常情况下,当一个 EventHandler 在处理事件时抛出异常,BatchEventProcessor 会捕获这个异常,然后根据你配置的 ExceptionHandler 来决定是记录日志、停止消费者,还是执行其他操作。但无论如何,这个事件(以及它所在的批次)通常就被跳过了。

而 RewindableEventHandler 改变了这一行为。通过它,你可以告诉 BatchEventProcessor:“我处理当前这个事件失败了,但这可能是一个暂时性的问题。请不要跳过这个批次,而是 回滚(rewind) 整个批次,稍后我会重新尝试处理它们。”

从代码中可以看到,它也继承了 EventHandlerBase,并且其 onEvent 方法的签名明确声明了可以抛出 RewindableException

// ... existing code ...
public interface RewindableEventHandler<T> extends EventHandlerBase<T>
{
// ... existing code ...@Overridevoid onEvent(T event, long sequence, boolean endOfBatch) throws RewindableException, Exception;
}

语义和用法

RewindableEventHandler 的核心语义是:为那些可能遇到暂时性、可恢复错误的消费者,提供一种优雅的、不丢失数据的重试机制。

这个机制的关键在于 RewindableException 这个特殊的异常。

// ... existing code ...
public class RewindableException extends Throwable
{public RewindableException(final Throwable cause){super("REWINDING BATCH", cause);}
}

当你的 onEvent 方法中抛出 RewindableException 时,BatchEventProcessor 会捕获它,并执行以下操作:

  1. 停止处理当前批次:它不会继续处理批次中剩余的事件。
  2. 执行回滚策略:它会调用一个 BatchRewindStrategy 来决定如何回滚。默认的回滚策略是 WaitThenRewindStrategy,它会等待一小段时间,然后让 BatchEventProcessor 重新从这个失败批次的起始位置开始处理。
  3. 重新处理整个批次BatchEventProcessor 的序列号不会前进,它会重新从引发异常的那个事件开始,再次调用你的 onEvent 方法,相当于给了你一次重试的机会。
典型用例

想象一个向远程服务写入数据的场景:

  • 网络抖动:你尝试将事件数据通过 RPC 发送给另一个服务,但网络瞬间抖动导致连接超时。这是一个典型的暂时性错误,很可能几毫秒后重试就会成功。
  • 数据库死锁:你尝试将事件数据写入数据库,但遇到了一个数据库死锁(deadlock)。数据库通常会自动回滚其中一个事务,如果你立即重试,很可能就能成功写入。
  • 资源暂时不可用:你依赖的某个外部资源(如一个文件、一个消息队列)暂时被锁定或不可用。

在这些情况下,如果直接抛出普通异常并跳过事件,就会导致数据丢失或不一致。而使用 RewindableEventHandler 就是完美的解决方案。

使用示例
import com.lmax.disruptor.BatchRewindStrategy;
import com.lmax.disruptor.RewindableEventHandler;
import com.lmax.disruptor.RewindableException;
import com.lmax.disruptor.WaitThenRewindStrategy;// 1. 实现 RewindableEventHandler
public class RemoteServiceEventHandler implements RewindableEventHandler<MyEvent> {private final RemoteServiceClient client = new RemoteServiceClient();@Overridepublic void onEvent(MyEvent event, long sequence, boolean endOfBatch) throws Exception {try {// 尝试调用远程服务client.send(event);} catch (TemporaryNetworkException e) {// 捕获到可重试的暂时性异常System.err.printf("网络暂时故障 (seq=%d),准备回滚批次...%n", sequence);// 包装成 RewindableException 抛出throw new RewindableException(e);} catch (Exception e) {// 对于不可恢复的严重错误,直接抛出普通异常System.err.printf("发生严重错误 (seq=%d),将交由 ExceptionHandler 处理。%n", sequence);throw e;}}
}// 2. 在配置 Disruptor 时,为 BatchEventProcessor 提供一个 BatchRewindStrategy
public void setupDisruptor() {Disruptor<MyEvent> disruptor = ...;RewindableEventHandler<MyEvent> handler = new RemoteServiceEventHandler();// 创建一个回滚策略,例如:等待100毫秒后重试BatchRewindStrategy rewindStrategy = new WaitThenRewindStrategy(100, TimeUnit.MILLISECONDS);// 创建 BatchEventProcessor 时传入该策略BatchEventProcessor<MyEvent> processor = new BatchEventProcessor<>(disruptor.getRingBuffer(),disruptor.getRingBuffer().newBarrier(),handler,rewindStrategy // 在这里设置回滚策略);// 将处理器的序列号添加到gating中disruptor.addGatingSequences(processor.getSequence());// 启动处理器new Thread(processor).start();
}

总结

  • 核心语义: 提供了在事件处理中遇到可恢复的、暂时性错误时,回滚并重试整个批次的能力。
  • 触发机制: 在 onEvent 方法中捕获暂时性错误,并抛出 RewindableException
  • 执行机制BatchEventProcessor 捕获 RewindableException 后,会根据配置的 BatchRewindStrategy 来执行回滚和等待,然后重新处理整个失败的批次。
  • 适用场景: 非常适合处理与外部系统交互时可能发生的暂时性故障,如网络抖动、数据库死锁、资源临时锁定等,能有效防止数据丢失,增强系统的健壮性。
  • 与普通 EventHandler 的区别EventHandler 抛出异常通常意味着“处理失败,请跳过”,而 RewindableEventHandler 抛出 RewindableException 意味着“处理暂时失败,请稍后重试”。

http://www.xdnf.cn/news/18472.html

相关文章:

  • 【Techlog】01入门-井筒数据整合软件的基本认识
  • C5.6:双电源发射极偏置、特殊类偏置、PNP型偏置电路
  • ODPS 十五周年实录 | 为 AI 而生的数据平台
  • 机器学习(Machine Learning, ML)
  • 项目1其二(验证码、jwt)
  • 《算法导论》第 33 章 - 计算几何学
  • 关于uniappx注意点1 - 鸿蒙app
  • 3ds Max 流体模拟终极指南:从创建到渲染,打造真实液体效果
  • 模拟tomcat接收GET、POST请求
  • 元宇宙的硬件设备:从 VR 头显到脑机接口
  • 【数据库】Oracle学习笔记整理之六:ORACLE体系结构 - 重做日志文件与归档日志文件(Redo Log Files Archive Logs)
  • Navicat Premium v17.2.3
  • Advanced Math Math Analysis |01 Limits, Continuous
  • 力扣hot100:最大子数组和的两种高效方法:前缀和与Kadane算法(53)
  • 学习设计模式《二十三》——桥接模式
  • uniapp:h5链接拉起支付宝支付
  • 有向图(Directed Graph)和有向无环图(Directed Acyclic Graph,DAG)代码实践
  • pcl求平面点云的边界凸包点
  • 池化技术分析
  • GISBox工具:FBX到3DTiles文件转换指南
  • Eclipse 里Mybatis的xml的头部报错
  • 仿真驱动的AI自动驾驶汽车安全设计与测试
  • JVM基础知识总结
  • 深入解析FTP核心术语03
  • PWA》》以京东为例安装到PC端
  • 从二进制固件到人类意识:AI小智开发全记录与技术沉思—— 一个创客的硬件实践与认知边界探索
  • 数据预处理
  • 怎么确定mongodb是不是链接上了?
  • 电脑驱动免费更新? 这款驱动管理工具:一键扫更新,还能备份恢复,小白也会用~
  • 嵌入式音频开发(3)- AudioService核心功能