当前位置: 首页 > news >正文

Kafka与RocketMQ在事务消息实现上的区别是什么?

一、Kafka事务消息核心实现(基于2.8+版本)

// KafkaProducer.java
public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record) {// 事务消息校验(第256行)if (transactionManager != null && transactionManager.isTransactional()) {// 事务ID绑定检查(需先初始化事务)transactionManager.maybeFailWithError();}// 消息实际发送(第412行)return appendTransactionalRecord(record);
}// TransactionManager.java(关键事务方法)
public synchronized void beginTransaction() {// 生成新事务ID(第134行)this.transactionalId = generateTransactionalId();// 与协调者建立连接(第152行)coordinator.ensureTransactionalIdReady();
}public void commitTransaction() {// 两阶段提交第一阶段:写入提交标记(第489行)coordinator.beginCommit();// 第二阶段:提交所有消息(第503行)coordinator.sendOffsetsToTransaction();
}

二、RocketMQ事务消息核心实现(基于4.9+版本)

// TransactionMQProducer.java
public TransactionSendResult sendMessageInTransaction(Message msg, Object arg) {// 1.发送半消息(第87行)msg.putUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");SendResult sendResult = this.send(msg);// 2.执行本地事务(第94行)LocalTransactionState state = transactionListener.executeLocalTransaction(msg, arg);// 3.提交事务状态(第101行)this.endTransaction(sendResult, state, null);
}// DefaultMQProducerImpl.java(事务回查机制)
private void checkTransactionState() {// Broker定时回查(第356行)for (MessageExt msg : halfMsgs) {// 查询本地事务状态(第372行)LocalTransactionState state = transactionListener.checkLocalTransaction(msg);// 根据状态提交/回滚(第379行)endTransaction(msg, state);}
}

三、核心差异对比

  1. 设计架构

    • Kafka:Exactly-Once语义,通过事务协调器实现 (其实本质上也是2PC,不过是通过一个特定的主题去做事务的处理
    • RocketMQ:采用二阶段提交+定时回查机制 (rocketmq事务消息会提供一个回查接口,目的是为了兜底,当你长时间未提交当前事务消息,通过回查机制让业务觉得该条消息是否提交
  2. 存储机制

// Kafka日志追加(第512行)
public void appendToTransactionLog() {// 使用__transaction_state特殊主题存储事务状态(需ISR确认)
}// RocketMQ事务存储(CommitLog.java第227行)
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {// 半消息存储到RMQ_SYS_TRANS_HALF_TOPIC主题if (msg.isTransactionPrepared()) {topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC;}
}
  1. 异常处理
// Kafka事务恢复(第672行)
void initializeTransactions() {// 通过事务ID恢复未完成事务coordinator.initializeTransactions();
}// RocketMQ事务补偿(第415行)
public void compensateDoTransaction() {// 超过checkTimeout未提交的消息自动回滚if (msg.getStoreTimestamp() + checkTimeout < now) {endTransaction(msg, LocalTransactionState.ROLLBACK_MESSAGE);}
}

四、适用场景对比

  1. Kafka:适合流处理场景的精确一次处理
  2. RocketMQ:更适合需要分布式事务支持的业务系统

注意:以上行号基于对应版本的源码,实际代码位置可能因版本更新发生变化。建议结合官方文档和源码注释进行验证。

http://www.xdnf.cn/news/305029.html

相关文章:

  • 扩增子分析|微生物生态网络稳定性评估之鲁棒性(Robustness)和易损性(Vulnerability)在R中实现
  • 鸿蒙系统被抹黑的深层解析:技术、商业与地缘政治的复杂博弈-优雅草卓伊凡
  • 用于备份的git版本管理指令
  • Github Action部署node项目
  • 如何打造系统级低延迟RTSP/RTMP播放引擎?
  • Leetcode Hot 100字母异位词分词
  • spring详解-循环依赖的解决
  • 第九章,链路聚合和VRRP
  • AI+浏览器自动化:Nanobrowser Chrome 扩展的使用「详细教程」
  • 【LLM】Open WebUI 使用指南:详细图文教程
  • Stream和Collections工具类
  • 多行文本省略
  • oceanbase不兼容SqlSugarCore的问题
  • 【KWDB创作者计划】_通过一篇文章了解什么是 KWDB(KaiwuDB)
  • JMeter_配置元件之随机变量(RandomVariable)介绍
  • 手撕算法(1)
  • 使用 Spring Boot 构建 REST API
  • SpringBoot教学管理平台源码设计开发
  • leetcode 24. 两两交换链表中的节点
  • 分库分表后复杂查询的应对之道:基于DTS实时性ES宽表构建技术实践
  • 简说Policy Gradient (1) —— 入门
  • [蓝桥杯 2025 省 B] 水质检测(暴力 )
  • python--------修改桌面文件内容
  • 第2章 神经网络的数学基础
  • 神经网络之激活函数:解锁非线性奥秘的关键
  • Linux开发工具【上】
  • 2025年LangChain(V0.3)开发与综合案例
  • 接口自动化工具如何选择?以及实战介绍
  • windows操作系统开机自启(自动启动) 运行窗口 shell:startup 指令调出开机自启文件夹
  • 驱动开发系列57 - Linux Graphics QXL显卡驱动代码分析(四)显示区域绘制