分布式MQTT客户端看门狗机制设计与实现
2. 看门狗调度机制
-
背景与问题
面临的挑战
在传统的微服务集群部署中,每个服务实例都可能需要连接MQTT服务器处理设备消息。这会带来几个问题:
- 消息重复处理:多个节点同时订阅同一个Topic,导致同一条消息被处理多次
- 资源浪费:每个节点都维护MQTT连接,占用不必要的网络和内存资源
- 状态不一致:多个节点并发处理设备指令,可能导致设备状态混乱
业务需求
对于设备管理服务,我们需要确保:
- 每条MQTT消息只被处理一次
- 服务具备高可用性,单节点故障不影响消息处理
- 系统能够自动进行故障恢复
解决方案设计
核心思想
通过分布式锁 + 看门狗的机制,确保在任意时刻只有一个节点负责MQTT连接和消息处理,同时保证服务的高可用性。
-
@Component public class MqttClientStart implements ApplicationRunner, DisposableBean {private static final String MQTT_LOCK_KEY = "Service:Mqtt:Consumers:Client:Watchdog:Lock";private static final Long LOCK_TIMEOUT = 120L; // 锁超时时间private static final int LOCK_RENEW_INTERVAL = 100; // 续期间隔private final String nodeId = RequestUtils.getHostname(); // 节点唯一标识private final AtomicBoolean watchdogRunning = new AtomicBoolean(false);private final AtomicBoolean mqttInitialized = new AtomicBoolean(false); }
设计要点:
- 使用主机名作为节点唯一标识
- 锁超时时间120秒,续期间隔100秒,避免网络抖动导致的锁丢失
- 通过AtomicBoolean确保状态的线程安全
private void startWatchdog() {watchdogExecutor = Executors.newSingleThreadScheduledExecutor(r -> {Thread thread = new Thread(r, "mqtt-watchdog-" + nodeId);thread.setDaemon(true);// 关键:设置未捕获异常处理器thread.setUncaughtExceptionHandler((t, ex) -> {log.error("Uncaught exception in watchdog thread {}: {}", t.getName(), ex.getMessage(), ex);handleWatchdogFailure(ex);});return thread;});watchdogExecutor.scheduleAtFixedRate(this::watchdogTask, 1, LOCK_RENEW_INTERVAL, TimeUnit.SECONDS); }
设计亮点:
- 单线程调度器避免并发问题
- 守护线程确保不阻塞应用关闭
- 完善的异常处理机制
3. 核心业务逻辑
private void watchdogTask() {try {boolean hasLock = myRedisLock.tryReentrantLock(MQTT_LOCK_KEY, nodeId, LOCK_TIMEOUT);if (hasLock) {// 获得锁且未初始化 -> 初始化MQTT客户端if (!mqttInitialized.get()) {log.info("Node {} acquired lock. Initializing MQTT client...", nodeId);initializeMqttClient();}} else {// 失去锁且已初始化 -> 关闭MQTT客户端if (mqttInitialized.get()) {log.info("Node {} lost lock. Shutting down MQTT client...", nodeId);shutdownMqttClient();}}} catch (Exception e) {log.error("Error in MQTT watchdog task:", e);if (mqttInitialized.get()) {shutdownMqttClient(); // 异常时确保资源清理}}
}
核心逻辑:
- 持有锁 + 未初始化 → 启动MQTT客户端
- 失去锁 + 已初始化 → 关闭MQTT客户端
- 异常情况下确保资源清理
4. 故障恢复机制
private void handleWatchdogFailure(Throwable ex) {watchdogRunning.set(false);// 异步延迟重启CompletableFuture.runAsync(() -> {try {Thread.sleep(5000); // 延迟5秒重启if (watchdogExecutor != null) {watchdogExecutor.shutdown();}startWatchdog();} catch (InterruptedException e) {Thread.currentThread().interrupt();}});
}
容错设计:
- 异常发生时自动重启看门狗
- 延迟重启避免频繁失败
- 异步处理不阻塞当前线程
运行流程
正常运行流程
- 应用启动:各节点启动看门狗线程
- 锁竞争:各节点尝试获取Redis分布式锁
- 角色确定:获得锁的节点成为Active,其他为Standby
- MQTT管理:Active节点初始化MQTT客户端,开始处理消息
- 锁续期:Active节点定期续期锁,Standby节点继续尝试获取锁
故障切换流程
- 故障检测:Active节点故障,停止锁续期
- 锁释放:Redis锁超时自动释放(120秒后)
- 角色切换:Standby节点获得锁,升级为Active
- 服务恢复:新Active节点初始化MQTT客户端,恢复消息处理
优势与权衡
主要优势
高可用性
- 单节点故障时自动切换,服务不中断
- 故障恢复时间可控(最多120秒)
数据一致性
- 确保消息唯一性处理
- 避免重复操作和状态冲突
运维友好
- 自动故障检测和恢复
- 完善的日志记录便于问题排查
设计权衡
性能方面
- 牺牲了并发处理能力
- MQTT处理能力无法水平扩展
资源利用
- 其他节点的MQTT处理资源闲置
- 可能造成负载不均
适用场景
这种设计适合以下场景:
- 对消息处理一致性要求较高
- MQTT消息量不大,单节点可以处理
- 更重视可用性而非性能