当前位置: 首页 > ds >正文

springboot java开发的rocketmq 事务消息保证

RocketMQ 的事务消息是其最具特色的功能之一,用于解决分布式场景下业务本地事务与消息发送的原子性问题​(即确保本地事务执行与消息发送要么同时成功,要么同时失败)。其核心设计思路是 ​两阶段提交(2PC)+ 事务状态回查

关键角色与机制

  1. 1.

    半消息(Half Message / Prepared Message)​

    • 本质:发送到 Broker 但对 Consumer 不可见的消息(存储在 RMQ_SYS_TRANS_HALF_TOPIC)。

    • 作用:先确认消息能否成功存储,再执行本地事务,避免因消息发送失败导致无效事务操作

2.

事务监听器(TransactionListener)​

生产者需实现此接口,包含两个方法:

public interface TransactionListener {// 执行本地事务(返回COMMIT_MESSAGE/ROLLBACK_MESSAGE/UNKNOW)LocalTransactionState executeLocalTransaction(Message msg, Object arg);// Broker回查事务状态时触发(解决超时问题)LocalTransactionState checkLocalTransaction(MessageExt msg);
}
  1. 3.

    事务状态回查(Transaction Check)​

    • 触发条件​:当半消息长时间(默认1分钟)未收到 End Transaction指令。

    • 回查策略​:Broker 主动向生产者发起回查请求,询问事务最终状态(最多重试15次)。

  2. 4.

    事务结果提交(End Transaction)​

    生产者根据本地事务结果,主动通知 Broker 提交(COMMIT_MESSAGE)或回滚(ROLLBACK_MESSAGE)事务。

实现步骤(代码示例)

1. 生产者:配置事务消息
public class TransactionProducer {public static void main(String[] args) throws Exception {// 创建事务消息生产者(指定生产者组名)TransactionMQProducer producer = new TransactionMQProducer("transaction_group");producer.setNamesrvAddr("localhost:9876");// 设置事务监听器(核心)producer.setTransactionListener(new TransactionListenerImpl());producer.start();// 构造消息Message msg = new Message("pay_topic", "订单支付".getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送事务消息(第二个参数可用于传递业务参数)TransactionSendResult result = producer.sendMessageInTransaction(msg, "订单ID:1001");System.out.println("事务发送结果:" + result.getSendStatus());}
}// 实现事务监听器
class TransactionListenerImpl implements TransactionListener {/*** 执行本地事务*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 1. 执行本地数据库事务(例如:订单状态更新)boolean success = doBusinessTransaction(arg.toString());// 2. 根据结果返回状态return success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;} catch (Exception e) {// 异常返回未知状态(后续依赖回查)return LocalTransactionState.UNKNOW;}}/*** Broker事务回查*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 根据消息内容检查本地事务状态(如查询数据库)String orderId = parseOrderId(msg);boolean isSuccess = checkOrderStatus(orderId);return isSuccess ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;}private boolean doBusinessTransaction(String orderId) { /* ... */ }private boolean checkOrderStatus(String orderId) { /* ... */ }
}
2. 消费者:正常消费消息
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("pay_topic", "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("收到事务消息: " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}

事务消息的局限性与避坑指南

问题场景

解决方案

消息重复消费

消费端必须实现幂等性处理​(如唯一键校验、状态机)

事务回查失败

确保 checkLocalTransaction逻辑可靠(重试+兜底策略),避免消息长期阻塞

本地事务与消息发送顺序

先执行业务再返回 COMMIT,避免业务未完成但消息已投递

超时时间不足

调整 broker.conf中的 transactionTimeout参数(默认60秒)

回查频率过高

优化本地事务性能,避免频繁返回 UNKNOW

http://www.xdnf.cn/news/19295.html

相关文章:

  • SyncBack 安全备份: 加密文件名及文件内容, 防止黑客及未授权的访问
  • Ansible Playbook 实践
  • CPP学习之map和set
  • 99.数据大小端模式
  • KLARI-CORD5硬件应用:基于CAN总线的多通道电气测量与数据记录实战
  • Spring Boot自动装配机制的原理
  • SOME/IP-SD中”服务器服务组播端点”、“客户端服务组播端点”与“IPv4组播选项的区分
  • 面向企业级产品开发的自动化脚本实战
  • Java 获取淘宝关键词搜索(item_search)API 接口实战指南
  • 抖音电商首创最严珠宝玉石质检体系,推动行业规范与消费扩容
  • 拼多多商品信息批量获取及开放API接口调用指南
  • 使用Python脚本执行Git命令
  • vben admin5组件文档(豆包版)---VbenTree
  • 【C++】C++入门——(上)
  • 用docker实现Redis主从配置
  • Android14 init.qcom.usb.rc详解
  • 2025年渗透测试面试题总结-38(题目+回答)
  • WebRTC音频QoS方法五(音频变速算法之Expand算法实现)
  • 订餐后台管理系统 -day03 登录模块
  • Electron 项目来实现文件下载和上传功能(AI)
  • 前端网页源码模板 静态HTML源码网站
  • 【C++八股文】计算机网络篇
  • 企业级-搭建CICD(持续集成持续交付)实验手册
  • Web开发工具一套式部署Maven/Nvm/Mysql/Redis
  • 【问题】Windows的dockerdesktop/wsl虚拟化支持问题总结
  • 2025年OE SCI2区TOP,势场蚁群算法+无人水面艇路径规划,深度解析+性能实测
  • RustDesk(跨平台远程桌面软件) v1.4.1 中文绿色版
  • 根据并发和响应延迟,实现语音识别接口自动切换需求
  • 「日拱一码」058 机器学习——监督学习
  • CesiumJS 封装 - 初始化与配置