当前位置: 首页 > ai >正文

Day37 MQTT协议 多客户端服务器模型

day37 MQTT协议 多客户端服务器模型

服务器/多客户端模型

在服务器开发中,处理多个客户端连接是常见需求。根据不同的应用场景和资源限制,有多种实现方式:

多客户端处理模型

  1. 多路IO多客户端模型:使用selectepoll等IO多路复用技术,单线程处理多个客户端连接
  2. 并发多客户端模型:使用fork创建子进程或pthread创建子线程处理每个客户端
  3. 循环服务器模型:简单的while(1)循环中依次处理accept()recv()send(),但只能处理单个客户端

线程模型实现 (tcp_thread)

服务器端代码 (ser.c)
#include <netinet/in.h>   // 提供Internet地址族相关定义,如sockaddr_in结构体
#include <netinet/ip.h>   // 提供IP协议相关定义
#include <pthread.h>      // 提供线程操作相关函数和数据类型
#include <stdio.h>        // 提供输入输出函数
#include <stdlib.h>       // 提供标准库函数,如内存分配、程序退出等
#include <string.h>       // 提供字符串操作函数
#include <sys/socket.h>   // 提供套接字相关函数和数据类型
#include <sys/types.h>    // 提供基本系统数据类型
#include <time.h>         // 提供时间相关函数
#include <unistd.h>       // 提供Unix标准函数,如close、read、write等
#include <semaphore.h>    // 提供信号量相关函数和数据类型// 定义一个信号量,用于同步主线程和子线程对连接套接字的处理
sem_t sem_conn;// 线程处理函数:用于处理客户端连接的子线程
void *th(void *arg)
{// 将传递过来的连接套接字描述符转换为int类型int conn = *(int *)arg;// 发送信号量,通知主线程可以继续接受新的连接sem_post(&sem_conn);// 分离当前线程,使其在结束时自动释放资源,无需主线程调用pthread_joinpthread_detach(pthread_self());time_t tm;  // 用于存储时间的变量// 循环处理客户端请求while (1){char buf[1024] = {0};  // 缓冲区,用于接收和发送数据// 从客户端接收数据,存入buf缓冲区int ret = recv(conn, buf, sizeof(buf), 0);// 如果接收失败或客户端关闭连接(ret <= 0)if (ret <= 0){close(conn);  // 关闭连接套接字break;        // 退出循环,结束线程}// 获取当前系统时间time(&tm);// 将客户端发送的内容与当前时间拼接在一起sprintf(buf, "%s %s", buf, ctime(&tm));// 将拼接后的内容发送回客户端send(conn, buf, strlen(buf), 0);}return NULL;
}// 定义一个类型别名SA,代表struct sockaddr*,简化代码书写
typedef struct sockaddr *(SA);int main(int argc, char **argv)
{// 创建监听套接字// AF_INET:使用IPv4地址族// SOCK_STREAM:使用面向连接的TCP协议// 0:自动选择合适的协议(此处为TCP)int listfd = socket(AF_INET, SOCK_STREAM, 0);if (-1 == listfd)  // 如果套接字创建失败{perror("scoket error\n");  // 打印错误信息return 1;                  // 异常退出程序}// 定义服务器和客户端的地址结构体struct sockaddr_in ser, cli;// 初始化地址结构体为0bzero(&ser, sizeof(ser));bzero(&cli, sizeof(cli));// 设置服务器地址信息ser.sin_family = AF_INET;                // 使用IPv4地址族ser.sin_port = htons(50000);             // 设置端口号为50000,htons用于主机字节序转网络字节序ser.sin_addr.s_addr = INADDR_ANY;        // 绑定到所有可用的网络接口(所有IP地址)// 将监听套接字绑定到指定的地址和端口int ret = bind(listfd, (SA)&ser, sizeof(ser));if (-1 == ret)  // 如果绑定失败{perror("bind");  // 打印错误信息return 1;        // 异常退出程序}// 开始监听套接字,等待客户端连接// 第二个参数3表示等待连接队列的最大长度(三次握手未完成的连接)listen(listfd, 3);socklen_t len = sizeof(cli);  // 用于存储客户端地址结构体的长度// 初始化信号量,第二个参数0表示线程间共享,第三个参数0表示初始值sem_init(&sem_conn, 0, 0);// 循环接受客户端连接while (1){// 接受客户端连接,返回一个新的连接套接字用于与该客户端通信// listfd:监听套接字// (SA)&cli:用于存储客户端地址信息// &len:用于存储客户端地址结构体的长度int conn = accept(listfd, (SA)&cli, &len);if (-1 == conn)  // 如果接受连接失败{perror("accept");  // 打印错误信息close(conn);       // 关闭连接套接字continue;          // 继续接受下一个连接}// 创建子线程,用于处理当前客户端的请求pthread_t tid;// 第一个参数:线程ID// 第二个参数:线程属性,NULL表示使用默认属性// 第三个参数:线程处理函数// 第四个参数:传递给线程处理函数的参数(连接套接字描述符)pthread_create(&tid, NULL, th, &conn);// 等待信号量,确保子线程已经获取了连接套接字描述符sem_wait(&sem_conn);}// 关闭监听套接字(实际中由于上面是无限循环,这里的代码不会执行)close(listfd);// 销毁信号量sem_destroy(&sem_conn);return 0;
}
客户端代码 (cli.c)
#include <netinet/in.h>   // 提供Internet地址族相关定义
#include <netinet/ip.h>   // 提供IP协议相关定义
#include <stdio.h>        // 提供输入输出函数
#include <stdlib.h>       // 提供标准库函数
#include <string.h>       // 提供字符串操作函数
#include <sys/socket.h>   // 提供套接字相关函数
#include <sys/types.h>    // 提供基本系统数据类型
#include <time.h>         // 提供时间相关函数
#include <unistd.h>       // 提供Unix标准函数(如sleep)// 定义结构体指针别名,简化代码书写
typedef struct sockaddr *(SA);int main(int argc, char **argv)
{// 创建TCP套接字// AF_INET: 使用IPv4地址族// SOCK_STREAM: 使用面向连接的TCP协议// 0: 自动选择合适的协议(此处为TCP)int conn = socket(AF_INET, SOCK_STREAM, 0);if (-1 == conn)  // 检查套接字创建是否失败{perror("socket");  // 输出错误信息return 1;          // 异常退出}// 定义服务器地址结构体struct sockaddr_in ser;// 初始化地址结构体为0bzero(&ser, sizeof(ser));// 设置服务器地址信息ser.sin_family = AF_INET;                // 使用IPv4地址族ser.sin_port = htons(50000);             // 服务器端口号(50000),转换为网络字节序ser.sin_addr.s_addr = INADDR_ANY;        // 连接到本地所有可用接口(实际应指定服务器IP)// 连接到服务器int ret = connect(conn, (SA)&ser, sizeof(ser));if (-1 == ret)  // 检查连接是否失败{perror("connect error\n");  // 输出错误信息return 1;                   // 异常退出}// 循环发送10次数据int i = 10;while (i){char buf[1024] = "hello,this is tcp test";  // 要发送的消息// 发送数据到服务器send(conn, buf, strlen(buf), 0);// 清空缓冲区,准备接收数据bzero(buf, sizeof(buf));// 接收服务器返回的数据recv(conn, buf, sizeof(buf), 0);// 打印服务器返回的内容printf("from ser:%s\n", buf);// 休眠1秒sleep(1);i--;  // 减少循环计数}// 关闭连接套接字close(conn);return 0;
}

