当前位置: 首页 > ai >正文

RabbitMQ消息的重复消费问题如何解决?

在这里插入图片描述

在RabbitMQ中,消息重复消费是一个常见问题,它通常发生在消费者处理消息时出现网络波动、节点故障或消费者自身处理逻辑异常,ACK 失败等情况,都会导致RabbitMQ 不能够正确感知消息已被成功处理,从而重新投递消息。以下是几种常见的解决RabbitMQ消息重复消费问题的方法:

消息幂等性处理(业务上)

  • 幂等性是指对同一操作的多次执行所产生的影响与一次执行的影响相同。在消息处理场景中,意味着无论消息被消费多少次,对业务的最终影响是一致的。
  • 消费者在业务逻辑中通过记录已处理消息的标识来保证幂等性。比如维护一个内存中的Set集合,每次处理消息前,先检查消息的唯一标识是否已在集合中。如果已存在,则说明该消息已被处理过,直接返回;如果不存在,则处理消息并将标识添加到集合中。
public class MessageProcessor {// 已消费的消息private static final Set<String> processedMessages = new HashSet<>();public void processMessage(String messageId, String messageContent) {if (processedMessages.contains(messageId)) {// 消息已处理过,直接返回return;}// 处理消息System.out.println("处理消息:" + messageContent);// 将消息标识添加到已处理集合processedMessages.add(messageId);}
}
  • 利用缓存(如Redis)来记录已处理消息的标识。每次消费消息时,先查询缓存中是否已存在该消息标识。如果存在,说明消息已被处理过,直接丢弃;如果不存在,则处理消息,并将消息标识存入缓存。缓存可以设置过期时间,以避免缓存数据无限增长。
  • 或者建立消息去重表,将已经处理过的消息的唯一键记录在数据库,每次去数据库查询是否处理过此消息。

使用 RabbitMQ 的确认机制

  • RabbitMQ提供了两种确认机制,分别是自动确认(autoAck=true)和手动确认(autoAck=false)。自动确认模式下,RabbitMQ在消息发送给消费者后,会立即认为消息已被成功消费,这种模式可能导致消息重复消费。所以需要进行手动确认,消费者处理完消息后,需要显式地调用basicAck方法通知RabbitMQ消息已被成功处理。如果消费者在处理消息过程中出现异常或未发送basicAck,RabbitMQ会认为消息未被成功消费,从而重新投递消息。
public class ManualAckConsumer {private static final String QUEUE_NAME = "normal_ack_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 设置为手动确认模式boolean autoAck = false;channel.basicConsume(QUEUE_NAME, autoAck, "normal-ack-consumer",false, false, null, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF - 8");System.out.println(" [x] 收到消息: '" + message + "'");try {// 模拟消息处理Thread.sleep(1000);// 手动确认消息channel.basicAck(envelope.getDeliveryTag(), false);System.out.println(" [x] 消息确认成功");} catch (InterruptedException e) {e.printStackTrace();}}});}
}

使用 RabbitMQ 的 Message Deduplication 插件

在消息属性中增加唯一 ID,Message Deduplication 插件基于生产者发送消息时携带的唯一 ID,在 RabbitMQ 服务器端对消息进行去重处理。它会维护一个去重表(通常存储在内存或磁盘,取决于配置),记录已经处理过的消息 ID。当新消息到达时,插件会检查去重表,若发现消息 ID 已存在,则丢弃该消息;否则,将消息 ID 记录到去重表并正常处理消息。

http://www.xdnf.cn/news/4885.html

相关文章:

  • jenkins 启动报错
  • 从粗放管控到数字治能——安科瑞智能监测系统助力污水厂能耗下降15%+
  • 如何通过C# 获取Excel单元格的数据类型
  • YOLO算法的基本介绍
  • 【react组件】矩形框选小组件,鼠标左键选中 div,键盘 ESC 清空
  • 【Axios】解决Axios下载二进制文件返回空对象的问题
  • 高性能Python Web 框架--FastAPI 学习「基础 → 进阶 → 生产级」
  • [Linux网络_70] ARP协议 | RARP | DNS | ICMP协议
  • 无人机电池储存与操作指南
  • 垃圾分类宣教小程序源码介绍
  • Java——包装类
  • (三)毛子整洁架构(Infrastructure层/DapperHelper/乐观锁)
  • vue内写websocket实时订阅
  • 【分享】KK/BD/XL等六大不限速下载
  • Spring Boot中的拦截器!
  • [计算机科学#12]:高级编程语言基本元素,迅速上手编程
  • 制造单元智能化改造与集成技术平台成套实训设备
  • 数据分析怎么做?高效的数据分析方法有哪些?
  • VB.NET Line Input有办法让输入的字符不显示在控制台上吗
  • Flutter 3.29.3 花屏问题记录
  • IBM BAW(原BPM升级版)使用教程第六讲
  • 一、每日Github软件分享----QuickGo外链直达工具​
  • 力扣刷题(第二十一天)
  • 涨薪技术|0到1学会性能测试第56课- 堆与栈、GC回收机制
  • 如何使用测试软件 Jmeter
  • 检查当前 Docker 使用的 默认运行时(default runtime)方法
  • mysql主从同步
  • Docker组件详解:核心技术与架构分析
  • 三维底座+智能应用,重构城市治理未来
  • WorkManager与Kotlin后台任务调度指南