当前位置: 首页 > news >正文

MQTT之“SUBSCRIBE报文和SUBACK报文”

SUBSCRIBE报文用来订阅1个主题,也可以一次性订阅多个主题,SUBACK报文用作订阅确认。

1SUBSCRIBE报文

1.1SUBSCRIBE报文之固定报头

由上表可知,“SUBSCRIBE报文的固定报头”的第1个字节的高4位为报文类型,低4位为0010B,从第2个字节开始就是“剩余长度”,可见这个剩余长度的字节数是可变的。注意:剩余长度的字节数最大为4,最少为1

剩余长度(Remaining Length)是“可变报头和有效载荷”的总长度。

1.2SUBSCRIBE报文之可变报头

由上表可知,SUBSCRIBE报文的可变报头只有两个字节,它表示报文标识符,最小值为1,最大值为65535

1.3SUBSCRIBE报文之有效载荷

​​​​​​​

由上表可知,SUBSCRIBE报文的有效载荷,每个“订阅主题名”的长度占两个字节,“服务质量要求”紧跟在每个“订阅主题名”的后面,它告诉“接收方”在发布“该订阅消息”时所使用的“服务质量要求”。上表只表示一个“订阅主题名”,如果一次性订阅多个“主题”,就会有多个“订阅主题名”的长度,主题名和“服务质量要求”。

1.4、生成SUBSCRIBE报文

/*计算“可变报头”和“有效载荷”的字节总数,即订阅主题固定报头中的“剩余长度”值,并返回该值

count表示订阅主题的数量,通常为1,即1次订阅一个主题

topicFilters[]为订阅主题,比如接收消息主题为“04661219C1676702/TimeData

*/

int MQTTSerialize_subscribeLength(int count, MQTTString topicFilters[])

{

         int i;

         int len = 2; /*“可变报头”中的报文标识符占2个字节,packetid */

         int ret;

         for (i = 0; i < count; ++i)//count表示订阅主题的数量,通常为1

         {

                   ret=MQTTstrlen(topicFilters[i]);//求“单一主题”的字节数,保存到ret

                   len += 2 + ret + 1;

/*“有效载荷”中的订阅主题的长度占2个字节,ret为“单一主题”的字节数

“有效载荷”中的“服务质量要求”占1个字节*/

         }

         return len;

}

/*

生成“SUBSCRIBE报文”,保存到buf[]中,并返回该报文的字节总数

返回值值为“SUBSCRIBE报文”的总字节数

buf[]用来保存报文,buflenbuf[]的最大长度

dup表示重发标志;如果DUP标志被设置为0,表示这是客户端或服务端第一次请求发送这个“SUBSCRIBE报文”。

packetid表示报文标识符

count表示订阅主题的数量,通常为1,即1次订阅一个主题

topicFilters[0]指向第1个订阅主题,topicFilters[1]指向第2个订阅主题;

requestedQoSs[0]指向第1个订阅主题所使用的服务质量要求,requestedQoSs[1]指向第2个订阅主题所使用的服务质量要求

*/

int MQTTSerialize_subscribe(unsigned char* buf, int buflen, unsigned char dup, unsigned short packetid, int count,

                   MQTTString topicFilters[], int requestedQoSs[])

{

         unsigned char *ptr = buf;

         MQTTHeader header = {0};

         int rem_len = 0;

         int rc = 0;

         int i = 0;

         int ret;

         rem_len = MQTTSerialize_subscribeLength(count, topicFilters);

         /*计算“可变报头”和“有效载荷”的字节总数,即订阅主题固定报头中的“剩余长度”值,并返回该值*/

         ret=MQTTPacket_len( rem_len );

         //计算对“剩余长度”rem_len进行试编码,然后计算编码后的字节数,并返回该值

         if ( ret > buflen)//查看所给的buf[]的长度是否够用

         {

                   rc = MQTTPACKET_BUFFER_TOO_SHORT;

                   goto exit;

         }

/*“固定报头”第1个字节的高4位,它表示“控制报文的类型”

SUBSCRIBE控制报文的固定报头第1个字节的bit3:0是保留位,必须分别设置为0,0,1,0*/

         header.byte = 0;

         header.bits.type = SUBSCRIBE;//bit7:4表示“控制报文的类型”

         header.bits.dup = dup;//占用bit3,必须设置为0

  header.bits.qos = 1;

/*SUBSCRIBE控制报文之固定报头的保留位,占用bit2:1,设置为01B*/

         writeChar(&ptr, header.byte);

/*添加“SUBSCRIBE报文”之固定报头的第1个字节*/

         ptr += MQTTPacket_encode(ptr, rem_len);

/*对“剩余长度rem_len”进行编码,然后保存到ptr[],返回值为rem_len编码后保存到ptr[]中的字节数;

添加“SUBSCRIBE报文”之固定报头的剩余长度,write remaining length */;

         writeInt(&ptr, packetid);

//添加“SUBSCRIBE报文”之可变报头中的“报文标识符”

        

//添加“有效载荷”

         for (i = 0; i < count; ++i)//count表示订阅主题的数量,通常为1

         {

                   writeMQTTString(&ptr, topicFilters[i]);

//添加“SUBSCRIBE报文”之有效载荷中的“订阅主题”

                   writeChar(&ptr, requestedQoSs[i]);

//添加“SUBSCRIBE报文”之有效载荷中的“服务质量要求”

         }

         rc = ptr - buf;

exit:

         return rc;

}

