当前位置: 首页 > ds >正文

【RocketMQ Broker 相关源码】-注册 broker 信息到所有的 NameServer

文章目录

  • 1. 前言
  • 2. registerBrokerAll 注册 broker 信息到 NameServer
    • 2.1 needRegister 是否需要注册
    • 2.2 needRegister
    • 2.3 DefaultRequestProcessor#queryBrokerTopicConfig 查询 broker 的配置信息
  • 3. doRegisterBrokerAll 向 NameServer 注册 broker 信息
    • 3.1 registerBrokerAll 注册 broker 的信息到 namrsrv
    • 3.2 TopicConfigSerializeWrapper
    • 3.3 registerBroker
  • 4. DefaultRequestProcessor#registerBrokerWithFilterServer
    • 4.1 registerBroker 注册 broker 信息
    • 4.2 createAndUpdateQueueData
  • 5. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

  • 【RocketMQ】- 源码系列目录
  • 【RocketMQ Broker 相关源码】- broker 启动源码(1)
  • 【RocketMQ Broker 相关源码】- broker 启动源码(2)

在前面 broker 启动时会判断如果没有开启 Dleger 高可用,就会将 broker 信息注册到所有 NameServer。
在这里插入图片描述
在这里插入图片描述
同时在 broker 启动的时候也启动了一个定时任务,初始化之后 10s 开始执行第一次, 之后每隔 30s 执行一次,前两篇文章我们只是提了一嘴,这篇文章我们就来看下 broker 信息是如何注册到
NameServer 的,注册的核心逻辑在方法 registerBrokerAll 中。
在这里插入图片描述c

2. registerBrokerAll 注册 broker 信息到 NameServer

/*** 注册 broker 里面的 topic 配置信息到 nameserver,也就是向 nameserver 发送 broker 的信息* @param checkOrderConfig  是否检测顺序 topic* @param oneway            是否是单向发送,意思是 broker 只管往 namesrv 发送数据,有没有回复,namesrv 有没有收到不管* @param forceRegister     是否强制注册到 nameserver 中*/
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway, boolean forceRegister) {// TopicConfigSerializeWrapper 是对 topicConfigTable 和 dataVersion 进行封装TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();// 如果 broker 权限不支持读或者写if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();// 遍历所有 Topic 配置for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {// 重新创建 TopicConfig,其实就是重新配置了 perm 权限参数,让这个参数和 broker 的一样// 如果说当前这个 topic 在这个 broker 上面不支持读写,那么说生产者或者消费者是不允许往这个 topic 读写的// 这种情况下就修改下 topic 的配置,就是为了和 broker 同步TopicConfig tmp =new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),this.brokerConfig.getBrokerPermission());// 重新设置到 topicConfigTable 中topicConfigTable.put(topicConfig.getTopicName(), tmp);}// 再重新设置下 topicConfigWrapper 的 topicConfigTable 属性topicConfigWrapper.setTopicConfigTable(topicConfigTable);}// 如果需要注册到 nameserverif (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId(),this.brokerConfig.getRegisterBrokerTimeoutMills())) {// 向 nameserver 上报 broker 信息doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);}
}

首先 broker 是存储了 topic 的配置信息的,因为消费者要拉取信息已经生产者要写入信息都需要指定 topic,但是 broker 有可能是不支持读写的,所以这种情况下就算 topic 配置支持读写,但是 broker 不支持,那生产者也不可能往这个 broker 写入消息,消费者也不会拉取消息。

所以这个方法中做了一个判断,就是如果发现这个 broker 权限不支持读或者写,这种情况下重新创建 TopicConfig,其实就是重新配置了 perm 权限参数,让这个参数和 broker 的一样,如果说当前这个 topic 在这个 broker 上面不支持读写,那么说生产者或者消费者是不允许往这个 topic 读写的,这种情况下就修改下 topic 的配置,就是为了和 broker 同步。

这个 topic 信息最终是要存储到 ${user.home}/store/config/topics.json 下面的。
在这里插入图片描述
那下面回到源码继续往下看,如果 forceRegister = true 或者 needRegister 返回了 true,就向 NameServer 注册 broker 信息,forceRegister 在初始化的时候第一次添加是 true,而后面的定时任务可以根据 brokerConfig 里面的 forceRegister 来配置,也就是可以在 broker.conf 里面配置 forceRegister,当然了,默认就是 true。


2.1 needRegister 是否需要注册

/*** broker 是否需要向 nameserver 中注册** @param clusterName   集群名* @param brokerAddr    broker 地址* @param brokerName    broker 名字* @param brokerId      brokerId, 标记是主从* @param timeoutMills  超时时间* @return broker 是否需要向 nameserver 中注册*/
private boolean needRegister(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final int timeoutMills) {// 构建 topicConfigWrapper 值,里面包括 topicConfigTable(topic 配置信息) 和 dataVersion(版本号)TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();// 是否需要向 nameserver 注册List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigWrapper, timeoutMills);boolean needRegister = false;for (Boolean changed : changeList) {if (changed) {// 只要有一个 topic 配置发生了修改,直接返回,表示需要注册needRegister = true;break;}}return needRegister;
}

