当前位置: 首页 > news >正文

分布式MQTT客户端看门狗机制设计与实现

2. 看门狗调度机制

  • 背景与问题

    面临的挑战

    在传统的微服务集群部署中,每个服务实例都可能需要连接MQTT服务器处理设备消息。这会带来几个问题:

    • 消息重复处理:多个节点同时订阅同一个Topic,导致同一条消息被处理多次
    • 资源浪费:每个节点都维护MQTT连接,占用不必要的网络和内存资源
    • 状态不一致:多个节点并发处理设备指令,可能导致设备状态混乱

    业务需求

    对于设备管理服务,我们需要确保:

    • 每条MQTT消息只被处理一次
    • 服务具备高可用性,单节点故障不影响消息处理
    • 系统能够自动进行故障恢复

    解决方案设计

    核心思想

    通过分布式锁 + 看门狗的机制,确保在任意时刻只有一个节点负责MQTT连接和消息处理,同时保证服务的高可用性。

  • @Component
    public class MqttClientStart implements ApplicationRunner, DisposableBean {private static final String MQTT_LOCK_KEY =        "Service:Mqtt:Consumers:Client:Watchdog:Lock";private static final Long LOCK_TIMEOUT = 120L;  // 锁超时时间private static final int LOCK_RENEW_INTERVAL = 100;  // 续期间隔private final String nodeId = RequestUtils.getHostname();  // 节点唯一标识private final AtomicBoolean watchdogRunning = new AtomicBoolean(false);private final AtomicBoolean mqttInitialized = new AtomicBoolean(false);
    }

    设计要点

  • 使用主机名作为节点唯一标识
  • 锁超时时间120秒,续期间隔100秒,避免网络抖动导致的锁丢失
  • 通过AtomicBoolean确保状态的线程安全
    private void startWatchdog() {watchdogExecutor = Executors.newSingleThreadScheduledExecutor(r -> {Thread thread = new Thread(r, "mqtt-watchdog-" + nodeId);thread.setDaemon(true);// 关键:设置未捕获异常处理器thread.setUncaughtExceptionHandler((t, ex) -> {log.error("Uncaught exception in watchdog thread {}: {}", t.getName(), ex.getMessage(), ex);handleWatchdogFailure(ex);});return thread;});watchdogExecutor.scheduleAtFixedRate(this::watchdogTask, 1, LOCK_RENEW_INTERVAL, TimeUnit.SECONDS);
    }

设计亮点

  • 单线程调度器避免并发问题
  • 守护线程确保不阻塞应用关闭
  • 完善的异常处理机制

3. 核心业务逻辑

 

private void watchdogTask() {try {boolean hasLock = myRedisLock.tryReentrantLock(MQTT_LOCK_KEY, nodeId, LOCK_TIMEOUT);if (hasLock) {// 获得锁且未初始化 -> 初始化MQTT客户端if (!mqttInitialized.get()) {log.info("Node {} acquired lock. Initializing MQTT client...", nodeId);initializeMqttClient();}} else {// 失去锁且已初始化 -> 关闭MQTT客户端if (mqttInitialized.get()) {log.info("Node {} lost lock. Shutting down MQTT client...", nodeId);shutdownMqttClient();}}} catch (Exception e) {log.error("Error in MQTT watchdog task:", e);if (mqttInitialized.get()) {shutdownMqttClient();  // 异常时确保资源清理}}
}

核心逻辑

  • 持有锁 + 未初始化 → 启动MQTT客户端
  • 失去锁 + 已初始化 → 关闭MQTT客户端
  • 异常情况下确保资源清理

4. 故障恢复机制

private void handleWatchdogFailure(Throwable ex) {watchdogRunning.set(false);// 异步延迟重启CompletableFuture.runAsync(() -> {try {Thread.sleep(5000);  // 延迟5秒重启if (watchdogExecutor != null) {watchdogExecutor.shutdown();}startWatchdog();} catch (InterruptedException e) {Thread.currentThread().interrupt();}});
}

容错设计

  • 异常发生时自动重启看门狗
  • 延迟重启避免频繁失败
  • 异步处理不阻塞当前线程

运行流程

正常运行流程

  1. 应用启动:各节点启动看门狗线程
  2. 锁竞争:各节点尝试获取Redis分布式锁
  3. 角色确定:获得锁的节点成为Active,其他为Standby
  4. MQTT管理:Active节点初始化MQTT客户端,开始处理消息
  5. 锁续期:Active节点定期续期锁,Standby节点继续尝试获取锁

故障切换流程

  1. 故障检测:Active节点故障,停止锁续期
  2. 锁释放:Redis锁超时自动释放(120秒后)
  3. 角色切换:Standby节点获得锁,升级为Active
  4. 服务恢复:新Active节点初始化MQTT客户端,恢复消息处理

优势与权衡

主要优势

高可用性

  • 单节点故障时自动切换,服务不中断
  • 故障恢复时间可控(最多120秒)

数据一致性

  • 确保消息唯一性处理
  • 避免重复操作和状态冲突

运维友好

  • 自动故障检测和恢复
  • 完善的日志记录便于问题排查

设计权衡

性能方面

  • 牺牲了并发处理能力
  • MQTT处理能力无法水平扩展

资源利用

  • 其他节点的MQTT处理资源闲置
  • 可能造成负载不均

    适用场景

    这种设计适合以下场景:

    • 对消息处理一致性要求较高
    • MQTT消息量不大,单节点可以处理
    • 更重视可用性而非性能
http://www.xdnf.cn/news/998785.html

相关文章:

  • ShardingSphere解析:分布式数据库中间件的分片设计与事务管理实践
  • react实现axios 的简单封装
  • 单链表经典算法
  • 【鸿蒙开发】组件动态创建
  • Linux检验库是否安装成功
  • 多线程(4)
  • 【大模型】实践之1:macOS一键部署本地大模型
  • std::make_shared简化智能指针 `std::shared_ptr` 的创建过程,并提高性能(减少内存分配次数,提高缓存命中率)
  • Tomcat 和 Spring MVC
  • SQL进阶之旅 Day 29:NoSQL结合使用策略
  • docker-自动启动java 包
  • 使用VSCode开发FastAPI指南
  • Python 实现 Web 请求与响应
  • VSCode - Trae 插件关闭弹出框代码补全
  • 【C++学习笔记】 std::atomic 拷贝构造错误解析
  • docker-compose容器单机编排
  • el-select+el-tree实现树形下拉选择
  • tabs页签嵌套表格,切换表格保存数据不变并回勾
  • CSS 外边距合并(Margin Collapsing)问题研究
  • Karate 与Playwright的比较和融合
  • spring boot项目整合mybatis实现多数据源的配置
  • RAG Food Project
  • GAN+ECA注意力机制实现图像超分辨率重建
  • ESP32-C3FH4X—低功耗、高集成度的 MCU 系统级芯片 (SoC)
  • 基于数据库实现配置管理和定时任务启停
  • 强化学习:策略梯度概念
  • word用endnote插入国标参考文献
  • 在 Flutter 项目中iOS 的 App 图标和 App 名称 的设置
  • 探索 Excel-to-JSON:高效数据转换的利器
  • Linux Alias 魔法:命令行效率提升秘籍