202531 | RocketMQ 消息过滤 + 消息重试机制 + 死信消息 + 重复消费问题
🚀 RocketMQ 中的 Topic、Tag 和 Key
📌 一句话解释:
- Topic:消息的“频道”或主题,用于区分业务大类
- Tag:Topic 下的子分类,用于精细过滤和路由
- Key:消息的唯一标识符,便于查询和追踪
🧠 概念关系图(Mermaid 流程图)
📦 实际业务场景:电商订单系统
你开发了一个电商平台,每当用户下单、取消订单或完成支付时,你都要把这些信息发给 RocketMQ。
场景 | Topic | Tag | Key |
---|---|---|---|
用户创建订单 | OrderTopic | CreateOrder | ORDER_20250412123456 |
用户取消订单 | OrderTopic | CancelOrder | ORDER_20250412123456 |
用户完成支付 | OrderTopic | PaySuccess | ORDER_20250412123456 |
✅ 表格总结:三者作用对比
概念 | 是否必填 | 作用 | 举例 |
---|---|---|---|
Topic | ✅ 是 | 消息的一级分类(业务主题) | OrderTopic |
Tag | ✅ 是 | Topic 下的逻辑子类(操作类型) | CreateOrder |
Key | ❌ 可选 | 消息的唯一标识(方便追踪) | ORDER_20250412123456 |
💻 Java 代码示例
👨💻 消息生产者发送消息(带 Topic、Tag、Key)
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class OrderProducer {public static void main(String[] args) throws Exception {// 创建生产者实例DefaultMQProducer producer = new DefaultMQProducer("order_producer_group");producer.setNamesrvAddr("localhost:9876");producer.start();// 构建消息String topic = "OrderTopic";String tag = "CreateOrder";String key = "ORDER_20250412123456";String body = "{\"orderId\": \"20250412123456\", \"userId\": \"U123\", \"amount\": 6999}";Message msg = new Message(topic, tag, key, body.getBytes());// 发送消息SendResult result = producer.send(msg);System.out.println("发送结果: " + result);producer.shutdown();}
}
👨💻 消费者只消费 CreateOrder 的消息
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class OrderConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_consumer_group");consumer.setNamesrvAddr("localhost:9876");// 订阅 Topic,并只接收 Tag 为 CreateOrder 的消息consumer.subscribe("OrderTopic", "CreateOrder");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {System.out.printf("接收到消息: %s%n", new String(msg.getBody()));}return org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}
RocketMQ 消息重试机制总结
RocketMQ 提供了可靠的消息重试机制,用于保障 消息在发送与消费过程中的稳定性和可靠性。重试机制主要分为:
- 生产者重试机制(Producer Retry)
- 消费者重试机制(Consumer Retry)
生产者重试机制(Producer Retry)
✅ 场景说明
当生产者将消息发送到 Broker 时,如果由于网络异常、Broker 宕机或存储失败导致消息发送失败,RocketMQ 会自动进行重试,以提高消息发送成功率。
🛠️ 机制要点
配置项 | 默认值 | 说明 |
---|---|---|
retryTimesWhenSendFailed | 2 | 同步发送失败时最多重试 2 次(加上初始发送,总共尝试 3 次) |
retryTimesWhenSendAsyncFailed | 2 | 异步发送失败时最多重试 2 次 |
retryAnotherBrokerWhenNotStoreOK | true | 当前 Broker 写入失败时是否自动尝试其他 Broker |
🔁 工作流程图(Mermaid)
💡 示例配置(Java)
producer.setRetryTimesWhenSendFailed(3);
producer.setRetryAnotherBrokerWhenNotStoreOK(true);
🚨 注意:生产者重试只保证“尽最大努力发送”,并不能 100% 保证投递成功,业务端应做好补偿措施(如日志记录、异步告警)。
消费者重试机制(Consumer Retry)
✅ 场景说明
当消费者接收到消息后,如果消费逻辑执行失败(如抛出异常、处理超时等),RocketMQ 会自动触发重试机制,确保消息不会因临时错误而丢失。
🛠️ 机制要点
项目项 | 默认值 | 说明 |
---|---|---|
最大重试次数 | 16 次 | 包括第一次消费;超过此次数后消息将进入死信队列(DLQ) |
重试间隔 | 指数退避 | 每次失败后重试间隔递增(RocketMQ 内部控制) |
死信队列 DLQ | 自动投递 | Topic 格式为 %DLQ%消费组名 ,需手动消费/补偿处理 |
💡 示例消费逻辑(Java)
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {try {// 模拟业务逻辑处理process(msg);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {// 处理失败 → 返回重试信号return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
📦 死信队列说明
-
消息超过 16 次消费失败后,将进入 死信队列(DLQ)
-
DLQ 的 Topic 命名规则为:
%DLQ%消费组名
-
可通过另一个消费者对 DLQ 进行补偿消费处理:
consumer.subscribe("%DLQ%order-consumer-group", "*");
对比总结表
对比项 | 生产者重试 | 消费者重试 |
---|---|---|
重试触发条件 | 消息发送失败 | 消费逻辑失败 |
默认重试次数 | 2 次(共尝试 3 次) | 16 次(包含首次消费) |
重试方式 | 同一或其他 Broker 重试 | 延迟队列定时重试 |
可配置性 | 支持设置重试次数及跨 Broker 策略 | 默认不可配置次数(源码内写死) |
最终处理方式 | 超过重试次数 → 抛出异常 | 超过重试次数 → 投递到死信队列(DLQ) |
补偿处理建议 | 记录失败消息进行人工补偿 | 监听 DLQ 进行二次消费或通知处理 |
🎯 小结口诀
“发送失败可换 Broker,消费失败慢慢重;重不过就进 DLQ,人工处理要补空。”
好的!下面是图文并茂的完整介绍,带你全面理解 RocketMQ 中的死信消息(DLQ, Dead Letter Queue)。内容包括概念、触发条件、工作机制、可视化图表、示例代码及处理建议。
好的!以下是经过修改和优化后的 RocketMQ 死信消息(DLQ)详解,包括概念、触发条件、工作机制、可视化图表、处理建议等。
🚨 RocketMQ 死信消息(DLQ)详解
🧠 什么是死信消息?
在 RocketMQ 中,死信消息(Dead Letter Message) 是指由于消费失败多次而被标记为无法处理的消息。为了避免影响正常消费,RocketMQ 会将这些消息投递到一个特殊的 Topic —— 死信队列(DLQ),供后续补偿、人工处理或报警使用。
🔁 死信消息的产生条件
条件 | 描述 |
---|---|
消费失败 | 消费者处理消息时发生异常,如抛出异常或返回 RECONSUME_LATER |
超过重试次数限制 | 默认最多尝试 16 次消费(包括首次),超过则投递到 DLQ |
无法正常消费 | 为防止无限重试占用系统资源,RocketMQ 将其作为死信处理 |
🧭 死信消息工作机制流程图
扩展说明(代码实现):
// 生产者发送消息示例
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "DLQ_TEST".getBytes());
// 设置最大重试次数(默认16次)
msg.setReconsumeTimes(3);
producer.send(msg);// 消费者重试逻辑(自动触发)
public class ConsumerExample {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");consumer.subscribe("TopicTest", "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 模拟消费失败(第4次会进入死信队列)if(msgs.get(0).getReconsumeTimes() < 3) {return ConsumeConcurrentlyStatus.RECONSUME_LATER;}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}
注意事项:
- 重试间隔时间公式:
延迟级别 = 重试次数 + 3
(对应Broker配置的messageDelayLevel
) - 可通过
CONSUMER.getDefaultMQPushConsumerImpl().getConsumeMessageService()
监控重试状态 - 死信队列消息需人工处理,不会自动清除
🗂️ 死信队列的 Topic 命名规则
死信消息被保存到专用的 Topic 中,命名规则为:
%DLQ%<ConsumerGroup>
例如,如果你的消费者组为 order-consumer-group
,死信队列 Topic 为:
%DLQ%order-consumer-group
- 每个消费组都会有自己的死信队列
- 消息会被隔离存放,避免对正常消息消费产生影响
🔍 如何消费死信消息?
你可以通过一个新的消费者监听死信队列,对死信消息进行补偿、记录日志、报警等处理:
consumer.subscribe("%DLQ%order-consumer-group", "*");consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {for (MessageExt msg : msgs) {// 补偿逻辑:例如记录日志、人工审核、重新投递到业务 TopichandleDeadLetter(msg);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
🧰 死信消息的典型处理方式
处理方式 | 描述 |
---|---|
手动重投 | 人工识别后,将死信消息重新发送到原始业务 Topic 进行消费 |
日志报警 | 将死信消息记录到日志系统,并触发报警,提醒开发或运维人员处理 |
自动补偿 | 编写自动补偿消费者,定期处理死信消息(如重入 Topic 或写入数据库) |
后台管理界面重试 | 配合 RocketMQ 控制台,可以选中死信消息后点击“重新发送” |
💡 示例:补偿逻辑封装
private void handleDeadLetter(MessageExt msg) {// 1. 解析消息体String msgBody = new String(msg.getBody());// 2. 写入日志系统log.warn("DeadLetter 消息:{}", msgBody);// 3. 判断是否可自动补偿if (canRetry(msg)) {// 自动补偿:重新投递消息Message newMsg = new Message("originalTopic", msg.getTags(), msg.getKeys(), msg.getBody());producer.send(newMsg);} else {// 如果不可重试,记录告警alert(msg); // 通知告警系统}
}
📊 死信机制配置项一览
参数 | 默认值 | 说明 |
---|---|---|
最大重试次数 | 16 次 | 超过此次数消息进入死信队列,RocketMQ 写死不支持配置 |
死信队列格式 | %DLQ%消费组名 | 死信 Topic 的命名规则 |
手动消费 DLQ | ✅ 支持 | 需注册消费者监听 %DLQ%xxx Topic |
✅ 总结
要点 | 描述 |
---|---|
产生原因 | 消息消费失败超过重试次数 |
默认重试上限 | 16 次 |
死信 Topic 命名 | %DLQ%<ConsumerGroup> |
后续处理方式 | 日志记录、人工审核、重新投递、告警或自动补偿 |
推荐最佳实践 | 配置监控 + 死信消费者补偿 + 可视化平台人工操作支持 |
🧭 可视化小口诀
消费失败不慌张,RocketMQ 先重试;
十六次都不行,DLQ 报警来补偿。
死信不是终点站,补偿处理才关键!
在 RocketMQ 中,消息重复消费 是一个常见且需要处理的重要问题。消息可能会被重复消费,这通常会对系统的一致性和性能产生影响。理解和解决消息重复消费问题,是保证消息队列系统可靠性和数据一致性的关键。
RocketMQ中消息重复消费问题
🚨 消息重复消费的原因
1. 网络延迟与重试
消息消费过程可能由于 网络抖动、连接断开、消费者崩溃 等问题导致消费失败。此时,RocketMQ 会 自动重试 该消息消费。如果消费者没有收到消费成功的确认(ACK),它会认为消费失败并进行重试。
2. 消费者宕机
如果消费者在消费过程中崩溃或者被杀死,RocketMQ 会根据其配置的 消费进度(offset) 从该消费者未成功消费的消息开始重新投递,导致相同消息被重新消费。
3. 消息投递确认问题
消费者需要确认(ACK)消息的成功消费。如果消费者没有正确地返回确认(如处理超时),RocketMQ 也可能会将这条消息重新投递给消费者。
4. Broker 负载均衡调整
当 RocketMQ Broker 负载均衡 调整时,消费者可能会从新的 Broker 获取消息,这也有可能导致某些消息被多次消费。
🧠 如何避免消息重复消费?
虽然 RocketMQ 的设计尽量避免重复消费,但它并不是天然的 “Exactly Once”(精确一次)消息队列系统。下面是一些减少或避免消息重复消费的方法:
1. 幂等性设计
通过确保消息处理操作是 幂等 的,可以避免消息重复消费时产生副作用。幂等操作意味着无论操作执行多少次,结果都是一致的。
常见幂等性实现方式:
- 使用唯一的 消息ID 或 消息Key 作为数据库操作的标识,确保相同消息不会多次更新数据库。
- 在数据库中维护一个记录消费状态的表,确保同一条消息不会被重复消费。
示例:
// 在数据库中使用消息的唯一ID作为约束
String messageId = msg.getKeys(); // 获取消息的唯一标识
if (isMessageAlreadyProcessed(messageId)) {return; // 如果消息已经处理过,则跳过
}
2. 消息去重
可以在消费者端实现消息去重机制,通过维护消息的 唯一标识 来检查该消息是否已经被消费。
常见去重方式:
- 使用 缓存(如 Redis) 来记录已处理的消息ID。
- 使用 数据库表 来记录处理过的消息ID。
示例:
String msgId = msg.getKeys();
if (cache.contains(msgId)) {return; // 消息已处理,直接跳过
}
cache.put(msgId, true); // 将消息ID放入缓存
3. 调整消息消费确认机制
确保 消费成功确认(ACK) 返回的时机合适,并且消费者不会在处理中途丢失确认。
- 消费成功:消费者成功消费消息后,返回
ConsumeConcurrentlyStatus.CONSUME_SUCCESS
。 - 消费失败:消费者可以返回
ConsumeConcurrentlyStatus.RECONSUME_LATER
,进行重试。
同时,可以通过 消息重试次数限制 来控制消息重试的次数,避免无限重试。
4. 设置适当的重试策略
调整 RocketMQ 消费者的 重试次数和重试间隔,根据业务需求选择合理的策略。例如,如果系统允许偶尔重复消费,可以适当增加重试次数并降低重试的频率。
consumer.setMaxReconsumeTimes(5); // 最大重试次数
consumer.setMessageModel(MessageModel.CLUSTERING); // 消息模型(集群消费)
⚙️ 消费者去重与幂等性的实现例子
1. 幂等性检查
在处理业务操作时,基于消息的 唯一标识 来确保消费的幂等性。
public boolean processMessage(Message msg) {String msgId = msg.getKeys(); // 获取消息的唯一标识if (messageAlreadyProcessed(msgId)) {return true; // 已经处理过该消息,避免重复消费}// 处理消息业务逻辑processBusinessLogic(msg);// 标记为已处理markMessageAsProcessed(msgId);return true;
}
2. 消息去重
可以通过 Redis 等缓存存储已消费的消息 ID,并定期清理过期记录。
public boolean isMessageAlreadyProcessed(String msgId) {// 使用 Redis 来存储已消费的消息IDreturn redisCache.containsKey(msgId);
}public void markMessageAsProcessed(String msgId) {// 将消息ID存储到 Redis,设置过期时间(例如24小时)redisCache.put(msgId, true, 24, TimeUnit.HOURS);
}
✅ 总结
解决方案 | 描述 |
---|---|
幂等性设计 | 通过保证操作是幂等的,避免重复消费导致的副作用。 |
消息去重 | 使用缓存或数据库记录已消费的消息,避免重复消费。 |
消费确认机制 | 确保消费者正确返回消息消费成功的确认,避免重复消费。 |
重试策略 | 合理配置消费重试次数和间隔时间,避免因过多重试导致的重复消费。 |