这个方法首先构建 topicConfigWrapper 值,里面包括 topicConfigTable(topic 配置信息) 和 dataVersion(版本号),topic 配置存储的时候会有一个版本号的东西,这个版本号创建出来的时候是当前时间,所以如果发生修改就只需要判断下这个版本号有没有变更就行了。

broker 也不会说每一次都向 NameServer 注册,如果说 NameServer 上面保存的这个 broker 上面存储的 topic 信息没有发生变化,就不需要注册,说明 topic 配置没有发生变更。要注意一下这里返回的是一个 List 集合,因为 broker 会向 NameServer 集群进行注册。

然后再调用 needRegister 去判断是否需要注册。


2.2 needRegister

/*** 是否应该注册 broker 中的 topic 信息到 nameserver* @param clusterName* @param brokerAddr* @param brokerName* @param brokerId* @param topicConfigWrapper* @param timeoutMills* @return*/
public List<Boolean> needRegister(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final TopicConfigSerializeWrapper topicConfigWrapper,final int timeoutMills) {// 请求返回结果保存final List<Boolean> changedList = new CopyOnWriteArrayList<>();// 获取所有的 nameserver 的地址List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());for (final String namesrvAddr : nameServerAddressList) {// 遍历brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {// 构建查询 DataVersion 的请求QueryDataVersionRequestHeader requestHeader = new QueryDataVersionRequestHeader();// 在请求头里面设置 broker 的信息requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);// 构建请求对象,请求 CODE 是 QUERY_DATA_VERSIONRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);// 在请求体中设置版本信息request.setBody(topicConfigWrapper.getDataVersion().encode());// 发送同步请求RemotingCommand response = remotingClient.invokeSync(namesrvAddr, request, timeoutMills);DataVersion nameServerDataVersion = null;Boolean changed = false;switch (response.getCode()) {case ResponseCode.SUCCESS: {// 获取返回结果QueryDataVersionResponseHeader queryDataVersionResponseHeader =(QueryDataVersionResponseHeader) response.decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);// broker 中存储的当前 topic 的配置和 namesrv 中存储的是否不同changed = queryDataVersionResponseHeader.getChanged();byte[] body = response.getBody();if (body != null) {// body 是版本信息nameServerDataVersion = DataVersion.decode(body, DataVersion.class);if (!topicConfigWrapper.getDataVersion().equals(nameServerDataVersion)) {// 如果这两个不相同,那么就说明变化了(是为了避免在请求的期间 broker 又发生了修改,所以重新判断一遍吗)changed = true;}}if (changed == null || changed) {// 如果发生了修改,设置标记到 changedList 中changedList.add(Boolean.TRUE);}}default:break;}log.warn("Query data version from name server {} OK,changed {}, broker {},name server {}", namesrvAddr, changed, topicConfigWrapper.getDataVersion(), nameServerDataVersion == null ? "" : nameServerDataVersion);} catch (Exception e) {changedList.add(Boolean.TRUE);log.error("Query data version from name server {}  Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {// 阻塞等待所有 nameserver 都处理完countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {log.error("query dataversion from nameserver countDownLatch await Exception", e);}}return changedList;
}

首先就是通过 getNameServerAddressList 来获取 NameServer 地址,这里的 NameServer 是在上两篇文章创建 broker 的时候设立进去的。
在这里插入图片描述
接着构建查询 DataVersion 的请求,不过大家可能会好奇这里为什么没有设置 topic 信息,是这样的,broker 里面有一个 brokerLiveTable,这个集合用于保存 broker 最近一次请求的时间。

// broker 地址 -> broker 活跃信息,里面维持了已建立连接的 broker 连接通道
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

比如说上面我发起 QUERY_DATA_VERSION 请求,这个 brokerLiveTable 里面如果存在这个 brokerAddr key 的话,BrokerLiveInfo 里面的 lastUpdateTimestamp 就会更新成当前时间,这个 lastUpdateTimestamp 是用来扫描不活跃的 broker 的,这个变量跟这篇文章没啥关系。有关系的是里面的 DataVersion,这个值记录了版本号,唯一的更新就是在注册 broker 信息的时候会去初始化更新,所以这个 QUERY_DATA_VERSION 请求,实际上是将 broker 中的 DataVersion 返回来,然后判断 NameServer 上记录的 broker 版本和 broker 本地记录的版本是否发生了变化,要注意的是这里没有传入 topic,那就意味者只要 broker 本地的 topic 信息发生了变化,都会重新注册,不是只针对某一个 topic。

可以看到的是其实上面的返回结果有一个 changed 字段,这个也是表示版本是否发生了变化,是在 NameServer 判断好了返回给 broker 的,但是为什么还需要再通过 topicConfigWrapper.getDataVersion().equals(nameServerDataVersion) 判断一遍呢?

估计是为了避免在请求的期间 broker 又发生了修改,所以重新判断一遍,就避免请求的时候 broker 存储的 topic 信息没有发生变化,但是返回结果的时间段里面 topic 配置发生了修改。

看到这里你可能又有疑问,这个 DataVersion 字段为啥就设置成了 topic 配置呢?其实可以看下下面图,下面就是 NameServer 中存储的 broker 的配置信息。

