当前位置: 首页 > news >正文

【RocketMQ Broker 相关源码】- 清除不活跃的 broker

文章目录

  • 1. 前言
  • 2. 扫描不活跃的 broker
  • 3. closeChannel 关闭连接通道
  • 4. onChannelDestroy 删除 broker 信息
  • 5. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

  • 【RocketMQ】- 源码系列目录
  • 【RocketMQ Broker 相关源码】-注册 broker 信息到所有的 NameServer

在上一篇文章中,我们详细阐述了 Broker 信息注册至 NameServer 的具体流程。在整个系统的运行过程中,Broker 与 NameServer 之间通过定时的心跳机制来维持连接状态,确保系统的稳定运行。

具体来说,Broker 会按照预设的时间间隔,每 30 秒向 NameServer 发送一次心跳信息,以此表明自身处于正常运行状态。NameServer 则会启动一个定时任务,每隔 10 秒对记录在 brokerLiveTable 中的所有 Broker 心跳信息进行扫描。通过检查每个 Broker 最近一次发送心跳的时间,NameServer 能够判断出哪些 Broker 已经处于不活跃状态。

一旦发现有不活跃的 Broker,NameServer 会及时关闭与这些 Broker 之间的连接通道。这样做的目的是为了释放系统资源,避免因无效连接占用过多的系统资源,从而保障整个系统的高效运行。

通过这种定时的心跳上报和扫描机制,Broker 和 NameServer 能够实时监控彼此的状态,确保系统的稳定性和可靠性。
在这里插入图片描述


2. 扫描不活跃的 broker

在这里插入图片描述
这个方法是 NamesrvController#initialize 方法启动的,按道理来说是放在 RocketMQ NameServer 系列才对,但是这里面涉及到 broker 的检测,所以还是放到了 broker 系列。

/*** 扫描不活跃的 broker*/
public void scanNotActiveBroker() {// 遍历 brokerLiveTable 集合Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();// 获取这个 broker 最后一次上报心跳信息的时间long last = next.getValue().getLastUpdateTimestamp();// 如果 2 小时内都没有上报过心跳信息了if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {// 关闭连接通道RemotingUtil.closeChannel(next.getValue().getChannel());// 删除这个 broker 的心跳信息it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);// 删掉 broker 的信息this.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}
}

scanNotActiveBroker 方法中会遍历 brokerLiveTable 集合,判断如果这个 broker 2 小时内都没有上报过心跳信息了,就认为这个 broker 连接过期或者 broker 出问题了,这种情况下关闭和这个 broker 的连接通道,同时删掉 broker 上报过的心跳信息。


3. closeChannel 关闭连接通道

public static void closeChannel(Channel channel) {final String addrRemote = RemotingHelper.parseChannelRemoteAddr(channel);channel.close().addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {log.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote,future.isSuccess());}});
}

关闭连接通道就是直接调用通道的 close 方法去关闭,这里面就是 Netty 的源码了。


4. onChannelDestroy 删除 broker 信息

这个方法就是对 brokerLiveTable、filterServerTable、brokerAddrTable、clusterAddrTable、topicQueueTable 进行维护,由于涉及到源码比较多,下面也是拆开来分析。

  • brokerLiveTable: broker 地址 -> broker 活跃信息,里面维持了已建立连接的 broker 的连接通道。
  • filterServerTable: broker 地址 -> 这个 broker 的过滤服务器地址,过滤服务器在当前这个版本已经看不到了,所以可以忽略。
  • brokerAddrTable: brokerName -> broker 信息的集合(注意主从集群的 brokerName 是一样的,这样才能够存储一个集群下面的所有 broker 地址)。
  • topicQueueTable: topic -> 队列信息,因为一个 topic 可以存储到多个 broker 下面,所以是一个 List。
  • clusterAddrTable: 集群名称 -> 集群下面的所有 broker 主从集群的 broker 名称(主从 broker 的 brokerName 一般是一样的)
/*** 要销毁的连接的地址* @param remoteAddr* @param channel*/
public void onChannelDestroy(String remoteAddr, Channel channel) {...
}

首先这个方法中传入了要销毁的 broker 的通道和远程地址,所以一开始的逻辑就是需要在 brokerLiveTable 中找到这个通道对应的 brokerName,这样能判断 broker 有没有上报过心跳给 NameServer。

