当前位置: 首页 > java >正文

RabbitMQ消息堆积问题排查:concurrentConsumers 配置的坑与解决方案

一、问题背景

在分布式系统中,消息队列常用于异步处理和解耦。某业务场景下,用户完成支付后,支付结果通过MQ通知业务系统更新状态。但用户反馈支付成功后,需要等待好几分钟才能在业务系统中看到状态更新,体验较差。

二、问题现象

业务流程如下:

  1. 用户支付成功
  2. 支付渠道将支付结果发送到MQ队列
  3. 业务服务消费MQ中的支付结果
  4. 消费成功后更新业务单据状态
  5. 向相关系统发送支付结果通知

监控发现,从消息生产到消费完成,平均耗时约2分钟,无法满足业务实时性要求。

三、问题排查

1. 队列堆积检查

使用RabbitMQ管理命令检查队列状态:
rabbitmqctl list_queues name messages messages_unacknowledged

监控数据显示:

16:54:00 queue_async_notify      2
....
16:59:30 queue_async_notify      0

明显看到消息在队列中堆积,直到16:58:30才被消费完。

2. 消费线程分析

检查业务日志发现,所有消息都由同一个线程处理:

3. 代码配置检查

查看RabbitMQ配置类发现关键问题:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory, MessageConverter msgConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(msgConverter);factory.setMaxConcurrentConsumers(10); // 只设置了最大并发数return factory;
}

4. 根本原因分析

• 业务处理平均耗时20秒,属于IO密集型操作

• 配置中只设置了maxConcurrentConsumers,未设置concurrentConsumers

• 由于消息量不大,未触发动态扩容机制

• 实际并发消费者数保持默认值1,导致消息串行处理

• 单线程处理速度跟不上消息生产速度,造成堆积

四、解决方案

方案一:固定并发消费者数(推荐)

@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

// 设置固定并发消费者数
factory.setConcurrentConsumers(5);
return factory;

}

方案二:动态扩容配置

@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

// 动态扩容配置
factory.setConcurrentConsumers(1);           // 初始并发数
factory.setMaxConcurrentConsumers(5);        // 最大并发数
factory.setPrefetchCount(1);                // 预取消息数
factory.setConsecutiveActiveTrigger(3);      // 连续活跃触发扩容
factory.setConsecutiveIdleTrigger(3);       // 连续空闲触发缩容
factory.setReceiveTimeout(1000L);            // 接收超时时间
return factory;

}

五、优化效果

配置调整后:
• 消息处理耗时从5分钟降低到20秒内

• 系统吞吐量提升5倍

• 用户感知的支付状态更新延迟大幅减少

六、经验总结

1. 理解配置参数:

• concurrentConsumers:初始并发消费者数

• maxConcurrentConsumers:最大并发消费者数

• 两者需要配合使用,只设置最大值不会生效

2. 根据业务特点配置:

• IO密集型操作:适当增加并发数

• CPU密集型操作:谨慎增加并发数

• 消息量稳定:使用固定并发数

• 消息量波动大:使用动态扩容

通过这次排查,我们认识到正确配置RabbitMQ消费者并发数的重要性,以及如何根据业务特性进行针对性优化。

http://www.xdnf.cn/news/19747.html

相关文章:

  • js设计模式-职责链模式
  • More Effective C++ 条款24:理解虚拟函数、多继承、虚继承和RTTI的成本
  • VMWare ubuntu24.04安装(安装ubuntu安装)
  • 复杂PDF文档如何高精度解析
  • css3元素倒影效果属性:box-reflect
  • IsaacLab训练机器人
  • uni-app 实现做练习题(每一题从后端接口请求切换动画记录错题)
  • 国内免费低代码软件精选:四款工具助你快速开启数字化转型之路
  • 力扣72:编辑距离
  • windows docker(二) 启动存在的容器
  • 5招教你看透PHP开发框架的生态系统够不够“牛”?
  • 推荐一个论文阅读工具ivySCI
  • latex怎么写脚注:标共一声明,标通讯作者
  • 使用 Avidemux 去除视频的重复帧
  • 从实操到原理:一文搞懂 Docker、Tomcat 与 k8s 的关系(附踩坑指南 + 段子解疑)
  • 血缘元数据采集开放标准:OpenLineage Guides 在 Spark 中使用 OpenLineage
  • SpringBoot3中使用Caffeine缓存组件
  • 模版进阶及分离编译问题
  • ansible判断
  • 科学研究系统性思维的方法体系:数据分析模板
  • C语言:归并排序和计数排序
  • OCR识别在媒资管理系统的应用场景剖析与选择
  • 基于ZooKeeper实现分布式锁(Spring Boot接入)及与Kafka实现的对比分析
  • Pod自动重启问题排查:JDK 17 EA版本G1GC Bug导致的应用崩溃
  • Element Plus 表格表单校验功能详解
  • 【Web前端】JS+DOM来实现乌龟追兔子小游戏
  • 轻型载货汽车变速器设计cad+设计说明书
  • 【序列晋升】25 Spring Cloud Open Service Broker 如何为云原生「服务市集」架桥铺路?
  • 分布式光纤传感选型 3 问:你的场景该选 DTS、DAS 还是 BOTDA?
  • 2017考研数学(二)真题