在这里插入图片描述
当 broker 注册的时候就是注册信息到这几个 table 中,最后一个过滤服务器地址现在也没看到有相关的请求 Code 了,官方文档也找不到介绍,直接忽略。剩余四个,broker 地址、集群地址这种一般就是配置文件就设置好了,启动之后就不会变的,那除非就是这个 broker 出故障了太久没更新 lastUpdateTimestamp,导致扫描出非活跃被干掉,这种情况下就会重新注册,一般都是 topic 变更了,所以把这个 DataVersion 设置成 topic 版本也算合适。


2.3 DefaultRequestProcessor#queryBrokerTopicConfig 查询 broker 的配置信息

那接下来我们就来下看下 QUERY_DATA_VERSION 这个 code 的处理逻辑,这个 code 的处理器是 DefaultRequestProcessor。
在这里插入图片描述

/*** 查询 broker 的 topic 配置* @param ctx* @param request* @return* @throws RemotingCommandException*/
public RemotingCommand queryBrokerTopicConfig(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {// 请求结果final RemotingCommand response = RemotingCommand.createResponseCommand(QueryDataVersionResponseHeader.class);final QueryDataVersionResponseHeader responseHeader = (QueryDataVersionResponseHeader) response.readCustomHeader();// 请求头final QueryDataVersionRequestHeader requestHeader =(QueryDataVersionRequestHeader) request.decodeCommandCustomHeader(QueryDataVersionRequestHeader.class);// 从请求体中获取出版本DataVersion dataVersion = DataVersion.decode(request.getBody(), DataVersion.class);// 将版本和 broker 地址传进去,判断现在 nameserver 存储的 broker 配置和 broker 上报过来的是否不同Boolean changed = this.namesrvController.getRouteInfoManager().isBrokerTopicConfigChanged(requestHeader.getBrokerAddr(), dataVersion);if (!changed) {// 如果没发生变化,那么更新下 BrokerLiveInfo 里面的更新时间this.namesrvController.getRouteInfoManager().updateBrokerInfoUpdateTimestamp(requestHeader.getBrokerAddr());}// 接着获取出 nameSever 里面存储的当前 broker 的 DataVersionDataVersion nameSeverDataVersion = this.namesrvController.getRouteInfoManager().queryBrokerTopicConfig(requestHeader.getBrokerAddr());// 设置返回结果返回给 brokerresponse.setCode(ResponseCode.SUCCESS);response.setRemark(null);if (nameSeverDataVersion != null) {// 设置 body 值response.setBody(nameSeverDataVersion.encode());}// 是否发生了修改responseHeader.setChanged(changed);return response;
}

这个方法会从请求体中获取出从请求体中获取出版本,然后通过 isBrokerTopicConfigChanged 判断判断 broker 的 topic 配置是否发生变化了,也就是判断 broker 的 topic 配置信息和 NameServer 中存储的是否不同,如果不同就返回 true,当然了如果找不到也会继续上报。

/*** 从 brokerLiveTable 中获取 broker 的活跃信息* @param brokerAddr* @return*/
public DataVersion queryBrokerTopicConfig(final String brokerAddr) {BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);if (prev != null) {// 返回数据版本return prev.getDataVersion();}return null;
}

而如果说版本没有发生变化,那么通过 updateBrokerInfoUpdateTimestamp 更新下 BrokerLiveInfo 里面的更新时间 lastUpdateTimestamp。这个变量是用来判断 broker 是不是长时间没有请求 NameServer 从而判断 broker 是否过期了,所以需要每次查询都更新这个变量,防止这个 broker 的信息被当成不活跃从而被删掉,简单来说就是更新 broker 最近一次请求的时间。那如果 DataVersion 发生变化呢?这种情况下会重新新建一个 BrokerLiveInfo 添加到 brokerLiveTable 中,这种情况下会将原来的覆盖,也相当于更新 lastUpdateTimestamp 了。

/*** 更新 Broker 配置版本信息, 在 queryBrokerTopicConfig 中当 broker 的 DataVersion 没有发生变化时修改下这个变量为当前时间,* 这个变量是用来判断 broker  是不是长时间没有请求 NameServer 从而判断 broker 是否过期了, 所以需要每次查询都更新这个变量, 防止这个 broker 的信息被当成不活跃从而被删掉,* 简单来说就是更新 broker 最近一次请求的时间, 那如果 DataVersion 发生变化呢?这种情况下会重新新建一个 BrokerLiveInfo 添加到 brokerLiveTable* 中, 这种情况下会将原来的覆盖, 也相当于更新 lastUpdateTimestamp 了* @param brokerAddr*/
public void updateBrokerInfoUpdateTimestamp(final String brokerAddr) {BrokerLiveInfo prev = this.brokerLiveTable.get(brokerAddr);if (prev != null) {prev.setLastUpdateTimestamp(System.currentTimeMillis());}
}

3. doRegisterBrokerAll 向 NameServer 注册 broker 信息

上面判断如果 broker 版本不同或者 NameServer 找不到这个 broker 的信息,就会调用 doRegisterBrokerAll 来注册 broker 信息到所有 NameServer。

/*** 向 namesrv 上报 broker 信息* @param checkOrderConfig      是否检查顺序 topic 的配置信息* @param oneway                是否是单向发送消息,也就是只发送信息给 namesrv,不等待返回结果* @param topicConfigWrapper    topic 信息的包装类,里面封装了当前 broker 里面存储的 topic 配置和版本信息*/
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,TopicConfigSerializeWrapper topicConfigWrapper) {// 执行注册的逻辑List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(this.brokerConfig.getBrokerClusterName(),   // broker 集群名称this.getBrokerAddr(),                       // broker 地址this.brokerConfig.getBrokerName(),          // broker 名字this.brokerConfig.getBrokerId(),            // brokerIDthis.getHAServerAddr(),                     // broker 的高可用地址,用于主从同步,brokerIP2:10912topicConfigWrapper,                         // topic 配置的包装类,封装了 topicConfigTable 和 DataVersionthis.filterServerManager.buildNewFilterServerList(),    // broker 的过滤服务器的地址oneway,                                     // 是否是单向发送this.brokerConfig.getRegisterBrokerTimeoutMills(),  // 注册的超时时间this.brokerConfig.isCompressedRegister());  // 注册的消息是否被压缩过了// 遍历所有注册的结果if (registerBrokerResultList.size() > 0) {RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);if (registerBrokerResult != null) {// updateMasterHAServerAddrPeriodically 用来配置是否在从节点 broker 启动的时候永久设置高可用地址,也是从节点用// 来主从同步的,如果没用配置,就默认每一次注册消息到 namesrv 都会更新当前从节点 broker 的高可用地址if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());}// 当 broker 注册到 namesrv 上面之后,会更新 slaveSynchronize 里面的 Master 地址// slaveSynchronize 是从节点用的,会定时从主节点拉取 topic 配置、消费偏移点位 ...this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());// 是否检查顺序 topic 的配置信息if (checkOrderConfig) {this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());}}}
}

