当前位置: 首页 > ds >正文

【RocketMQ 高可用】- 从节点同步偏移量源码解析

文章目录

  • 1. 前言
  • 2. handleSlaveSynchronize
  • 3. syncAll 同步主节点偏移量
    • 3.1 syncTopicConfig 同步主节点的 topi 配置
    • 3.2 syncConsumerOffset 同步主节点的 ConsumerOffset 配置
    • 3.3 syncDelayOffset 同步延时队列的消费点位
    • 3.4 syncSubscriptionGroupConfig 同步主节点的订阅消息
  • 4. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

前置文章:

  • 【RocketMQ 高可用】- 主从同步的前置(自己实现的简单通信流程)
  • 【RocketMQ 高可用】- 主从同步(一主一从测试)
  • 【RocketMQ 高可用】- 详解主从同步刷盘请求处理服务 GroupTransferService
  • 【RocketMQ 高可用】- 主从同步核心源码 HAService 和 HAConnection(1)
  • 【RocketMQ 高可用】- 主从同步核心源码 HAService 和 HAConnection(2)

上面几篇文章我们解析了主从同步的源码和流程结构,而最后还有一项就是偏移量的同步,因为上面的流程只是将数据和同步偏移量传输过来,实际上主节点消费点位、topic 配置、订阅消息等这些都没有给从节点传输过去,这样如果消费者来消费从节点还是不知道应该怎么过滤消息…,所以 RocketMQ 通过一个定时任务来将这些相关的信息都给从节点传输过去。


2. handleSlaveSynchronize

这个方法就是从节点专门用于同步配置的方法,当 BrokerController 启动的时候会调用 handleSlaveSynchronize 方法,所以是 broker 启动就开始同步的。