理想运行结果

  • 服务器启动后监听50000端口
  • 客户端连接成功后,每秒发送一条"hello,this is tcp test"消息
  • 服务器接收消息后添加当前时间戳返回给客户端
  • 客户端输出类似:from ser:hello,this is tcp test Thu Jun 20 14:30:45 2023

进程模型实现 (tcp_fork)

#include <netinet/in.h>   // 提供Internet地址族相关定义
#include <netinet/ip.h>   // 提供IP协议相关定义
#include <signal.h>       // 提供信号处理相关函数
#include <stdio.h>        // 提供输入输出函数
#include <stdlib.h>       // 提供标准库函数
#include <string.h>       // 提供字符串操作函数
#include <sys/socket.h>   // 提供套接字相关函数
#include <sys/types.h>    // 提供基本系统数据类型
#include <sys/wait.h>     // 提供进程等待相关函数
#include <time.h>         // 提供时间相关函数
#include <unistd.h>       // 提供Unix标准函数// 定义结构体指针别名,简化代码书写
typedef struct sockaddr *(SA);// 信号处理函数:用于处理子进程退出信号,回收僵尸进程
void myhandle(int num)
{// 等待子进程结束,回收其资源,防止僵尸进程wait(NULL);
}int main(int argc, char **argv)
{// 注册SIGCHLD信号的处理函数为myhandle// 当子进程退出时会产生SIGCHLD信号,触发该函数回收资源signal(SIGCHLD, myhandle);// 创建监听套接字// AF_INET: 使用IPv4地址族// SOCK_STREAM: 使用面向连接的TCP协议// 0: 自动选择合适的协议(此处为TCP)int listfd = socket(AF_INET, SOCK_STREAM, 0);if (-1 == listfd)  // 检查套接字创建是否失败{perror("scoket error\n");  // 输出错误信息return 1;                  // 异常退出}// 定义服务器和客户端的地址结构体struct sockaddr_in ser, cli;// 初始化地址结构体为0bzero(&ser, sizeof(ser));bzero(&cli, sizeof(cli));// 设置服务器地址信息ser.sin_family = AF_INET;                // 使用IPv4地址族ser.sin_port = htons(50000);             // 服务器端口号(50000),转换为网络字节序ser.sin_addr.s_addr = INADDR_ANY;        // 绑定到所有可用的网络接口(所有IP地址)// 将监听套接字绑定到指定的地址和端口int ret = bind(listfd, (SA)&ser, sizeof(ser));if (-1 == ret)  // 检查绑定是否失败{perror("bind");  // 输出错误信息return 1;        // 异常退出}// 开始监听套接字,等待客户端连接// 第二个参数3表示等待连接队列的最大长度(三次握手未完成的连接)listen(listfd, 3);socklen_t len = sizeof(cli);  // 用于存储客户端地址结构体的长度time_t tm;  // 用于存储时间的变量// 循环接受客户端连接while (1){// 接受客户端连接,返回一个新的连接套接字用于与该客户端通信// listfd:监听套接字// (SA)&cli:用于存储客户端地址信息// &len:用于存储客户端地址结构体的长度int conn = accept(listfd, (SA)&cli, &len);if (-1 == conn)  // 检查接受连接是否失败{perror("accept");  // 输出错误信息close(conn);       // 关闭连接套接字continue;          // 继续接受下一个连接}// 创建子进程,用于处理当前客户端的请求pid_t pid = fork();if (pid > 0)  // 父进程分支{// 父进程不需要连接套接字,关闭它close(conn);// wait();  // 注释掉的等待方式,改用信号处理}else if (0 == pid)  // 子进程分支{// 子进程不需要监听套接字,关闭它close(listfd);// 循环处理客户端请求while (1){char buf[1024] = {0};  // 缓冲区,用于接收和发送数据// 从客户端接收数据int ret = recv(conn, buf, sizeof(buf), 0);// 如果接收失败或客户端关闭连接(ret <= 0)if (ret <= 0){break;  // 退出循环}// 获取当前系统时间time(&tm);// 将客户端发送的内容与当前时间拼接sprintf(buf, "%s %s", buf, ctime(&tm));// 将拼接后的内容发送回客户端send(conn, buf, strlen(buf), 0);}// 处理完毕,退出子进程exit(1);}else  // fork失败分支{perror("fork");  // 输出错误信息continue;        // 继续接受下一个连接}}// 关闭监听套接字(实际中由于上面是无限循环,这里的代码不会执行)close(listfd);return 0;
}