这个方法调用了 registerBrokerAll 来注册,如果注册成功,就更新主从同步的高可用地址,同时设置主节点地址到 slaveSynchronize 中,然后更新顺序 topic 配置,顺序 topic 这个以后再说,前两个高可用的在我的【RocketMQ 高可用】系列的几篇文章中也有详细解释为什么要这么设置,这些属性有什么用,大家感兴趣可以去翻翻前面的文章。


3.1 registerBrokerAll 注册 broker 的信息到 namrsrv

/*** 注册 broker 的信息到 namrsrv* @param clusterName* @param brokerAddr* @param brokerName* @param brokerId* @param haServerAddr* @param topicConfigWrapper* @param filterServerList* @param oneway* @param timeoutMills* @param compressed* @return*/
public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final boolean oneway,final int timeoutMills,final boolean compressed) {final List<RegisterBrokerResult> registerBrokerResultList = new CopyOnWriteArrayList<>();// 获取所有的 NameServer 地址List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null && nameServerAddressList.size() > 0) {// 封装 broker 注册的数据包final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();// broker 地址requestHeader.setBrokerAddr(brokerAddr);// brokerIDrequestHeader.setBrokerId(brokerId);// broker 名字requestHeader.setBrokerName(brokerName);// 集群名称requestHeader.setClusterName(clusterName);// 当前 broker 的高可用地址,用于主从同步的requestHeader.setHaServerAddr(haServerAddr);// 消息体是否被压缩过requestHeader.setCompressed(compressed);// 设置消息体RegisterBrokerBody requestBody = new RegisterBrokerBody();// topic 配置requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);// 类过滤服务的信息requestBody.setFilterServerList(filterServerList);// 将消息体进行编码,同时根据 compressed 来判断需不需要压缩消息体final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);// 设置 CRC32 校验码requestHeader.setBodyCrc32(bodyCrc32);// 同步发送,等向所有的 NameServer 都发送完心跳才会继续往下走final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());// 遍历所有的 NameServerfor (final String namesrvAddr : nameServerAddressList) {// 线程池处理,提高性能(核心线程 4,最大线程 10,阻塞队列 32)brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {// 核心逻辑,发起注册请求RegisterBrokerResult result = registerBroker(namesrvAddr, oneway, timeoutMills, requestHeader, body);if (result != null) {// 将返回结果添加到 registerBrokerResultList 中registerBrokerResultList.add(result);}log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {// 执行结束就减少计数countDownLatch.countDown();}}});}try {// 阻塞执行 timeoutMills 时间,默认 6s,也就是向所有 NameServer 注册的时间不超过 6scountDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}// 返回注册结果return registerBrokerResultList;
}

这个方法会遍历所有 NameServer 地址,然后通过 registerBroker 发起注册请求,同时将返回结果添加到 registerBrokerResultList 中,注意 broker 信息设置到了请求头,而 body 信息是核心的 topic 配置信息,我们来看下这些配置是什么。


3.2 TopicConfigSerializeWrapper

public class TopicConfigSerializeWrapper extends RemotingSerializable {private ConcurrentMap<String, TopicConfig> topicConfigTable =new ConcurrentHashMap<String, TopicConfig>();private DataVersion dataVersion = new DataVersion();public ConcurrentMap<String, TopicConfig> getTopicConfigTable() {return topicConfigTable;}public void setTopicConfigTable(ConcurrentMap<String, TopicConfig> topicConfigTable) {this.topicConfigTable = topicConfigTable;}public DataVersion getDataVersion() {return dataVersion;}public void setDataVersion(DataVersion dataVersion) {this.dataVersion = dataVersion;}
}

TopicConfigSerializeWrapper 是 broker 要上报到 NameServer 的配置信息,这里面包括 topicConfigTable,这个集合存储了 topic -> topic 配置信息的映射,而 dataVersion 是这个 topic 配置的版本,当 topic 信息发生变化,就会更新 dataVersion。

