当前位置: 首页 > java >正文

202531 | RocketMQ 消息过滤 + 消息重试机制 + 死信消息 + 重复消费问题

🚀 RocketMQ 中的 Topic、Tag 和 Key

📌 一句话解释:

  • Topic:消息的“频道”或主题,用于区分业务大类
  • Tag:Topic 下的子分类,用于精细过滤和路由
  • Key:消息的唯一标识符,便于查询和追踪

🧠 概念关系图(Mermaid 流程图)

消息生产者 Producer
发送消息
Topic: OrderTopic
Tag: CreateOrder
Tag: CancelOrder
Tag: PaySuccess
Key: ORDER_20250412123456

📦 实际业务场景:电商订单系统

你开发了一个电商平台,每当用户下单、取消订单或完成支付时,你都要把这些信息发给 RocketMQ。

场景TopicTagKey
用户创建订单OrderTopicCreateOrderORDER_20250412123456
用户取消订单OrderTopicCancelOrderORDER_20250412123456
用户完成支付OrderTopicPaySuccessORDER_20250412123456

✅ 表格总结:三者作用对比

概念是否必填作用举例
Topic✅ 是消息的一级分类(业务主题)OrderTopic
Tag✅ 是Topic 下的逻辑子类(操作类型)CreateOrder
Key❌ 可选消息的唯一标识(方便追踪)ORDER_20250412123456

💻 Java 代码示例

