【鸿蒙开发】Hi3861学习笔记- MQTT通信
00. 目录
文章目录
- 00. 目录
- 01. MQTT概述
- 02. MQTT服务器
- 03. MQTT客户端流程
- 04. MQTT第三方库介绍
- 05. MQTT相关API
- 5.1 MQTTSerialize_connect
- 5.2 MQTTPacket_read
- 5.3 MQTTDeserialize_connack
- 5.4 MQTTSerialize_subscribe
- 5.5 MQTTDeserialize_suback
- 5.6 MQTTDeserialize_publish
- 06. 硬件设计
- 07. 软件设计
- 08. 实验现象
- 09. 附录
01. MQTT概述
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
MQTT协议身份和消息格式
有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
MQTT传输的消息分为:主题(Topic)和负载(payload)两部分。Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)。payload,负载,也就是指消息的内容。
MQTT协议中的订阅、主题、会话
订阅(Subscription):订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联,一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器会话(Session),每个客户端与服务器建立连接后就是一个会话。
主题名(Topic Name):就是连接到一个应用程序消息的一个标签。该标签与服务器的订阅的相匹配,服务器会将消息发送给订阅所匹配标签的每个客户端。
MQTT 协议的特点
MQTT 是一种轻量、简单、开放,低开销、低带宽占用的即时通讯协议。使用场景有:机器与机器(M2M)、物联网(IoT)。该协议主要针对嵌入式设备,这些设备一般工作于TCP/IP网络。MQTT会构建底层网络传输,它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。对传输消息有三种服务质量(QoS):
QoS 0:最多一次,这一级别会发生消息丢失或重复,消息发布依赖于底层TCP/IP网络。即:<=1。
QoS 1:最少一次(承诺消息将至少传送一次给订阅者),这一级别会确保消息到达,但消息可能会重复。即:>=1。
QoS 2:刚好一次(承诺消息仅传送到目的地一次),确保消息只有一次到达。即:=1。
02. MQTT服务器
为了测试方便,选择国内免费 MQTT 服务器,服务器地址:broker.emqx.io端口:1883,如下:
03. MQTT客户端流程
编写MQTT客户端代码流程如下:
● 初始化 MQTT 客户端,并设置网络参数、连接参数等。
● 连接到 MQTT 服务器(Broker)。
● 订阅感兴趣的主题。
● 发送消息到指定的主题。
● 处理接收到的消息。
04. MQTT第三方库介绍
使用 MQTT 采用第三方开源库,其位置如下:
cJSON 文件夹:主要存放 mqtt 与云端数据格式相关处理文件;
paho.mqtt.embedded-c 文件夹:主要存放 mqtt 协议相关文件;
05. MQTT相关API
src\third_party\paho.mqtt.embedded-c\MQTTPacket\src\MQTTConnect.h
5.1 MQTTSerialize_connect
MQTTSerialize_connect 函数是 MQTT 协议栈(如 Mosquitto, EclipsePaho MQTT, 或其他 MQTT 客户端库)中的一个关键函数,它用于生成一个 MQTT CONNECT 控制包,这个包是客户端在尝试与 MQTT 服务器(也称为代理或broker)建立连接时发送的第一个包。
/*** Serializes the connect options into the buffer.* @param buf the buffer into which the packet will be serialized* @param len the length in bytes of the supplied buffer* @param options the options to be used to build the connect packet* @return serialized length, or error if 0*/
int MQTTSerialize_connect(unsigned char* buf, int buflen, MQTTPacket_connectData* options)
功能:生成一个 MQTT CONNECT 控制包参数:buf:指向用于存储序列化后的 MQTT CONNECT 消息的缓冲区的指针。buflen:缓冲区的长度,即 buf 可以存储的最大字节数。这个长度必须足够大,以容纳根据 options 生成的 MQTT CONNECT 消息。options:指向包含 MQTT 连接选项的结构体的指针。这个结构体通常包含客户端 ID、用户名、密码、保持连接时间、清理会话标志、遗嘱消息等信息。返回值:如果成功,函数返回一个正整数,表示序列化到缓冲区中的 MQTT CONNECT消息的长度(字节数)。如果失败(例如,由于缓冲区太小无法容纳整个消息),函数返回一个负数,表示错误代码。
一旦 MQTTSerialize_connect 函数成功执行,你就可以使用另一个函数(如网络发送函数)将 buf 缓冲区中的 MQTT CONNECT 消息发送到 MQTT 服务器了。
MQTTPacket_connectData类型
typedef struct
{/** The eyecatcher for this structure. must be MQTC. */char struct_id[4];/** The version number of this structure. Must be 0 */int struct_version;/** Version of MQTT to be used. 3 = 3.1 4 = 3.1.1*/unsigned char MQTTVersion;MQTTString clientID;unsigned short keepAliveInterval;unsigned char cleansession;unsigned char willFlag;MQTTPacket_willOptions will;MQTTString username;MQTTString password;
} MQTTPacket_connectData;
5.2 MQTTPacket_read
从某种数据源(如网络套接字)中读取 MQTT 数据包到指定的缓冲区中。
/*** Helper function to read packet data from some source into a buffer* @param buf the buffer into which the packet will be serialized* @param buflen the length in bytes of the supplied buffer* @param getfn pointer to a function which will read any number of bytes from the needed source* @return integer MQTT packet type, or -1 on error* @note the whole message must fit into the caller's buffer*/
int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int))
功能:从某种数据源(如网络套接字)中读取 MQTT 数据包到指定的缓冲区中。
参数:buf:指向用于存储从数据源读取的 MQTT 数据包的缓冲区的指针。buflen:缓冲区的长度,即 buf 可以存储的最大字节数。getfn:一个指向函数的指针,该函数负责从数据源(如网络套接字)读取数据到提供的缓冲区中。这个读取函数应该有两个参数:一个是指向缓冲区的指针,另一个是请求读取的字节数;它应该返回实际读取的字节数,或者在发生错误时返回某个特定的错误码。返回值:整数 MQTT 数据包类型,或错误时为-1。
5.3 MQTTDeserialize_connack
MQTTDeserialize_connack 函数是 MQTT C 客户端库中用于反序列化(即解析)从 MQTT 服务器接收到的 CONNACK(连接确认)消息的函数。这个函数通常会将原始的数据包内容(即 buf 指向的字节序列)解析为 MQTT 协议定义的CONNACK 消息的各个字段,并将这些字段的值存储在提供的参数中。
/*** Deserializes the supplied (wire) buffer into connack data - return code* @param sessionPresent the session present flag returned (only for MQTT 3.1.1)* @param connack_rc returned integer value of the connack return code* @param buf the raw buffer data, of the correct length determined by the remaining length field* @param len the length in bytes of the data in the supplied buffer* @return error code. 1 is success, 0 is failure*/
int MQTTDeserialize_connack(unsigned char* sessionPresent, unsigned char* connack_rc, unsigned char* buf, int buflen)
功能:MQTT C 客户端库中用于反序列化(即解析)从 MQTT 服务器接收到的 CONNACK(连接确认)消息
参数:sessionPresent:指向一个变量的指针,该变量将存储会话是否存在的标志。在 MQTT 协议中,CONNACK 消息的第一个字节(除了控制包
类型之外)的最低有效位(LSB)用于指示会话是否存在(如果设置为 1,则表示会话存在;如果设置为 0,则表示会话不存在)。connack_rc:指向一个变量的指针,该变量将存储连接确认的返回码(也称为连接响应码)。返回码用于指示连接请求是否被接受,以及
拒绝的原因(如果连接被拒绝)。buf:指向包含原始 CONNACK 消息数据包的缓冲区的指针。buflen:缓冲区的长度,即 buf 中数据的字节数。
返回值:1 代表成功,0 代表失败;
5.4 MQTTSerialize_subscribe
MQTTSerialize_subscribe 函数是 MQTT C 客户端库中用于序列化(即构建)SUBSCRIBE 消息的函数。SUBSCRIBE 消息是 MQTT 客户端发送给 MQTT 服务器以请求订阅一个或多个主题的消息。这个函数根据提供的参数,将 SUBSCRIBE消息的各个字段编码成 MQTT 协议定义的格式,并存储在提供的缓冲区中。
/*** Serializes the supplied subscribe data into the supplied buffer, ready for sending* @param buf the buffer into which the packet will be serialized* @param buflen the length in bytes of the supplied bufferr* @param dup integer - the MQTT dup flag* @param packetid integer - the MQTT packet identifier* @param count - number of members in the topicFilters and reqQos arrays* @param topicFilters - array of topic filter names* @param requestedQoSs - array of requested QoS* @return the length of the serialized data. <= 0 indicates error*/
int MQTTSerialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, int count, MQTTString topicFilters[], int requestedQoSs[])
功能:MQTT C 客户端库中用于序列化(即构建)SUBSCRIBE 消息参数:buf:指向一个足够大的缓冲区,用于存储序列化后的SUBSCRIBE 消息。buflen:缓冲区的长度,即 buf 中可以存储的字节数。dup:SUBSCRIBE 消息的控制包头中的 DUP 标志。对于SUBSCRIBE 消息,此标志应始终为 0,因为 SUBSCRIBE 消息不是重复发送的 QoS1 或 QoS 2 消息的一部分。packetid:SUBSCRIBE 消息的唯一包标识符。MQTT 客户端使用此标识符来匹配 SUBSCRIBE 消息的 SUBACK 响应。count:要订阅的主题过滤器的数量。topicFilters:一个 MQTTString 类型的数组,包含要订阅的主题过滤器。MQTTString 通常是一个结构体,包含指向主题名称的指针和主题名称的长度。requestedQoSs:一个整数数组,与 topicFilters 数组中的每个主题过滤器相对应,指定了每个主题的请求服务质量(QoS)级别。返回值:返回序列化数据的长度。<=0 表示错误;
5.5 MQTTDeserialize_suback
MQTTDeserialize_suback 函数是 MQTT C 客户端库中用于反序列化(即解析)SUBACK 消息的函数。SUBACK 消息是 MQTT 服务器发送给客户端的,以响应客户端的 SUBSCRIBE 消息,并告知每个订阅请求的授予服务质量(QoS)级别。
/*** Deserializes the supplied (wire) buffer into suback data* @param packetid returned integer - the MQTT packet identifier* @param maxcount - the maximum number of members allowed in the grantedQoSs array* @param count returned integer - number of members in the grantedQoSs array* @param grantedQoSs returned array of integers - the granted qualities of service* @param buf the raw buffer data, of the correct length determined by the remaining length field* @param buflen the length in bytes of the data in the supplied buffer* @return error code. 1 is success, 0 is failure*/
int MQTTDeserialize_suback(unsigned short* packetid, int maxcount, int* count, int grantedQoSs[], unsigned char* buf, int buflen)
功能:MQTT C 客户端库中用于反序列化(即解析)SUBACK 消息
参数:packetid:指向一个变量的指针,该变量将存储从 SUBACK消息中解析出的包标识符。这个包标识符应该与客户端发送的 SUBSCRIBE 消息
中的包标识符相匹配。maxcount:grantedQoSs数组可以存储的最大元素数量。这个值应该足够大,以容纳客户端在 SUBSCRIBE 消息中请求的所有主题的 QoS 授予信息。count:指向一个变量的指针,该变量将存储从 SUBACK 消息中解析出的授予 QoS 级别的数量。这个数量应该与客户端在 SUBSCRIBE 消息中请求的主题数量相匹配。grantedQoSs:一个整数数组,用于存储从 SUBACK 消息中解析出的每个主题的授予 QoS 级别。buf:指向包含 SUBACK 消息二进制数据的缓冲区的指针。buflen:缓冲区的长度,即 buf 中包含的字节数。返回值:返回错误代码。1 代表成功,0 代表失败
5.6 MQTTDeserialize_publish
MQTTDeserialize_publish 函数是 MQTT C 客户端库中用于反序列化(即解析)PUBLISH 消息的函数。该函数从给定的二进制数据缓冲区中解析出 PUBLISH消息的各个字段,包括控制包头的标志、主题名称、有效负载等,并将解析出的信息存储在提供的变量和指针中。
/*** Deserializes the supplied (wire) buffer into publish data* @param dup returned integer - the MQTT dup flag* @param qos returned integer - the MQTT QoS value* @param retained returned integer - the MQTT retained flag* @param packetid returned integer - the MQTT packet identifier* @param topicName returned MQTTString - the MQTT topic in the publish* @param payload returned byte buffer - the MQTT publish payload* @param payloadlen returned integer - the length of the MQTT payload* @param buf the raw buffer data, of the correct length determined by the remaining length field* @param buflen the length in bytes of the data in the supplied buffer* @return error code. 1 is success*/
int MQTTDeserialize_publish(unsigned char* dup, int* qos, unsigned char* retained, unsigned short* packetid, MQTTString* topicName, unsigned char** payload, int* payloadlen, unsigned char* buf, int buflen)
功能:MQTT C 客户端库中用于反序列化(即解析)PUBLISH 消息
参数:dup:指向一个变量的指针,该变量将存储从 PUBLISH消息中解析出的 DUP 标志。DUP 标志用于 QoS > 0 的消息,表示这是一个重复的
消息。qos:指向一个变量的指针,该变量将存储从 PUBLISH 消息中解析出的 QoS(服务质量)级别。retained:指向一个变量的指针,该变量将存储从 PUBLISH消息中解析出的 RETAIN 标志。RETAIN 标志表示服务器应该保留该消息,以便在后续的新订阅者订阅该主题时发送给它们。packetid:指向一个变量的指针,该变量将存储从 QoS >0 的 PUBLISH 消息中解析出的包标识符。对于 QoS 0 的消息,此字段通常不会被设置。topicName:指向一个 MQTTString 结构体的指针,该结构体将用于存储从 PUBLISH 消息中解析出的主题名称。请注意,这个结构体可能需要在调用函数之前被分配和初始化,以便函数能够填充它。payload:指向一个指针的指针,该指针将指向从 PUBLISH消息中解析出的有效负载数据。请注意,这个指针可能会指向 buf 缓冲区内部
的位置,或者在某些实现中,可能会分配新的内存来存储有效负载数据。payloadlen:指向一个变量的指针,该变量将存储从 PUBLISH 消息中解析出的有效负载数据的长度。buf:指向包含 PUBLISH 消息二进制数据的缓冲区的指针。buflen:缓冲区的长度,即 buf 中包含的字节数。返回值:返回错误代码。1 是成功
06. 硬件设计
由于 Hi3861 内置 WIFI 功能,所以直接在开发板上使用即可,无需额外连接。
07. 软件设计
gn文件
# Copyright (c) 2024 www.prechin.cn
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License. static_library("template") {sources = ["template.c","//vendor/sziit/hi3861/common/bsp/src/bsp_wifi.c", "//vendor/sziit/hi3861/common/bsp/src/bsp_mqtt.c", "//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTConnectClient.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTConnectServer.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTDeserializePublish.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTFormat.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTPacket.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTSerializePublish.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTSubscribeClient.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTSubscribeServer.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTUnsubscribeServer.c","//third_party/paho.mqtt.embedded-c/MQTTPacket/src/MQTTUnsubscribeClient.c",]include_dirs = ["//utils/native/lite/include","//kernel/liteos_m/kal/cmsis","//base/iot_hardware/peripheral/interfaces/kits","//vendor/sziit/hi3861/common/bsp/include","//foundation/communication/wifi_lite/interfaces/wifiservice","//third_party/paho.mqtt.embedded-c/MQTTPacket/src","//third_party/cJSON",]
}
bsp_mqtt.h
#ifndef BSP_MQTT_H
#define BSP_MQTT_H#include "cmsis_os2.h"
#include "hi_io.h"
#include "hi_gpio.h"//函数声明
int MQTTClient_connectServer(const char *ip_addr, int ip_port);
int MQTTClient_unConnectServer(void);
int MQTTClient_subscribe(char *subTopic);
int MQTTClient_init(char *clientID, char *userName, char *password);
int MQTTClient_pub(char *pub_Topic, unsigned char *payloadData, int payloadLen);
int MQTTClient_sub(void);
extern int8_t(*p_MQTTClient_sub_callback)(unsigned char *topic, unsigned char *payload);#endif
bsp_mqtt.c
#include <stdio.h>#include <unistd.h>#include <string.h>#include <stdlib.h>#include "lwip/netifapi.h"#include "lwip/sockets.h"#include "wifi_device.h"#include "ohos_init.h"#include "MQTTPacket.h"#include "bsp_mqtt.h"#define MQTT_BUFF_MAX_SIZE 512int g_tcp_socket_fd = 0; // 网络套接字unsigned char mqttBuff[MQTT_BUFF_MAX_SIZE] = {0};// 发送网络数据static int transport_sendPacketBuffer(unsigned char *buf, int buflen){int rc = send(g_tcp_socket_fd, buf, buflen, 0);return (rc <= 0) ? 0 : 1;}// 接收网络数据static int transport_getdata(unsigned char *buf, int count){int rc = recv(g_tcp_socket_fd, buf, count, 0);return rc;}// 连接服务器int MQTTClient_connectServer(const char *ip_addr, int ip_port){if (ip_addr == NULL) {return -1;}int res = 0; // 函数返回值struct sockaddr_in tcpServerConfig; // tcp服务器信息// 创建TCP套接字g_tcp_socket_fd = socket(AF_INET, SOCK_STREAM, 0);if (g_tcp_socket_fd < 0) {printf("Failed to create Socket\r\n");}// 连接TCP服务器tcpServerConfig.sin_family = AF_INET; // IPV4tcpServerConfig.sin_port = htons(ip_port); // 填写服务器的IP端口号tcpServerConfig.sin_addr.s_addr = inet_addr(ip_addr); // 填写服务器的IP地址res = connect(g_tcp_socket_fd, (struct sockaddr *)&tcpServerConfig, sizeof(tcpServerConfig)); // 连接服务器if (res == -1) {printf("Failed to connect to the server\r\n");return -1;}printf("Connection to server successful\r\n");return 0;}// 断开TCP服务器 0:成功, -1:失败int MQTTClient_unConnectServer(void){int ret = 0;printf("Server shut down successfully\r\n");ret = close(g_tcp_socket_fd);g_tcp_socket_fd = 0;return ret;}// mqtt客户端 订阅主题int MQTTClient_subscribe(char *subTopic){if (subTopic == NULL) {printf("Incorrect parameters\r\n");return -1;}int len = 0, res = 0;int subcount = 0, granted_qos = 0, req_qos = 0;unsigned short submsgid = 0;MQTTString topicString = MQTTString_initializer;/* subscribe */topicString.cstring = subTopic;len = MQTTSerialize_subscribe(mqttBuff, sizeof(mqttBuff), 0, 1, 1, &topicString, &req_qos);if (len <= 0) {printf("MQTTSerialize_subscribe Error %d\r\n", len);return -1;}res = transport_sendPacketBuffer(mqttBuff, len);if (res != 1) {printf("transport_sendPacketBuffer Error %d\r\n", res);return -1;}sleep(1);memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));/* wait for suback */if (MQTTPacket_read(mqttBuff, sizeof(mqttBuff), transport_getdata) != SUBACK) {printf("MQTTPacket_read Error\r\n");return -1;}if (MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, mqttBuff, sizeof(mqttBuff)) != 1) {printf("MQTTDeserialize_suback Error\r\n");return -1;}printf("MQTT subscribed to topics successfully\r\n");return 0;}// 保持在线时长 60s#define MQTT_KEEP_ALIVE 60#define MQTT_DELAY_TIME 3// mqtt客户端 初始化int MQTTClient_init(char *clientID, char *userName, char *password){if (clientID == NULL || userName == NULL || password == NULL) {printf("Incorrect parameters\r\n");return -1;}int res = 0, len = 0, i = 0;int mqtt_read_len = 10;unsigned char sessionPresent = 0, connack_rc = 0;MQTTPacket_connectData mqttData = MQTTPacket_connectData_initializer;// 初始化MQTT客户端mqttData.clientID.cstring = clientID;mqttData.username.cstring = userName;mqttData.password.cstring = password;mqttData.cleansession = true; // 是否初始化的时候,清除上一次的对话mqttData.keepAliveInterval = MQTT_KEEP_ALIVE;// 组MQTT消息包len = MQTTSerialize_connect(mqttBuff, sizeof(mqttBuff), &mqttData);if (len <= 0) {printf("MQTTSerialize_connect Error %d\r\n", res);return -1;}res = transport_sendPacketBuffer(mqttBuff, len);if (res != 1) {printf("transport_sendPacketBuffer Error %d\r\n", res);return -1;}sleep(MQTT_DELAY_TIME);/* 打印发送出去的数据帧,调试用 */printf("MQTT_sendPacket: \r\n");for (i = 0; i < len; i++) {printf("%x ", mqttBuff[i]);}printf("\r\n");memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));/* wait for connack */if (MQTTPacket_read(mqttBuff, sizeof(mqttBuff), transport_getdata) != CONNACK) {printf("MQTTPacket_read != CONNACK\r\n");}printf("MQTT_recvPacket: \r\n");/* 打印服务器返回的消息,调试用 */for (i = 0; i < mqtt_read_len; i++) {printf("%x ", mqttBuff[i]);}printf("\r\n");if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, mqttBuff, sizeof(mqttBuff)) != 1 || connack_rc != 0) {printf("Unable to connect, return code %d\r\n", connack_rc);memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));return -1;} else {printf("MQTT initialized successfully\r\n");}memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));return 0;}#define MQTT_PUB_DATA_TIME (100 * 1000)int MQTTClient_pub(char *pub_Topic, unsigned char *payloadData, int payloadLen){if (payloadData == NULL) {printf("Incorrect parameters\r\n");return -1;}printf("pubTopic: %s\n", pub_Topic);printf("pubData: %s\n", payloadData);int ret = 0, len = 0;unsigned short retry_count = 5; // 重发次数unsigned char sendBuff[MQTT_BUFF_MAX_SIZE] = {0};MQTTString topicString = MQTTString_initializer;topicString.cstring = pub_Topic;len = MQTTSerialize_publish(sendBuff, sizeof(sendBuff), 0, 0, 0, 0, topicString,payloadData,payloadLen);while (--retry_count > 0) {ret = transport_sendPacketBuffer(sendBuff, len);if (ret == 1) {break;}printf("Send MQTT_Data Fail\r\n");usleep(MQTT_PUB_DATA_TIME);}if (!retry_count && ret != 1) {printf("transport_sendPacketBuffer Error %d\r\n", ret);return -1;}// printf("send==>%s", payloadData);return 0;}unsigned char mqtt_topic[200];int8_t (*p_MQTTClient_sub_callback)(unsigned char *topic, unsigned char *payload);int MQTTClient_sub(void){int qos, payloadlen_in;unsigned char dup, retained;unsigned short msgid;unsigned char *payload_in;MQTTString receivedTopic;memset_s(mqttBuff, sizeof(mqttBuff), 0, sizeof(mqttBuff));// $oc/devices/63ad5a6cc4efcc747bd75973_lamp/sys/commands/request_id=42c20ffb-0885-4f6e-97b5-45d8f613efafif (MQTTPacket_read(mqttBuff, sizeof(mqttBuff), transport_getdata) == PUBLISH) {MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,&payload_in, &payloadlen_in, mqttBuff, sizeof(mqttBuff));printf("data: %s\n", receivedTopic.lenstring.data);printf("length: %d\n", strlen(receivedTopic.lenstring.data) - payloadlen_in);printf("payload_length: %d\n", payloadlen_in);memcpy_s(mqtt_topic, sizeof(mqtt_topic),receivedTopic.lenstring.data, strlen(receivedTopic.lenstring.data) - payloadlen_in);printf("topic: %s\n", mqtt_topic);printf("payload: %s\n", payload_in);p_MQTTClient_sub_callback(mqtt_topic, payload_in);}}
template.c
#include <stdio.h>
#include <unistd.h>#include "ohos_init.h"
#include "cmsis_os2.h"#include "bsp_wifi.h"
#include "bsp_mqtt.h"
#include "lwip/sockets.h"#define TASK_STACK_SIZE 1024#define SERVER_IP_ADDR "54.244.173.190" //broker.emqx.io
#define SERVER_IP_PORT 1883
#define MQTT_TOPIC_SUB "subTopic"
#define MQTT_TOPIC_PUB "pubTopic"
#define TASK_INIT_TIME 2 // s
#define MQTT_RECV_TASK_TIME (200 * 1000) // usstatic const char *data = "hello shenzhen";//任务1ID
osThreadId_t task1_id;
osThreadId_t task2_id;int8_t mqtt_sub_payload_callback(unsigned char *topic, unsigned char *payload)
{printf("[info] topic:[%s] recv<== %s\r\n", topic, payload);
}void mqtt_recv_task(void)
{while (1) {MQTTClient_sub();usleep(MQTT_RECV_TASK_TIME);}
}//线程回调入口函数
void task1 (void *argument)
{int sockfd = -1;int clientid = -1;int ret = -1;int len = -1;char buf[128];struct sockaddr_in addr;struct sockaddr_in from;//连接到WIFIWiFi_connectHotspots("IOT", "iot12345678");// 连接MQTT服务器if (MQTTClient_connectServer(SERVER_IP_ADDR, SERVER_IP_PORT) != 0) {printf("[error] MQTTClient_connectServer\r\n");} else {printf("[success] MQTTClient_connectServer\r\n");}sleep(TASK_INIT_TIME);// 初始化MQTT客户端if (MQTTClient_init("mqtt_client_123", "username", "password") != 0) {printf("[error] MQTTClient_init\r\n");} else {printf("[success] MQTTClient_init\r\n");}sleep(TASK_INIT_TIME);// 订阅Topicif (MQTTClient_subscribe(MQTT_TOPIC_SUB) != 0) {printf("[error] MQTTClient_subscribe\r\n");} else {printf("[success] MQTTClient_subscribe\r\n");}sleep(TASK_INIT_TIME); osThreadAttr_t options;options.name = "mqtt_recv_task";options.attr_bits = 0;options.cb_mem = NULL;options.cb_size = 0;options.stack_mem = NULL;options.stack_size = 1024*5;options.priority = osPriorityNormal;task2_id = osThreadNew((osThreadFunc_t)mqtt_recv_task, NULL, &options);if (task2_id != NULL) {printf("ID = %d, Create mqtt_recv_task_id is OK!\r\n", task2_id);}while (1) {MQTTClient_pub(MQTT_TOPIC_PUB, "hello world!!!", strlen("hello world!!!"));sleep(TASK_INIT_TIME);}}/*** @description: 初始化并创建任务* @param {*}* @return {*}*/
static void template_demo(void)
{ osThreadAttr_t attr;attr.name = "task1"; //任务名称attr.attr_bits = osThreadDetached; //分离状态attr.cb_mem = NULL;attr.cb_size = 0;attr.stack_mem = NULL;attr.stack_size = TASK_STACK_SIZE * 15;attr.priority = osPriorityNormal;p_MQTTClient_sub_callback = &mqtt_sub_payload_callback;//创建任务1task1_id = osThreadNew(task1, NULL, &attr);if (NULL != task1_id){printf("任务1创建OK task1_id = %d\n", task1_id);}}
SYS_RUN(template_demo);
08. 实验现象
实验现象:连接路由器热点,打开通信猫 MQTT 调试客户端和串口调试助手,连接好服务器后设置正确的订阅和发布主题即可收到消息,同时串口助手也能收到发布的消息。