public class TopicConfig {private static final String SEPARATOR = " ";// 默认读队列数量public static int defaultReadQueueNums = 16;// 默认写队列数量public static int defaultWriteQueueNums = 16;// topic 名称private String topicName;// 读队列数量private int readQueueNums = defaultReadQueueNums;// 写队列数量private int writeQueueNums = defaultWriteQueueNums;// 权限是可读写private int perm = PermName.PERM_READ | PermName.PERM_WRITE;// topic 过滤类型是 SINGLE_TAGprivate TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG;// topic 的同步配置,具体的配置值可以到 TopicSysFlag 类中去看,默认是 0private int topicSysFlag = 0;// 标识这个 topic 是不是一个顺序 topic// 在 RocketMQ 中,顺序消息分为全局顺序消息和分区顺序消息://      1. 全局顺序消息:一个 Topic 下的所有消息都按照严格的顺序进行生产和消费。要实现全局顺序消息,需要将该 Topic 的 order 属性设置为 true,//                     并且在生产者端确保所有消息发送到同一个队列,在消费者端确保同一个队列只被一个消费者消费//      2. 分区顺序消息:一个 Topic 下的每个消息队列内部的消息是有序的,但不同队列之间的消息顺序不保证。这种情况下,order 属性也可用于标记//                     Topic 支持顺序消息处理,但在使用时需要通过业务逻辑控制消息发送到特定的队列private boolean order = false;...
}
  • topicName: topic 名称。
  • readQueueNums: 读队列的数量。
  • writeQueueNums: 写队列数量。
  • topicFilterType : topic 过滤类型,默认是 SINGLE_TAG,单 TAG 过滤,但是这个变量没搞懂是用来干什么的。
  • topicSysFlag: topic 的同步配置,具体的配置值可以到 TopicSysFlag 类中去看,默认是 0,这个变量也没搞懂是用来干什么的。
  • order: 是不是一个顺序 topic,默认是 false。

3.3 registerBroker

