RabbitMq如何实现幂等性
目录
为什么会产生重复消息?
实现幂等性的常见方案
1. 业务逻辑天然幂等
2. 唯一键/版本号控制(最常用、最推荐)
3. 状态机控制(适用于有状态流转的业务)
4. 使用 Redis 等缓存中间件
总结与建议
在分布式系统中,网络抖动、客户端或服务端故障都可能导致消息重复传递。RabbitMQ 本身不提供幂等性保证,它提供的是消息投递的保证(如确认机制),但无法阻止重复消息的产生。
因此,消息幂等性必须由消费者来实现。
为什么会产生重复消息?
-
生产者确认模式:
-
生产者开启了
publisher confirm
模式,但在消息发出后,网络抖动导致确认信号没有及时收到。生产者可能会认为消息发送失败而重试,导致Broker收到两条一样的消息。
-
-
消费者确认模式:
-
消费者处理完消息后,在发送
ack
(确认)回Broker之前突然宕机或连接断开。Broker没有收到ack
,会认为该消息处理失败,从而将消息重新投递给另一个消费者(或者等待当前消费者重连后再次投递)。
-
实现幂等性的常见方案
幂等性的核心思想是:无论同一条消息被消费多少次,其结果都与消费一次相同。以下是几种主流的实现方案:
1. 业务逻辑天然幂等
首先检查你的业务操作本身是否就是幂等的。例如:
-
查询操作:
select * from table where id=1
,执行多次结果都一样。 -
更新操作:
update table set status = 'completed' where id=1
,执行多次后状态依然是completed
。 -
删除操作:
delete from table where id=1
,执行多次后结果都是数据被删除。
如果业务逻辑本身是幂等的,那么就无需额外处理。
2. 唯一键/版本号控制(最常用、最推荐)
这是最通用和有效的方法。核心原理是:在数据库中利用唯一约束来防止重复数据。
实现步骤:
-
在消息体中携带一个全局唯一的ID(例如
message_id
),这个ID可以是业务主键,也可以是雪花算法等生成的分布式ID。这个ID需要唯一标识一条消息或一个业务请求。 -
消费者在处理消息前,先拿这个
message_id
去一张“去重表”中查询。-
如果不存在,则进行业务处理,处理成功后将该
message_id
作为唯一键插入到“去重表”中。 -
如果已存在,则说明该消息已经被成功处理过,直接丢弃或确认消息即可。
-
举例:订单支付消息
假设消息体为:{ "order_id": 20240907001, "amount": 100.00 }
-- 创建去重表
CREATE TABLE message_id_empower (id BIGINT PRIMARY KEY AUTO_INCREMENT,message_id VARCHAR(64) NOT NULL UNIQUE, -- 唯一约束,确保不会重复插入create_time DATETIME DEFAULT CURRENT_TIMESTAMP
);-- 消费者伪代码
public void consume(Message message) {String orderId = message.getBody().getString("order_id");// 1. 尝试插入去重表try {int count = executeSql("INSERT INTO message_id_empower (message_id) VALUES (?)", orderId);if (count > 0) {// 插入成功,说明是第一次处理processPayment(orderId); // 真正的业务处理:更新订单状态为已支付channel.basicAck(deliveryTag); // 确认消息} else {// 插入失败(由于唯一约束冲突),说明是重复消息channel.basicAck(deliveryTag); // 直接确认,不再处理log.warn("Duplicate message received, orderId: {}", orderId);}} catch (DuplicateKeyException e) {// 捕获唯一键冲突异常,同样视为重复消息channel.basicAck(deliveryTag);log.warn("Duplicate message received, orderId: {}", orderId);}
}
优点:
-
简单可靠,通用性强。
-
基于数据库,实现方便。
缺点:
-
需要引入额外的数据库表和写操作,有性能开销。
-
去重表需要根据业务周期定期清理旧数据。
3. 状态机控制(适用于有状态流转的业务)
很多业务数据本身就有明确的状态流转(如订单状态:待支付
-> 已支付
-> 已发货
)。可以通过判断当前状态来决定是否处理消息。
举例:同样的订单支付消息
public void consume(Message message) {String orderId = message.getBody().getString("order_id");// 1. 先从数据库查询当前订单状态Order order = orderDao.findById(orderId);if (order == null) {// 订单不存在,可能是脏数据,记录日志并确认消息channel.basicAck(deliveryTag);return;}if (OrderStatus.PAID.equals(order.getStatus())) {// 状态已是“已支付”,说明是重复消息,直接确认channel.basicAck(deliveryTag);log.warn("Order already paid, orderId: {}", orderId);return;}if (!OrderStatus.PENDING.equals(order.getStatus())) {// 状态不是“待支付”,说明订单无法支付(可能已取消),记录日志并确认channel.basicAck(deliveryTag);log.error("Order status is invalid for payment, orderId: {}, status: {}", orderId, order.getStatus());return;}// 2. 状态是“待支付”,正常处理业务processPayment(orderId);channel.basicAck(deliveryTag);
}
优点:
-
无需创建额外的去重表,利用业务数据本身实现。
-
逻辑符合业务语义。
缺点:
-
只适用于有状态流转的业务模型。
-
需要先进行一次数据库查询。
4. 使用 Redis 等缓存中间件
原理与“唯一键控制”类似,利用 Redis 的 SET key value NX
(如果key不存在则设置)命令来实现分布式锁或去重标记。
public void consume(Message message) {String orderId = message.getBody().getString("order_id");String redisKey = "order_paid:" + orderId;// 尝试设置一个过期时间为一天的键,如果设置成功返回true,否则返回falseBoolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, "1", Duration.ofDays(1));if (Boolean.TRUE.equals(success)) {// 设置成功,说明是第一次处理processPayment(orderId);channel.basicAck(deliveryTag);} else {// 设置失败,键已存在,说明是重复消息channel.basicAck(deliveryTag);log.warn("Duplicate message received, orderId: {}", orderId);}
}
优点:
-
性能极高,速度远快于数据库。
缺点:
-
可靠性不如数据库,存在Redis服务宕机或数据丢失的风险(需要持久化配置)。
-
需要合理设置键的过期时间。
总结与建议
方案 | 适用场景 | 优点 | 缺点 |
---|---|---|---|
天然幂等 | 查询、特定更新/删除 | 无需任何额外工作 | 适用范围有限 |
唯一键控制 | 几乎所有场景 | 通用、可靠 | 需要数据库支持,有性能开销 |
状态机控制 | 订单等有状态流转的业务 | 利用现有业务表,无需额外表 | 需要先查询,只适用于特定业务 |
Redis 缓存 | 对性能要求极高的场景 | 性能极佳 | 可靠性稍弱,需要维护Redis |
最佳实践建议:
-
首选方案:对于大部分业务系统,“唯一键/版本号控制” 是最稳健、最通用的选择。结合数据库的唯一约束,可以万无一失。
-
组合使用:可以将多种方案结合。例如,先用 Redis 做快速去重过滤大部分请求,同时用数据库做最终兜底。
-
消息设计:务必在消息体内携带一个全局唯一的业务ID(如
order_id
,message_id
),这是实现幂等的基础。 -
先查后改:在处理任何消息时,养成“先查询当前状态,再决定是否处理”的习惯。
记住,RabbitMQ 提供了“至少一次”的消息投递保证,而要达成“恰好一次”的语义,必须依靠消费者端的幂等性处理来实现。