当前位置: 首页 > backend >正文

RabbitMq如何实现幂等性

目录

为什么会产生重复消息?

实现幂等性的常见方案

1. 业务逻辑天然幂等

2. 唯一键/版本号控制(最常用、最推荐)

3. 状态机控制(适用于有状态流转的业务)

4. 使用 Redis 等缓存中间件

总结与建议


在分布式系统中,网络抖动、客户端或服务端故障都可能导致消息重复传递。RabbitMQ 本身不提供幂等性保证,它提供的是消息投递的保证(如确认机制),但无法阻止重复消息的产生。

因此,消息幂等性必须由消费者来实现

为什么会产生重复消息?

  1. 生产者确认模式

    • 生产者开启了 publisher confirm 模式,但在消息发出后,网络抖动导致确认信号没有及时收到。生产者可能会认为消息发送失败而重试,导致Broker收到两条一样的消息。

  2. 消费者确认模式

    • 消费者处理完消息后,在发送 ack(确认)回Broker之前突然宕机或连接断开。Broker没有收到 ack,会认为该消息处理失败,从而将消息重新投递给另一个消费者(或者等待当前消费者重连后再次投递)。

实现幂等性的常见方案

幂等性的核心思想是:无论同一条消息被消费多少次,其结果都与消费一次相同。以下是几种主流的实现方案:

1. 业务逻辑天然幂等

首先检查你的业务操作本身是否就是幂等的。例如:

  • 查询操作select * from table where id=1,执行多次结果都一样。

  • 更新操作update table set status = 'completed' where id=1,执行多次后状态依然是 completed

  • 删除操作delete from table where id=1,执行多次后结果都是数据被删除。

如果业务逻辑本身是幂等的,那么就无需额外处理。

2. 唯一键/版本号控制(最常用、最推荐)

这是最通用和有效的方法。核心原理是:在数据库中利用唯一约束来防止重复数据

实现步骤:

  1. 在消息体中携带一个全局唯一的ID(例如 message_id),这个ID可以是业务主键,也可以是雪花算法等生成的分布式ID。这个ID需要唯一标识一条消息或一个业务请求

  2. 消费者在处理消息前,先拿这个 message_id 去一张“去重表”中查询。

    • 如果不存在,则进行业务处理,处理成功后将该 message_id 作为唯一键插入到“去重表”中。

    • 如果已存在,则说明该消息已经被成功处理过,直接丢弃或确认消息即可。

举例:订单支付消息
假设消息体为:{ "order_id": 20240907001, "amount": 100.00 }

-- 创建去重表
CREATE TABLE message_id_empower (id BIGINT PRIMARY KEY AUTO_INCREMENT,message_id VARCHAR(64) NOT NULL UNIQUE, -- 唯一约束,确保不会重复插入create_time DATETIME DEFAULT CURRENT_TIMESTAMP
);-- 消费者伪代码
public void consume(Message message) {String orderId = message.getBody().getString("order_id");// 1. 尝试插入去重表try {int count = executeSql("INSERT INTO message_id_empower (message_id) VALUES (?)", orderId);if (count > 0) {// 插入成功,说明是第一次处理processPayment(orderId); // 真正的业务处理:更新订单状态为已支付channel.basicAck(deliveryTag); // 确认消息} else {// 插入失败(由于唯一约束冲突),说明是重复消息channel.basicAck(deliveryTag); // 直接确认,不再处理log.warn("Duplicate message received, orderId: {}", orderId);}} catch (DuplicateKeyException e) {// 捕获唯一键冲突异常,同样视为重复消息channel.basicAck(deliveryTag);log.warn("Duplicate message received, orderId: {}", orderId);}
}

优点

  • 简单可靠,通用性强。

  • 基于数据库,实现方便。

缺点

  • 需要引入额外的数据库表和写操作,有性能开销。

  • 去重表需要根据业务周期定期清理旧数据。

3. 状态机控制(适用于有状态流转的业务)

很多业务数据本身就有明确的状态流转(如订单状态:待支付 -> 已支付 -> 已发货)。可以通过判断当前状态来决定是否处理消息。

举例:同样的订单支付消息

public void consume(Message message) {String orderId = message.getBody().getString("order_id");// 1. 先从数据库查询当前订单状态Order order = orderDao.findById(orderId);if (order == null) {// 订单不存在,可能是脏数据,记录日志并确认消息channel.basicAck(deliveryTag);return;}if (OrderStatus.PAID.equals(order.getStatus())) {// 状态已是“已支付”,说明是重复消息,直接确认channel.basicAck(deliveryTag);log.warn("Order already paid, orderId: {}", orderId);return;}if (!OrderStatus.PENDING.equals(order.getStatus())) {// 状态不是“待支付”,说明订单无法支付(可能已取消),记录日志并确认channel.basicAck(deliveryTag);log.error("Order status is invalid for payment, orderId: {}, status: {}", orderId, order.getStatus());return;}// 2. 状态是“待支付”,正常处理业务processPayment(orderId);channel.basicAck(deliveryTag);
}

