RocketMQ 使用经验一二
RocketMQ 使用经验一二
- 1. 横向扩容消费节点后,部分节点消费不到数据
- 2. 消费线程数和预期不一致
1. 横向扩容消费节点后,部分节点消费不到数据
RocketMQ Topic队列数太少,当消费者个数>topic队列个数时,多余的消费者无法消费到消息。
2. 消费线程数和预期不一致
@Slf4j
@Component
@RocketMQMessageListener(topic = "${rocketmq.custom.my.topic}", consumerGroup = "${rocketmq.custom.my.score.consumer}", selectorExpression = "${rocketmq.custom.my.tag}", consumeThreadMax = 64)
public class MyMqListener implements RocketMQListener<MessageExt> {...
}
org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer#initRocketMQPushConsumer
代码块显示,当consumeThreadMax
<consumeThreadMin
时,consumeThreadMin
的值被被强制设置为consumeThreadMax
,可以将核心线程数(消费线程数)控制在较低的水平。
RocketMQ的消费线程池使用ThreadPoolExecutor,核心线程数是consumeThreadMin,最大线程数是consumeThreadMax。不过,线程池的工作队列是LinkedBlockingQueue,这是一个无界队列。根据ThreadPoolExecutor的工作原理,当任务被提交时,线程池会先使用核心线程处理,如果核心线程都在忙,任务会被放入队列。只有当队列满了,才会创建新线程直到达到最大线程数。而由于队列是无界的,几乎不可能满,所以最大线程数设置无效,实际线程数不会超过核心线程数。
- 解决办法
结合Spring
BeanPostProcessor
,通过配置项强制覆盖consumeThreadMin
值。
@Slf4j
@Slf4j
@Configuration
public class RMQAutoConfiguration implements BeanPostProcessor {/*** 消费者线程最小值(核心线程数)*/@Value("${rocketmq.consumer.consume-thread-min:20}")private int consumeThreadMin;@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {if (bean instanceof DefaultRocketMQListenerContainer) {DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean;DefaultMQPushConsumer consumer = container.getConsumer();if (consumeThreadMin <= consumer.getConsumeThreadMax()) {consumer.setConsumeThreadMin(consumeThreadMin);}}return bean;}}