【物联网】MQTT(Message Queuing Telemetry Transport)是什么?
MQTT 是什么
一句话:MQTT(Message Queuing Telemetry Transport)是一种轻量级发布/订阅消息协议,跑在 TCP 长连接 上,专为物联网这种“带宽小、设备多、实时性要求高”的场景设计。
5 个核心点
-
发布/订阅(Pub/Sub):客户端把消息发布到某个 Topic;订阅该 Topic 的客户端就能收到。
-
Broker:消息中转站(服务器)。所有客户端都只连 Broker,不直接互连。
-
Topic:字符串路径(比如
iot/sensors/data
),可以分层;支持通配符#
、+
。 -
QoS:三种可靠等级
- QoS 0:至多一次(不重试,最快)
- QoS 1:至少一次(会重发,可能重复)
- QoS 2:正好一次(最稳但最慢)
-
低开销:头部很小、保持长连接、消息体自定义(文本/JSON/二进制都行)。
在你的项目里的应用(Node→Edge→Cloud)
你的架构:
Arduino (BLE Peripheral)└── BLE Notify/Write↓↑
Raspberry Pi (BLE Central + MQTT Client)└── TCP 长连↓↑
AWS/EC2 Mosquitto (MQTT Broker)
- 上行(设备 → 云)
Arduino 通过 BLE Notify 推送 JSON 给 RPi → RPi 把 JSON publish 到
iot/sensors/data
→ Broker 转发给所有订阅者(例如 AWS 测试面板、你的监控脚本)。 - 下行(云 → 设备)
云端在iot/commands/arduino
发布命令 → RPi 作为订阅者 on_message 收到 → 用 BLE Write 写给 Arduino → Arduino 执行(点亮板载 LED)。
重点:Arduino 不直接说 MQTT,由 RPi 充当“协议桥”(BLE↔MQTT)。这叫 Edge Bridge/Gateway。
在你代码中的体现(逐行映射)
以你用的“桥接脚本”为例(edge_ble_mqtt_bridge.py
的结构):
# 1) 定义 Topic —— “地址体系”
TOPIC_SENSOR = "iot/sensors/data" # 上行:发布传感器数据用
TOPIC_CMD = "iot/commands/arduino" # 下行:订阅命令用
# 2) 建立 MQTT 客户端并连接 Broker(EC2 或校园 Broker)
mqtt_client = mqtt.Client()
mqtt_client.connect(MQTT_HOST, MQTT_PORT, 60) # 60s keepalive,底层是 TCP 长连接
mqtt_client.loop_start() # 后台线程处理网络读写
# 3) 订阅“下行命令”主题,并设置回调
mqtt_client.subscribe(TOPIC_CMD)def on_mqtt_message(client, userdata, msg):payload = msg.payload.decode("utf-8")# ……把命令通过 BLE 写入 Arduinoawait client_ble.write_gatt_char(CMD_UUID, payload.encode("utf-8"))mqtt_client.on_message = on_mqtt_message
# 4) 从 BLE 收到“上行数据”回调时,发布到 MQTT
def on_ble_notify(_sender, data: bytearray):s = data.decode("utf-8") # Arduino 发来的 JSON(温湿度)mqtt_client.publish(TOPIC_SENSOR, s) # 发布到上行 Topic,云端即可收到
对应理解
-
connect()
:和 Broker 建立 TCP 长连接(默认端口 1883;TLS 是 8883)。 -
loop_start()
:起一个后台线程处理心跳、重连、收发,主线程可以继续 BLE 事件循环。 -
subscribe()
:声明“我要听这个频道”。 -
on_message
:只要有人往那个频道发消息,你就能在这里“收到并处理”。 -
publish(topic, payload)
:往某个频道广播消息。payload
你用的是 JSON 文本(通用、易读)。 -
你用的 Topic 分层很清晰:
iot/sensors/data
→ 一般是上行(设备状态/测量值)iot/commands/arduino
→ 一般是下行(控制命令)
如何理解:和 HTTP 的区别
- HTTP:一次请求、一次响应、短连接为主;客户端要“主动拉”。
- MQTT:长连接,Broker 中转,订阅后被动收到(发布者/订阅者互不感知对方存在),非常适合实时推送与群发。
- 你的项目里:RPi 订阅命令,不用轮询;云端一发,RPi 就立刻收到并写给 Arduino。
进阶(你可以加的 3 个小增强)
-
QoS:
- 上行发布可用
client.publish(topic, payload, qos=1)
,防丢但可能重复(可在 JSON 里带ts
去重)。
- 上行发布可用
-
保留消息(Retain):
client.publish(topic, payload, retain=True)
,Broker 会“记住最后一条”,新订阅者刚连上就能拿到“最近状态”(比如最近一条温湿度)。
-
“遗嘱”(LWT):
client.will_set("iot/status/arduino", "offline", retain=True)
,当 RPi 异常掉线时 Broker 自动发布此消息;配合online
/offline
做设备在线指示。
(课堂演示可以说:我们当前用 1883 匿名便于教学;生产应使用 TLS 8883 + 证书/鉴权,AWS IoT Core 就是这么做的。)
生活中的体现(你能举的例子)
- 智能家居:温湿度、门磁、灯开关都用 MQTT;手机 App 或 Home Assistant 订阅
home/+/state
实时更新界面,发布home/livingroom/light/cmd
控灯。 - 共享单车/物流车队:车载终端定时发布
fleet/<bike_id>/gps
,调度平台订阅;平台下发fleet/<bike_id>/lock
控制电机锁。 - 工业设备:PLC 把产线状态发布到
factory/line1/status
,告警系统/看板订阅;运维系统发布factory/line1/cmd
下发复位。 - 电梯/冷链监控:传感器持续上报温度/振动,异常时手机订阅者即时收到;MQTT 在 2G/3G/弱网下也很稳。
- 校园实验/竞赛:多组设备统一连到学校的 Broker;老师只要订阅特定 Topic 就能看到每组实验数据。
“我们把 MQTT 用作 设备与云之间的消息总线。RPi 作为客户端连到 Broker,把从 BLE 收到的 JSON 发布到
iot/sensors/data
,这就是上行;同时 订阅iot/commands/arduino
,云端一发布命令,它就 write_gatt_char 写回 Arduino,控制 LED,这就是下行。MQTT 采用 发布/订阅 + 长连接 的方式,开销小、实时性好,比 HTTP 轮询更适合物联网。”