springboot java开发的rocketmq 事务消息保证
RocketMQ 的事务消息是其最具特色的功能之一,用于解决分布式场景下业务本地事务与消息发送的原子性问题(即确保本地事务执行与消息发送要么同时成功,要么同时失败)。其核心设计思路是 两阶段提交(2PC)
+ 事务状态回查
。
关键角色与机制
- 1.
半消息(Half Message / Prepared Message)
- •
本质:发送到 Broker 但对 Consumer 不可见的消息(存储在
RMQ_SYS_TRANS_HALF_TOPIC
)。 - •
作用:先确认消息能否成功存储,再执行本地事务,避免因消息发送失败导致无效事务操作
- •
2.
事务监听器(TransactionListener
)
生产者需实现此接口,包含两个方法:
public interface TransactionListener {// 执行本地事务(返回COMMIT_MESSAGE/ROLLBACK_MESSAGE/UNKNOW)LocalTransactionState executeLocalTransaction(Message msg, Object arg);// Broker回查事务状态时触发(解决超时问题)LocalTransactionState checkLocalTransaction(MessageExt msg);
}
- 3.
事务状态回查(Transaction Check)
- •
触发条件:当半消息长时间(默认1分钟)未收到
End Transaction
指令。 - •
回查策略:Broker 主动向生产者发起回查请求,询问事务最终状态(最多重试15次)。
- •
- 4.
事务结果提交(End Transaction)
生产者根据本地事务结果,主动通知 Broker 提交(
COMMIT_MESSAGE
)或回滚(ROLLBACK_MESSAGE
)事务。
实现步骤(代码示例)
1. 生产者:配置事务消息
public class TransactionProducer {public static void main(String[] args) throws Exception {// 创建事务消息生产者(指定生产者组名)TransactionMQProducer producer = new TransactionMQProducer("transaction_group");producer.setNamesrvAddr("localhost:9876");// 设置事务监听器(核心)producer.setTransactionListener(new TransactionListenerImpl());producer.start();// 构造消息Message msg = new Message("pay_topic", "订单支付".getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送事务消息(第二个参数可用于传递业务参数)TransactionSendResult result = producer.sendMessageInTransaction(msg, "订单ID:1001");System.out.println("事务发送结果:" + result.getSendStatus());}
}// 实现事务监听器
class TransactionListenerImpl implements TransactionListener {/*** 执行本地事务*/@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 1. 执行本地数据库事务(例如:订单状态更新)boolean success = doBusinessTransaction(arg.toString());// 2. 根据结果返回状态return success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;} catch (Exception e) {// 异常返回未知状态(后续依赖回查)return LocalTransactionState.UNKNOW;}}/*** Broker事务回查*/@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 根据消息内容检查本地事务状态(如查询数据库)String orderId = parseOrderId(msg);boolean isSuccess = checkOrderStatus(orderId);return isSuccess ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;}private boolean doBusinessTransaction(String orderId) { /* ... */ }private boolean checkOrderStatus(String orderId) { /* ... */ }
}
2. 消费者:正常消费消息
public class Consumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");consumer.setNamesrvAddr("localhost:9876");consumer.subscribe("pay_topic", "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.println("收到事务消息: " + new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}
事务消息的局限性与避坑指南
问题场景 | 解决方案 |
---|---|
消息重复消费 | 消费端必须实现幂等性处理(如唯一键校验、状态机) |
事务回查失败 | 确保 |
本地事务与消息发送顺序 | 先执行业务再返回 |
超时时间不足 | 调整 |
回查频率过高 | 优化本地事务性能,避免频繁返回 |