理想运行结果

  • 服务器启动后监听50000端口
  • 每个客户端连接都会创建一个子进程处理
  • 信号处理函数自动回收僵尸进程
  • 客户端连接后,服务器会回复带时间戳的消息
  • 多个客户端可以同时连接并获得服务

MQTT协议详解

MQTT概述

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,构建于TCP/IP协议之上。该协议由IBM在1999年发布,具有轻量、简单、开放和易于实现的特点,在物联网、小型设备、移动应用等领域应用广泛。

核心概念

  1. 发布/订阅模式

    • 客户端分为发布者和订阅者
    • 发布者将消息发布到特定主题(Topic)
    • 订阅者通过订阅感兴趣的主题来接收消息
    • 这种模式实现了应用程序之间的解耦
    • 多个订阅者可以同时接收来自同一主题的消息
  2. 服务质量(QoS)

    • QoS 0:最多分发一次,消息可能丢失或重复,适用于对消息可靠性要求不高的场景,如环境传感器数据采集
    • QoS 1:至少分发一次,确保消息到达,但可能出现重复,常用于设备控制指令传输
    • QoS 2:仅分发一次,保证消息只到达一次,适用于对消息可靠性要求极高的场景,如金融交易数据传输
  3. 主题(Topic)与主题过滤器(Topic Filter)

    • 主题是消息的分类标识,是UTF-8编码字符串
    • 不能超过65535字节,层级数量无限制
    • 区分大小写
    • 主题过滤器用于订阅时筛选感兴趣的主题,可包含通配符
      • +:单层通配符,只能用于单个主题层级匹配
      • #:多层通配符,用于匹配主题中任意层级,但必须位于主题过滤器的最后
  4. 会话(Session)

    • 客户端与服务器建立连接后形成一个会话
    • 会话存在于网络连接期间,也可能跨越多个连续的网络连接
    • 会话用于存储客户端和服务器之间的交互状态,如客户端的订阅信息、未确认的消息等
  5. 遗嘱(Last Will and Testament)

    • 客户端可设置遗嘱消息,当客户端异常断开连接时,服务器会发布该遗嘱消息
    • 需在Connect时由客户端指定相关设置项
    • 包括Will Flag(开启或关闭遗嘱功能)、Will QoS(遗嘱消息的服务质量等级)、Will Retain(遗嘱是否保留)、Will Topic(遗嘱话题)和Will Payload(遗嘱消息内容)

