RocketMQ 的事务消息是如何实现的
为什么要用事务消息?
想象一个场景:你在电商平台下单支付,需要同时完成两个操作:
- 操作1:扣减库存(数据库事务)
- 操作2:通知物流系统发货(发消息)
如果两个操作不同步完成,可能出现:
- 库存扣了但物流没收到通知(用户付了钱但没发货)
- 物流收到通知但库存没扣(超卖)
- 事务消息就是为了解决这种跨服务的“要么全成功,要么全失败”问题。
2. 核心流程(两阶段提交)
阶段一:发半消息(试探)
1、生产者发送一条半消息到RocketMQ(Broker)。
- 这条消息会被临时存到特殊主题RMQ_SYS_TRANS_HALF_TOPIC,消费者看不到它。
- 相当于对Broker说:“我先发个草稿,你别告诉别人,等我确认后再公开。”
2、Broker回复ACK:“草稿收到了,你继续。”
3、生产者收到ACK后,执行本地事务(比如扣减库存)。
- 如果本地事务成功:记录“可以公开消息”。
- 如果失败:记录“消息作废”。
阶段二:确认提交或回滚
4、生产者根据本地事务结果,告诉Broker:
- Commit:“草稿可以公开了!” → Broker将消息转移到真实Topic,消费者可见。
- Rollback:“草稿作废!” → Broker删除半消息。
5、容错机制(回查):
- 如果生产者宕机,没来得及告诉Broker结果怎么办?
- Broker会主动回查:每隔1分钟扫描半消息,找到“未确认”的消息,问生产者:“这消息到底要不要公开?”
- 生产者需实现checkLocalTransaction方法,检查本地事务状态并回复Broker。