/*** 发起同步注册请求,向 NameServer 注册当前 broker 的信息,包括 topic 信息,topic 的版本信息,brokerIP、集群名称、过滤服务地址 ...* @param namesrvAddr* @param oneway* @param timeoutMills* @param requestHeader* @param body* @return* @throws RemotingCommandException* @throws MQBrokerException* @throws RemotingConnectException* @throws RemotingSendRequestException* @throws RemotingTimeoutException* @throws InterruptedException*/
private RegisterBrokerResult registerBroker(final String namesrvAddr,final boolean oneway,final int timeoutMills,final RegisterBrokerRequestHeader requestHeader,final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,InterruptedException {// 构建远程调用请求对象,code 为 REGISTER_BROKERRemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);request.setBody(body);// 如果是单向请求,就是直接向 NameServer 发起异步请求之后直接返回了,如果是注册请求就不是单向请求了if (oneway) {try {this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);} catch (RemotingTooMuchRequestException e) {// Ignore}return null;}// 发起同步请求,同步阻塞等待结果,阻塞时间默认 6sRemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {// 注册的返回结果RegisterBrokerResponseHeader responseHeader =(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);RegisterBrokerResult result = new RegisterBrokerResult();// 设置主节点的地址和高可用地址进去result.setMasterAddr(responseHeader.getMasterAddr());result.setHaServerAddr(responseHeader.getHaServerAddr());if (response.getBody() != null) {result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));}return result;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark(), requestHeader == null ? null : requestHeader.getBrokerAddr());
}

这个方法就是发起同步注册请求,向 NameServer 注册当前 broker 的信息,可以看到请求 Code 是 REGISTER_BROKER。

下面判断如果是 oneway = true,就单向发送请求,这个 oneway 就是单向发送请求,不接收返回结果,但是 broker 启动和定时任务定时上报这个属性都是 false,所以是不会走 oneway 的。

下面就是发送同步请求了,同步阻塞等待结果,阻塞时间默认 6s,获取到返回结果后设置主节点地址和高可用地址,下面我们就看下 NameServer 如何处理这个请求的。


4. DefaultRequestProcessor#registerBrokerWithFilterServer

REGISTER_BROKER 请求的处理器也是 DefaultRequestProcessor,在这个处理器中会判断版本如果大于 3.0.11,就走 registerBrokerWithFilterServer 方法来注册。
在这里插入图片描述

/*** 处理 Broker 的注册请求* @param ctx* @param request* @return* @throws RemotingCommandException*/
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {// 创建返回结果final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);// 创建响应头final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();// 解码并获取请求头final RegisterBrokerRequestHeader requestHeader =(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);// 校验 CRC32 校验码是否匹配,来判断传输的过程中数据有没有被修改了if (!checksum(ctx, request, requestHeader)) {// 如果被修改了,就设置返回结果为 SYSTEM_ERRORresponse.setCode(ResponseCode.SYSTEM_ERROR);// CRC32 不匹配错误response.setRemark("crc32 not match");return response;}// broker 注册的请求体内容RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();if (request.getBody() != null) {try {// 解码,这里也会根据 compressed 来判断发送过来的数据是不是被压缩过的,从而走不同的解码逻辑registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());} catch (Exception e) {throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);}} else {// 如果请求体为空,就重置下配置里面的版本信息registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);}// broker 信息注册RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(requestHeader.getClusterName(),     // 请求消息的 broker 集群名称requestHeader.getBrokerAddr(),      // 请求消息的 broker 地址requestHeader.getBrokerName(),      // 请求消息的 broker 名称requestHeader.getBrokerId(),        // 请求消息的 brokerIDrequestHeader.getHaServerAddr(),    // 高可用服务器地址registerBrokerBody.getTopicConfigSerializeWrapper(),    // 获取请求消息的 topic 配置信息和版本信息registerBrokerBody.getFilterServerList(),   // 获取请求消息的过滤服务器列表ctx.channel()); // 连接通道// 设置高可用地址和主节点地址到返回结果中responseHeader.setHaServerAddr(result.getHaServerAddr());responseHeader.setMasterAddr(result.getMasterAddr());// 获取顺序消息的配置,根据命名空间获取 ORDER_TOPIC_CONFIGbyte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);response.setBody(jsonValue);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;
}

还是一样,先从请求头中解析出请求数据,然后校验 CRC32 校验码是否匹配,来判断传输的过程中数据有没有被修改了或者发生损坏,接着会根据 compressed 来判断发送过来的数据是不是被压缩过的,从而走不同的解码逻辑。

broker 编码发送有两种方式,一种是直接 JSON.toJSONString 转成 JSON 串,所以对应的就是 JSON.parseObject,另外一种就是使用 DeflaterOutputStream 这个 JDK 自带的压缩工具进行压缩,然后将消息以字节的方式设置进去,对应的在 broker 端需要使用 InflaterInputStream 来解码。

最后调用 RouteInfoManager#registerBroker 来注册 broker,终于到最后的核心逻辑了。


4.1 registerBroker 注册 broker 信息

这个方法信息量比较大,所以直接一行一行讲解,首先在注册 broker 信息之前先加一个写锁避免并发导致相互覆盖。

// 加锁
this.lock.writeLock().lockInterruptibly();

接下来更新下 clusterAddrTable 这个集合,这个集合是一个 String -> Set< String > 类型的 HashMap,存储了集群名称 -> 集群下面的所有 broker 主从集群的 broker 名称(主从 broker 的 brokerName 一般是一样的),broker 配置文件可以指定 brokerClusterName 为集群名称, 同时这个集群里面可以包含多个 broker 主从集群,这些主从 broker 集群的 brokerName 是一样的, 依靠 brokerId 区分主节点还是从节点, 所以这个 table 的 value 是 Set 集合。

private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;// 1.首先是根据集群名称获取这个集群下面的所有 broker 的 broker 名称
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {// 如果为空,就添加一个集合进去brokerNames = new HashSet<String>();this.clusterAddrTable.put(clusterName, brokerNames);
}
// 把新增的 brokerName 添加到集合中
brokerNames.add(brokerName);

接下来维护 brokerAddrTable 集合,brokerAddrTable 存储了 brokerName -> broker 信息的集合(注意主从集群的 brokerName 是一样的,这样才能够存储一个集群下面的所有 broker 地址)。

// brokerName -> broker 信息的集合(注意主从集群的 brokerName 是一样的,这样才能够存储一个集群下面的所有 broker 地址)
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

下面就是处理这个集合的逻辑。

// 看下是不是第一次注册
boolean registerFirst = false;// 2.处理 brokerAddrTable 集合
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());this.brokerAddrTable.put(brokerName, brokerData);
}
// 获取这个集群下面的 brokerName 下面的 brokerId 到 brokerAddr 的映射
// 因为设计到主从集群架构,所以每一个节点都需要存储所在的集群下面的所有 broker 节点的地址信息,方便主从同步
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
// Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
// The same IP:PORT must only have one record in brokerAddrTable
// 那么如何将从节点转换成主节点呢?上面注释给了一个例子,比如原来有两个地址 <0, IP1:PORT>,<1, IP2:PORT>,
// 假设 0 节点挂了,这种情况下就会上报上 <0, IP2:PORT>,那么这种情况下需要怎么做呢?
//      1.移除 <1, IP2:PORT>
//      2.添加 <0, IP2:PORT>
// 这样 brokerAddrsMap 就会由 <0, IP1:PORT>,<1, IP2:PORT> 变成 <0, IP2:PORT>
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {Entry<Long, String> item = it.next();// 如果说集合里面原来已经有相同地址的 broker 节点,但是 ID 不同,这种情况下就是从节点变成主节点了if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {it.remove();}
}// 将 brokerId -> brokerAddr 映射添加到 brokerAddrs 中
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
// 是否是第一次注册
registerFirst = registerFirst || (null == oldAddr);

可以看到上面首先就是判断如果原来 brokerAddrTable 集合中找不到这个集群下面的配置信息,就说明是首次注册,把 registerFirst 设置为 true,然后新建一个 BrokerData 添加到 brokerAddrTable 集合中。

那如果不是第一次添加,就需要更新里面的主从关系了,但是在接着讲代码之前,我们还是先来看下 BrokerData 里面是存储什么的。在这里插入图片描述
其实就是存储了这个 brokerName 下面的主从 broker 和集群信息。那既然 broker 有主从关系,所以每一次上报 broker 信息的时候就需要判断下这个 broker 的身份是否切换了,比如从节点 broker 是否变成主节点了,因此接下来就是维护 BrokerData#brokerAddrs。

