当前位置: 首页 > ops >正文

RocketMQ如何处理消息堆积

mq出现消息堆积本质上是消费速度跟不上生产速度导致的结果。处理消息堆积需要一个系统性的方法,从快速定位问题根源到实施相应的解决方案。

紧急诊断与定位瓶颈

当监控系统告警出现消息堆积时,不要急于重启或扩容,先搞清楚问题出在哪里。

  1. 确认堆积情况

    • 通过 RocketMQ Console 或 Admin Tool 查看 堆积量(Behind)、堆积消息数(Diff)等指标。确认是单个 Topic 还是多个 Topic 出现问题,是全部 Consumer Group 还是特定 Group。

    • 命令:./mqadmin consumerProgress -n <nameserver_addr> -g <consumer_group>

  2. 分析消费端状态

    • 消费线程是否卡住?:检查消费者应用的日志是否有大量错误,如数据库连接超时、HTTP 调用失败、死锁或长时间的 GC 停顿。这是最常见的原因

    • 消费逻辑是否变慢?:是否引入了新的、耗时的业务逻辑?是否在处理某一种特定消息时效率极低?

    • 检查系统资源:CPU、内存、磁盘 I/O 或网络带宽是否已达瓶颈?特别是消费者应用所在的机器。

  3. 分析生产端状态

    • 是否突然有流量洪峰?:例如大促、定时任务集中触发等,导致生产者发送消息的速率远超平时。

    • 是否在“重放”消息?:是否将大量历史消息重新发送到队列中?

  4. 分析 Broker 状态

    • Broker 的 CPU、IO 负载是否正常?写入消息的性能是否下降?

  • 单个Topic堆积 -> 问题很大概率出在这个Topic相关的特定业务链路上(消费端或生产端)。

  • 多个Topic同时堆积 -> 问题很大概率出在公共底层资源或组件上。

因此从四大方面分别进行分析:发送端(流量激增,过多重复消息),队列端(单或多topic同时堆积),broker处理机端(cpu,io负载),消费端(消费报错,新增耗时业务,系统资源占用情况)。

实施应急处理方案(短期止血)

排查完问题之后就需要立即解决当前消息堆积的问题

  1. 扩容消费者(最直接有效)

    • 增加消费者实例数:这是应对流量洪峰最快捷的方式。通过增加 Pod(K8s)、容器或虚拟机来水平扩展消费者应用。

    • 增加单个消费者的并行度

      • 调整 consumeThreadMin 和 consumeThreadMax:增加消费者线程池的大小。

      • 调整 pullBatchSize:增加每次从 Broker 拉取的消息数量,减少网络交互次数。

  2. 优化消费逻辑

    • 简化或绕过:如果可能,临时将复杂的消费逻辑(如写数据库、调用外部API)替换为更简单的逻辑(如只解析消息并落盘到本地或更快的存储中),事后再进行补偿处理。

    • 避免阻塞:检查消费代码中是否有不必要的同步等待、锁竞争或慢速的 IO 操作,将其异步化或优化。

  3. 服务降级

    • 如果消息不是100%关键,可以考虑只处理核心业务消息,非核心消息先跳过或记录日志后稍后处理。

根本原因分析与长期优化(治本)

1. 批量消费示例

RocketMQ支持批量消费,你可以在消费者中一次获取多条消息进行处理。