/*** 从节点专门用于同步主节点的各个配置* @param role 角色*/
private void handleSlaveSynchronize(BrokerRole role) {if (role == BrokerRole.SLAVE) {// 如果是从节点才进行同步if (null != slaveSyncFuture) {// 这里取消原来的定时任务,但是不中断原来定时任务的执行slaveSyncFuture.cancel(false);}// 将 slaveSynchronize 对象中的主节点地址设置为 null,后面 broker 上传心跳会重新设置主节点的// 猜测有可能是避免主节点一开始没有数据就不断去同步,浪费资源,不一定对this.slaveSynchronize.setMasterAddr(null);// 定时任务slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 初始化之后延时 3s 执行,之后按固定速率的模式去执行,固定速率设置成 10sBrokerController.this.slaveSynchronize.syncAll();}catch (Throwable e) {log.error("ScheduledTask SlaveSynchronize syncAll error.", e);}}}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);} else {//handle the slave synchroniseif (null != slaveSyncFuture) {slaveSyncFuture.cancel(false);}this.slaveSynchronize.setMasterAddr(null);}
}

上面就是 handleSlaveSynchronize 的全部流程,里面会去判断如果是从节点才进行同步,同时取消原来的定时任务,但是不中断原来定时任务的执行。

下面将 slaveSynchronize 对象中的主节点地址设置为 null,后面 broker 上传心跳会重新设置主节点的,相当于延迟执行了。

最后延迟 3s 执行,每隔 10s 执行一次。


3. syncAll 同步主节点偏移量

/*** 同步主节点的所有偏移量*/
public void syncAll() {// 同步主节点的 topi 配置this.syncTopicConfig();// 同步主节点的 ConsumerOffset 配置this.syncConsumerOffset();// 设置延时队列的消费点位this.syncDelayOffset();// 同步主节点的订阅消息this.syncSubscriptionGroupConfig();
}

上面就是同步的逻辑,可以看到里面分成了四部分,专门负责同步 topic 配置ConsumerOffset 消费配置delayOffset 延时对列消费点位订阅信息


3.1 syncTopicConfig 同步主节点的 topi 配置

/*** 同步 topic 的配置*/
private void syncTopicConfig() {// 主节点地址String masterAddrBak = this.masterAddr;if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {// 如果当前的 broker 地址不是主节点地址,那 broker 地址怎么算出来的呢?就是通过 brokerIP1 + : + listenPorttry {// 向主节点发送请求获取 topic 配置TopicConfigSerializeWrapper topicWrapper =this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);// 这里的意思是如果当前的 topic 配置和主节点发送过来的版本不一样,那就说明需要重新更新当前从节点的 topic 配置信息// 因为更新 topic 配置需要持久化,所以版本一样的情况下就没必要重复持久化了,还是很耗费性能的if (!this.brokerController.getTopicConfigManager().getDataVersion().equals(topicWrapper.getDataVersion())) {// 设置当前 slave 节点的配置版本为最新的this.brokerController.getTopicConfigManager().getDataVersion().assignNewOne(topicWrapper.getDataVersion());// 清空旧数据this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();// 设置新数据this.brokerController.getTopicConfigManager().getTopicConfigTable().putAll(topicWrapper.getTopicConfigTable());// 持久化 topic 配置到文件中this.brokerController.getTopicConfigManager().persist();log.info("Update slave topic config from master, {}", masterAddrBak);}} catch (Exception e) {log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);}}
}

可以看到这个方法中会判断如果主节点地址不为空,就向主节点发起同步请求,如果是主节点 masterAddr 就是空,也就不会同步了,只有从节点会同步。

getAllTopicConfig 就是向主节点发送请求获取 topic 配置,主节点同步配置过来会携带一个版本号,从节点可以根据这个版本号判断 topic 配置是否发生了变化,如果发生了变化就需要更新从节点的 topic 配置,因为更新 topic 配置需要持久化,所以版本一样的情况下就没必要重复持久化了,还是很耗费性能的。

设置 topic 配置的时候会把配置设置到 topicConfigTable 中,然后调用 persist 持久化,持久化的文件就是:${ROCKETMQ_HOME} / store / config / topics.json
在这里插入图片描述

那么是如何发送获取 topic 配置的呢?看下面的 getAllTopicConfig 方法。

public TopicConfigSerializeWrapper getAllTopicConfig(final String addr) throws RemotingConnectException, RemotingSendRequestException,RemotingTimeoutException, InterruptedException, MQBrokerException {// 构建获取 topic 配置的请求,请求码是 GET_ALL_TOPIC_CONFIGRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);// 通过从节点的 VIP 通道向主节点发送请求,VIP 通道跟普通通道没什么区别,但是 VIP 通道是平时很少用的,所以性能会很高// 不过其实 VIP 通道就是 broker 监听的端口 - 2,比如当前 broker 监听的端口是 10911,那么 VIP 通道就是 broker地址 + 10909RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {// 返回结果return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class);}default:break;}// 抛出异常throw new MQBrokerException(response.getCode(), response.getRemark(), addr);}

这里获取配置是通过 VIP 通道去发送的,,VIP 通道跟普通通道没什么区别,但是 VIP 通道是平时很少用的,所以性能会很高,其实 VIP 通道就是 broker 监听的端口 - 2,比如当前 broker 监听的端口是 10911,那么 VIP 通道就是 broker地址 + 10909。

发送请求的时候会构建请求码为 GET_ALL_TOPIC_CONFIG 的请求,broker 会在 processRequest 中通过 getAllTopicConfig 方法处理获取 topic 配置的请求。
在这里插入图片描述
下面看下具体的 getAllTopicConfig 方法。

/*** 获取所有 topic 配置,返回当前 broker 的 topic 配置和版本信息* @param ctx* @param request* @return*/
private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) {final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class);// final GetAllTopicConfigResponseHeader responseHeader =// (GetAllTopicConfigResponseHeader) response.readCustomHeader();// 获取编码过后的 topic 配置和版本信息String content = this.brokerController.getTopicConfigManager().encode();if (content != null && content.length() > 0) {try {// 设置编码为 "UTF-8"response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {log.error("", e);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UnsupportedEncodingException " + e);return response;}} else {log.error("No topic in this broker, client: {}", ctx.channel().remoteAddress());response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("No topic in this broker");return response;}// 设置返回结果状态码response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;
}