协议格式

MQTT数据包由三部分构成:

1. 固定头(Fixed Header)
  • 存在于所有MQTT数据包中
  • 包含数据包类型(如CONNECT、PUBLISH等)
  • 包含标识位(如DUP、QoS、RETAIN)
  • 剩余长度(表示可变头和消息体的总大小)
2. 可变头(Variable Header)
  • 部分MQTT数据包包含可变头
  • 内容因数据包类型而异
  • 一些数据包(如PUBLISH (QoS > 0)、PUBACK等)的可变头中包含2字节的数据包标识字段
3. 消息体(Payload)
  • 部分MQTT数据包包含消息体
  • 不同类型的消息体内容不同
    • CONNECT消息体:包含客户端的ClientID、订阅的Topic、Message以及用户名和密码
    • SUBSCRIBE消息体:是一系列要订阅的主题以及QoS
    • SUBACK消息体:是服务器对SUBSCRIBE申请的主题及QoS的确认和回复
    • UNSUBSCRIBE消息体:是要取消订阅的主题

服务器与客户端工作流程

连接建立阶段
  1. 客户端发起连接

    • 客户端通过TCP/IP建立底层连接
    • 发送CONNECT报文(首个报文)
    • 报文中含协议名、协议级别、连接标志、保持连接时长等
    • 有效载荷包含客户端标识符(ClientId)、用户名/密码等
  2. 服务器确认连接

    • 服务器验证CONNECT报文合法性
    • 发送CONNACK报文响应
    • 报文中含"当前会话标志"(Session Present)和"连接返回码"
    • 连接返回码0表示连接成功,非0表示拒绝原因
消息交互阶段(发布-订阅核心流程)
  1. 客户端订阅主题

    • 客户端发送SUBSCRIBE报文
    • 有效载荷含"主题过滤器+请求QoS"
    • 服务器发送SUBACK报文确认,有效载荷含对应主题的"授权QoS"
  2. 客户端发布消息

    • 发布端客户端发送PUBLISH报文
    • 可变报头含"主题名"和"报文标识符"(仅QoS>0时需含)
    • 服务器接收后,根据主题名匹配所有订阅该主题的客户端
    • 按订阅的授权QoS,向每个匹配客户端转发PUBLISH报文
  3. 客户端取消订阅

    • 客户端发送UNSUBSCRIBE报文
    • 有效载荷含待取消的主题过滤器
    • 服务器发送UNSUBACK报文确认
