当前位置: 首页 > web >正文

【RocketMQ Broker 相关源码】- 清除不活跃的生产者、消费者、过滤服务器连接

文章目录

  • 1. 前言
  • 2. 客户端心跳检测服务
  • 3. scanExceptionChannel 扫描不活跃的连接
    • 3.1 ProducerManager#scanNotActiveChannel
    • 3.2 ConsumerManager#scanNotActiveChannel
    • 3.3 FilterServerManager#scanNotActiveChannel
  • 4. 小结


本文章基于 RocketMQ 4.9.3

1. 前言

  • 【RocketMQ】- 源码系列目录
  • 【RocketMQ Broker 相关源码】- 清除不活跃的 broker

上一篇文章我们探究了 NameServer 是如何清除不活跃的 broker,而这篇文章我们也来看下 broker 是如何清除不活跃的生产者、消费者、过滤服务器连接。


2. 客户端心跳检测服务

在 broker#start 方法启动 broker 的时候会启动客户端心跳检测定时任务,这个任务会在启动 10s 之后执行,每隔 10s 执行一次。在这里插入图片描述
在 start 方法中会调用 scanExceptionChannel 去扫描过期的连接,这个连接里面包括生产者连接、消费者连接和过滤服务器连接,之前 broker 启动的文章我们也分析过了,这个版本下过滤服务器应该是不再用了,而且在 5.0.0 之后这个过滤服务也会从 RocketMQ 中删掉,推荐用 TAG 过滤和 SQL92 过滤。

/*** 启动 10s 之后执行,每隔 10s 执行一次*/
public void start() {this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// 扫描过期的连接ClientHousekeepingService.this.scanExceptionChannel();} catch (Throwable e) {log.error("Error occurred when scan not active client channels.", e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);
}/*** 扫描出不活跃的连接*/
private void scanExceptionChannel() {// 生产者连接this.brokerController.getProducerManager().scanNotActiveChannel();// 消费者连接this.brokerController.getConsumerManager().scanNotActiveChannel();// 过滤服务器连接this.brokerController.getFilterServerManager().scanNotActiveChannel();
}

3. scanExceptionChannel 扫描不活跃的连接

3.1 ProducerManager#scanNotActiveChannel

这个方法就是扫描生产者的非活跃连接,生产者和消费者启动的时候会通过定时任务每隔 30s 上报一次心跳信息。而由于生产者和消费者都有组的概念,所以 broker 存储连接通道会使用 Map 存储,key 就是生产者组,value 就是这个消费者组下面的所有生产者。

scanNotActiveChannel 的逻辑就是扫描所有的生产者组下面的生产者,然后判断如果当前距离上一次上报心跳信息已经超过了 120s,说明是一个不活跃的连接,这种情况下会关闭这个连接的通道,然后将这个连接从 groupChannelTable 集合中删掉。 lastUpdateTimestamp 记录的就是上一次上报心跳的事件。

