当前位置: 首页 > ai >正文

RocketMQ 使用经验一二

RocketMQ 使用经验一二

    • 1. 横向扩容消费节点后,部分节点消费不到数据
    • 2. 消费线程数和预期不一致

1. 横向扩容消费节点后,部分节点消费不到数据

RocketMQ Topic队列数太少,当消费者个数>topic队列个数时,多余的消费者无法消费到消息。

2. 消费线程数和预期不一致

@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.custom.my.topic}", consumerGroup = "${rocketmq.custom.my.score.consumer}", selectorExpression = "${rocketmq.custom.my.tag}", consumeThreadMax = 64)
public class MyMqListener implements RocketMQListener<MessageExt> {...
}

org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer#initRocketMQPushConsumer代码块显示,当consumeThreadMax<consumeThreadMin时,consumeThreadMin的值被被强制设置为consumeThreadMax,可以将核心线程数(消费线程数)控制在较低的水平。

RocketMQ的消费线程池使用ThreadPoolExecutor,核心线程数是consumeThreadMin,最大线程数是consumeThreadMax。不过,线程池的工作队列是LinkedBlockingQueue,这是一个无界队列。根据ThreadPoolExecutor的工作原理,当任务被提交时,线程池会先使用核心线程处理,如果核心线程都在忙,任务会被放入队列。只有当队列满了,才会创建新线程直到达到最大线程数。而由于队列是无界的,几乎不可能满,所以最大线程数设置无效,实际线程数不会超过核心线程数。

  • 解决办法

结合Spring BeanPostProcessor,通过配置项强制覆盖consumeThreadMin值。

@Slf4j
@Slf4j
@Configuration
public class RMQAutoConfiguration implements BeanPostProcessor {/*** 消费者线程最小值(核心线程数)*/@Value("${rocketmq.consumer.consume-thread-min:20}")private int consumeThreadMin;@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof DefaultRocketMQListenerContainer) {DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;DefaultMQPushConsumer consumer = container.getConsumer();if (consumeThreadMin <= consumer.getConsumeThreadMax()) {consumer.setConsumeThreadMin(consumeThreadMin);}}return bean;}}
http://www.xdnf.cn/news/7379.html

相关文章:

  • 全新30m高分辨率全球地形因子数据下载方法
  • 国产RFID手持终端的自主国产化数据安全优势
  • C# 匹配模式
  • 【算法专题十四】BFS解决FloodFill算法
  • 部署springBoot项目的脚本-windows
  • C++(25): 标准库 <deque>
  • 迅联文库开发日志(三)登陆注册
  • esp32课设记录(四)摩斯密码的实现 并用mqtt上传
  • Springboot 跨域拦截器配置说明
  • JavaScript 学习
  • DW_DMAC简介
  • 嵌入式学习笔记 D22:栈与队列
  • 编排优先——Go 语言开发 AI 智能体的设计与实现
  • 专为MoE设计的“超级工厂”,来了
  • 跨境业务服务器架构设计与CN2线路深度调优
  • Spring Boot 接口定义指南:构建高效的RESTful API
  • 【第二届帕鲁杯】第二届帕鲁杯畸行的爱完整wp
  • RSP-BSP-1
  • 生成式人工智能认证(GAI认证)在企业中的认可度怎样?
  • 基于 STM32 的自动温度巡检小车控制系统设计与实现
  • 第五天的尝试
  • 经典算法复习——快速模幂
  • 51单片机点亮一个LED介绍
  • C++ 函数对象、仿函数与 Lambda 表达式详解
  • 12.vue整合springboot首页显示数据库表-实现按钮:【添加修改删除查询】
  • 深入Java G1 GC调优:如何解决高延迟与吞吐量瓶颈
  • 嵌入式学习笔记 - STM32独立看门狗IWDG与窗口看门狗WWDG的区别
  • HTTPS实验室——TLS/TLCP一站式解决方案
  • C语言——深入理解指针(一)
  • rosbag使用记录