连接维护与断开
  1. 心跳保活

    • 客户端需确保控制报文发送间隔不超过"保持连接时长"
    • 无报文可发时需发送PINGREQ报文
    • 服务器收到后必须回复PINGRESP报文
    • 若1.5倍保持连接时长内无客户端报文,服务器需断开连接
  2. 正常断开

    • 客户端需发送DISCONNECT报文,之后关闭TCP连接
    • 服务器收到DISCONNECT后,需丢弃该客户端的遗嘱消息(若存在)

发布-订阅机制核心规则

  1. 主题与主题过滤器

    • 主题名是消息标识(如/sensor/temp,UTF-8编码,无通配符)
    • 主题过滤器是订阅时的匹配规则(支持+单层通配符,如/sensor/+#多层通配符,如/sensor/#
    • 服务器按"逐层级匹配"规则,将PUBLISH报文转发给所有主题过滤器匹配的订阅客户端
  2. 保留消息(Retain)

    • 客户端发布PUBLISH时若设RETAIN=1,服务器需存储该消息
    • 新订阅匹配主题的客户端会立即收到该保留消息
    • 若发布RETAIN=1且有效载荷为空的消息,服务器会删除对应主题的保留消息
  3. 遗嘱消息(Will)

    • 客户端CONNECT时设Will Flag=1,需指定"遗嘱主题"、“遗嘱消息”、“遗嘱QoS”、“遗嘱Retain”
    • 若客户端异常断开(如网络故障、超时),服务器会自动将遗嘱消息发布到遗嘱主题
    • 若客户端正常发送DISCONNECT,服务器需丢弃遗嘱消息

QoS(服务质量)等级与流程

  1. QoS 0(最多一次)

    • 流程:发布端发送PUBLISH(QoS=0,无报文标识符,DUP=0),接收端无需回复
    • 消息可能丢失(如网络中断),不重发
    • 适用场景:环境传感器数据(如实时温度,丢失一次可容忍)
  2. QoS 1(至少一次)

    • 流程:
      1. 发布端发送PUBLISH(QoS=1,含报文标识符,DUP=0),并存储消息
      2. 接收端接收后,发送PUBACK报文(含相同标识符),并将消息交给应用
      3. 发布端收到PUBACK后,删除存储的消息
      4. 若未收到PUBACK,发布端需重发PUBLISHDUP=1),接收端可能收到重复消息
    • 适用场景:设备控制指令(如开灯,需确保到达,重复可通过应用层去重)
  3. QoS 2(仅一次)

    • 流程(四次握手):
      1. 发布端发送PUBLISH(QoS=2,含标识符,DUP=0),存储消息
      2. 接收端接收后,发送PUBREC报文(含相同标识符),存储标识符,不交给应用
      3. 发布端收到PUBREC后,删除PUBLISH消息,发送PUBREL报文(含相同标识符)
      4. 接收端收到PUBREL后,将消息交给应用,发送PUBCOMP报文(含相同标识符)
      5. 发布端收到PUBCOMP后,删除PUBREL相关状态
      6. 各环节未收到确认均需重发,确保消息仅到达一次
    • 适用场景:计费数据、交易指令(不允许丢失或重复)

应用场景

  1. 物联网场景

    • 智能家居(智能家电、传感器通信)
    • 工业物联网(工厂设备数据交互)
    • 农业监测(土壤湿度等传感器数据传输)
    • 设备通过MQTT实现数据上报与远程控制
  2. 传感器数据传输

    • 环境监测(气象站、水质传感器)
    • 农业监测等场景
    • 传感器借MQTT将采集数据发送至服务器,供相关人员订阅获取实时数据
  3. 设备监控管理

    • 服务器集群(CPU、内存等状态监控)
    • 移动设备(基站、智能电表状态上报)
    • 运维/维护人员订阅主题掌握设备状态
  4. 消息推送

    • 轻量级即时通讯软件
    • 移动应用(新闻、社交类)用MQTT推送消息
    • 用户订阅主题接收实时信息