👨‍💻 消息生产者发送消息(带 Topic、Tag、Key)

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class OrderProducer {public static void main(String[] args) throws Exception {// 创建生产者实例DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();// 构建消息String topic = "OrderTopic";String tag = "CreateOrder";String key = "ORDER_20250412123456";String body = "{\"orderId\": \"20250412123456\", \"userId\": \"U123\", \"amount\": 6999}";Message msg = new Message(topic, tag, key, body.getBytes());// 发送消息SendResult result = producer.send(msg);System.out.println("发送结果: " + result);producer.shutdown();}
}

👨‍💻 消费者只消费 CreateOrder 的消息

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class OrderConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");consumer.setNamesrvAddr("localhost:9876");// 订阅 Topic,并只接收 Tag 为 CreateOrder 的消息consumer.subscribe("OrderTopic", "CreateOrder");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.printf("接收到消息: %s%n", new String(msg.getBody()));}return org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}

RocketMQ 消息重试机制总结

RocketMQ 提供了可靠的消息重试机制,用于保障 消息在发送与消费过程中的稳定性和可靠性。重试机制主要分为:

  • 生产者重试机制(Producer Retry)
  • 消费者重试机制(Consumer Retry)

生产者重试机制(Producer Retry)

✅ 场景说明

当生产者将消息发送到 Broker 时,如果由于网络异常、Broker 宕机或存储失败导致消息发送失败,RocketMQ 会自动进行重试,以提高消息发送成功率。

🛠️ 机制要点

配置项默认值说明
retryTimesWhenSendFailed2同步发送失败时最多重试 2 次(加上初始发送,总共尝试 3 次)
retryTimesWhenSendAsyncFailed2异步发送失败时最多重试 2 次
retryAnotherBrokerWhenNotStoreOKtrue当前 Broker 写入失败时是否自动尝试其他 Broker

🔁 工作流程图(Mermaid)

Producer BrokerA BrokerB 发送消息 失败 重试发送消息 成功 or 再次失败 Producer BrokerA BrokerB

💡 示例配置(Java)

producer.setRetryTimesWhenSendFailed(3);
producer.setRetryAnotherBrokerWhenNotStoreOK(true);

🚨 注意:生产者重试只保证“尽最大努力发送”,并不能 100% 保证投递成功,业务端应做好补偿措施(如日志记录、异步告警)。


消费者重试机制(Consumer Retry)

✅ 场景说明

当消费者接收到消息后,如果消费逻辑执行失败(如抛出异常、处理超时等),RocketMQ 会自动触发重试机制,确保消息不会因临时错误而丢失。

🛠️ 机制要点

项目项默认值说明
最大重试次数16 次包括第一次消费;超过此次数后消息将进入死信队列(DLQ)
重试间隔指数退避每次失败后重试间隔递增(RocketMQ 内部控制)
死信队列 DLQ自动投递Topic 格式为 %DLQ%消费组名,需手动消费/补偿处理

💡 示例消费逻辑(Java)

consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {try {// 模拟业务逻辑处理process(msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {// 处理失败 → 返回重试信号return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

📦 死信队列说明

  • 消息超过 16 次消费失败后,将进入 死信队列(DLQ)

  • DLQ 的 Topic 命名规则为:

    %DLQ%消费组名
    
  • 可通过另一个消费者对 DLQ 进行补偿消费处理:

consumer.subscribe("%DLQ%order-consumer-group", "*");

对比总结表

对比项生产者重试消费者重试
重试触发条件消息发送失败消费逻辑失败
默认重试次数2 次(共尝试 3 次)16 次(包含首次消费)
重试方式同一或其他 Broker 重试延迟队列定时重试
可配置性支持设置重试次数及跨 Broker 策略默认不可配置次数(源码内写死)
最终处理方式超过重试次数 → 抛出异常超过重试次数 → 投递到死信队列(DLQ)
补偿处理建议记录失败消息进行人工补偿监听 DLQ 进行二次消费或通知处理

🎯 小结口诀

“发送失败可换 Broker,消费失败慢慢重;重不过就进 DLQ,人工处理要补空。”

好的!下面是图文并茂的完整介绍,带你全面理解 RocketMQ 中的死信消息(DLQ, Dead Letter Queue)。内容包括概念、触发条件、工作机制、可视化图表、示例代码及处理建议。

好的!以下是经过修改和优化后的 RocketMQ 死信消息(DLQ)详解,包括概念、触发条件、工作机制、可视化图表、处理建议等。


🚨 RocketMQ 死信消息(DLQ)详解


🧠 什么是死信消息?

在 RocketMQ 中,死信消息(Dead Letter Message) 是指由于消费失败多次而被标记为无法处理的消息。为了避免影响正常消费,RocketMQ 会将这些消息投递到一个特殊的 Topic —— 死信队列(DLQ),供后续补偿、人工处理或报警使用。


🔁 死信消息的产生条件

条件描述
消费失败消费者处理消息时发生异常,如抛出异常或返回 RECONSUME_LATER
超过重试次数限制默认最多尝试 16 次消费(包括首次),超过则投递到 DLQ
无法正常消费为防止无限重试占用系统资源,RocketMQ 将其作为死信处理

🧭 死信消息工作机制流程图

1. 发送消息
2. 正常消费
3. 延迟重试
4. 人工处理
生产者发送消息
Topic队列
消费成功?
流程结束
重试队列
%RETRY%+Group
达到最大
重试次数?
死信队列
%DLQ%+Group
重新投递/记录/报警

扩展说明(代码实现):

// 生产者发送消息示例
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "DLQ_TEST".getBytes());
// 设置最大重试次数(默认16次)
msg.setReconsumeTimes(3); 
producer.send(msg);// 消费者重试逻辑(自动触发)
public class ConsumerExample {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 模拟消费失败(第4次会进入死信队列)if(msgs.get(0).getReconsumeTimes() < 3) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}

注意事项:

  1. 重试间隔时间公式:延迟级别 = 重试次数 + 3(对应Broker配置的messageDelayLevel
  2. 可通过CONSUMER.getDefaultMQPushConsumerImpl().getConsumeMessageService()监控重试状态
  3. 死信队列消息需人工处理,不会自动清除

🗂️ 死信队列的 Topic 命名规则

死信消息被保存到专用的 Topic 中,命名规则为:

%DLQ%<ConsumerGroup>

例如,如果你的消费者组为 order-consumer-group,死信队列 Topic 为:

%DLQ%order-consumer-group
  • 每个消费组都会有自己的死信队列
  • 消息会被隔离存放,避免对正常消息消费产生影响

🔍 如何消费死信消息?

你可以通过一个新的消费者监听死信队列,对死信消息进行补偿、记录日志、报警等处理:

consumer.subscribe("%DLQ%order-consumer-group", "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {// 补偿逻辑:例如记录日志、人工审核、重新投递到业务 TopichandleDeadLetter(msg);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

🧰 死信消息的典型处理方式

处理方式描述
手动重投人工识别后,将死信消息重新发送到原始业务 Topic 进行消费
日志报警将死信消息记录到日志系统,并触发报警,提醒开发或运维人员处理
自动补偿编写自动补偿消费者,定期处理死信消息(如重入 Topic 或写入数据库)
后台管理界面重试配合 RocketMQ 控制台,可以选中死信消息后点击“重新发送”

💡 示例:补偿逻辑封装

private void handleDeadLetter(MessageExt msg) {// 1. 解析消息体String msgBody = new String(msg.getBody());// 2. 写入日志系统log.warn("DeadLetter 消息:{}", msgBody);// 3. 判断是否可自动补偿if (canRetry(msg)) {// 自动补偿:重新投递消息Message newMsg = new Message("originalTopic", msg.getTags(), msg.getKeys(), msg.getBody());producer.send(newMsg);} else {// 如果不可重试,记录告警alert(msg); // 通知告警系统}
}

📊 死信机制配置项一览

参数默认值说明
最大重试次数16超过此次数消息进入死信队列,RocketMQ 写死不支持配置
死信队列格式%DLQ%消费组名死信 Topic 的命名规则
手动消费 DLQ✅ 支持需注册消费者监听 %DLQ%xxx Topic

✅ 总结

要点描述
产生原因消息消费失败超过重试次数
默认重试上限16 次
死信 Topic 命名%DLQ%<ConsumerGroup>
后续处理方式日志记录、人工审核、重新投递、告警或自动补偿
推荐最佳实践配置监控 + 死信消费者补偿 + 可视化平台人工操作支持

🧭 可视化小口诀

消费失败不慌张,RocketMQ 先重试;
十六次都不行,DLQ 报警来补偿。
死信不是终点站,补偿处理才关键!


RocketMQ 中,消息重复消费 是一个常见且需要处理的重要问题。消息可能会被重复消费,这通常会对系统的一致性和性能产生影响。理解和解决消息重复消费问题,是保证消息队列系统可靠性和数据一致性的关键。

RocketMQ中消息重复消费问题

🚨 消息重复消费的原因

1. 网络延迟与重试

消息消费过程可能由于 网络抖动、连接断开、消费者崩溃 等问题导致消费失败。此时,RocketMQ 会 自动重试 该消息消费。如果消费者没有收到消费成功的确认(ACK),它会认为消费失败并进行重试。

2. 消费者宕机

如果消费者在消费过程中崩溃或者被杀死,RocketMQ 会根据其配置的 消费进度(offset) 从该消费者未成功消费的消息开始重新投递,导致相同消息被重新消费。

3. 消息投递确认问题

消费者需要确认(ACK)消息的成功消费。如果消费者没有正确地返回确认(如处理超时),RocketMQ 也可能会将这条消息重新投递给消费者。

4. Broker 负载均衡调整

RocketMQ Broker 负载均衡 调整时,消费者可能会从新的 Broker 获取消息,这也有可能导致某些消息被多次消费。


🧠 如何避免消息重复消费?

虽然 RocketMQ 的设计尽量避免重复消费,但它并不是天然的 “Exactly Once”(精确一次)消息队列系统。下面是一些减少或避免消息重复消费的方法:

1. 幂等性设计

通过确保消息处理操作是 幂等 的,可以避免消息重复消费时产生副作用。幂等操作意味着无论操作执行多少次,结果都是一致的。

常见幂等性实现方式

  • 使用唯一的 消息ID消息Key 作为数据库操作的标识,确保相同消息不会多次更新数据库。
  • 在数据库中维护一个记录消费状态的表,确保同一条消息不会被重复消费。

示例

// 在数据库中使用消息的唯一ID作为约束
String messageId = msg.getKeys();  // 获取消息的唯一标识
if (isMessageAlreadyProcessed(messageId)) {return;  // 如果消息已经处理过,则跳过
}

2. 消息去重

可以在消费者端实现消息去重机制,通过维护消息的 唯一标识 来检查该消息是否已经被消费。

常见去重方式

  • 使用 缓存(如 Redis) 来记录已处理的消息ID。
  • 使用 数据库表 来记录处理过的消息ID。

示例

String msgId = msg.getKeys();
if (cache.contains(msgId)) {return;  // 消息已处理,直接跳过
}
cache.put(msgId, true);  // 将消息ID放入缓存

3. 调整消息消费确认机制

确保 消费成功确认(ACK) 返回的时机合适,并且消费者不会在处理中途丢失确认。

  • 消费成功:消费者成功消费消息后,返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
  • 消费失败:消费者可以返回 ConsumeConcurrentlyStatus.RECONSUME_LATER,进行重试。

同时,可以通过 消息重试次数限制 来控制消息重试的次数,避免无限重试。

4. 设置适当的重试策略

调整 RocketMQ 消费者的 重试次数和重试间隔,根据业务需求选择合理的策略。例如,如果系统允许偶尔重复消费,可以适当增加重试次数并降低重试的频率。

consumer.setMaxReconsumeTimes(5);  // 最大重试次数
consumer.setMessageModel(MessageModel.CLUSTERING);  // 消息模型(集群消费)

⚙️ 消费者去重与幂等性的实现例子

1. 幂等性检查

在处理业务操作时,基于消息的 唯一标识 来确保消费的幂等性。

public boolean processMessage(Message msg) {String msgId = msg.getKeys();  // 获取消息的唯一标识if (messageAlreadyProcessed(msgId)) {return true;  // 已经处理过该消息,避免重复消费}// 处理消息业务逻辑processBusinessLogic(msg);// 标记为已处理markMessageAsProcessed(msgId);return true;
}

2. 消息去重

可以通过 Redis 等缓存存储已消费的消息 ID,并定期清理过期记录。

public boolean isMessageAlreadyProcessed(String msgId) {// 使用 Redis 来存储已消费的消息IDreturn redisCache.containsKey(msgId);
}public void markMessageAsProcessed(String msgId) {// 将消息ID存储到 Redis,设置过期时间(例如24小时)redisCache.put(msgId, true, 24, TimeUnit.HOURS);
}

✅ 总结

解决方案描述
幂等性设计通过保证操作是幂等的,避免重复消费导致的副作用。
消息去重使用缓存或数据库记录已消费的消息,避免重复消费。
消费确认机制确保消费者正确返回消息消费成功的确认,避免重复消费。
重试策略合理配置消费重试次数和间隔时间,避免因过多重试导致的重复消费。
http://www.xdnf.cn/news/3939.html

相关文章:

  • zotero pdf中英翻译插件使用
  • epub格式转txt格式工具,txt批量转PDF
  • 设计模式(结构型)-组合模式
  • 【Java ee初阶】多线程(6)
  • item_get_app_pro - 获得淘宝app商品详情原数据操作流程
  • 使用 vllm 部署 Llama3-8b-Instruct
  • 【C++】grpc(一):安装
  • 【Python】Python好玩的第三方库之二维码生成,操作xlsx文件,以及音频控制器
  • 从零开始学Flink:开启实时计算的魔法之旅
  • CSS知识总结
  • Socket 编程 TCP
  • OpenGl实战笔记(1)基于qt5.15.2+mingw64+opengl绘制三角形
  • 解决因字段过长使MYSQL数据解析超时导致线上CPU告警问题
  • 技术犯规计入个人犯规吗·棒球1号位
  • [C语言]第一章-初识
  • 【Linux】深入理解Linux基础IO:从文件描述符到缓冲区设计
  • Java求职面试:Spring Boot与微服务的幽默探讨
  • 架构思维:构建高并发读服务_异构数据的同步一致性方案
  • C语言:文件操作
  • Cognito
  • Android基于绑定的控件用法
  • 文献分享:CH-CL配对和VL结构域的完整性影响IgG1分泌过程
  • XGBoost算法原理及Python实现
  • K230的ISP(图像信号处理器)通常支持多通道输出,常见配置为3个独立通道
  • CATIA高效工作指南——曲面设计篇(一)
  • 49. 字母异位词分组
  • 高等数学-第七版-下册 选做记录 习题10-2
  • 【C++11】其他一些新特性 | 右值引用 | 完美转发
  • Allegro23.1新功能之如何设置高压爬电间距规则操作指导
  • AtCoder Beginner Contest 404 C-G(无F)题解