至于具体的 topic 信息就是上面的编码方法,在里面会把 topicConfigTable 和 dataVersion 设置成 json 格式再发送回给从节点,这个流程不复杂,直接看代码就行了。

/*** 将 topicConfigTable 和 dataVersion 编码,dataVersion 主要是判断 topic 有没有更新的,判断逻辑就是根据* dataVersion 里面的更新时间和 counter 修改次数来判断* @param prettyFormat* @return*/
public String encode(final boolean prettyFormat) {TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();topicConfigSerializeWrapper.setTopicConfigTable(this.topicConfigTable);topicConfigSerializeWrapper.setDataVersion(this.dataVersion);return topicConfigSerializeWrapper.toJson(prettyFormat);
}

3.2 syncConsumerOffset 同步主节点的 ConsumerOffset 配置

/*** 同步主节点的消费偏移量*/
private void syncConsumerOffset() {// 主节点地址String masterAddrBak = this.masterAddr;// 如果当前的 broker 地址不是主节点地址,那 broker 地址怎么算出来的呢?就是通过 brokerIP1 + : + listenPortif (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {try {// 向主节点发送请求获取消费偏移量ConsumerOffsetSerializeWrapper offsetWrapper =this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);// 将获取到的消费偏移量添加到 Slave 节点的 offsetTablethis.brokerController.getConsumerOffsetManager().getOffsetTable().putAll(offsetWrapper.getOffsetTable());// 持久化到文件this.brokerController.getConsumerOffsetManager().persist();log.info("Update slave consumer offset from master, {}", masterAddrBak);} catch (Exception e) {log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);}}
}

这个方法就是同步主节点的消费偏移量,获取到的 ConsumeOffset 会持久化到 ${ROCKETMQ_HOME} / store / config / consumerOffset.json
在这里插入图片描述
获取 ConsumerOffset 的请求码是 GET_ALL_CONSUMER_OFFSET,同样的在 broker 也是通过 getAllConsumerOffset 来获取,其实就是对 consumerOffsetManager 编码求 json 串返回。

