MQTT 协议详解:物联网通信的利器
在当今物联网(IoT)迅猛发展的背景下,设备之间的高效、可靠通信变得尤为重要。MQTT(Message Queuing Telemetry Transport)作为一种轻量级的消息传输协议,因其低带宽占用和高可靠性,成为物联网领域中广受欢迎的通信工具。
官网:http://mqtt.org
1. 什么是 MQTT?
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种基于发布/订阅(publish/subscribe)模型的轻量级通信协议,专为资源受限的设备和不可靠网络环境设计。它最初由 Andy Stanford-Clark 和 Arlen Nipper 在 1999 年开发,如今已成为物联网通信的标准协议之一。
MQTT 的核心思想是通过一个中间代理(Broker)来实现消息的分发,从而解耦发送者与接收者。
客户端 | 服务端 |
一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以: | 也称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以: |
- (1)发布其他客户端可能会订阅的信息; | - (1)接受来自客户的网络连接; |
- (2)订阅其它客户端发布的消息; | - (2)接受客户发布的应用信息; |
- (3)退订或删除应用程序的消息; | - (3)处理来自客户端的订阅和退订请求; |
- (4)断开与服务器连接。 | - (4)向订阅的客户转发应用程序消息。 |
2. 设计原则
- 精简,不添加可有可无的功能;
- 发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递,解耦Client/Server模式,带来的好处在于不必预先知道对方的存在(ip/port),不必同时运行;
- 允许用户动态创建主题(不需要预先创建主题),零运维成本;
- 把传输量降到最低以提高传输效率;
- 把低带宽、高延迟、不稳定的网络等因素考虑在内;
- 支持连续的会话保持和控制(心跳);
- 理解客户端计算能力可能很低;
- 提供服务质量( quality of service level:QoS)管理;
- 不强求传输数据的类型与格式,保持灵活性(指的是应用层业务数据)。
3. MQTT 核心概念
3.1 服务端(Broker)
消息的中转站,负责接收来自客户端的消息,并将消息转发给订阅了相关主题的客户端。
3.2 客户端(Client)
客户端可以是发布者(Publisher)或订阅者(Subscriber),也可以两者兼有。每个客户端都可以连接到 Broker 并进行消息的发布或订阅。
3.3 主题(Topic)
类似于邮件地址的主题字符串,用于标识消息的分类。例如:
- sensor/temperature
- home/livingroom/light
3.4 Topic 通配符匹配规则
3.4.1 层级分隔符:/
/ 用来分隔主题树的每一层,并给主题空间提供分等级的结构。当两个通配符在一个主题中出现的时候,主题层次分隔符的使用是很重要的。
示例:
love/you/with/all/my/heart
3.4.2 多层通配符:#
多层通配符有可以标识大于等于0的层次。因此,love/# 也可以匹配到单独的 love,此时#代表0层。
多层通配符一定要是主题树的最后一个字符。比如说,love/# 是有效的,但是 love/#/with 是无效的。
love/you/# 可匹配如下内容(包括单不限于):love/you
love/you/with
love/you/with/all
love/you/with/all/my/heart
love/you/with/all/my/hearts
3.4.3 单层通配符:+
只匹配主题的一层
1. love/you/+ :匹配/love/you/with和/love/you/and,但是不匹配love/you/with/all/my/heart。
2. 单层通配符只匹配1层,love/+不匹配love。
3. 单层通配符可以被用于主题树的任意层级,连带多层通配符。它必须被用在主题层级分隔符/的右边,除非它是指定自己。因此,+和love/+都是有效的,但是love+无效。单层通配符可以用在主题树的末端,也可以用在中间。比如说,love/+和love/+/with都是有效。
注意事项:
- 主题层次分隔符被用来在主题中引入层次。多层的通配符和单层通配符可以被使用,但他们不能被使用来做发布者的消息。
- Topic命名尽量见名知意,符合规范,主题名字是大小写敏感的。比如说,love和LOVE是两个不同的主题。
- 以/开头会产生一个不同的主题。比如说,/love与love不同。/love匹配"+/+"和/+,但不匹配+。
- 不要在任何主题中包含null(Unicode \x0000)字符。
- 在主题树中,长度被限制于64k内但是在这以内没有限制层级的数目。
- 可以有任意数目的根节点: 也就是说,可以有任意数目的主题树。
3.5 $SYS 主题
以 $SYS/
开头的主题为系统主题,系统主题主要用于获取 MQTT 服务器自身运行状态、消息统计、客户端上下线事件等数据。目前,MQTT 协议暂未明确规定 $SYS/
主题标准,但大多数 MQTT 服务器都遵循该标准建议。
例如,EMQX 服务器支持通过以下主题获取集群状态。
主题 | 说明 |
---|---|
$SYS/brokers | EMQX 集群节点列表 |
$SYS/brokers/emqx@127.0.0.1/version | EMQX 版本 |
$SYS/brokers/emqx@127.0.0.1/uptime | EMQX 运行时间 |
$SYS/brokers/emqx@127.0.0.1/datetime | EMQX 系统时间 |
$SYS/brokers/emqx@127.0.0.1/sysdescr | EMQX 系统信息 |
EMQX 还支持客户端上下线事件、收发流量、消息收发、系统监控等丰富的系统主题,用户可通过订阅 $SYS/#
主题获取所有系统主题消息。详细请见:EMQX 系统主题文档。
3.6 QoS(服务质量等级)
MQTT 协议中规定了消息服务质量(Quality of Service),它保证了在不同的网络环境下消息传递的可靠性,QoS 的设计是 MQTT 协议里的重点。
MQTT 设计了 3 个 QoS 等级。
3.6.1 QoS 0(最多一次)
消息发布完全依赖底层TCP/IP网络。会发生消息丢失。 一个消息不会被接收端应答,也不会被发送者存储并再发送。这个也被叫做“即发即弃”
3.6.2 QoS 1(至少一次)
确保消息到达,但消息重复可能会发生。发送者将会存储发送的信息直到发送者收到一次来自接收者的PUBACK格式的应答。
3.6.3 QoS 2(恰好一次)
确保消息到达一次。如果接收端接收到了一个QoS为2的PUBLISH消息,它将相应地处理PUBLISH消息,并通过PUBREC消息向发送方确认。
其中:
- PUBLISH:发布消息
- PUBREC:发布收到
- PUBREL:发布释放
- PUBCOMP:发布完成
3.6.4 QoS 匹配规则总结
不同情况下,客户端收到的消息 QoS 可参考下表:
发布消息的 QoS | 主题订阅的 QoS | 接收消息的 QoS |
0 | 0 | 0 |
0 | 1 | 0 |
0 | 2 | 0 |
1 | 0 | 0 |
1 | 1 | 1 |
1 | 2 | 1 |
2 | 0 | 0 |
2 | 1 | 1 |
2 | 2 | 2 |
MQTT 中的 QoS 是一种灵活的机制,它允许发布者和订阅者独立设置期望的服务质量等级,而最终的消息传输将基于两者之间的最小值进行处理。这种设计既保障了通信的可靠性,也避免了因能力不匹配导致的资源浪费。
3.7 MQTT V3.1.1 协议报文
3.7.1 报文结构
- 固定报头(Fixed header)
- 可变报头(Variable header)
- 报文有效载荷(Payload)
3.7.2 固定报头
+----------+-----+-----+-----+-----+-----+-----+-----+-----+
| Bit | 7 | 6 | 5 | 4 | 3 | 2 | 1 | 0 |
+----------+-----+-----+-----+-----+-----+-----+-----+-----+
| byte1 | MQTT Packet type | Flags |
+----------+-----------------------+-----------------------+
| byte2... | Remaining Length |
+----------+-----------------------------------------------+
3.7.3 报文类型
类型名称 | 类型值 | 报文说明 |
---|---|---|
CONNECT | 1 | 发起连接 |
CONNACK | 2 | 连接回执 |
PUBLISH | 3 | 发布消息 |
PUBACK | 4 | 发布回执 |
PUBREC | 5 | QoS2 消息回执 |
PUBREL | 6 | QoS2 消息释放 |
PUBCOMP | 7 | QoS2 消息完成 |
SUBSCRIBE | 8 | 订阅主题 |
SUBACK | 9 | 订阅回执 |
UNSUBSCRIBE | 10 | 取消订阅 |
UNSUBACK | 11 | 取消订阅回执 |
PINGREQ | 12 | PING 请求 |
PINGRESP | 13 | PING 响应 |
DISCONNECT | 14 | 断开连接 |
其中:
- PUBLISH 发布消息:PUBLISH 报文承载客户端与服务器间双向的发布消息。 PUBACK 报文用于接收端确认 QoS1 报文,PUBREC/PUBREL/PUBCOMP 报文用于 QoS2 消息流程。
- PINGREQ/PINGRESP 心跳:客户端在无报文发送时,按保活周期(KeepAlive)定时向服务端发送 PINGREQ 心跳报文,服务端响应 PINGRESP 报文。PINGREQ/PINGRESP 报文均 2 个字节。
3.8 MQTT WebSocket 连接
MQTT 协议除支持 TCP 传输层外,还支持 WebSocket 作为传输层。通过 WebSocket 浏览器可以直连 MQTT 消息服务器,发布订阅模式与其他 MQTT 客户端通信。
MQTT 协议的 WebSocket 连接,必须采用 binary 模式,并携带子协议 Header:
Sec-WebSocket-Protocol: mqttv3.1 或 mqttv3.1.1
4. 物联网级消息中间件EMQ
4.1 EMQX 简介
EMQ X Broker 是基于高并发的 Erlang/OTP 语言平台开发,支持百万级连接和分布式集群架构,发布订阅模式的开源 MQTT 消息服务器。
EMQ官网:EMQ: 连接物理世界与人工智能
EMQX 文档:产品概览 | EMQX 1.0 文档
为什么选择EMQ X ?从支持 MQTT5.0、稳定性、扩展性、集群能力等方面考虑,EMQX 的表现应该是最好的。
4.2 EMQX的特点
- EMQ X 目前为开源社区中最流行的 MQTT 消息中间件;
- EMQ X 是开源社区中第一个支持 5.0协议规范的消息服务器,并且完全兼容 MQTT V3.1 和 V3.1.1 协议。
- 除了 MQTT 协议之外,EMQ X 还支持MQTT-SN、CoAP、 LwM2M、LoRaWAN 和 WebSocket 等物联网协议
- 单机支持百万连接,集群支持千万级连接;毫秒级消息转发。
- 易于安装和使用;
- 中国本地的技术支持服务;
- 扩展模块和插件,EMQ X 提供了灵活的扩展机制,支持企业的一些定制场景;
- 桥接
- 共享订阅
4.3 环境搭建与配置
以 Docker 运行单个 EMQX 节点为例:
1. 拉取emqx镜像
docker pull emqx/emqx:v4.1.0
2. 创建emqx容器
docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.1.0
其中,端口说明:
端口 | 协议 | 描述 |
---|---|---|
1883 | TCP | MQTT over TCP 监听器端口,主要用于未加密的 MQTT 连接。 |
8883 | TCP | MQTT over SSL/TLS 监听器端口,用于加密的 MQTT 连接。 |
8083 | TCP | MQTT over WebSocket 监听器端口,使 MQTT 能通过 WebSocket 进行通信。 |
8084 | TCP | MQTT over WSS (WebSocket over SSL) 监听器端口,提供加密的 WebSocket 连接。 |
18083 | HTTP | EMQX Dashboard 和 REST API 端口,用于管理控制台和 API 接口。 |
4370 | TCP | Erlang 分布式传输端口,根据节点名称不同实际端口可能是 BasePort (4370) + Offset 。 |
5370 | TCP | 集群 RPC 端口(在 Docker 环境下为 5369),根据节点名称不同实际端口可能是 BasePort (5370) + Offset 。 |
Docker 部署注意事项:
1. 如果需要持久化 Docker 容器中生成的数据 ,请将以下目录挂载到容器外部,这样即使容器被删除数据也不会丢失:
/opt/emqx/data /opt/emqx/log
启动容器并挂载目录:
docker run -tid --name emqx \-p 1883:1883 -p 8083:8083 \-p 8084:8084 -p 8883:8883 \-p 18083:18083 \-v $PWD/data:/opt/emqx/data \-v $PWD/log:/opt/emqx/log \emqx/emqx:v4.1.0
EMQX 历史版本:Directory listing for EMQX: / | EMQ
单节点部署:安装部署和迁移 | EMQX文档
集群部署:分布式集群介绍 | EMQX文档
安全认证:安全指南 | EMQX文档
EMQX命令行:命令行 | EMQX文档
配置文件:配置文件简介 | EMQX文档
4.4 EMQX 目录结构
目录 | 描述 | 压缩包解压安装(同 Docker) | 二进制包安装 |
---|---|---|---|
etc | 静态配置文件 | ./etc | /etc/emqx |
data | 数据和配置文件 | ./data | /var/lib/emqx |
log | 日志文件 | ./log | /var/log/emqx |
releases | 启动相关的脚本 | ./releases | /usr/lib/emqx/releases |
bin | 可执行文件 | ./bin | /usr/lib/emqx/bin |
lib | Erlang 代码 | ./lib | /usr/lib/emqx/lib |
erts-* | Erlang 虚拟机文件 | ./erts-* | /usr/lib/emqx/erts-* |
plugins | 插件 | ./plugins | /usr/lib/emqx/plugins |
以上目录中,用户经常接触与使用的是 bin、etc、data、log 目录。
4.5 EMQ Dashboard
EMQ X 提供了 Dashboard 以方便用户管理设备与监控相关指标。通过 Dashboard可以查看服务器基本信息、负载情况和统计数据,可以查看某个客户端的连接状态等信息甚至断开其连接,也可以动态加载和卸载指定插件。
访问地址:http://localhost:18083 来查看 Dashboard,默认用户名是 admin,密码是 public。
EMQX Dashboard 详细介绍:EMQX Dashboard | EMQX文档
EMQX Dashboard 之 MQTT 配置:配置和管理会话持久化 | EMQX文档
导航项目 | 说明 |
Monitor | 提供了服务端与客户端监控信息的展示页面 |
RULE ENGINE | 提供了规则引擎的可视化操作页面 |
MANAGEMENT | 提供了扩展插件与应用的管理页面 |
TOOLS | 提供了 Websocket 客户端工具以及 HTTP API 速查页面 |
ADMIN | 提供了 Dashboard 用户管理和显示设置等页面 |
4.6 客户端调试工具MQTTX
MQTT X 是 EMQ 开源的一款优雅的跨平台 MQTT 5.0 桌面客户端,它支持 macOS, Linux, Windows。
MQTT X 的 UI 采用了聊天界面形式,简化了页面操作逻辑,用户可以快速创建连接,允许保存多个客户端,方便用户快速测试 MQTT/MQTTS 连接,及 MQTT 消息的订阅和发布。
下载地址:MQTTX:全功能 MQTT 客户端工具
4.7 基础使用教程
1. 点击 + ,再点击 New Connetion
2. 填写信息并点击右上角的 Connect ,如果无其他需求,仅填写第一页内容即可,其他均为拓展项。
3. 点击 Connect 即可正常使用
界面说明:
4. 发送消息:
5. 订阅消息:
关于切换中文:
4.8 共享订阅
对应文档:共享订阅 | EMQX文档
EMQX 实现了 MQTT 的共享订阅功能。共享订阅是一种订阅模式,用于在多个订阅者之间实现负载均衡。客户端可以分为多个订阅组,消息仍然会被转发到所有订阅组,但每个订阅组内只有一个客户端接收消息。您可以为一组订阅者的原始主题添加前缀以启用共享订阅。EMQX 支持两种格式的共享订阅前缀,分别为带群组的共享订阅(前缀为 $share/<group-name>/
)和不带群组的共享订阅(前缀为 $queue/
)。两种共享订阅格式示例如下:
前缀格式 | 示例 | 前缀 | 真实主题名 |
---|---|---|---|
带群组格式 | $share/abc/t/1 | $share/abc/ | t/1 |
不带群组格式 | $queue/t/1 | $queue/ | t/1 |
需要在EMQDashboard开启该选项:(默认开启)
4.7.1 不带群组的共享订阅
格式:$queue/{TopicName}
EMQX 的共享订阅支持和策略配置:/etc/emqx.conf
# 均衡策略
## Dispatch strategy for shared subscription
##
## Value: Enum
## - random
## - round_robin
## - sticky
## - hash
broker.shared_subscription_strategy=random
均衡策略 | 描述 |
random | 在所有订阅者中随机选择 |
round_robin | 按照订阅顺序轮询 |
sticky | 一直发往上次选取的订阅者 |
hash | 按照发布者 ClientID 的哈希值 |
调用示例:
4.7.2 带群组的共享订阅
格式:$share/<group-name>/{TopicName}
调用示例:
4.8 保留消息(Retained Message)
Broker 可以保留某个主题的最后一条消息,当新客户端订阅该主题时,会立即收到这条保留消息。
需要在EMQDashboard开启该选项:(默认开启)
配置示例:
发送消息时勾选 Retain 选项。
4.9 遗嘱消息(Last Will and Testament, LWT)
客户端可以在连接时设置一个“遗嘱”,如果客户端异常断开连接,Broker 将自动发布该遗嘱消息。
配置示例:
1. 在连接配置时,将页面下拉,在 Last Will and Testament 部分,填写遗嘱消息的配置。
- 遗嘱消息主题:输入
offline
。 - 遗嘱消息 QoS:保持默认值
0
。 - 遗嘱消息保留标志:默认禁用。如果启用,遗嘱消息也将是一个保留消息。
- 遗嘱消息:输入
I'm offline
。 - 遗嘱消息延迟时间:设置为
5
秒。
4.10 排他订阅
排它订阅是 EMQX 支持的 MQTT 扩展功能。排它订阅允许对主题进行互斥订阅,一个主题同一时刻仅被允许存在一个订阅者,在当前订阅者未取消订阅前,其他订阅者都将无法订阅对应主题。
要进行排它订阅,您需要为主题名称添加前缀,如以下表格中的示例:
示例 | 前缀 | 真实主题名 |
---|---|---|
$exclusive/t/1 | $exclusive/ | t/1 |
当某个客户端 A 订阅 $exclusive/t/1
后,其他客户端再订阅 $exclusive/t/1
时都会失败,直到 A 取消了对 $exclusive/t/1
的订阅为止。
注意:
排它订阅必须使用
$exclusive/
前缀,在上面的示例中,其他客户端依然可以通过t/1
成功进行订阅。
4.10.1 订阅失败错误码
错误码 | 原因 |
---|---|
0x8F | 使用了 $exclusive/ ,但并未开启排它订阅 |
0x97 | 已经有客户端订阅了该主题 |
4.10.2 配置排它订阅
1. 开启排他订阅(默认不支持)
方式一:通过配置文件配置排它订阅
mqtt.exclusive_subscription.enable = true
方式二:通过界面配置
2. 填写 $exclusive/ + 真实主题名
4.11 延迟发布
EMQ X 的延迟发布功能可以实现按照用户配置的时间间隔延迟发布 PUBLISH 报文的功能。模块开启emqx_mod_delayed
通过 Dashboard 配置延迟发布:(默认关闭)
延迟发布主题的具体格式如下:
$delayed/{DelayInterval}/{TopicName}
其中:
- $delayed: 使用 $delayed 作为主题前缀的消息都将被视为需要延迟发布的消息。
- {DelayInterval}: 指定该 MQTT 消息延迟发布的时间间隔,单位是秒,允许的最大间隔是 4294967 秒。
- {TopicName}: MQTT 消息的主题名称。
发送示例:
5. Eclipse Paho
5.1 Eclipse Paho是什么
Eclipse paho 是 EMQX 官方推荐的实现了 mqtt 协议 Java 客户端。
其关系类似于 Mysql 于 JDBC ,我们的项目代码要连接数据库需要用到 JDBC ,而我们的项目需要连接 EMQX 需要用到 Eclipse Paho ,并且它提供了基础的消息收发。
5.2 Eclipse Paho技术调研
全语言编程:MQTT 客户端编程 | EMQX文档
Java 编程:如何在 Java 中使用 Paho MQTT 客户端 | EMQ
1. 添加依赖
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>
2. 发布消息到EMQ
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.web.bind.annotation.GetMapping;@GetMapping("/publish")
public void publish() throws MqttException {MqttClientPersistence persistence = new MemoryPersistence();//内存持久化MqttClient client = new MqttClient("tcp://localhost:1883", "random123", persistence);//连接选项中定义用户名密码和其它配置MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录options.setAutomaticReconnect(true);//是否自动重连options.setConnectionTimeout(30);//连接超时时间 秒options.setKeepAliveInterval(10);//连接保持检查周期 秒options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本client.connect(options);//连接client.publish("topic", "发送内容".getBytes(), 2, false);}
3. 订阅消息
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.web.bind.annotation.GetMapping;@GetMapping("/subscribe")public void subscribe() throws MqttException {MqttClientPersistence persistence = new MemoryPersistence();//内存持久化MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "random456", persistence);//连接选项中定义用户名密码和其它配置MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录options.setAutomaticReconnect(true);//是否自动重连options.setConnectionTimeout(30);//连接超时时间 秒options.setKeepAliveInterval(10);//连接保持检查周期 秒options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本// 获取全部主题的消息client.setCallback(new MqttCallbackExtended() {@Overridepublic void connectionLost(Throwable throwable) {System.out.println("连接丢失!");}@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {System.out.println("接收到消息 topic:" + topic + " id:" + mqttMessage.getId() + " message:" + mqttMessage.toString());}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {}@Overridepublic void connectComplete(boolean b, String s) {System.out.println("连接成功!");}});client.connect(options);//连接client.subscribe("test"); //订阅主题// 方式二:只获取当前主题的消息
// client.subscribe("test", new IMqttMessageListener() {
// @Override
// public void messageArrived(String topic, MqttMessage message) throws Exception {
// System.out.println("接收到消息 topic:" + topic + " id:" + message.getId() + " message:" + message.toString());
// }
// });}
注意:
1. 同一个 Topic 重复订阅会覆盖。
2. clientId 保证唯一,否则容易将其他相同 clientId 的连接顶掉。