【RocketMQ Broker 相关源码】- 清除不活跃的 broker
文章目录
- 1. 前言
- 2. 扫描不活跃的 broker
- 3. closeChannel 关闭连接通道
- 4. onChannelDestroy 删除 broker 信息
- 5. 小结
本文章基于 RocketMQ 4.9.3
1. 前言
- 【RocketMQ】- 源码系列目录
- 【RocketMQ Broker 相关源码】-注册 broker 信息到所有的 NameServer
在上一篇文章中,我们详细阐述了 Broker 信息注册至 NameServer 的具体流程。在整个系统的运行过程中,Broker 与 NameServer 之间通过定时的心跳机制来维持连接状态,确保系统的稳定运行。
具体来说,Broker 会按照预设的时间间隔,每 30 秒向 NameServer 发送一次心跳信息,以此表明自身处于正常运行状态。NameServer 则会启动一个定时任务,每隔 10 秒对记录在 brokerLiveTable 中的所有 Broker 心跳信息进行扫描。通过检查每个 Broker 最近一次发送心跳的时间,NameServer 能够判断出哪些 Broker 已经处于不活跃状态。
一旦发现有不活跃的 Broker,NameServer 会及时关闭与这些 Broker 之间的连接通道。这样做的目的是为了释放系统资源,避免因无效连接占用过多的系统资源,从而保障整个系统的高效运行。
通过这种定时的心跳上报和扫描机制,Broker 和 NameServer 能够实时监控彼此的状态,确保系统的稳定性和可靠性。
2. 扫描不活跃的 broker
这个方法是 NamesrvController#initialize 方法启动的,按道理来说是放在 RocketMQ NameServer 系列才对,但是这里面涉及到 broker 的检测,所以还是放到了 broker 系列。
/*** 扫描不活跃的 broker*/
public void scanNotActiveBroker() {// 遍历 brokerLiveTable 集合Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();while (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();// 获取这个 broker 最后一次上报心跳信息的时间long last = next.getValue().getLastUpdateTimestamp();// 如果 2 小时内都没有上报过心跳信息了if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {// 关闭连接通道RemotingUtil.closeChannel(next.getValue().getChannel());// 删除这个 broker 的心跳信息it.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);// 删掉 broker 的信息this.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}
}
scanNotActiveBroker 方法中会遍历 brokerLiveTable 集合,判断如果这个 broker 2 小时内都没有上报过心跳信息了,就认为这个 broker 连接过期或者 broker 出问题了,这种情况下关闭和这个 broker 的连接通道,同时删掉 broker 上报过的心跳信息。
3. closeChannel 关闭连接通道
public static void closeChannel(Channel channel) {final String addrRemote = RemotingHelper.parseChannelRemoteAddr(channel);channel.close().addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {log.info("closeChannel: close the connection to remote address[{}] result: {}", addrRemote,future.isSuccess());}});
}
关闭连接通道就是直接调用通道的 close 方法去关闭,这里面就是 Netty 的源码了。
4. onChannelDestroy 删除 broker 信息
这个方法就是对 brokerLiveTable、filterServerTable、brokerAddrTable、clusterAddrTable、topicQueueTable 进行维护,由于涉及到源码比较多,下面也是拆开来分析。
- brokerLiveTable: broker 地址 -> broker 活跃信息,里面维持了已建立连接的 broker 的连接通道。
- filterServerTable: broker 地址 -> 这个 broker 的过滤服务器地址,过滤服务器在当前这个版本已经看不到了,所以可以忽略。
- brokerAddrTable: brokerName -> broker 信息的集合(注意主从集群的 brokerName 是一样的,这样才能够存储一个集群下面的所有 broker 地址)。
- topicQueueTable: topic -> 队列信息,因为一个 topic 可以存储到多个 broker 下面,所以是一个 List。
- clusterAddrTable: 集群名称 -> 集群下面的所有 broker 主从集群的 broker 名称(主从 broker 的 brokerName 一般是一样的)
/*** 要销毁的连接的地址* @param remoteAddr* @param channel*/
public void onChannelDestroy(String remoteAddr, Channel channel) {...
}
首先这个方法中传入了要销毁的 broker 的通道和远程地址,所以一开始的逻辑就是需要在 brokerLiveTable 中找到这个通道对应的 brokerName,这样能判断 broker 有没有上报过心跳给 NameServer。
if (channel != null) {try {try {// 加读锁this.lock.readLock().lockInterruptibly();Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =this.brokerLiveTable.entrySet().iterator();while (itBrokerLiveTable.hasNext()) {Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();if (entry.getValue().getChannel() == channel) {// 是否找到对应的 broker 连接通道brokerAddrFound = entry.getKey();break;}}} finally {// 解除读锁this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}
}
查找的方法很简单,就是遍历这个集合里面的所有 value,然后一个一个比较,注意这里就是 == 去比较的,但是我说实话上层方法 scanNotActiveBroker
都已经扫描出 brokerLiveTable 集合里面的这个 channel 过期了才调用这个方法去删除 broker 信息,为什么不直接把 key 和 value 都传到 onChannelDestroy 去删除呢?
不过我在写的时候看了下这个方法的调用,结果发现这个方法之前好像还讲过。【RocketMQ NameServer】- NettyEventExecutor 处理 Netty 事件,就是 Netty 服务端通过 NettyEventExecutor 去监听 NettyEvent 事件,然后调用 BrokerHousekeepingServer 去处理空闲的连接,但是这篇文章只是把源码贴出来,所以这里还是再详细重新介绍一遍。
好了,回到源码,继续看,如果没找到就将 brokerAddrFound 赋值为 remoteAddr。
// 如果没找到就赋值为 remoteAddr
if (null == brokerAddrFound) {brokerAddrFound = remoteAddr;
} else {log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);
}
接着继续判断,如果找到了地址,也就是上面的 brokerAddrFound 不为空,说明需要将 broker 信息从这几个集合中删掉,这种情况下先维护 brokerLiveTable
和 filterServerTable
,维护逻辑很简单,直接删掉就行。
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {try {try {// 加写锁this.lock.writeLock().lockInterruptibly();// 从 brokerLiveTable 集合删掉这个 brokerthis.brokerLiveTable.remove(brokerAddrFound);this.filterServerTable.remove(brokerAddrFound);}...}
}
然后就是维护 brokerAddrTable,那我们都知道 broker 集群下面会有多个 broker 节点,那这里要销毁的可能只是一个从节点或者主节点,所以我们需要先找到要删除的这个 broker 的 brokerName,因为这个 onChannelDestroy 传入的只是地址,并没有传入 brokerName,包括 brokerLiveTable 集合都不存储 brokerName 的。
String brokerNameFound = null;
boolean removeBrokerName = false;
// 然后再遍历 brokerAddrTable, 也要维护这个 broker 集群集合的地址消息
Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();
/*** 1. 维护 brokerAddrTable*/
while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {// 获取这个集群下面的 broker 信息BrokerData brokerData = itBrokerAddrTable.next().getValue();// 遍历这个集群下面的主从节点Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();while (it.hasNext()) {Entry<Long, String> entry = it.next();Long brokerId = entry.getKey();String brokerAddr = entry.getValue();// 如果找到了要删除的if (brokerAddr.equals(brokerAddrFound)) {// 记录下要删除的 brokerNamebrokerNameFound = brokerData.getBrokerName();// 删掉it.remove();log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",brokerId, brokerAddr);break;}}if (brokerData.getBrokerAddrs().isEmpty()) {// 如果说删除了这个 broker 信息之后这个集群下面的 broker 地址为空了removeBrokerName = true;// 那么就将这个 brokerName -> brokerData 的映射也删掉, 表明此时这个 broker 集群已经没有节点了itBrokerAddrTable.remove();log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",brokerData.getBrokerName());}
}
可以看到上面的 brokerNameFound 就是这个地址对应的 brokerName,找到之后就将 brokerAddr 从 brokerAddrs 集合中删掉,可以看下面图。
删掉之后,如果说 brokerAddrs 这个集合为空了,那么说明这个 brokerName 集群下面的主从节点都挂了,这种情况下就需要将这个 brokerName 从 brokerAddrTable 集合中删掉,到这里就维护好了 brokerAddrTable。
接着往下看,维护好了 brokerAddrTable 后下一个要维护的就是 clusterAddrTable,之所以先维护 brokerAddrTable 是因为需要判断 brokerName 有没有删掉,clusterAddrTable 存储的是 clusterName 和 brokerName 的关系,所以要放到后面去维护,维护的逻辑也很简单,就是将 brokerName 从集合中删掉,删掉之后看下这个 cluster 集群里面还有没有其他 broker 集群,如果没有了,那也将这个 cluster 集群删掉。
/*** 2. 维护 clusterAddrTable*/
if (brokerNameFound != null && removeBrokerName) {// 然后如果说删除了一个 brokerName, 也要维护 clusterAddrTable 集合, 这个集合记录了 clusterName -> Set(brokerName) 的集合,// 这里要说下 clusterAddrTable 这个集合, broker 配置文件可以指定 brokerClusterName 为集群名称, 同时这个集群里面可以包含多个 broker// 主从集群, 这些主从 broker 集群的 brokerName 是一样的, 依靠 brokerId 区分主节点还是从节点, 所以这个 table 的 value 是 Set 集合Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();while (it.hasNext()) {Entry<String, Set<String>> entry = it.next();// 集群名称String clusterName = entry.getKey();// 集群下面的 broker 主从集群的 brokerName 集合Set<String> brokerNames = entry.getValue();// 从集合中删掉boolean removed = brokerNames.remove(brokerNameFound);if (removed) {// 如果删掉了就打印日志log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",brokerNameFound, clusterName);// 如果删掉这个 brokerName 之后, 集群下面没有 broker 集群了, 就删掉这个 clusterif (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",clusterName);it.remove();}break;}}
}
最后再维护 topicQueueTable,一个 topic 可以分布在多个 broker 集群上,所以如果删除了 brokerName 这个集群,那么就需要将 topic 在这个 broker 上面的配置信息删掉,注意 topicQueueTable 的 value 是一个 List< QueueData> 集合,所以需要遍历这个集合的 value,找到对应的 brokerName,然后删掉,逻辑也不复杂。
/*** 3. 维护 topicQueueTable*/
if (removeBrokerName) {// 遍历 topicQueueTable 下面的 topic 信息Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =this.topicQueueTable.entrySet().iterator();while (itTopicQueueTable.hasNext()) {Entry<String, List<QueueData>> entry = itTopicQueueTable.next();// 遍历所有 topicString topic = entry.getKey();// 遍历 topic 的队列配置信息List<QueueData> queueDataList = entry.getValue();Iterator<QueueData> itQueueData = queueDataList.iterator();while (itQueueData.hasNext()) {// 因为 topic 下面的队列可以分配到不同 broker 上面, 所以需要把分配到这个 brokerName 集群上面的队列也// 移除掉QueueData queueData = itQueueData.next();if (queueData.getBrokerName().equals(brokerNameFound)) {// 从集合中移除itQueueData.remove();log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",topic, queueData);}}if (queueDataList.isEmpty()) {// 如果移除之后已经没有队列了, 那么这个 topic 队列配置信息也要删掉了itTopicQueueTable.remove();log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",topic);}}
}
5. 小结
好了,这篇文章介绍了 NameSever 是如何扫描出长期没有上报心跳的 broker 同时进行清除的,下一篇文章我们就来看下 broker 又是怎么清除过期的 producer 和 consumer。
如有错误,欢迎指出!!!!