/*** 扫描不活跃的生产者连接*/
public void scanNotActiveChannel() {for (final Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> entry : this.groupChannelTable.entrySet()) {// 生产者组final String group = entry.getKey();// 生产者组下面的生产者连接final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = entry.getValue();// 遍历所有连接Iterator<Entry<Channel, ClientChannelInfo>> it = chlMap.entrySet().iterator();while (it.hasNext()) {Entry<Channel, ClientChannelInfo> item = it.next();// final Integer id = item.getKey();final ClientChannelInfo info = item.getValue();// 如果当前距离上一次上报心跳事件已经超过了 120s, 说明是一个不活跃的连接, 生产者和消费者会每隔 30s 上报一次心跳信息long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();if (diff > CHANNEL_EXPIRED_TIMEOUT) {it.remove();clientChannelTable.remove(info.getClientId());log.warn("SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);// 关闭连接RemotingUtil.closeChannel(info.getChannel());}}}
}

3.2 ConsumerManager#scanNotActiveChannel

/*** 扫描出非活跃的消费者连接*/
public void scanNotActiveChannel() {// 遍历所有消费者组Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();while (it.hasNext()) {Entry<String, ConsumerGroupInfo> next = it.next();// 消费者组String group = next.getKey();// 消费者组下面的消费者连接集合ConsumerGroupInfo consumerGroupInfo = next.getValue();ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable =consumerGroupInfo.getChannelInfoTable();Iterator<Entry<Channel, ClientChannelInfo>> itChannel = channelInfoTable.entrySet().iterator();// 遍历所有消费者连接while (itChannel.hasNext()) {// 获取连接的信息Entry<Channel, ClientChannelInfo> nextChannel = itChannel.next();ClientChannelInfo clientChannelInfo = nextChannel.getValue();// 如果距离上一次上报心跳已经超过 120s, 说明连接不活跃long diff = System.currentTimeMillis() - clientChannelInfo.getLastUpdateTimestamp();if (diff > CHANNEL_EXPIRED_TIMEOUT) {log.warn("SCAN: remove expired channel from ConsumerManager consumerTable. channel={}, consumerGroup={}",RemotingHelper.parseChannelRemoteAddr(clientChannelInfo.getChannel()), group);// 关闭连接, 将这个连接从消费者组连接集合中删掉RemotingUtil.closeChannel(clientChannelInfo.getChannel());itChannel.remove();}}// 如果这个消费者组连接都删掉了, 就把这个消费者组也删掉if (channelInfoTable.isEmpty()) {log.warn("SCAN: remove expired channel from ConsumerManager consumerTable, all clear, consumerGroup={}",group);it.remove();}}
}

这个方法跟上面生产者的差不多的,都是扫描消费者连接,如果这个消费者距离上一次上报心跳已经超过 120s,说明连接不活跃,将这个连接删掉,消费者也是一样 30s 上报一次心跳。


3.3 FilterServerManager#scanNotActiveChannel

/*** 扫描不活跃的过滤服务连接*/
public void scanNotActiveChannel() {// 遍历所有过滤服务连接Iterator<Entry<Channel, FilterServerInfo>> it = this.filterServerTable.entrySet().iterator();while (it.hasNext()) {Entry<Channel, FilterServerInfo> next = it.next();// 上一次上报的心跳事件long timestamp = next.getValue().getLastUpdateTimestamp();Channel channel = next.getKey();// 如果超过 30s 没有上报信息if ((System.currentTimeMillis() - timestamp) > FILTER_SERVER_MAX_IDLE_TIME_MILLS) {// 删除并关闭连接log.info("The Filter Server<{}> expired, remove it", next.getKey());it.remove();RemotingUtil.closeChannel(channel);}}
}

这里逻辑跟上面的差不多,但是因为过滤服务器这个版本不再用了,所以直接看注释就行。


4. 小结

好了,这篇文章我们就探究了 broker 是如何扫描出不活跃的的生产者、消费者和过滤服务器,生产者和消费者在启动的时候都会每隔 30s 上报一次心跳到 broker,所以 broker 这里的判断是如果生产者和消费者 120s 内没有再次上报心跳,就认为这个连接是不活跃的,过期的,可能是生产者或者消费者遇到了什么问题,这种情况下 broker 就会断开这个连接,等待后续服务恢复之后再次和 broker 建连。





如有错误,欢迎指出!!!!

http://www.xdnf.cn/news/7051.html

相关文章:

  • 重排序模型解读 mxbai-rerank-base-v2 强大的重排序模型
  • Vue+Vite学习笔记
  • DeepSeek 大模型部署全指南:常见问题、优化策略与实战解决方案
  • 布隆过滤器介绍及其在大数据场景的应用
  • java.lang.ArithmeticException
  • Vivado2024.2+Modelsim仿真环境搭建大全(保姆式说明)
  • Three.js 中调试 Raycaster 的方法
  • MySQL8新特性底层原理
  • 淘宝商品主图标题api接口(附API接口文档)
  • Linux - 2.系统命令
  • MySQL 高级查询:JOIN、子查询、窗口函数
  • 搭建一个WordPress网站需要多少成本
  • QML元素 - ZoomBlur
  • 内核链表常用接口的一些理解
  • 2025/517学习
  • No More Adam: 新型优化器SGD_SaI
  • MySQL Workbench 工具导出与导入数据库:实用指南
  • 文件共享ftb
  • 多平台屏幕江湖生存指南
  • MongoDB聚合查询:从入门到精通
  • 现代健康生活养生指南
  • nodejs 文件的复制
  • 【人工智能】微调的艺术:将大模型塑造成你的专属智能助手
  • 大模型技术演进与应用场景深度解析
  • Type-C连接器:数字时代接口革命的终极答案
  • C语言中字符串函数的详细讲解
  • 2025年数字孪生技术最新应用案例:跨领域实践与技术趋势
  • OpenAI新发布Codex的全面解析
  • C语言输入函数对比解析
  • GPIO点亮LED