当前位置: 首页 > java >正文

深入剖析 RocketMQ 中的 DefaultMQPushConsumerImpl:消息推送消费的核心实现

前言

在 Apache RocketMQ 的消息消费体系中,RocketMQ 提供了DefaultMQPushConsumer(推送消费)和DefaultMQPullConsumer(拉取消费)两种主要消费方式。DefaultMQPushConsumer与DefaultMQPullConsumer在消息获取方式,消息处理逻辑,负载均衡与消费管理都有着不同的处理逻辑,这篇博文主要进行分析DefaultMQPushConsumer。

一、DefaultMQPushConsumerImpl 的主要属性

/*** consumer重平衡的组件*/private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);/*** 过滤消息的钩子*/private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();/*** 消费者启动的时间戳*/private final long consumerStartTimestamp = System.currentTimeMillis();/*** 消费消息的钩子list*/private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();/*** rpc的钩子*/private final RPCHook rpcHook;/*** 服务状态*/private volatile ServiceState serviceState = ServiceState.CREATE_JUST;/*** 网络客户端通信*/private MQClientInstance mQClientFactory;/*** 消息拉取的Api*/private PullAPIWrapper pullAPIWrapper;/*** 是否暂停拉取消息的标识*/private volatile boolean pause = false;/*** 是否顺序消费的标识*/private boolean consumeOrderly = false;/*** 消息处理监听器*/private MessageListener messageListenerInner;/*** 消费偏移量的存储组件*/private OffsetStore offsetStore;/*** 消费消息的服务*/private ConsumeMessageService consumeMessageService;/*** 队列流量控制次数*/private long queueFlowControlTimes = 0;/*** 队列最大跨度流量控制次数*/private long queueMaxSpanFlowControlTimes = 0;

其中比较重要的属性位:

  • pullAPIWrapper:负责与Broker进行消息拉取的交互,封装了底层的网络请求与响应处理。

  • rebalanceImpl:实现消费者的负载均衡逻辑,动态分配消息队列给消费者实例

  • offsetStore:负责消费者消费进度(偏移量)的存储与管理,确保消息不重复消费、不丢失

  • consumeMessageService:管理消息消费的线程池,执行具体的消息消费任务

二、DefaultMQPushConsumerImpl 启动方法

//启动方法public synchronized void start() throws MQClientException {switch (this.serviceState) {case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());this.serviceState = ServiceState.START_FAILED;// 检查配置合法性this.checkConfig();// 复制订阅信息this.copySubscription();if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {this.defaultMQPushConsumer.changeInstanceNameToPID();}// 获取或创建MQClientInstancethis.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPushConsumer.getOffsetStore() != null) {this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {switch (this.defaultMQPushConsumer.getMessageModel()) {case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}//从磁盘加载数据this.offsetStore.load();if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}this.consumeMessageService.start();boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}this.updateTopicSubscribeInfoWhenSubscriptionChanged();this.mQClientFactory.checkClientInBroker();//向broker发送心跳this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();//立即进行rebalancethis.mQClientFactory.rebalanceImmediately();}

从DefaultMQPushConsumerImpl的start方法中可以看出来主要作用为:

1. 配置校验与状态管理

  • 配置合法性检查:验证消费者组名、NameServer 地址、订阅关系等核心配置是否正确,确保后续操作的基础条件满足。

  • 状态机控制:通过状态枚举(ServiceState)确保消费者只能从初始状态(CREATE_JUST)启动,防止重复启动或非法状态转换。

2. 核心组件初始化

  • 复制订阅信息:将用户配置的主题(Topic)和过滤表达式(Tag)复制到内部数据结构,用于后续消息过滤。

  • 获取或创建 MQClientInstance:通过单例模式获取或创建与 RocketMQ 集群通信的客户端实例,该实例负责管理网络连接、心跳机制和请求路由。

  • 注册消费者:将当前消费者注册到 MQClientInstance 的消费者注册表中,便于统一管理和协调。

3. 启动核心服务

  • 负载均衡服务:初始化并启动RebalanceImpl,定期(默认 20 秒)执行负载均衡算法,动态分配消息队列给消费者实例,确保集群内负载均匀。

  • 消息拉取服务:启动PullAPIWrapper,初始化拉取线程池,为后续从 Broker 拉取消息做准备。

  • 消息消费服务:根据消费模式(并发 / 顺序)启动对应的ConsumeMessageService,初始化消费线程池,处理拉取到的消息。