public class BatchMessageListener implements MessageListenerConcurrently {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {try {// 批量处理消息for (MessageExt msg : msgs) {String body = new String(msg.getBody(), StandardCharsets.UTF_8);// 处理每条消息,这里可以是你的业务逻辑processMessage(body);}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} catch (Exception e) {// 处理失败,稍后重试return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}private void processMessage(String message) {// 你的业务逻辑System.out.println("Processing: " + message);}
}

在初始化消费者时,你需要设置批量消费的大小(通过设置Consumer的pullBatchSize属性,但注意,实际拉取的消息数还会受到Topic、队列等因素的影响)。另外,也可以通过在监听器中处理多条消息(如上所示)来实现批量处理。

2. 异步处理示例

将耗时的业务操作异步化,避免阻塞消费线程。

public class AsyncMessageListener implements MessageListenerConcurrently {// 创建一个独立的线程池用于异步处理private final ExecutorService asyncProcessingPool = Executors.newFixedThreadPool(10);@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {// 将每条消息提交给线程池异步处理for (MessageExt msg : msgs) {asyncProcessingPool.submit(() -> {try {String body = new String(msg.getBody(), StandardCharsets.UTF_8);// 耗时操作,如调用第三方API、复杂计算等timeConsumingProcess(body);} catch (Exception e) {// 处理异常,可以根据业务需要记录日志并决定是否重试// 注意:异步处理中如果失败,消息已经被确认消费了,所以需要额外的重试机制(如将失败消息再发送到另一个队列)}});}// 注意:这里立即返回成功,消息会被确认消费成功。如果异步处理失败,消息不会重试。// 因此,这种模式适用于对消息丢失不敏感的场景,或者有其他补偿机制。return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}private void timeConsumingProcess(String message) {// 模拟耗时操作try {Thread.sleep(1000);} catch (InterruptedException e) {Thread.currentThread().interrupt();}System.out.println("Processed: " + message);}
}

注意:异步处理的风险在于,如果异步任务失败,消息已经被确认为消费成功,因此不会重试。所以这种方法通常需要配合其他机制(如死信队列、人工干预)使用,或者用于允许少量丢失的场景。

3. 优化数据库操作示例

优化数据库操作,例如使用批量插入、索引等。

假设我们消费消息后需要将数据插入数据库,我们可以使用批量插入来优化。

4. 保证幂等性示例

以消费端幂等为例,使用Redis来记录已经处理过的消息(假设每条消息有唯一ID)。

public class IdempotentMessageListener implements MessageListenerConcurrently {private final RedisTemplate<String, String> redisTemplate;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {String messageId = msg.getMsgId(); // 注意:生产者发送重复消息时MsgId不同,所以要用业务唯一IDString body = new String(msg.getBody(), StandardCharsets.UTF_8);MyData data = parseMessage(body);// 使用业务唯一ID来判断重复,比如订单号String businessId = data.getOrderId();// 尝试在Redis中设置key,如果已存在则设置失败Boolean success = redisTemplate.opsForValue().setIfAbsent(businessId, "processed", 10, TimeUnit.MINUTES);if (Boolean.TRUE.equals(success)) {// 第一次处理,执行业务逻辑processData(data);} else {// 已经处理过,直接跳过System.out.println("Duplicate message, skipped: " + businessId);}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}private void processData(MyData data) {// 处理业务}
}

注意:上述示例中使用消息的业务唯一ID(如订单ID)作为Redis的key,并设置过期时间(例如10分钟),这样在一定时间后自动清除,避免Redis无限增长。

如果使用数据库实现幂等,可以通过唯一键约束来避免重复插入。

这些示例提供了实现这些优化策略的基本思路,实际应用中需要根据具体业务场景进行调整。

5. 合理设计消息与 Topic

  • 消息过滤:使用 Tag 或 SQL92 属性过滤器,让消费者只订阅它真正需要的消息,避免处理无关数据。

  • 拆分 Topic:如果一个大 Topic 中包含多种业务类型的消息,且消费速度不一致,可以考虑将其拆分成多个 Topic,由不同的 Consumer Group 消费,避免慢业务阻塞快业务。

6. 监控与告警常态化

  • 建立完善的监控:不仅监控堆积量,还要监控消费 TPS、生产 TPS、消费耗时、成功/失败率等。

  • 设置合理告警阈值:在堆积量达到一个风险水平(如积压超过 10W)时就提前告警,而不是等到系统快崩溃了再告警。

7.预留资源缓冲

  • 预估大促或活动流量,提前对消费者应用进行扩容。

  • 保证系统有一定的资源冗余,以应对突发流量。

http://www.xdnf.cn/news/20208.html

相关文章:

  • 云某惠旧案再审可能性与商业创新实践:积分运营的边界与实体商家机遇
  • 【设计模式】 工厂方法模式
  • 【YOLOv11】2.安装Anaconda3
  • 机器人控制器开发(定位算法——map、odom、baselink关联与差异)
  • JavaScript的库简介
  • 离散数学学习指导与习题解析
  • react生命周期,详细版本
  • 运筹学——求解线性规划的单纯形法
  • solidity的高阶语法2
  • AI工程师对于AI的突发奇想
  • Docker Desktop 安装 Linux(告别传统的虚拟机VMware)
  • Date、BigDecimal类型值转换
  • 残差去噪扩散模型
  • 字节跳动OmniHuman-1.5发布:单图+音频秒变超真实视频,AI数字人技术再升级
  • HOT100--Day13--104. 二叉树的最大深度,226. 翻转二叉树,101. 对称二叉树
  • Docker入门到精通:从零基础到生产部署
  • 如何在路由器上配置DHCP服务器?
  • 本体论中的公理与规则——从经典逻辑到神经符号融合的演进
  • Hive on Tez/Spark 执行引擎对比与优化
  • AI浪潮下,人类创造力的“危”与“机”
  • 2026届大数据毕业设计选题推荐-基于大数据旅游数据分析与推荐系统 爬虫数据可视化分析
  • JAVA基本文件操作
  • 【74页PPT】MES简介(附下载方式)
  • TensorFlow 面试题及详细答案 120道(101-110)-- 底层原理与扩展
  • C++笔记之软件设计原则总结
  • Lua > Mac Mini M4安装openresty
  • 基于Transformer 实现车辆检测与车牌识别(一)
  • disable CASCADE主键失败 ORA-2297 And ORA-2433
  • MCAP :机器人数据容器的全面实践指南
  • 区块链是什么