Wireshark抓包分析

环境准备
  1. 安装Wireshark

    • 从Wireshark官网(https://www.wireshark.org/download.html)下载并安装
    • 安装过程中选择合适的网络适配器相关选项
  2. 配置MQTT相关参数

    • 打开Wireshark,进入"编辑"菜单选择"首选项"
    • 在"协议"中找到"mqtt",设置对应参数(一般选择3.1.1版本和1883端口)
抓包操作
  1. 选择网络接口

    • 选择合适的网络接口(本地运行选回环地址,局域网环境选对应接口)
  2. 设置捕获过滤器(可选)

    • 如只想捕获MQTT数据包,可设置"tcp.port == 1883"
  3. 开始捕获

    • 点击"开始捕获"按钮或选择"捕获">“开始捕获”
抓包后的操作
  1. 停止捕获

    • 点击"停止捕获"按钮
  2. 保存与导出数据包

    • 选择"文件">“保存”,通常保存为.pcapng格式
  3. 利用显示过滤器筛选分析

    • 过滤所有MQTT连接:输入"mqtt"
    • 过滤特定QoS等级的消息:“mqtt.qos == 1”
    • 过滤特定主题的消息:“mqtt.topic == ‘test/topic’”

MQTT实战应用

库的移植

MQTT库移植步骤:

  1. OpenSSL编译安装

    tar -xvf openssl-1.0.0s.tar.gz
    cd openssl-1.0.0s
    ./config enable-shared -fPIC  # 必须加入-fPIC选项
    make
    sudo make install
    
  2. Paho MQTT C库编译

    unzip paho.mqtt.c-master.zip
    cd paho.mqtt.c-master
    
    • 修改Makefile:
      • 122行:CC ?= gcc
      • 133行:添加
        CFLAGS += -I /usr/local/ssl/include
        LDFLAGS += -L /usr/local/ssl/lib
        
      • 192行:确保路径正确
        CCFLAGS_SO += -Wno-deprecated-declarations -DOSX -I /usr/local/ssl/include
        LDFLAGS_CS += -Wl,-install_name,lib$(MQTTLIB_CS).so.${MAJOR_VERSION} -L /usr/local/ssl/lib
        
    • 编译安装:
      make
      sudo make install
      

云平台设备配置

OneNET云平台配置
  1. 注册账号并登录控制台

  2. 产品开发 -> 产品品类 -> 设备接入 -> MQTT协议

  3. 创建产品

    • 产品ID:Qon3io17BJ
    • 设备名称:test1
    • 设备密钥:c2Q2OVJKcW5KNDBQdmFLcm1OZEFmZU56cUJhSkhjd2o=
  4. 生成连接参数

    • products/{产品id}/devices/{设备名字}
    • 示例:products/Qon3io17BJ/devices/test1
    • 签名参数:version=2018-10-31&res=products%2FQon3io17BJ%2Fdevices%2Ftest1&et=1837255523&method=md5&sign=vTKE9XEYychiMZcr34TjuQ%3D%3D
  5. 添加物模型

    • 产品开发 -> 详情 -> 设置物模型
    • 添加自定义功能点(如温度)
  6. 设备管理

    • 设备管理 -> 详情 -> 属性 -> 实时刷新

MQTT Demo程序详解

头文件 (head.h)
#ifndef HEAD_H
#define HEAD_H#include <MQTTAsync.h>
#include <MQTTClient.h>#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <string.h>// OneNET MQTT服务器地址
#define NEW_ADDRESS     "tcp://183.230.40.96:1883"
// 设备名称
#define DEV_NAME        "test1"
// 客户端ID(与设备名称相同)
#define CLIENTID        DEV_NAME
// 产品ID
#define PRODUCT_ID      "Qon3io17BJ"
// 连接密码(包含签名信息)
#define PASSWD          "version=2018-10-31&res=products%2FQon3io17BJ%2Fdevices%2Ftest1&et=1837255523&method=md5&sign=vTKE9XEYychiMZcr34TjuQ%3D%3D"
// 服务质量等级
#define QOS             0
// 等待消息完成的超时时间(毫秒)
#define TIMEOUT         10000L#endif // HEAD_H
主程序 (main.c)
//https://eclipse.dev/paho/files/mqttdoc/MQTTClient/html/_m_q_t_t_client_8h.html#a9a0518d9ca924d12c1329dbe3de5f2b6
#include <stdio.h>
#include "head.h"// 存储订阅和发布的主题
static char topic[2][200] = {0};
// MQTT客户端实例
static MQTTClient client;
// 消息ID计数器
static int id = 10000;
// 用于存储已确认送达的消息令牌
volatile static MQTTClient_deliveryToken deliveredtoken;// 构建主题名称
void pack_topic(char * dev_name, char * pro_id)
{// 订阅主题格式:$sys/{产品ID}/{设备名称}/thing/property/post/replysprintf(topic[0], "$sys/%s/%s/thing/property/post/reply", pro_id, dev_name);// 发布主题格式:$sys/{产品ID}/{设备名称}/thing/property/postsprintf(topic[1], "$sys/%s/%s/thing/property/post", pro_id, dev_name);
}// 发送成功后的回调函数
void delivered(void *context, MQTTClient_deliveryToken dt)
{printf("Message with token value %d delivery confirmed\n", dt);deliveredtoken = dt;
}// 接收到消息的回调函数
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{int i;char* payloadptr;printf("Message arrived\n");printf("     topic: %s\n", topicName);printf("   message: ");// 打印消息内容payloadptr = (char*)message->payload;for(i=0; i<message->payloadlen; i++){putchar(*payloadptr++);}putchar('\n');// 释放消息和主题内存MQTTClient_freeMessage(&message);MQTTClient_free(topicName);return 1;
}// 掉线后的回调函数
void connlost(void *context, char *cause)
{printf("\nConnection lost\n");printf("     cause: %s\n", cause);
}// MQTT客户端初始化
int mqtt_init()
{// 构建主题pack_topic(DEV_NAME, PRODUCT_ID);// 创建MQTT客户端int rc = MQTTClient_create(&client, NEW_ADDRESS, CLIENTID,MQTTCLIENT_PERSISTENCE_NONE, NULL);if(MQTTCLIENT_SUCCESS != rc){printf("create mqtt client failure...\n");exit(1);}// 初始化连接选项MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;conn_opts.keepAliveInterval = 20;  // 保持连接间隔(秒)conn_opts.cleansession = 1;         // 清理会话conn_opts.username = PRODUCT_ID;    // 用户名(产品ID)conn_opts.password = PASSWD;        // 密码(包含签名信息)// 设置回调函数rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);if(MQTTCLIENT_SUCCESS != rc){printf("Failed to set callbacks, return code %d\n", rc);exit(EXIT_FAILURE);}// 连接到MQTT服务器if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS){printf("Failed to connect, return code %d\n", rc);exit(EXIT_FAILURE);}// 订阅主题(当前被注释,可根据需要启用)
#if 0MQTTClient_subscribe(client, topic[0], QOS);printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n", topic[0], CLIENTID, QOS);
#endif return rc;
}// 发送MQTT消息
int mqtt_send(char * key, int value)
{MQTTClient_deliveryToken deliveryToken;MQTTClient_message test2_pubmsg = MQTTClient_message_initializer;// 需要发送的正文char message[1024] = {0};// 设置消息属性test2_pubmsg.qos = QOS;test2_pubmsg.retained = 0;test2_pubmsg.payload = message;// 构建JSON格式的消息sprintf(message,"{\"id\":\"%d\",\"version\":\"1.0\",\"params\":{\"%s\":{\"value\":%d}}}",id++, key, value);test2_pubmsg.payloadlen = strlen(message);printf("%s\n",message);// 发布消息int rc = MQTTClient_publishMessage(client, topic[1], &test2_pubmsg, &deliveryToken);if(MQTTCLIENT_SUCCESS != rc){printf("client to publish failure.. %lu\n", pthread_self());exit(1);}// 等待消息确认printf("Waiting for up to %d seconds for publication on topic %s for client with ClientID: %s\n",(int)(TIMEOUT/1000), topic[0], CLIENTID);MQTTClient_waitForCompletion(client, deliveryToken, TIMEOUT);sleep(1);return rc;
}// 释放MQTT资源
void mqtt_deinit()
{// 断开连接MQTTClient_disconnect(client, 10000);// 销毁客户端MQTTClient_destroy(&client);
}// 主函数
int main(void)
{// 初始化MQTT客户端mqtt_init();// 持续发送随机温度数据while(1){int value = rand() % 100 + 1;mqtt_send("tmp", value);}// 释放资源(实际不会执行到此处)mqtt_deinit();return 0;
}

理想运行结果

{"id":"10000","version":"1.0","params":{"tmp":{"value":42}}}
Waiting for up to 10 seconds for publication on topic $sys/Qon3io17BJ/test1/thing/property/post/reply for client with ClientID: test1
Message arrivedtopic: $sys/Qon3io17BJ/test1/thing/property/post/replymessage: {"id":"10000","code":200,"msg":"success"}{"id":"10001","version":"1.0","params":{"tmp":{"value":75}}}
Waiting for up to 10 seconds for publication on topic $sys/Qon3io17BJ/test1/thing/property/post/reply for client with ClientID: test1
Message arrivedtopic: $sys/Qon3io17BJ/test1/thing/property/post/replymessage: {"id":"10001","code":200,"msg":"success"}...

程序将不断生成随机温度值(1-100之间),以JSON格式发送到OneNET平台,平台会返回确认消息。在OneNET控制台的设备属性中,可以看到实时更新的温度数据。

wireshark抓包
MQTTtest1
Qon3io17BJyversion=2018-10-31&res=products%2FQon3io17BJ%2Fdevices%2Ftest1&et=1837255523&method=md5&sign=vTKE9XEYychiMZcr34TjuQ%3D%3D
http://www.xdnf.cn/news/20201.html

相关文章:

  • 手写MyBatis第53弹: @Intercepts与@Signature注解的工作原理
  • 工业洗地机和商用洗地机的区别是什么?
  • 【基础-单选】关于bundleName,下列说法正确的是?
  • 波特率vs比特率
  • rh134第三章复习总结
  • 贪心算法应用:保险理赔调度问题详解
  • Java中的死锁
  • 使用 MongoDB.Driver 在 C# .NETCore 中实现 Mongo DB 过滤器
  • [数据结构] ArrayList(顺序表)与LinkedList(链表)
  • 万代《宝可梦》主题新品扭蛋公开!史上最大尺寸
  • 机器人控制器开发(传感器层——奥比大白相机适配)
  • 【FastDDS】Layer Transport ( 05-Shared Memory Transport)
  • 天气预报云服务器部署实战
  • 在Java AI项目中实现Function Call功能
  • 计算机毕设大数据方向:基于Spark+Hadoop的餐饮外卖平台数据分析系统【源码+文档+调试】
  • 通过Idea 阿里插件快速部署java jar包
  • 实用向:Linux Shell 脚本实现 CPU / 内存 / 硬盘 + IO / 网络多指标告警(支持 163/QQ/139 邮箱)
  • python调用mysql
  • PDF文件基础-计算机字体
  • 【Luogu_P8118】 「RdOI R3.5」Mystery【Slope Trick】【DP】
  • 深度学习基础概念回顾(Pytorch架构)
  • 【Java实战㉗】Java日志框架实战:Logback与Log4j2的深度探索
  • 大型Go项目中搭建CI/CD流水线
  • 竞价代运营:百度竞价账户托管优化
  • VeeValidate v4 终极指南:精通 Vue 3 组合式 API 表单验证
  • Web Worker 从原理到实战 —— 把耗时工作搬到后台线程,避免页面卡顿
  • 计算机视觉(九):图像轮廓
  • 破局功能割裂、成本高昂、协同低效,遨游天通卫星电话实现一机多能
  • Adobe Illustrator(Ai) 2022矢量设计软件的安装教程与下载地址
  • 【Python自动化】 21.3 Pandas Series 核心数据结构完全指南