2SUBACK报文

服务端发送“SUBACK报文”给客户端,用于确认它已收到“SUBSCRIBE报文”。

2.1SUBACK报文之固定报头

由上表可知,“SUBACK报文的固定报头”的第1个字节的高4位为报文类型,低4位为0000B,从第2个字节开始就是“剩余长度”,可见这个剩余长度的字节数是可变的。注意:剩余长度的字节数最大为4,最少为1

剩余长度(Remaining Length)是“可变报头和有效载荷”的总长度。

2.2SUBACK报文之可变报头

由上表可知,SUBACK文的可变报头只有两个字节,它表示报文标识符,最小值为1,最大值为65535

2.3SUBACK报文之有效载荷

由上表可知,SUBACK文的“有效载荷”只有一个字节,返回值为SUBSCRIBE报文”之有效载荷中的“服务质量要求”

注意:由于可变报头只有2个字节,如果只订阅单一主题时,则“SUBACK报文”的有效载荷只有1个字节,因此,在订阅单一主题时,SUBACK文的剩余长度值为3;若订阅2个主题,则SUBACK文的剩余长度值为4,以此类推

2.4SUBACK报文解析

//解码buf[]中的SUBACK的报文,“有效载荷的值”保存在grantedQoSs[][],执行成功,则返回1

//grantedQoSs[][]中的值和SUBSCRIBE报文中的QoS相等

//packetid返回的是报文标识符

//maxcount表示SUBACK报文的“有效载荷”的最大字节数。在“订阅多个主题”时,就有多个“有效载荷”

//*count表示SUBACK文的“有效载荷”的索引

//grantedQoSs[count]是要返回的“有效载荷的值”

//buf[]为接收缓冲区,buflenbuf[]的最大字节总数

int MQTTDeserialize_suback(unsigned short* packetid, int maxcount, int* count, int grantedQoSs[], unsigned char* buf, int buflen)

{

         MQTTHeader header = {0};

         unsigned char* curdata = buf;

         unsigned char* enddata = NULL;

         int rc = 0;

         int mylen;

         header.byte = readChar(&curdata);

         //读“SUBACK报文的固定报头”的第1个字节

         //相当于header.byte = *curdata;curdata++;

         if (header.bits.type != SUBACK) goto exit;//若报文类型错误,则退出

         rc = MQTTPacket_decodeBuf(curdata, &mylen);

//解码后的"剩余长度"保存到mylen,rc为解码前“剩余长度字段”所占的字节数

//在“SUBACK报文”中,若订阅单一主题时,则剩余长度值为3

         curdata =curdata + rc;//指向“SUBACK报文”的“可变报头”

         enddata = curdata + mylen;//enddata指向有效载荷的结束位置

         if (enddata - curdata < 2) goto exit;

         *packetid = readInt(&curdata);

         //读“SUBACK报文”的“可变报头”的报文标识符,占两个字节

         //curdata=curdata+2,此时指向“有效载荷”的起始位置

         *count = 0;

         while (curdata < enddata)//SUBACK文的“有效载荷”,只有1个字节

         {

                   if (*count > maxcount)

                   {

                            rc = -1;

                            goto exit;

                   }

                   grantedQoSs[(*count)] = readChar(&curdata);//返回读到“服务质量要求”

                   (*count)++;

         }

         rc = 1;

exit:

         return rc;

}

http://www.xdnf.cn/news/1202149.html

相关文章:

  • “太赫兹”
  • 【华为机试】210. 课程表 II
  • 自动化测试常用函数
  • XML Expat Parser:深入解析与高效应用
  • 【CDA干货】金融超市电商App经营数据分析案例
  • 写一个3D旋转的python程序
  • 字节跳动开源Coze,开启AI Agent开发新时代?
  • 【Linux篇章】穿越数据迷雾:HTTPS构筑网络安全的量子级护盾,重塑数字信任帝国!
  • 新能源行业B端极简设计:碳中和目标下的交互轻量化实践
  • 【数据架构09】人工智能及数据智能架构篇
  • 群晖Synology Drive:打造高效安全的私有云协作平台
  • 优测推出HarmonyOS全场景测试服务,解锁分布式场景应用卓越品质!
  • httpx 接口测试教程
  • HarmonyOS 6 云开发-用户头像上传云存储
  • 打通视频到AI的第一公里:轻量RTSP服务如何重塑边缘感知入口?
  • UniappDay04
  • Java 排序
  • Kafka——请求是怎么被处理的?
  • flutter使用firebase集成谷歌,苹果登录
  • Claude Launcher:支持Kimi K2的Claude Code可视化启动工具
  • 力扣988. 从叶结点开始的最小字符串
  • 负载均衡集群HAproxy
  • keepalived
  • Redis做混沌测试都需要测哪些场景?预期如何?
  • 安宝特案例丨AR+AI赋能轨道交通制造:破解人工装配难题的创新实践
  • [免费]【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts)【论文+源码+SQL脚本】
  • 【代码解读】通义万相最新视频生成模型 Wan 2.2 实现解析
  • ESP32学习-按键中断
  • 【重学数据结构】二叉搜索树 Binary Search Tree
  • 源代码管理工具有哪些?有哪些管理场景?