设备服务管理上报方案
简介
本文介绍一个设备管理上报系统的实现方案。通过WebSocket连接,服务实例能够主动向设备管理服务注册并周期性上报状态信息。核心功能包括:1)启动时发送注册消息(包含设备ID、最大负载等);2)定时发送心跳包,附带当前实际连接数;3)具备断线重连机制(采用指数退避算法)和优雅停止功能。系统集成在服务主进程中,通过后台协程方式运行,在配置文件中可灵活启用或关闭该功能。
import asyncio
import websockets
import json
import randomasync def mock_device():# 读取配置信息with open('node_config.json', 'r') as f:config = json.load(f)device_id = config['device_id']max_number = config['max_number']address = config['client_connection_address']# async with websockets.connect('ws://***') as ws:async with websockets.connect('wss://***.com') as ws:# 注册设备await ws.send(json.dumps({'type': 'register','device_id': device_id,'max_number': max_number,'client_connection_address': address,'metadata': {'model': 'v1.0'}}))print(await ws.recv())# 心跳while True:await asyncio.sleep(random.randint(1, 5))await ws.send(json.dumps({'type': 'heartbeat','device_id': device_id,'current_number':0,}))asyncio.run(mock_device())
上面这个设备模拟器脚本,将这台设备注册到设备管理的WebSocket服务里,并持续上报心跳与负载信息,借鉴这个思路,将其运用到当前需要被监控的服务后端上。
服务主函数启动时就主动调用一个“设备管理上报”功能,定期把本实例的状态(如当前连接数)上报到设备管理服务,用于主动更新信息。
- 在 `app.py` 内新增了一个后台协程函数,启动时自动运行,职责是:
- 建立到配置的设备管理 WebSocket 的连接
- 发送注册消息 type=register(包含 device_id、max_number、client_connection_address、metadata)
- 周期性发送心跳 type=heartbeat,并把当前实际连接数 current_number 设置为 WebSocket 服务的活跃连接数
- 该任务具有断线重连(指数退避)和优雅停止(随主进程退出自动取消)的能力。
主要代码示例
async def device_management_reporter(ws_server: WebSocketServer, config: dict):"""在进程启动后,主动向“设备管理服务”上报注册与心跳信息。device_management:enabled: truews_url: ""device_id: "device_001"max_number: 1client_connection_address: ""heartbeat_interval: 5metadata:model: "v1.0"若未配置或未启用,将跳过。"""dm_cfg = (config or {}).get("device_management") or {}if not dm_cfg or not dm_cfg.get("enabled"):logger.bind(tag=TAG).info("设备管理上报未启用,跳过。")returnurl = dm_cfg.get("ws_url")device_id = dm_cfg.get("device_id")max_number = int(dm_cfg.get("max_number", 1))heartbeat_interval = int(dm_cfg.get("heartbeat_interval", 5))metadata = dm_cfg.get("metadata") or {"model": "v1.0"}# 计算默认的对外连接地址(如果未显式配置)server_cfg = (config or {}).get("server") or {}ws_port = int(server_cfg.get("port", 8000))default_client_addr = f"ws://{get_local_ip()}:{ws_port}/***"client_addr = dm_cfg.get("client_connection_address") or default_client_addrif not url or not device_id:logger.bind(tag=TAG).warning("设备管理上报未启动:缺少 ws_url 或 device_id 配置")returnbackoff = 2 # 指数退避初始值while True:try:logger.bind(tag=TAG).info("连接设备管理服务: {}", url)async with websockets.connect(url) as ws:# 发送注册register_payload = {"type": "register","device_id": device_id,"max_number": max_number,"client_connection_address": client_addr,"metadata": metadata,}await ws.send(json.dumps(register_payload))# 可选:等待一次响应try:ack = await asyncio.wait_for(ws.recv(), timeout=5)logger.bind(tag=TAG).info("设备管理注册响应: {}", ack)except asyncio.TimeoutError:logger.bind(tag=TAG).warning("设备管理注册未收到确认,继续心跳上报")backoff = 2 # 注册成功后重置退避# 周期性心跳while True:await asyncio.sleep(heartbeat_interval)current_number = len(getattr(ws_server, "active_connections", []))heartbeat_payload = {"type": "heartbeat","device_id": device_id,"current_number": current_number,}await ws.send(json.dumps(heartbeat_payload))except asyncio.CancelledError:# 被主进程取消,直接退出循环raiseexcept Exception as e:logger.bind(tag=TAG).warning("设备管理上报连接异常: {},将在 {} 秒后重试", e, backoff)await asyncio.sleep(min(backoff, 60))backoff = min(backoff * 2, 60)
backoff=2是指数退避算法的初始值,主要用于控制重连间隔时间,目的是避免频繁重连导致服务器压力过大。
退避算法具体作用:
1. 控制重连频率
当连接失败时,不会立即重试,而是等待 backoff
秒后再尝试:
await asyncio.sleep(backoff) # 等待一段时间再重试
2. 指数级增加等待时间
每次重连失败后,等待时间会指数级增长:
backoff *= 2 # 等待时间翻倍:2 → 4 → 8 → 16...
3. 避免服务器过载
防止大量客户端在服务器短暂故障时同时发起重连,造成"惊群效应"。
完整的工作流程:
backoff = 2 # 初始等待2秒
max_backoff = 60 # 最大等待60秒(通常还会设置上限)while True:try:# 尝试连接...async with websockets.connect(url) as ws:# 连接成功后的操作...backoff = 2 # 重要:成功后就重置等待时间# ...其他代码except Exception as e:logger.error("连接失败,{}秒后重试: {}", backoff, e)await asyncio.sleep(backoff)# 指数增加等待时间,但不超过最大值backoff = min(backoff * 2, max_backoff)
async def main():check_ffmpeg_installed()config = load_config()# 默认使用manager-api的secret作为auth_key# 如果secret为空,则生成随机密钥# auth_key用于jwt认证,比如视觉分析接口的jwt认证auth_key = config.get("manager-api", {}).get("secret", "")if not auth_key or len(auth_key) == 0 or "你" in auth_key:auth_key = str(uuid.uuid4().hex)config["server"]["auth_key"] = auth_key# 添加 stdin 监控任务stdin_task = asyncio.create_task(monitor_stdin())# 启动 WebSocket 服务器ws_server = WebSocketServer(config)ws_task = asyncio.create_task(ws_server.start())# 启动 Simple http 服务器ota_server = SimpleHttpServer(config)ota_task = asyncio.create_task(ota_server.start())# 启动设备管理上报任务(可选)dm_task = asyncio.create_task(device_management_reporter(ws_server, config))# 获取WebSocket配置,使用安全的默认值websocket_port = 8000server_config = config.get("server", {})if isinstance(server_config, dict):websocket_port = int(server_config.get("port", 8000))try:await wait_for_exit() # 阻塞直到收到退出信号except asyncio.CancelledError:print("任务被取消,清理资源中...")finally:# 取消所有任务(关键修复点)stdin_task.cancel()ws_task.cancel()if ota_task:ota_task.cancel()if dm_task:dm_task.cancel()# 等待任务终止(必须加超时)await asyncio.wait([stdin_task, ws_task, ota_task, dm_task]if ota_taskelse [stdin_task, ws_task, dm_task],timeout=3.0,return_when=asyncio.ALL_COMPLETED,)print("服务器已关闭,程序退出。")