当前位置: 首页 > backend >正文

6、RocketMQ消息积压问题如何解决

消息积压是消息中间件中常见的问题,主要由消费速度跟不上生产速度导致。以下是几种解决方案:

1、增加消费者线程数量


这是最直接的方法,通过增加消费者线程数来提高消费能力。
示例代码:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupName");  
consumer.setNamesrvAddr("nameserver1:9876;nameserver2:9876");  
consumer.setConsumeThreadMin(20);  
consumer.setConsumeThreadMax(64);  // 设置每个消费者实例消费的最大线程数  
consumer.setConsumeThreadMax(30);  consumer.subscribe("TopicTest", "*");  
consumer.registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  // 消费逻辑  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  }  
});  consumer.start();

2、消息业务异步处理


示例代码:

consumer.registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  // 批量处理消息  executorService.submit(() -> {  for (MessageExt msg : msgs) {  // 开启后台线程异步处理每条消息  processMessageAsync(msg);  }  });  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  }  
});

3、调整消费者的消费模式


将顺序消费改为并行消费,提高消费效率。
示例代码:

// 将MessageListenerOrderly改为MessageListenerConcurrently  
consumer.registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  // 并行消费逻辑  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  }  
});

4、使用消息过滤


通过消息过滤,只消费重要的消息,降低消费压力。
示例代码:

consumer.subscribe("TopicTest", "tag1 || tag2 || tag3");

5、临时扩容


在消息积压严重时,可以临时启动额外的消费者实例来快速消费积压的消息。

(快速消费逻辑,可以只做简单处理或者将消息转储到其他系统(比如redis),再启动后台线程处理redis里的消息  )
示例代码:

public class TemporaryConsumer {  public static void main(String[] args) throws Exception {  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TemporaryConsumerGroup");  consumer.setNamesrvAddr("nameserver:9876");  consumer.subscribe("TopicTest", "*");  consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  consumer.registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  // 快速消费逻辑,可以只做简单处理或者将消息转储到其他系统(比如redis),再启动后台线程处理redis里的消息  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  }  });  consumer.start();  }  
}

6、调整生产者发送策略


如果可能,可以调整生产者的发送策略,如降低发送频率或者实现背压机制。
示例代码:


DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");  
producer.setNamesrvAddr("nameserver:9876");  
producer.start();  // 设置发送消息的超时时间,如果超时,说明可能存在积压,可以降低发送频率  
producer.setSendMsgTimeout(3000);  // 设置异步发送失败重试次数  
producer.setRetryTimesWhenSendAsyncFailed(0);  // 示例:根据积压情况调整发送频率  
while (true) {  if (checkMessageAccumulation(groupName)) {  Thread.sleep(1000); // 降低发送频率  }  Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());  producer.send(msg);  
}public Boolean checkMessageAccumulation(String groupName) throws Exception { private static final long ACCUMULATION_THRESHOLD = 10000; // 消息积压阈值 DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt();  defaultMQAdminExt.setInstanceName("CheckAccumulationInstance");  try {  defaultMQAdminExt.start();  ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats(groupName);  //computeTotalDiff方法可以计算当前积压的消息总数long accumulateCount = consumeStats.computeTotalDiff();  System.out.println("Total accumulated messages: " + accumulateCount);  // 根据业务需要设置积压数量阈值  if (accumulateCount > ACCUMULATION_THRESHOLD) {  System.out.println("发生消息积压");  return true;}  } finally {  defaultMQAdminExt.shutdown();  }  return false;
}  

代码说明:
DefaultMQAdminExt 是 RocketMQ 提供的一个管理类,属于 RocketMQ 的管理工具模块。它继承自 MQAdmin,是用于扩展和增强 RocketMQ 管理能力的一个类。这个类提供了一系列的管理 API,用于管理和监控 RocketMQ 集群,包括消息查询、消费进度管理、主题管理、Broker 状态监控等功能。

总结:


这些方法可以单独使用,也可以组合使用,具体取决于您的业务场景和系统架构。在实施这些解决方案时,请注意监控系统性能,确保不会因为过度优化而导致其他问题。同时,也要考虑长期的解决方案,如优化系统架构、升级硬件等,以从根本上提高系统的消息处理能力。

http://www.xdnf.cn/news/18691.html

相关文章:

  • Python爬虫实战:Selenium模拟操作爬取马蜂窝旅游攻略
  • 数据挖掘 6.1 其他降维方法(不是很重要)
  • redis----list详解
  • 深度学习入门第一课——神经网络实现手写数字识别
  • 读《精益数据分析》:A/B测试与多变量测试
  • 【栈 - LeetCode】739.每日温度
  • [Java恶补day51] 46. 全排列
  • 无人机芯片休眠模式解析
  • 关于传统的JavaWeb(Servlet+Mybatis)项目部署Tomcat后的跨域问题解决方案
  • 日语学习-日语知识点小记-构建基础-JLPT-N3阶段(19):文法复习+单词第7回1
  • 基于知识图谱的装备健康智能维护系统KGPHMAgent
  • C++ #pragma
  • 少儿舞蹈小程序需求规格说明书
  • 【Hot100】二分查找
  • Fluent Bit系列:字符集转码测试(上)
  • 使用 Prometheus 监控服务器节点:Node Exporter 详解与配置
  • 实时监测蒸汽疏水阀的工作状态的物联网实时监控平台技术解析
  • 容器学习day02
  • 基于 OpenCV 与 Mediapipe 的二头肌弯举追踪器构建指南:从环境搭建到实时计数的完整实现
  • 力扣498 对角线遍历
  • 4G模块 EC200通过MQTT协议连接到阿里云
  • (LeetCode 每日一题) 498. 对角线遍历 (矩阵、模拟)
  • 撤回git 提交
  • 【龙泽科技】汽车车身测量与校正仿真教学软件【赛欧+SHARK】
  • 什么是共模抑制比?
  • 三坐标如何实现测量稳定性的提升
  • RustFS在金融行业的具体落地案例中,是如何平衡性能与合规性要求的?
  • WRC2025 | 澳鹏亮相2025世界机器人大会,以数据之力赋能具身智能新纪元
  • 大数据毕业设计选题推荐-基于大数据的餐饮服务许可证数据可视化分析系统-Spark-Hadoop-Bigdata
  • LevelDB SSTable模块