看上面的代码,注释也举了个例子来说明,比如原来有两个地址 <0, IP1:PORT>,<1, IP2:PORT>,假设 0 节点挂了,这种情况下就会上报上 <0, IP2:PORT>,那么这种情况下需要怎么做呢?

  • 1.移除 <1, IP2:PORT>
  • 2.添加 <0, IP2:PORT>

这样 brokerAddrsMap 就会由 <0, IP1:PORT>,<1, IP2:PORT> 变成 <0, IP2:PORT>,也就是原来的主节点从 IP1:PORT 变成了 IP2:PORT,所以你可以看到判断逻辑里面会判断如果说集合里面原来已经有相同地址的 broker 节点,但是 ID 不同,这种情况下就是从节点变成主节点了,这种情况下将原来的删掉,也就是删掉原来的 <1, IP2:PORT>。最后通过 brokerData.getBrokerAddrs().put(brokerId, brokerAddr) 统一进行添加。

好了继续往下看,第三部分就是维护 topicQueueTable 信息,还是老规律,先来看下这个集合存储的什么东西。

// topic -> 队列信息,因为一个 topic 可以存储到多个 broker 下面,所以是一个 List
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;public class QueueData implements Comparable<QueueData> {// broker 名字private String brokerName;// 读队列的数量,即该队列下有多少个读队列,这个值在创建队列时由用户指定,通常是4private int readQueueNums;// 写队列的数量,即该队列下有多少个写队列,这个值在创建队列时由用户指定,通常是4private int writeQueueNums;// 权限,{@link PermName} 里面就配置了相关的权限,比如 6 是读写权限,4 是只读,也就是只能消费,2 是只写,也就是只能发送消息// 1 表示继承 topic 的配置private int perm;// topic 的同步配置,具体的配置值可以到 TopicSysFlag 类中去看,默认是 0private int topicSysFlag;...}

这个集合存储的是 topic -> 队列信息,因为一个 topic 可以存储到多个 broker 下面,所以是一个 List,下面回到 registerBroker 中继续往下看,看看是怎么处理这个集合的。

// 3.创建或者更新 topicQueueTable 中的队列信息,如果是从节点旧没用这个,因为从节点是可以通过定时任务去同步主节点的 topic 配置的
if (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) {// 判断下 broker 上报过来的 topic 配置和 NameServer 存储的对应的配置是否不同// 当然了如果是第一次注册,那肯定也要更新 NameServer 的 topicConfigTableif (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {// 如果 broker 是第一次注册或者 topicConfig 发生了变化,就需要更新 topicQueueTable 中的 QueueData// 注意这里 topicConfigWrapper 是 broker 上报过来的,意思是根据 broker 的 TopicConfig 去更新 NameServer 的 topicQueueTableConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {// 创建或者更新 topicQueueTable 中的队列信息this.createAndUpdateQueueData(brokerName, entry.getValue());}}}
}

isBrokerTopicConfigChanged 这个方法上面几个小结也讲过了,就是判断 brokerLiveTable 这个集合里面的 broker 版本是否和上报过来的版本不同,如果不同才去更新,否则是不会更新的,更新的逻辑在 createAndUpdateQueueData 中,那前面我们也说过了,其实 brokerLiveTable 集合里面存储的版本就是 topic 的配置版本,更新的方法就等会到 4.2 小节再讲,现在先不管。

// 4.处理 broker 状态集合,直接用 put 方法覆盖原来的 BrokerLiveInfo,lastUpdateTimestamp 会被更新成最新的时间戳
//   所以到这里就可以总结一些心跳的更新逻辑了,第一次上报会更新 broker 的 BrokerLiveInfo 状态信息,而下一次 broker
//   上报之前会先判断下需不需要重新注册,这里所说的注册其实就是上报 broker 的信息到 NameServer,如果不需要也会更新 lastUpdateTimestamp
//   相当于 lastUpdateTimestamp 就是记录的 broker 上一次请求的时间
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));
if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}

可以看到,只要通过 registerBroker 集合添加的 BrokerLiveInfo 都是新建的,直接把原来的 brokerLiveTable 给覆盖掉。
在这里插入图片描述