private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) {final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 将 ConsumerOffsetManager 进行编码String content = this.brokerController.getConsumerOffsetManager().encode();if (content != null && content.length() > 0) {try {response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {log.error("get all consumer offset from master error.", e);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("UnsupportedEncodingException " + e);return response;}} else {log.error("No consumer offset in this broker, client: {} ", ctx.channel().remoteAddress());response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("No consumer offset in this broker");return response;}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;
}

3.3 syncDelayOffset 同步延时队列的消费点位

/*** 同步延时队列的消费进度*/
private void syncDelayOffset() {// 主节点地址String masterAddrBak = this.masterAddr;// 如果当前的 broker 地址不是主节点地址,那 broker 地址怎么算出来的呢?就是通过 brokerIP1 + : + listenPortif (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {try {// 获取延时队列的消费进度集合String delayOffset =this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);if (delayOffset != null) {// 要写入的文件名String fileName =StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController.getMessageStoreConfig().getStorePathRootDir());try {// 进行持久化MixAll.string2File(delayOffset, fileName);} catch (IOException e) {log.error("Persist file Exception, {}", fileName, e);}}log.info("Update slave delay offset from master, {}", masterAddrBak);} catch (Exception e) {log.error("SyncDelayOffset Exception, {}", masterAddrBak, e);}}
}

获取到的 DelayOffset 会持久化到 ${ROCKETMQ_HOME} / store / config / delayOffset.json。延时 offset 是延时消息的消费点位,最终会调用到 ScheduleMessageService#encode 方法。

@Override
public String encode(final boolean prettyFormat) {DelayOffsetSerializeWrapper delayOffsetSerializeWrapper = new DelayOffsetSerializeWrapper();delayOffsetSerializeWrapper.setOffsetTable(this.offsetTable);return delayOffsetSerializeWrapper.toJson(prettyFormat);
}

其他逻辑跟上面几个方法基本是一样的,所以就不多详细解释了。


3.4 syncSubscriptionGroupConfig 同步主节点的订阅消息

private void syncSubscriptionGroupConfig() {// 主节点地址String masterAddrBak = this.masterAddr;// 如果当前的 broker 地址不是主节点地址,那 broker 地址怎么算出来的呢?就是通过 brokerIP1 + : + listenPortif (masterAddrBak != null  && !masterAddrBak.equals(brokerController.getBrokerAddr())) {try {// 获取主节点消费者组的订阅信息SubscriptionGroupWrapper subscriptionWrapper =this.brokerController.getBrokerOuterAPI().getAllSubscriptionGroupConfig(masterAddrBak);// 如果说消费者组的订阅信息版本有变动if (!this.brokerController.getSubscriptionGroupManager().getDataVersion().equals(subscriptionWrapper.getDataVersion())) {// 获取 Slave 节点的订阅信息SubscriptionGroupManager subscriptionGroupManager =this.brokerController.getSubscriptionGroupManager();// 更新版本subscriptionGroupManager.getDataVersion().assignNewOne(subscriptionWrapper.getDataVersion());// 清除当前 Slave 节点的订阅信息subscriptionGroupManager.getSubscriptionGroupTable().clear();// 将新的订阅消息设置到当前节点中subscriptionGroupManager.getSubscriptionGroupTable().putAll(subscriptionWrapper.getSubscriptionGroupTable());// 持久化subscriptionGroupManager.persist();log.info("Update slave Subscription Group from master, {}", masterAddrBak);}} catch (Exception e) {log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);}}
}

获取到的 subscriptionGroupTable 会持久化到 ${ROCKETMQ_HOME} / store / config / subscriptionGroup.json

里面的逻辑也和上面的流程差不多,也不多详细介绍了。


4. 小结

好了,这篇文章就讲到这了,RocketMQ 中从节点就是通过这种方式来同步主节点的 topic 等相关配置,从而当消费者从从节点读取消息的时候能够正确过滤执行。





如有错误,欢迎指出!!!

http://www.xdnf.cn/news/4253.html

相关文章:

  • 计算机体系结构 第九章 (附带移数网络直径证明和取值情况)
  • 刷leetcodehot100返航版--哈希表5/5、5/6
  • Java抽象类与接口详解
  • 【项目】基于ArkTS的网吧会员应用开发(1)
  • 访问计划(C++)
  • BC9 printf的返回值
  • 学习路线(工业自动化软件架构)
  • Imagine Explainers:AI × 可视化 × 趣味讲解,让复杂变简单
  • 1. 设计哲学与核心价值
  • C/C++滑动窗口算法深度解析与实战指南
  • 2025年第十六届蓝桥杯省赛JavaB组真题
  • 【RocketMQ Broker 相关源码】-注册 broker 信息到所有的 NameServer
  • gcc/g++用法摘记
  • torch.nn.Sequential() and torch.nn.ModuleList()
  • 用输入输出变量根据超稳定性理论设计模型参考自适应系统
  • 迭代器模式
  • map和set的设计以及红黑树的设计
  • 英伟达语音识别模型论文速读:Fast Conformer
  • 学习黑客Nmap 实战
  • Java学习手册:Spring 多数据源配置与管理
  • 信息系统项目管理工程师备考计算类真题讲解十二
  • 破局者手册 Ⅰ:测试开发核心基础,解锁未来测试密钥!
  • 【NLP】27. 语言模型训练以及模型选择:从预训练到下游任务
  • RAG知识库只是表面简单!
  • Kubernetes排错(七)-节点排错
  • 除了java.nio.file.StandardCopyOption,还有哪些类可以实现文件的复制和移动?
  • C++动态库和静态库的生成和使用
  • linux crash工具详解
  • android-ndk开发(1): 搭建环境
  • 星途-(4)