优点

  • 无需创建额外的去重表,利用业务数据本身实现。

  • 逻辑符合业务语义。

缺点

  • 只适用于有状态流转的业务模型。

  • 需要先进行一次数据库查询。

4. 使用 Redis 等缓存中间件

原理与“唯一键控制”类似,利用 Redis 的 SET key value NX(如果key不存在则设置)命令来实现分布式锁或去重标记。

public void consume(Message message) {String orderId = message.getBody().getString("order_id");String redisKey = "order_paid:" + orderId;// 尝试设置一个过期时间为一天的键,如果设置成功返回true,否则返回falseBoolean success = redisTemplate.opsForValue().setIfAbsent(redisKey, "1", Duration.ofDays(1));if (Boolean.TRUE.equals(success)) {// 设置成功,说明是第一次处理processPayment(orderId);channel.basicAck(deliveryTag);} else {// 设置失败,键已存在,说明是重复消息channel.basicAck(deliveryTag);log.warn("Duplicate message received, orderId: {}", orderId);}
}

优点

  • 性能极高,速度远快于数据库。

缺点

  • 可靠性不如数据库,存在Redis服务宕机或数据丢失的风险(需要持久化配置)。

  • 需要合理设置键的过期时间。

总结与建议

方案适用场景优点缺点
天然幂等查询、特定更新/删除无需任何额外工作适用范围有限
唯一键控制几乎所有场景通用、可靠需要数据库支持,有性能开销
状态机控制订单等有状态流转的业务利用现有业务表,无需额外表需要先查询,只适用于特定业务
Redis 缓存对性能要求极高的场景性能极佳可靠性稍弱,需要维护Redis

最佳实践建议:

  1. 首选方案:对于大部分业务系统,“唯一键/版本号控制” 是最稳健、最通用的选择。结合数据库的唯一约束,可以万无一失。

  2. 组合使用:可以将多种方案结合。例如,先用 Redis 做快速去重过滤大部分请求,同时用数据库做最终兜底。

  3. 消息设计务必在消息体内携带一个全局唯一的业务ID(如 order_idmessage_id),这是实现幂等的基础。

  4. 先查后改:在处理任何消息时,养成“先查询当前状态,再决定是否处理”的习惯。

记住,RabbitMQ 提供了“至少一次”的消息投递保证,而要达成“恰好一次”的语义,必须依靠消费者端的幂等性处理来实现。

http://www.xdnf.cn/news/19942.html

相关文章:

  • 力扣字符串刷题-六道题记录-1
  • ECMAScript (5)ES6前端开发核心:国际化与格式化、内存管理与性能
  • Lucene 8.7.0 版本的索引文件格式
  • uniapp vue页面传参到webview.nvue页面的html或者另一vue中
  • 架构-亿级流量性能调优实践
  • 【ICCV 2025 顶会论文】,新突破!卷积化自注意力 ConvAttn 模块,即插即用,显著降低计算量和内存开销。
  • 阿里云轻量应用服务器部署-WooCommerce
  • 剧本杀APP系统开发:引领娱乐行业新潮流的科技力量
  • 【RNN-LSTM-GRU】第三篇 LSTM门控机制详解:告别梯度消失,让神经网络拥有长期记忆
  • 【已更新文章+代码】2025数学建模国赛A题思路代码文章高教社杯全国大学生数学建模-烟幕干扰弹的投放策略
  • 达梦数据库-字典缓冲区 (二)-v2
  • void*指针类型转换笔记
  • C++ const以及相关关键字
  • Ubuntu 25.04搭建hadoop3.4.1集群详细教程
  • Access开发导出PDF的N种姿势,你get了吗?
  • 开源本地LLM推理引擎(Cortex AI)
  • OpenTenBase vs MySQL vs Oracle,企业级应用数据库实盘对比分析
  • 使用国外网络的核心问题有哪些?
  • 基于 epoll 的高并发服务器原理与实现(对比 select 和 poll)
  • 十七、单线程 Web 服务器
  • (自用)PowerShell常用命令自查文档
  • AI重构出海营销:HeadAI如何用“滴滴模式”破解红人营销效率困局?
  • Flink 网络消息队列 PrioritizedDeque
  • C52单片机独立按键模块,中断系统,定时器计数器以及蜂鸣器
  • OpenLayers常用控件 -- 章节三:鼠标位置坐标显示控件教程
  • 多线程入门到精通系列: 从操作系统到 Java 线程模型
  • 快鹭云业财一体化系统技术解析:低代码+AI如何破解数据孤岛难题
  • 飞算JavaAI开发在线图书借阅平台全记录:从0到1的实践指南
  • 【C++】详解形参和实参:别再傻傻分不清
  • Android adb shell命令分析应用内存占用