// 5.更新过滤服务器地址集合
if (filterServerList != null) {if (filterServerList.isEmpty()) {// 如果上报过来的是空,就把原来的删掉this.filterServerTable.remove(brokerAddr);} else {// 否则就直接设置到 filterServerTable 里面,会覆盖原来的this.filterServerTable.put(brokerAddr, filterServerList);}
}

接着往看,更新过滤服务器地址集合,早期 broker 启动的时候会顺便启动过滤器服务,专门用于消息过滤,消费者可以通过 subscribe 方法去设定自己设定的消息过滤代码,过滤类路径,但是官方注释也说了这个方法在 5.0.0 之后就不用了,因为过滤服务器也被删掉了,推荐使用 TAG过滤或者 SQL92过滤。

/**
* This method will be removed in the version 5.0.0,because filterServer was removed,and method <code>subscribe(final String topic, final MessageSelector messageSelector)</code>
* is recommended.
*
* Subscribe some topic
*
* @param fullClassName full class name,must extend org.apache.rocketmq.common.filter. MessageFilter
* @param filterClassSource class source code,used UTF-8 file encoding,must be responsible for your code safety
*/
@Deprecated
void subscribe(final String topic, final String fullClassName,final String filterClassSource) throws MQClientException;

而在消费者注册过滤类的方法中给出的请求 code 是 REGISTER_MESSAGE_FILTER_CLASS,但是这个 code 在 broker 里面并没有找到处理的逻辑。
在这里插入图片描述
所以应该是不会用了。

好了,继续回到源码,下面会对从节点的返回结果设置 HaServerAddr、MasterAddr,因为从节点需要依靠这两个地址去跟主节点通信从而主从同步,当然 HaServerAddr 其实也是 broker 传过来的,这里大家可以去看我之前写过的【RocketMQ 高可用】系列的文章。

// 6.对从节点的返回结果设置 HaServerAddr、MasterAddr
if (MixAll.MASTER_ID != brokerId) {// 获取 brokerId=0 的 broker 地址String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {// 这里的 HaServerAddr 实际上就是 Master 配置的 brokerIP2:10912result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());// 设置主节点地址result.setMasterAddr(masterAddr);}}
}

最后处理完这四个集合,解除写锁,返回结果。


4.2 createAndUpdateQueueData

这个方法用于根据 broker 上报过来的 topicConfig 更新 NameServer 的 topicQueueTable。

/*** 根据 broker 上报过来的 topicConfig 更新 NameServer 的 topicQueueTable* @param brokerName* @param topicConfig*/
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {// 创建队列信息 QueueDataQueueData queueData = new QueueData();queueData.setBrokerName(brokerName);queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());queueData.setReadQueueNums(topicConfig.getReadQueueNums());queueData.setPerm(topicConfig.getPerm());queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());// 根据 topic 找出队列信息,这些信息包括读写队列的数量、权限、队列所属的 broker...List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());if (null == queueDataList) {// 如果原来没有,那么就新增queueDataList = new LinkedList<QueueData>();queueDataList.add(queueData);this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);} else {// 如果原来有了,那么就删除旧的,新增新的boolean addNewOne = true;Iterator<QueueData> it = queueDataList.iterator();while (it.hasNext()) {// 遍历这个 topic 下面的所有队列信息QueueData qd = it.next();if (qd.getBrokerName().equals(brokerName)) {// 之所以要判断 brokerName,是因为一个 topic 的队列是可以存储到多个 broker 的,所以这里需要判断这个队列是不是// 当前上报信息的 broker 下面的if (qd.equals(queueData)) {// 如果原来的队列和新增的一样,那就没必要新增了addNewOne = false;} else {// 这里就是队列信息发生了修改,将原来的删掉log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,queueData);it.remove();}}}// 如果需要新增队列信息if (addNewOne) {// 添加到集合中queueDataList.add(queueData);}}
}

里面的逻辑不复杂,总之就是如果原本找不到就新增,如果已存在了就判断需不需要更新。


5. 小结

在这里插入图片描述
这篇文章我们就讲述了 broker 信息是如何注册到所有的 NameServer,同时 broker 注册的这些消息(集群、topic、过滤服务器)消息会存储到 NameServer 的哪些集合中,其实这里的注册也可以理解为上报心跳信息,因为会有一个定时任务定时触发去上报,那下一篇文章我们会讲一下那些长时间没有上报心跳的 broker 又是怎么处理的。





如有错误,欢迎指出!!!!

http://www.xdnf.cn/news/4240.html

相关文章:

  • gcc/g++用法摘记
  • torch.nn.Sequential() and torch.nn.ModuleList()
  • 用输入输出变量根据超稳定性理论设计模型参考自适应系统
  • 迭代器模式
  • map和set的设计以及红黑树的设计
  • 英伟达语音识别模型论文速读:Fast Conformer
  • 学习黑客Nmap 实战
  • Java学习手册:Spring 多数据源配置与管理
  • 信息系统项目管理工程师备考计算类真题讲解十二
  • 破局者手册 Ⅰ:测试开发核心基础,解锁未来测试密钥!
  • 【NLP】27. 语言模型训练以及模型选择:从预训练到下游任务
  • RAG知识库只是表面简单!
  • Kubernetes排错(七)-节点排错
  • 除了java.nio.file.StandardCopyOption,还有哪些类可以实现文件的复制和移动?
  • C++动态库和静态库的生成和使用
  • linux crash工具详解
  • android-ndk开发(1): 搭建环境
  • 星途-(4)
  • 关于Python:9. 深入理解Python运行机制
  • DeepSeek技术发展详细时间轴与技术核心解析
  • ARM子程序调用与返回
  • vscode运行python的快捷键
  • VirtualBox调整虚拟机内存和CPU
  • 信息系统项目管理师-软考高级(软考高项)​​​​​​​​​​​2025最新(八)
  • 智能体四项关键技术:MCP、A2A、ANP与函数调用的深度解析
  • 判断字符是否唯一 --- 位运算
  • 《冰雪三职业》:战士玩法攻略!
  • 精益数据分析(39/126):SaaS与移动应用商业模式的关键要点剖析
  • P6822 [PA 2012 Finals] Tax 题解
  • 【项目】基于ArkTS的网吧会员应用开发(2)