if (channel != null) {try {try {// 加读锁this.lock.readLock().lockInterruptibly();Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =this.brokerLiveTable.entrySet().iterator();while (itBrokerLiveTable.hasNext()) {Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();if (entry.getValue().getChannel() == channel) {// 是否找到对应的 broker 连接通道brokerAddrFound = entry.getKey();break;}}} finally {// 解除读锁this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}
}

查找的方法很简单,就是遍历这个集合里面的所有 value,然后一个一个比较,注意这里就是 == 去比较的,但是我说实话上层方法 scanNotActiveBroker 都已经扫描出 brokerLiveTable 集合里面的这个 channel 过期了才调用这个方法去删除 broker 信息,为什么不直接把 key 和 value 都传到 onChannelDestroy 去删除呢?

不过我在写的时候看了下这个方法的调用,结果发现这个方法之前好像还讲过。【RocketMQ NameServer】- NettyEventExecutor 处理 Netty 事件,就是 Netty 服务端通过 NettyEventExecutor 去监听 NettyEvent 事件,然后调用 BrokerHousekeepingServer 去处理空闲的连接,但是这篇文章只是把源码贴出来,所以这里还是再详细重新介绍一遍。

在这里插入图片描述
好了,回到源码,继续看,如果没找到就将 brokerAddrFound 赋值为 remoteAddr。

// 如果没找到就赋值为 remoteAddr
if (null == brokerAddrFound) {brokerAddrFound = remoteAddr;
} else {log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
}

接着继续判断,如果找到了地址,也就是上面的 brokerAddrFound 不为空,说明需要将 broker 信息从这几个集合中删掉,这种情况下先维护 brokerLiveTablefilterServerTable,维护逻辑很简单,直接删掉就行。

if (brokerAddrFound != null && brokerAddrFound.length() > 0) {try {try {// 加写锁this.lock.writeLock().lockInterruptibly();// 从 brokerLiveTable 集合删掉这个 brokerthis.brokerLiveTable.remove(brokerAddrFound);this.filterServerTable.remove(brokerAddrFound);}...}
}

然后就是维护 brokerAddrTable,那我们都知道 broker 集群下面会有多个 broker 节点,那这里要销毁的可能只是一个从节点或者主节点,所以我们需要先找到要删除的这个 broker 的 brokerName,因为这个 onChannelDestroy 传入的只是地址,并没有传入 brokerName,包括 brokerLiveTable 集合都不存储 brokerName 的。

String brokerNameFound = null;
boolean removeBrokerName = false;
// 然后再遍历 brokerAddrTable, 也要维护这个 broker 集群集合的地址消息
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();
/*** 1. 维护 brokerAddrTable*/
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {// 获取这个集群下面的 broker 信息BrokerData brokerData = itBrokerAddrTable.next().getValue();// 遍历这个集群下面的主从节点Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();while (it.hasNext()) {Entry<Long, String> entry = it.next();Long brokerId = entry.getKey();String brokerAddr = entry.getValue();// 如果找到了要删除的if (brokerAddr.equals(brokerAddrFound)) {// 记录下要删除的 brokerNamebrokerNameFound = brokerData.getBrokerName();// 删掉it.remove();log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",brokerId, brokerAddr);break;}}if (brokerData.getBrokerAddrs().isEmpty()) {// 如果说删除了这个 broker 信息之后这个集群下面的 broker 地址为空了removeBrokerName = true;// 那么就将这个 brokerName -> brokerData 的映射也删掉, 表明此时这个 broker 集群已经没有节点了itBrokerAddrTable.remove();log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",brokerData.getBrokerName());}
}

可以看到上面的 brokerNameFound 就是这个地址对应的 brokerName,找到之后就将 brokerAddr 从 brokerAddrs 集合中删掉,可以看下面图。
在这里插入图片描述
删掉之后,如果说 brokerAddrs 这个集合为空了,那么说明这个 brokerName 集群下面的主从节点都挂了,这种情况下就需要将这个 brokerName 从 brokerAddrTable 集合中删掉,到这里就维护好了 brokerAddrTable。

接着往下看,维护好了 brokerAddrTable 后下一个要维护的就是 clusterAddrTable,之所以先维护 brokerAddrTable 是因为需要判断 brokerName 有没有删掉,clusterAddrTable 存储的是 clusterName 和 brokerName 的关系,所以要放到后面去维护,维护的逻辑也很简单,就是将 brokerName 从集合中删掉,删掉之后看下这个 cluster 集群里面还有没有其他 broker 集群,如果没有了,那也将这个 cluster 集群删掉。

/*** 2. 维护 clusterAddrTable*/
if (brokerNameFound != null && removeBrokerName) {// 然后如果说删除了一个 brokerName, 也要维护 clusterAddrTable 集合, 这个集合记录了 clusterName -> Set(brokerName) 的集合,// 这里要说下 clusterAddrTable 这个集合, broker 配置文件可以指定 brokerClusterName 为集群名称, 同时这个集群里面可以包含多个 broker// 主从集群, 这些主从 broker 集群的 brokerName 是一样的, 依靠 brokerId 区分主节点还是从节点, 所以这个 table 的 value 是 Set 集合Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<String>> entry = it.next();// 集群名称String clusterName = entry.getKey();// 集群下面的 broker 主从集群的 brokerName 集合Set<String> brokerNames = entry.getValue();// 从集合中删掉boolean removed = brokerNames.remove(brokerNameFound);if (removed) {// 如果删掉了就打印日志log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",brokerNameFound, clusterName);// 如果删掉这个 brokerName 之后, 集群下面没有 broker 集群了, 就删掉这个 clusterif (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",clusterName);it.remove();}break;}}
}

最后再维护 topicQueueTable,一个 topic 可以分布在多个 broker 集群上,所以如果删除了 brokerName 这个集群,那么就需要将 topic 在这个 broker 上面的配置信息删掉,注意 topicQueueTable 的 value 是一个 List< QueueData> 集合,所以需要遍历这个集合的 value,找到对应的 brokerName,然后删掉,逻辑也不复杂。

/*** 3. 维护 topicQueueTable*/
if (removeBrokerName) {// 遍历 topicQueueTable 下面的 topic 信息Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =this.topicQueueTable.entrySet().iterator();while (itTopicQueueTable.hasNext()) {Entry<String, List<QueueData>> entry = itTopicQueueTable.next();// 遍历所有 topicString topic = entry.getKey();// 遍历 topic 的队列配置信息List<QueueData> queueDataList = entry.getValue();Iterator<QueueData> itQueueData = queueDataList.iterator();while (itQueueData.hasNext()) {// 因为 topic 下面的队列可以分配到不同 broker 上面, 所以需要把分配到这个 brokerName 集群上面的队列也// 移除掉QueueData queueData = itQueueData.next();if (queueData.getBrokerName().equals(brokerNameFound)) {// 从集合中移除itQueueData.remove();log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",topic, queueData);}}if (queueDataList.isEmpty()) {// 如果移除之后已经没有队列了, 那么这个 topic 队列配置信息也要删掉了itTopicQueueTable.remove();log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",topic);}}
}

5. 小结

好了,这篇文章介绍了 NameSever 是如何扫描出长期没有上报心跳的 broker 同时进行清除的,下一篇文章我们就来看下 broker 又是怎么清除过期的 producer 和 consumer。





如有错误,欢迎指出!!!!

http://www.xdnf.cn/news/504595.html

相关文章:

  • JavaScript【6】事件
  • windows 11安装Python3.9、mujoco200、mujoco_py2.0.2.8、metaworld
  • 51单片机仿真突然出问题
  • 如何在 Windows 11 或 10 的 CMD 中检查固件
  • 元件伏安特性及基尔霍夫定理的相量形式
  • 【as 在长难句中有哪几种翻译?】
  • 北京市工程技术人才职称评价基本标准条件解读
  • PLC和变频器之间如何接线
  • 2020CCPC河南省赛题解
  • V型不锈钢对夹球阀:高性价比工业控制解决方案-耀圣
  • 项目复习(2)
  • 黑客帝国电子表html
  • java中的包机制
  • 信任的进阶:LEI与vLEI协同推进跨境支付体系变革
  • (面试)View相关知识
  • 【强化学习】#5 时序差分学习
  • BBR 的 buffer 动力学观感
  • C++(19):内联(inline)函数
  • Python跳动的双爱心
  • JAVA GUI
  • 【深度学习-Day 12】从零认识神经网络:感知器原理、实现与局限性深度剖析
  • 令牌桶和漏桶算法使用场景解析
  • HDCleaner:深度清理与优化,提升系统性能
  • 六、磁盘划分与磁盘配额
  • Redis 发布订阅模式深度解析:原理、应用与实践
  • AI:人形机器人一定是人的形状吗?
  • 超长文本能取代RAG吗
  • 计算机视觉与深度学习 | Python实现EMD-SSA-VMD-LSTM时间序列预测(完整源码和数据)
  • 深入探讨 Java Switch Expressions
  • 期望是什么:(无数次的均值,结合概率)21/6=3.5