Day37 MQTT协议 多客户端服务器模型
day37 MQTT协议 多客户端服务器模型
服务器/多客户端模型
在服务器开发中,处理多个客户端连接是常见需求。根据不同的应用场景和资源限制,有多种实现方式:
多客户端处理模型
- 多路IO多客户端模型:使用
select
或epoll
等IO多路复用技术,单线程处理多个客户端连接 - 并发多客户端模型:使用
fork
创建子进程或pthread
创建子线程处理每个客户端 - 循环服务器模型:简单的
while(1)
循环中依次处理accept()
、recv()
、send()
,但只能处理单个客户端
线程模型实现 (tcp_thread)
服务器端代码 (ser.c)
#include <netinet/in.h> // 提供Internet地址族相关定义,如sockaddr_in结构体
#include <netinet/ip.h> // 提供IP协议相关定义
#include <pthread.h> // 提供线程操作相关函数和数据类型
#include <stdio.h> // 提供输入输出函数
#include <stdlib.h> // 提供标准库函数,如内存分配、程序退出等
#include <string.h> // 提供字符串操作函数
#include <sys/socket.h> // 提供套接字相关函数和数据类型
#include <sys/types.h> // 提供基本系统数据类型
#include <time.h> // 提供时间相关函数
#include <unistd.h> // 提供Unix标准函数,如close、read、write等
#include <semaphore.h> // 提供信号量相关函数和数据类型// 定义一个信号量,用于同步主线程和子线程对连接套接字的处理
sem_t sem_conn;// 线程处理函数:用于处理客户端连接的子线程
void *th(void *arg)
{// 将传递过来的连接套接字描述符转换为int类型int conn = *(int *)arg;// 发送信号量,通知主线程可以继续接受新的连接sem_post(&sem_conn);// 分离当前线程,使其在结束时自动释放资源,无需主线程调用pthread_joinpthread_detach(pthread_self());time_t tm; // 用于存储时间的变量// 循环处理客户端请求while (1){char buf[1024] = {0}; // 缓冲区,用于接收和发送数据// 从客户端接收数据,存入buf缓冲区int ret = recv(conn, buf, sizeof(buf), 0);// 如果接收失败或客户端关闭连接(ret <= 0)if (ret <= 0){close(conn); // 关闭连接套接字break; // 退出循环,结束线程}// 获取当前系统时间time(&tm);// 将客户端发送的内容与当前时间拼接在一起sprintf(buf, "%s %s", buf, ctime(&tm));// 将拼接后的内容发送回客户端send(conn, buf, strlen(buf), 0);}return NULL;
}// 定义一个类型别名SA,代表struct sockaddr*,简化代码书写
typedef struct sockaddr *(SA);int main(int argc, char **argv)
{// 创建监听套接字// AF_INET:使用IPv4地址族// SOCK_STREAM:使用面向连接的TCP协议// 0:自动选择合适的协议(此处为TCP)int listfd = socket(AF_INET, SOCK_STREAM, 0);if (-1 == listfd) // 如果套接字创建失败{perror("scoket error\n"); // 打印错误信息return 1; // 异常退出程序}// 定义服务器和客户端的地址结构体struct sockaddr_in ser, cli;// 初始化地址结构体为0bzero(&ser, sizeof(ser));bzero(&cli, sizeof(cli));// 设置服务器地址信息ser.sin_family = AF_INET; // 使用IPv4地址族ser.sin_port = htons(50000); // 设置端口号为50000,htons用于主机字节序转网络字节序ser.sin_addr.s_addr = INADDR_ANY; // 绑定到所有可用的网络接口(所有IP地址)// 将监听套接字绑定到指定的地址和端口int ret = bind(listfd, (SA)&ser, sizeof(ser));if (-1 == ret) // 如果绑定失败{perror("bind"); // 打印错误信息return 1; // 异常退出程序}// 开始监听套接字,等待客户端连接// 第二个参数3表示等待连接队列的最大长度(三次握手未完成的连接)listen(listfd, 3);socklen_t len = sizeof(cli); // 用于存储客户端地址结构体的长度// 初始化信号量,第二个参数0表示线程间共享,第三个参数0表示初始值sem_init(&sem_conn, 0, 0);// 循环接受客户端连接while (1){// 接受客户端连接,返回一个新的连接套接字用于与该客户端通信// listfd:监听套接字// (SA)&cli:用于存储客户端地址信息// &len:用于存储客户端地址结构体的长度int conn = accept(listfd, (SA)&cli, &len);if (-1 == conn) // 如果接受连接失败{perror("accept"); // 打印错误信息close(conn); // 关闭连接套接字continue; // 继续接受下一个连接}// 创建子线程,用于处理当前客户端的请求pthread_t tid;// 第一个参数:线程ID// 第二个参数:线程属性,NULL表示使用默认属性// 第三个参数:线程处理函数// 第四个参数:传递给线程处理函数的参数(连接套接字描述符)pthread_create(&tid, NULL, th, &conn);// 等待信号量,确保子线程已经获取了连接套接字描述符sem_wait(&sem_conn);}// 关闭监听套接字(实际中由于上面是无限循环,这里的代码不会执行)close(listfd);// 销毁信号量sem_destroy(&sem_conn);return 0;
}
客户端代码 (cli.c)
#include <netinet/in.h> // 提供Internet地址族相关定义
#include <netinet/ip.h> // 提供IP协议相关定义
#include <stdio.h> // 提供输入输出函数
#include <stdlib.h> // 提供标准库函数
#include <string.h> // 提供字符串操作函数
#include <sys/socket.h> // 提供套接字相关函数
#include <sys/types.h> // 提供基本系统数据类型
#include <time.h> // 提供时间相关函数
#include <unistd.h> // 提供Unix标准函数(如sleep)// 定义结构体指针别名,简化代码书写
typedef struct sockaddr *(SA);int main(int argc, char **argv)
{// 创建TCP套接字// AF_INET: 使用IPv4地址族// SOCK_STREAM: 使用面向连接的TCP协议// 0: 自动选择合适的协议(此处为TCP)int conn = socket(AF_INET, SOCK_STREAM, 0);if (-1 == conn) // 检查套接字创建是否失败{perror("socket"); // 输出错误信息return 1; // 异常退出}// 定义服务器地址结构体struct sockaddr_in ser;// 初始化地址结构体为0bzero(&ser, sizeof(ser));// 设置服务器地址信息ser.sin_family = AF_INET; // 使用IPv4地址族ser.sin_port = htons(50000); // 服务器端口号(50000),转换为网络字节序ser.sin_addr.s_addr = INADDR_ANY; // 连接到本地所有可用接口(实际应指定服务器IP)// 连接到服务器int ret = connect(conn, (SA)&ser, sizeof(ser));if (-1 == ret) // 检查连接是否失败{perror("connect error\n"); // 输出错误信息return 1; // 异常退出}// 循环发送10次数据int i = 10;while (i){char buf[1024] = "hello,this is tcp test"; // 要发送的消息// 发送数据到服务器send(conn, buf, strlen(buf), 0);// 清空缓冲区,准备接收数据bzero(buf, sizeof(buf));// 接收服务器返回的数据recv(conn, buf, sizeof(buf), 0);// 打印服务器返回的内容printf("from ser:%s\n", buf);// 休眠1秒sleep(1);i--; // 减少循环计数}// 关闭连接套接字close(conn);return 0;
}
理想运行结果:
- 服务器启动后监听50000端口
- 客户端连接成功后,每秒发送一条"hello,this is tcp test"消息
- 服务器接收消息后添加当前时间戳返回给客户端
- 客户端输出类似:
from ser:hello,this is tcp test Thu Jun 20 14:30:45 2023
进程模型实现 (tcp_fork)
#include <netinet/in.h> // 提供Internet地址族相关定义
#include <netinet/ip.h> // 提供IP协议相关定义
#include <signal.h> // 提供信号处理相关函数
#include <stdio.h> // 提供输入输出函数
#include <stdlib.h> // 提供标准库函数
#include <string.h> // 提供字符串操作函数
#include <sys/socket.h> // 提供套接字相关函数
#include <sys/types.h> // 提供基本系统数据类型
#include <sys/wait.h> // 提供进程等待相关函数
#include <time.h> // 提供时间相关函数
#include <unistd.h> // 提供Unix标准函数// 定义结构体指针别名,简化代码书写
typedef struct sockaddr *(SA);// 信号处理函数:用于处理子进程退出信号,回收僵尸进程
void myhandle(int num)
{// 等待子进程结束,回收其资源,防止僵尸进程wait(NULL);
}int main(int argc, char **argv)
{// 注册SIGCHLD信号的处理函数为myhandle// 当子进程退出时会产生SIGCHLD信号,触发该函数回收资源signal(SIGCHLD, myhandle);// 创建监听套接字// AF_INET: 使用IPv4地址族// SOCK_STREAM: 使用面向连接的TCP协议// 0: 自动选择合适的协议(此处为TCP)int listfd = socket(AF_INET, SOCK_STREAM, 0);if (-1 == listfd) // 检查套接字创建是否失败{perror("scoket error\n"); // 输出错误信息return 1; // 异常退出}// 定义服务器和客户端的地址结构体struct sockaddr_in ser, cli;// 初始化地址结构体为0bzero(&ser, sizeof(ser));bzero(&cli, sizeof(cli));// 设置服务器地址信息ser.sin_family = AF_INET; // 使用IPv4地址族ser.sin_port = htons(50000); // 服务器端口号(50000),转换为网络字节序ser.sin_addr.s_addr = INADDR_ANY; // 绑定到所有可用的网络接口(所有IP地址)// 将监听套接字绑定到指定的地址和端口int ret = bind(listfd, (SA)&ser, sizeof(ser));if (-1 == ret) // 检查绑定是否失败{perror("bind"); // 输出错误信息return 1; // 异常退出}// 开始监听套接字,等待客户端连接// 第二个参数3表示等待连接队列的最大长度(三次握手未完成的连接)listen(listfd, 3);socklen_t len = sizeof(cli); // 用于存储客户端地址结构体的长度time_t tm; // 用于存储时间的变量// 循环接受客户端连接while (1){// 接受客户端连接,返回一个新的连接套接字用于与该客户端通信// listfd:监听套接字// (SA)&cli:用于存储客户端地址信息// &len:用于存储客户端地址结构体的长度int conn = accept(listfd, (SA)&cli, &len);if (-1 == conn) // 检查接受连接是否失败{perror("accept"); // 输出错误信息close(conn); // 关闭连接套接字continue; // 继续接受下一个连接}// 创建子进程,用于处理当前客户端的请求pid_t pid = fork();if (pid > 0) // 父进程分支{// 父进程不需要连接套接字,关闭它close(conn);// wait(); // 注释掉的等待方式,改用信号处理}else if (0 == pid) // 子进程分支{// 子进程不需要监听套接字,关闭它close(listfd);// 循环处理客户端请求while (1){char buf[1024] = {0}; // 缓冲区,用于接收和发送数据// 从客户端接收数据int ret = recv(conn, buf, sizeof(buf), 0);// 如果接收失败或客户端关闭连接(ret <= 0)if (ret <= 0){break; // 退出循环}// 获取当前系统时间time(&tm);// 将客户端发送的内容与当前时间拼接sprintf(buf, "%s %s", buf, ctime(&tm));// 将拼接后的内容发送回客户端send(conn, buf, strlen(buf), 0);}// 处理完毕,退出子进程exit(1);}else // fork失败分支{perror("fork"); // 输出错误信息continue; // 继续接受下一个连接}}// 关闭监听套接字(实际中由于上面是无限循环,这里的代码不会执行)close(listfd);return 0;
}
理想运行结果:
- 服务器启动后监听50000端口
- 每个客户端连接都会创建一个子进程处理
- 信号处理函数自动回收僵尸进程
- 客户端连接后,服务器会回复带时间戳的消息
- 多个客户端可以同时连接并获得服务
MQTT协议详解
MQTT概述
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,构建于TCP/IP协议之上。该协议由IBM在1999年发布,具有轻量、简单、开放和易于实现的特点,在物联网、小型设备、移动应用等领域应用广泛。
核心概念
-
发布/订阅模式:
- 客户端分为发布者和订阅者
- 发布者将消息发布到特定主题(Topic)
- 订阅者通过订阅感兴趣的主题来接收消息
- 这种模式实现了应用程序之间的解耦
- 多个订阅者可以同时接收来自同一主题的消息
-
服务质量(QoS):
- QoS 0:最多分发一次,消息可能丢失或重复,适用于对消息可靠性要求不高的场景,如环境传感器数据采集
- QoS 1:至少分发一次,确保消息到达,但可能出现重复,常用于设备控制指令传输
- QoS 2:仅分发一次,保证消息只到达一次,适用于对消息可靠性要求极高的场景,如金融交易数据传输
-
主题(Topic)与主题过滤器(Topic Filter):
- 主题是消息的分类标识,是UTF-8编码字符串
- 不能超过65535字节,层级数量无限制
- 区分大小写
- 主题过滤器用于订阅时筛选感兴趣的主题,可包含通配符
+
:单层通配符,只能用于单个主题层级匹配#
:多层通配符,用于匹配主题中任意层级,但必须位于主题过滤器的最后
-
会话(Session):
- 客户端与服务器建立连接后形成一个会话
- 会话存在于网络连接期间,也可能跨越多个连续的网络连接
- 会话用于存储客户端和服务器之间的交互状态,如客户端的订阅信息、未确认的消息等
-
遗嘱(Last Will and Testament):
- 客户端可设置遗嘱消息,当客户端异常断开连接时,服务器会发布该遗嘱消息
- 需在Connect时由客户端指定相关设置项
- 包括Will Flag(开启或关闭遗嘱功能)、Will QoS(遗嘱消息的服务质量等级)、Will Retain(遗嘱是否保留)、Will Topic(遗嘱话题)和Will Payload(遗嘱消息内容)
协议格式
MQTT数据包由三部分构成:
1. 固定头(Fixed Header)
- 存在于所有MQTT数据包中
- 包含数据包类型(如CONNECT、PUBLISH等)
- 包含标识位(如DUP、QoS、RETAIN)
- 剩余长度(表示可变头和消息体的总大小)
2. 可变头(Variable Header)
- 部分MQTT数据包包含可变头
- 内容因数据包类型而异
- 一些数据包(如PUBLISH (QoS > 0)、PUBACK等)的可变头中包含2字节的数据包标识字段
3. 消息体(Payload)
- 部分MQTT数据包包含消息体
- 不同类型的消息体内容不同
- CONNECT消息体:包含客户端的ClientID、订阅的Topic、Message以及用户名和密码
- SUBSCRIBE消息体:是一系列要订阅的主题以及QoS
- SUBACK消息体:是服务器对SUBSCRIBE申请的主题及QoS的确认和回复
- UNSUBSCRIBE消息体:是要取消订阅的主题
服务器与客户端工作流程
连接建立阶段
-
客户端发起连接:
- 客户端通过TCP/IP建立底层连接
- 发送
CONNECT
报文(首个报文) - 报文中含协议名、协议级别、连接标志、保持连接时长等
- 有效载荷包含客户端标识符(ClientId)、用户名/密码等
-
服务器确认连接:
- 服务器验证
CONNECT
报文合法性 - 发送
CONNACK
报文响应 - 报文中含"当前会话标志"(
Session Present
)和"连接返回码" - 连接返回码0表示连接成功,非0表示拒绝原因
- 服务器验证
消息交互阶段(发布-订阅核心流程)
-
客户端订阅主题:
- 客户端发送
SUBSCRIBE
报文 - 有效载荷含"主题过滤器+请求QoS"
- 服务器发送
SUBACK
报文确认,有效载荷含对应主题的"授权QoS"
- 客户端发送
-
客户端发布消息:
- 发布端客户端发送
PUBLISH
报文 - 可变报头含"主题名"和"报文标识符"(仅QoS>0时需含)
- 服务器接收后,根据主题名匹配所有订阅该主题的客户端
- 按订阅的授权QoS,向每个匹配客户端转发
PUBLISH
报文
- 发布端客户端发送
-
客户端取消订阅:
- 客户端发送
UNSUBSCRIBE
报文 - 有效载荷含待取消的主题过滤器
- 服务器发送
UNSUBACK
报文确认
- 客户端发送
连接维护与断开
-
心跳保活:
- 客户端需确保控制报文发送间隔不超过"保持连接时长"
- 无报文可发时需发送
PINGREQ
报文 - 服务器收到后必须回复
PINGRESP
报文 - 若1.5倍保持连接时长内无客户端报文,服务器需断开连接
-
正常断开:
- 客户端需发送
DISCONNECT
报文,之后关闭TCP连接 - 服务器收到
DISCONNECT
后,需丢弃该客户端的遗嘱消息(若存在)
- 客户端需发送
发布-订阅机制核心规则
-
主题与主题过滤器:
- 主题名是消息标识(如
/sensor/temp
,UTF-8编码,无通配符) - 主题过滤器是订阅时的匹配规则(支持
+
单层通配符,如/sensor/+
;#
多层通配符,如/sensor/#
) - 服务器按"逐层级匹配"规则,将
PUBLISH
报文转发给所有主题过滤器匹配的订阅客户端
- 主题名是消息标识(如
-
保留消息(Retain):
- 客户端发布
PUBLISH
时若设RETAIN=1
,服务器需存储该消息 - 新订阅匹配主题的客户端会立即收到该保留消息
- 若发布
RETAIN=1
且有效载荷为空的消息,服务器会删除对应主题的保留消息
- 客户端发布
-
遗嘱消息(Will):
- 客户端
CONNECT
时设Will Flag=1
,需指定"遗嘱主题"、“遗嘱消息”、“遗嘱QoS”、“遗嘱Retain” - 若客户端异常断开(如网络故障、超时),服务器会自动将遗嘱消息发布到遗嘱主题
- 若客户端正常发送
DISCONNECT
,服务器需丢弃遗嘱消息
- 客户端
QoS(服务质量)等级与流程
-
QoS 0(最多一次):
- 流程:发布端发送
PUBLISH
(QoS=0,无报文标识符,DUP=0
),接收端无需回复 - 消息可能丢失(如网络中断),不重发
- 适用场景:环境传感器数据(如实时温度,丢失一次可容忍)
- 流程:发布端发送
-
QoS 1(至少一次):
- 流程:
- 发布端发送
PUBLISH
(QoS=1,含报文标识符,DUP=0
),并存储消息 - 接收端接收后,发送
PUBACK
报文(含相同标识符),并将消息交给应用 - 发布端收到
PUBACK
后,删除存储的消息 - 若未收到
PUBACK
,发布端需重发PUBLISH
(DUP=1
),接收端可能收到重复消息
- 发布端发送
- 适用场景:设备控制指令(如开灯,需确保到达,重复可通过应用层去重)
- 流程:
-
QoS 2(仅一次):
- 流程(四次握手):
- 发布端发送
PUBLISH
(QoS=2,含标识符,DUP=0
),存储消息 - 接收端接收后,发送
PUBREC
报文(含相同标识符),存储标识符,不交给应用 - 发布端收到
PUBREC
后,删除PUBLISH
消息,发送PUBREL
报文(含相同标识符) - 接收端收到
PUBREL
后,将消息交给应用,发送PUBCOMP
报文(含相同标识符) - 发布端收到
PUBCOMP
后,删除PUBREL
相关状态 - 各环节未收到确认均需重发,确保消息仅到达一次
- 发布端发送
- 适用场景:计费数据、交易指令(不允许丢失或重复)
- 流程(四次握手):
应用场景
-
物联网场景:
- 智能家居(智能家电、传感器通信)
- 工业物联网(工厂设备数据交互)
- 农业监测(土壤湿度等传感器数据传输)
- 设备通过MQTT实现数据上报与远程控制
-
传感器数据传输:
- 环境监测(气象站、水质传感器)
- 农业监测等场景
- 传感器借MQTT将采集数据发送至服务器,供相关人员订阅获取实时数据
-
设备监控管理:
- 服务器集群(CPU、内存等状态监控)
- 移动设备(基站、智能电表状态上报)
- 运维/维护人员订阅主题掌握设备状态
-
消息推送:
- 轻量级即时通讯软件
- 移动应用(新闻、社交类)用MQTT推送消息
- 用户订阅主题接收实时信息
Wireshark抓包分析
环境准备
-
安装Wireshark:
- 从Wireshark官网(https://www.wireshark.org/download.html)下载并安装
- 安装过程中选择合适的网络适配器相关选项
-
配置MQTT相关参数:
- 打开Wireshark,进入"编辑"菜单选择"首选项"
- 在"协议"中找到"mqtt",设置对应参数(一般选择3.1.1版本和1883端口)
抓包操作
-
选择网络接口:
- 选择合适的网络接口(本地运行选回环地址,局域网环境选对应接口)
-
设置捕获过滤器(可选):
- 如只想捕获MQTT数据包,可设置"tcp.port == 1883"
-
开始捕获:
- 点击"开始捕获"按钮或选择"捕获">“开始捕获”
抓包后的操作
-
停止捕获:
- 点击"停止捕获"按钮
-
保存与导出数据包:
- 选择"文件">“保存”,通常保存为.pcapng格式
-
利用显示过滤器筛选分析:
- 过滤所有MQTT连接:输入"mqtt"
- 过滤特定QoS等级的消息:“mqtt.qos == 1”
- 过滤特定主题的消息:“mqtt.topic == ‘test/topic’”
MQTT实战应用
库的移植
MQTT库移植步骤:
-
OpenSSL编译安装:
tar -xvf openssl-1.0.0s.tar.gz cd openssl-1.0.0s ./config enable-shared -fPIC # 必须加入-fPIC选项 make sudo make install
-
Paho MQTT C库编译:
unzip paho.mqtt.c-master.zip cd paho.mqtt.c-master
- 修改Makefile:
- 122行:
CC ?= gcc
- 133行:添加
CFLAGS += -I /usr/local/ssl/include LDFLAGS += -L /usr/local/ssl/lib
- 192行:确保路径正确
CCFLAGS_SO += -Wno-deprecated-declarations -DOSX -I /usr/local/ssl/include LDFLAGS_CS += -Wl,-install_name,lib$(MQTTLIB_CS).so.${MAJOR_VERSION} -L /usr/local/ssl/lib
- 122行:
- 编译安装:
make sudo make install
- 修改Makefile:
云平台设备配置
OneNET云平台配置
-
注册账号并登录控制台
-
产品开发 -> 产品品类 -> 设备接入 -> MQTT协议
-
创建产品:
- 产品ID:Qon3io17BJ
- 设备名称:test1
- 设备密钥:c2Q2OVJKcW5KNDBQdmFLcm1OZEFmZU56cUJhSkhjd2o=
-
生成连接参数:
products/{产品id}/devices/{设备名字}
- 示例:
products/Qon3io17BJ/devices/test1
- 签名参数:
version=2018-10-31&res=products%2FQon3io17BJ%2Fdevices%2Ftest1&et=1837255523&method=md5&sign=vTKE9XEYychiMZcr34TjuQ%3D%3D
-
添加物模型:
- 产品开发 -> 详情 -> 设置物模型
- 添加自定义功能点(如温度)
-
设备管理:
- 设备管理 -> 详情 -> 属性 -> 实时刷新
MQTT Demo程序详解
头文件 (head.h)
#ifndef HEAD_H
#define HEAD_H#include <MQTTAsync.h>
#include <MQTTClient.h>#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>// OneNET MQTT服务器地址
#define NEW_ADDRESS "tcp://183.230.40.96:1883"
// 设备名称
#define DEV_NAME "test1"
// 客户端ID(与设备名称相同)
#define CLIENTID DEV_NAME
// 产品ID
#define PRODUCT_ID "Qon3io17BJ"
// 连接密码(包含签名信息)
#define PASSWD "version=2018-10-31&res=products%2FQon3io17BJ%2Fdevices%2Ftest1&et=1837255523&method=md5&sign=vTKE9XEYychiMZcr34TjuQ%3D%3D"
// 服务质量等级
#define QOS 0
// 等待消息完成的超时时间(毫秒)
#define TIMEOUT 10000L#endif // HEAD_H
主程序 (main.c)
//https://eclipse.dev/paho/files/mqttdoc/MQTTClient/html/_m_q_t_t_client_8h.html#a9a0518d9ca924d12c1329dbe3de5f2b6
#include <stdio.h>
#include "head.h"// 存储订阅和发布的主题
static char topic[2][200] = {0};
// MQTT客户端实例
static MQTTClient client;
// 消息ID计数器
static int id = 10000;
// 用于存储已确认送达的消息令牌
volatile static MQTTClient_deliveryToken deliveredtoken;// 构建主题名称
void pack_topic(char * dev_name, char * pro_id)
{// 订阅主题格式:$sys/{产品ID}/{设备名称}/thing/property/post/replysprintf(topic[0], "$sys/%s/%s/thing/property/post/reply", pro_id, dev_name);// 发布主题格式:$sys/{产品ID}/{设备名称}/thing/property/postsprintf(topic[1], "$sys/%s/%s/thing/property/post", pro_id, dev_name);
}// 发送成功后的回调函数
void delivered(void *context, MQTTClient_deliveryToken dt)
{printf("Message with token value %d delivery confirmed\n", dt);deliveredtoken = dt;
}// 接收到消息的回调函数
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{int i;char* payloadptr;printf("Message arrived\n");printf(" topic: %s\n", topicName);printf(" message: ");// 打印消息内容payloadptr = (char*)message->payload;for(i=0; i<message->payloadlen; i++){putchar(*payloadptr++);}putchar('\n');// 释放消息和主题内存MQTTClient_freeMessage(&message);MQTTClient_free(topicName);return 1;
}// 掉线后的回调函数
void connlost(void *context, char *cause)
{printf("\nConnection lost\n");printf(" cause: %s\n", cause);
}// MQTT客户端初始化
int mqtt_init()
{// 构建主题pack_topic(DEV_NAME, PRODUCT_ID);// 创建MQTT客户端int rc = MQTTClient_create(&client, NEW_ADDRESS, CLIENTID,MQTTCLIENT_PERSISTENCE_NONE, NULL);if(MQTTCLIENT_SUCCESS != rc){printf("create mqtt client failure...\n");exit(1);}// 初始化连接选项MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;conn_opts.keepAliveInterval = 20; // 保持连接间隔(秒)conn_opts.cleansession = 1; // 清理会话conn_opts.username = PRODUCT_ID; // 用户名(产品ID)conn_opts.password = PASSWD; // 密码(包含签名信息)// 设置回调函数rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);if(MQTTCLIENT_SUCCESS != rc){printf("Failed to set callbacks, return code %d\n", rc);exit(EXIT_FAILURE);}// 连接到MQTT服务器if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS){printf("Failed to connect, return code %d\n", rc);exit(EXIT_FAILURE);}// 订阅主题(当前被注释,可根据需要启用)
#if 0MQTTClient_subscribe(client, topic[0], QOS);printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n", topic[0], CLIENTID, QOS);
#endif return rc;
}// 发送MQTT消息
int mqtt_send(char * key, int value)
{MQTTClient_deliveryToken deliveryToken;MQTTClient_message test2_pubmsg = MQTTClient_message_initializer;// 需要发送的正文char message[1024] = {0};// 设置消息属性test2_pubmsg.qos = QOS;test2_pubmsg.retained = 0;test2_pubmsg.payload = message;// 构建JSON格式的消息sprintf(message,"{\"id\":\"%d\",\"version\":\"1.0\",\"params\":{\"%s\":{\"value\":%d}}}",id++, key, value);test2_pubmsg.payloadlen = strlen(message);printf("%s\n",message);// 发布消息int rc = MQTTClient_publishMessage(client, topic[1], &test2_pubmsg, &deliveryToken);if(MQTTCLIENT_SUCCESS != rc){printf("client to publish failure.. %lu\n", pthread_self());exit(1);}// 等待消息确认printf("Waiting for up to %d seconds for publication on topic %s for client with ClientID: %s\n",(int)(TIMEOUT/1000), topic[0], CLIENTID);MQTTClient_waitForCompletion(client, deliveryToken, TIMEOUT);sleep(1);return rc;
}// 释放MQTT资源
void mqtt_deinit()
{// 断开连接MQTTClient_disconnect(client, 10000);// 销毁客户端MQTTClient_destroy(&client);
}// 主函数
int main(void)
{// 初始化MQTT客户端mqtt_init();// 持续发送随机温度数据while(1){int value = rand() % 100 + 1;mqtt_send("tmp", value);}// 释放资源(实际不会执行到此处)mqtt_deinit();return 0;
}
理想运行结果:
{"id":"10000","version":"1.0","params":{"tmp":{"value":42}}}
Waiting for up to 10 seconds for publication on topic $sys/Qon3io17BJ/test1/thing/property/post/reply for client with ClientID: test1
Message arrivedtopic: $sys/Qon3io17BJ/test1/thing/property/post/replymessage: {"id":"10000","code":200,"msg":"success"}{"id":"10001","version":"1.0","params":{"tmp":{"value":75}}}
Waiting for up to 10 seconds for publication on topic $sys/Qon3io17BJ/test1/thing/property/post/reply for client with ClientID: test1
Message arrivedtopic: $sys/Qon3io17BJ/test1/thing/property/post/replymessage: {"id":"10001","code":200,"msg":"success"}...
程序将不断生成随机温度值(1-100之间),以JSON格式发送到OneNET平台,平台会返回确认消息。在OneNET控制台的设备属性中,可以看到实时更新的温度数据。
wireshark抓包
MQTTtest1
Qon3io17BJyversion=2018-10-31&res=products%2FQon3io17BJ%2Fdevices%2Ftest1&et=1837255523&method=md5&sign=vTKE9XEYychiMZcr34TjuQ%3D%3D