4. 状态更新与资源准备

  • 更新服务状态:将消费者状态从START_FAILED切换为RUNNING,标志启动成功。

  • 订阅信息同步:向 NameServer 同步订阅信息,确保 Broker 知晓消费者的订阅关系。

三、消息拉取PullAPIWrapper

消息拉取由pullAPIWrapper主导。pullKernelImpl方法构建请求并发送到Broker,Broker在无消息时挂起请求,超时后返回结果。消费者接收到响应后,解析PullResult。

主要属性

 /*** 网路通信组件*/private final MQClientInstance mQClientFactory;/*** 消费组*/private final String consumerGroup;/*** 单元*/private final boolean unitMode;/*** 消息队列和broker机器的映射关系*/private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =new ConcurrentHashMap<MessageQueue, AtomicLong>(32);/*** 是否连接broker*/private volatile boolean connectBrokerByUser = false;private volatile long defaultBrokerId = MixAll.MASTER_ID;private Random random = new Random(System.currentTimeMillis());/*** 过滤消息的钩子*/private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

拉取方法

 /*** 拉取消息的核心方法* @param mq 从哪个MessageQueue中拉取消息* @param subExpression 子表达式* @param expressionType 表达式类型* @param subVersion 子版本号* @param offset 拉取偏移量* @param maxNums 拉取的最大数量* @param sysFlag 系统标志* @param commitOffset 我们已经拉取处理完毕的数据偏移量 做一个提交* @param brokerSuspendMaxTimeMillis broker挂起最大的时间戳* @param timeoutMillis 拉取消息的超时时间* @param communicationMode 通信模式* @param pullCallback 回调的接口* @return* @throws MQClientException* @throws RemotingException* @throws MQBrokerException* @throws InterruptedException*/public PullResult pullKernelImpl(final MessageQueue mq,final String subExpression,final String expressionType,final long subVersion,final long offset,final int maxNums,final int sysFlag,final long commitOffset,final long brokerSuspendMaxTimeMillis,final long timeoutMillis,final CommunicationMode communicationMode,final PullCallback pullCallback) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {//根据messageQueue获取broker地址 可以针对针对这个MessageQueue重新计算FindBrokerResult findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);//未找到broker 从nameServer中进行更新数据 然后进行再次寻找if (null == findBrokerResult) {this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());findBrokerResult =this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),this.recalculatePullFromWhichNode(mq), false);}if (findBrokerResult != null) {{// check versionif (!ExpressionType.isTagType(expressionType)&& findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {throw new MQClientException("The broker[" + mq.getBrokerName() + ", "+ findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);}}int sysFlagInner = sysFlag;if (findBrokerResult.isSlave()) {sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);}//构建拉取消息的请求头PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();requestHeader.setConsumerGroup(this.consumerGroup);requestHeader.setTopic(mq.getTopic());requestHeader.setQueueId(mq.getQueueId());requestHeader.setQueueOffset(offset);requestHeader.setMaxMsgNums(maxNums);requestHeader.setSysFlag(sysFlagInner);requestHeader.setCommitOffset(commitOffset);requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);requestHeader.setSubscription(subExpression);requestHeader.setSubVersion(subVersion);requestHeader.setExpressionType(expressionType);requestHeader.setBname(mq.getBrokerName());String brokerAddr = findBrokerResult.getBrokerAddr();if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {brokerAddr = computePullFromWhichFilterServer(mq.getTopic(), brokerAddr);}//拉取消息PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(brokerAddr,requestHeader,timeoutMillis,communicationMode,pullCallback);return pullResult;}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}

四、RebalanceImpl负载均衡

RebalanceImpl#rebalanceByTopic 是 RocketMQ 实现消费者负载均衡的核心方法,其主要作用是根据当前集群中消费者实例和消息队列的状态,动态分配每个消费者应负责消费的队列。这个过程确保了消息消费的均匀性和高可用性,避免了部分消费者过载而其他消费者空闲的情况。

 private void rebalanceByTopic(final String topic, final boolean isOrder) {switch (messageModel) {case BROADCASTING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);if (mqSet != null) {boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);if (changed) {this.messageQueueChanged(topic, mqSet, mqSet);log.info("messageQueueChanged {} {} {} {}",consumerGroup,topic,mqSet,mqSet);}} else {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}break;}case CLUSTERING: {//订阅的topic 有哪些queuesSet<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);//查询consumer分组中有哪些consumer实例List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}if (null == cidAll) {log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);}if (mqSet != null && cidAll != null) {List<MessageQueue> mqAll = new ArrayList<MessageQueue>();mqAll.addAll(mqSet);Collections.sort(mqAll);Collections.sort(cidAll);//获取配置的负载均衡策略(默认:平均分配)AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;List<MessageQueue> allocateResult = null;try {//执行负载均衡算法,计算当前消费者应分配的队列allocateResult = strategy.allocate(this.consumerGroup,this.mQClientFactory.getClientId(),mqAll,cidAll);} catch (Throwable e) {log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),e);return;}Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();if (allocateResult != null) {allocateResultSet.addAll(allocateResult);}//更新consumer要进行处理的消息队列boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);if (changed) {log.info("rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),allocateResultSet.size(), allocateResultSet);this.messageQueueChanged(topic, mqSet, allocateResultSet);}}break;}default:break;}}

 核心功能概述

rebalanceByTopic 方法会针对指定主题(Topic)执行以下操作:

  • 获取当前消费组内所有活跃的消费者实例列表

  • 获取该主题下的所有消息队列(MessageQueue)

  • 根据负载均衡策略(如平均分配、轮询等)计算每个消费者应分配的队列

  • 更新本地队列分配信息,并触发消息拉取或停止不必要的拉取任务

负载均衡会在以下情况触发

  • 消费者启动时

  • 消费者实例数量变化(新实例加入或旧实例退出)

  • 主题的队列数量变化(如 Broker 动态调整队列数)

  • 定时任务(默认每 20 秒执行一次)

五、总结

通过对DefaultMQPushConsumerImpl了解,我们清晰地看到了 RocketMQ 消息推送消费的完整实现逻辑。从类结构设计到各个核心流程的源码细节,每一部分都紧密协作,共同保障了消息消费的高效性和可靠性。理解这些源码,开发者能够更好地进行性能优化、问题排查和功能扩展,让 RocketMQ 在实际项目中发挥更大价值。

http://www.xdnf.cn/news/12664.html

相关文章:

  • Docker基础(二)
  • TTL简述
  • Unity基础-欧拉角和四元数
  • 【Elasticsearch】映射:Join 类型、Flattened 类型、多表关联设计
  • 基于springboot的藏文古籍系统
  • Nature子刊:16S宏基因组+代谢组学联动,借助MicrobiomeGS2建模揭示IBD代谢治疗新靶点
  • Java高级 | 【实验六】Springboot文件上传和下载
  • Python 中的MVC与MVP 框架与示例
  • LVGL对显示接口的要求
  • 闲庭信步使用SV搭建图像测试平台:第一课——图片的读写
  • 【商城saas和商城源码的区别】
  • 【Zephyr 系列 13】BLE Mesh 入门实战:构建基础节点通信与中继组播系统
  • 类型别名与类型自动推导
  • Redis数据持久化之RDB快照
  • 【走好求职第一步】求职OMG——见面课测验4
  • SAP学习笔记 - 开发27 - 前端Fiori开发 Routing and Navigation(路由和导航)
  • 算术图片验证码(四则运算)+selenium
  • 【大模型】大模型RAG(Retrieval-Augmented Generation)面试题合集
  • 欢乐熊大话蓝牙知识16:蓝牙是怎么找设备的?扫描与广播的“对话内幕”
  • Shell编程精髓:表达式与数组实战指南
  • DbServer链接KingBase8(人大)数据库
  • Android座舱系统Agent改造方案
  • day 47
  • 微前端架构下的B端页面设计:模块化与跨团队协作的终极方案
  • Python爬虫-爬取各省份各年份高考分数线数据,进行数据分析
  • 国产pcie switch,支持PCIE 3.0/4.0/5.0,支持昇腾310/910 GPU,支持龙芯、海光、飞腾
  • 小白成长之路-Linux Shell脚本练习
  • 2025年- H77-Lc185--45.跳跃游戏II(贪心)--Java版
  • Xilinx IP 解析之 Block Memory Generator v8.4 ——01-手册重点解读(仅 Native R
  • 前端开发面试题总结-JavaScript篇(二)