MQTT报文的数据结构
看完ioLibrary库中的MQTT驱动后,还是整理一下,加深理解。
1、报文组成
报文通常由3个部分组成:固定报头,可变报头和有效载荷。有的报文没有有效载荷,还有的没有可变报头和有效载荷,报文类型不同,报文的组成也不同。
2、报文类型定义
#define MQTT_FAILURE 0 /*不能识别的报文*/
#define MQTT_CONNECT 1 /*表示“连接报文”,客户端请求连接服务端*/
#define MQTT_CONNACK 2 /*表示“连接应答报文”,服务端告诉客户端,“连接报文”被确认了*/
#define MQTT_PUBLISH 3 /*表示“发布消息报文”,客户端发送消息给服务器,或者是服务器发送消息给客户端*/
#define MQTT_PUBACK 4 /*表示“发布确认报文”,它是对QoS等级1的“发布消息报文”的响应*/
#define MQTT_PUBREC 5 /*表示“发布收到报文”,PUBREC报文是对QoS等级2的“发布消息报文”的响应*/
#define MQTT_PUBREL 6 /*表示“发布释放报文”,PUBREL报文是对QoS等级2的“发布收到报文”的响应*/
#define MQTT_PUBCOM 7 /*表示“发布完成报文”,PUBCOM报文是对QoS等级2的“发布释放报文”的响应*/
#define MQTT_SUBSCRIBE 8 /*表示“订阅主题报文”,“订阅主题报文”的固定报头第1个字节的bit3:0是保留位,必须分别设置为0,0,1,0*/
#define MQTT_SUBACK 9 /*表示“订阅确认报文”,客户端向服务器订阅消息后,服务器要发送这个应答,表示客户端订阅消息成功*/
#define MQTT_UNSUBSCRIBE 10 /*表示“取消订阅报文”,客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题*/
#define MQTT_UNSUBACK 11 /*表示“取消订阅确认报文”,服务端发送“UNSUBACK报文”给客户端用于确认收到“UNSUBSCRIBE报文”*/
#define MQTT_PINGREQ 12 /*表示“心跳请求报文”,用于在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端,它还活着*/
#define MQTT_PINGRESP 13 /*表示“心跳响应报文”,服务端发送PINGRESP报文响应客户端的PINGREQ报文。表示服务端还活着*/
#define MQTT_DISCONNECT 14 /*表示“断开连接报文”,DISCONNECT报文是客户端发给服务端的最后一个控制报文。表示客户端正常断开连接。*/
2、固定报头的第1个字节
#define MQTT_CONNECT_FixedHeaderByte1 0x10 //“CONNECT报文的固定报头”字节1
#define MQTT_CONNACK_FixedHeaderByte1 0x20 //“CONNACK报文的固定报头”字节1
#define MQTT_PUBLISH_FixedHeaderByte1 0x30
//“PUBLISH报文的固定报头”字节1,只能确定高4位,低4位可变
#define MQTT_PUBACK_FixedHeaderByte1 0x40 //“PUBACK报文的固定报头”字节1
#define MQTT_PUBREC_FixedHeaderByte1 0x50 //“PUBREC报文的固定报头”字节1
#define MQTT_PUBREL_FixedHeaderByte1 0x62 //“PUBREL报文的固定报头”字节1
#define MQTT_PUBCOM_FixedHeaderByte1 0x70 //“PUBCOM报文的固定报头”字节1
#define MQTT_SUBSCRIBE_FixedHeaderByte1 0x82 //“SUBSCRIBE报文的固定报头”字节1
#define MQTT_SUBACK_FixedHeaderByte1 0x90 //“SUBACK报文的固定报头”字节1
#define MQTT_UNSUBSCRIBE_FixedHeaderByte1 0xA2
//“UNSUBSCRIBE报文的固定报头”字节1
#define MQTT_UNSUBACK_FixedHeaderByte1 0xB0 //“UNSUBACK报文的固定报头”字节1
#define MQTT_PINGREQ_FixedHeaderByte1 0xC0 //“PINGREQ报文的固定报头”字节1
#define MQTT_PINGRESP_FixedHeaderByte1 0xD0 //“PINGRESP报文的固定报头”字节1
#define MQTT_DISCONNECT_FixedHeaderByte1 0xE0 //“DISCONNECT报文的固定报头”字节1
3、固定报头中的剩余长度
#define MQTT_CONNACK_RemanentLength 2 //“CONNACK报文的固定报头度”中的剩余长值
#define MQTT_PUBACK_RemanentLength 2 //“PUBACK报文的固定报头度”中的剩余长值
#define MQTT_PUBREC_RemanentLength 2 //“PUBREC报文的固定报头度”中的剩余长值
#define MQTT_PUBREL_RemanentLength 2 //“PUBREL报文的固定报头度”中的剩余长值
#define MQTT_PUBCOM_RemanentLength 2 //“PUBCOM报文的固定报头度”中的剩余长值
#define MQTT_SUBACK_RemanentLength 3
//在订阅单一主题时,“SUBACK报文的固定报头度”中的剩余长值为3,每增加一个订阅主题,其值加1
#define MQTT_UNSUBACK_RemanentLength 2
//“UNSUBACK报文的固定报头度”中的剩余长值
#define MQTT_PINGREQ_RemanentLength 0 //“PINGREQ报文的固定报头度”中的剩余长值
#define MQTT_PINGRESP_RemanentLength 0 //“PINGRESP报文的固定报头度”中的剩余长值
#define MQTT_DISCONNECT_RemanentLength 0
//“DISCONNECT报文的固定报头度”中的剩余长值
4、PUBLISH报文的服务质量等级
#define QOS0 0
#define QOS1 1
#define QOS2 2
5、CONNECT报文的结构
#define KeepAliveTime_Second 120 //定义“保持活着的间隔时间”为120秒
#define MQTTVersion 4 //MQTT的版本: 3 = 3.1; 4 = 3.1.1
typedef union //“CONNECT报文的可变报头”中的连接标志
{
uint8_t byte;
struct
{
unsigned int : 1; //bit0为保留位
unsigned int cleansession : 1;
/*bit1=1表示“清理会话(CleanSession)标志”置1,客户端和服务端必须丢
弃之前的任何会话,并开始一个新的会话。cleansession flag */
unsigned int will : 1;
/*bit2=0表示如果“遗嘱标志”被设置为0,则“遗嘱保留(Will Retain)标志”也必须设置为0;
bit2=1表示如果遗嘱标志被设置为1,连接标志中的“遗嘱QoS”和“遗嘱保留(Will Retain)”
字段会被服务端用到,同时有效载荷中必须包含“Will Topic”和“Will Message”字段。will flag */
unsigned int willQoS : 2;
/*bit4:3=00表示“遗嘱QoS0”;
bit4:3=01表示“遗嘱QoS1”;
bit4:3=10表示“遗嘱QoS2”;
will QoS value */
unsigned int willRetain : 1;
/*bit5=1表示“遗嘱保留(Will Retain)标志”置1,则服务端必须将
“遗嘱消息”当作“保留消息”发布;will retain setting */
unsigned int password : 1;
/*bit6=1表示“密码标志”置1,则 “有效载荷”中必须包含“密码
字段”;3.1 password */
unsigned int username : 1;
/*bit7=1表示“用户名标志”置1,则“有效载荷”中必须包含“用户名字段”;
3.1 user name */
} bits;
}ConnectFlags_def;//“CONNECT报文的可变报头”中的连接标志
///////CONNECT报文的结构定义///////
#define RemanentLength_BufferSize1 4 //剩余长度要可变报头和有效载荷计算
//CONNECT报文的“固定报头”
typedef struct
{
uint8_t byte1;
//固定报头的字节1,“bit7:4”表示“报文的类型”,“bit3:0”为保留位
uint8_t RemanentLength_Buffer[RemanentLength_BufferSize1];
//从固定报头的字节2开始为剩余长度,最多有4个字节,它是编码后的值
}CONNECTMessage_FixedHeader_Def;
//CONNECT报文的“可变报头”,固定为10个字节
typedef struct
{
uint16_t ProtocolNameLength; //协议名长度为4
uint8_t ProtocolNameBuffer[4];//协议名为"MQTT"
uint8_t ProtocolLevel;
//协议级别
//MQTT版本为3.1.1时,“协议级别”为4;
//MQTT版本为3.1时,“协议级别”为3;
ConnectFlags_def ConnectFlags;//可变报头中的“连接标志”
uint16_t KeepAliveTime; //可变报头中的“保持连接时间”
}CONNECTMessage_VariableHeader_Def;
#define ClientID_BufferSize1 17
#define Will_TopicNameBufferSize1 4
#define Will_MessageBufferSize1 4
#define Username_BufferSize1 17
#define Password_BufferSize1 17
//CONNECT报文的“有效载荷”
typedef struct
{
uint8_t ClientID_Buffer[ClientID_BufferSize1];
//客户端标识符,如:"04661219C1676702"
struct
{
uint16_t TopicNameLength;//“遗嘱主题”的长度
uint8_t TopicNameBuffer[Will_TopicNameBufferSize1];
//遗嘱主题
uint16_t MessageLength;//“遗嘱消息”的长度
uint8_t MessageBuffer[Will_MessageBufferSize1];
//遗嘱消息
}Will;
struct
{
uint16_t Length;//“登录服务器的用户名”的长度
uint8_t Buffer[Username_BufferSize1];
//登录服务器的用户名
}Username;
struct
{
uint16_t Length;//“登录服务器的用户密码”的长度
uint8_t Buffer[Password_BufferSize1];
//登录服务器的用户密码
}Password;
}CONNECTMessage_Payload_Def;
///////CONNACK报文的结构定义///////
//CONNACK报文的“固定报头”
typedef struct
{
uint8_t byte1;
//固定报头的字节1,“bit7:4”表示“报文的类型”,“bit3:0”为保留位
uint8_t RemanentLength_Buffer[1];
//CONNACK报文的剩余长度值为0x02
}CONNACKMessage_FixedHeader_Def;
//CONNACK报文的“可变报头”,固定为2个字节
typedef struct
{
uint8_t Connect_onfirmation;
//“CONNACK报文的可变报头”的第1个字节“连接确认”
// 0x80,表示建立“连接确认标志”
uint8_t Connect_Return_Code;
//“CONNACK报文的可变报头”的第2个字节为“连接返回码”
// 0x00,连接已被服务端接受;
// 0x01,服务端不支持客户端请求的MQTT协议级别;
// 0x02,客户端标识符是正确的UTF-8编码,但服务端不允许使用;
// 0x03,网络连接已建立,但MQTT服务不可用;
// 0x04,用户名或密码的数据格式无效;
}CONNACKMessage_VariableHeader_Def;
6、SUBSCRIBE报文的结构
///////SUBSCRIBE报文的结构定义///////
#define RemanentLength_BufferSize3 4
//SUBSCRIBE报文的“固定报头”
typedef struct
{
uint8_t FirstByte;
//固定报头的字节1,“bit7:4”表示“报文的类型”,“bit3:0”为保留位
uint8_t RemanentLength_Buffer[RemanentLength_BufferSize3];
//从固定报头的字节2开始为剩余长度,最多有4个字节,它是编码后的值
}SUBSCRIBEMessage_FixedHeader_Def;
//SUBSCRIBE报文的“可变报头”,固定为2个字节
typedef struct
{
uint16_t PacketIdentifier; //可变报头中的“报文标识符”
}SUBSCRIBEMessage_VariableHeader_Def;
#define TopicName_Size3 17
//SUBSCRIBE报文的“有效载荷”
typedef struct
{
uint16_t Length_of_TopicName;
uint8_t TopicName[TopicName_Size3];
//订阅主题名,如:"04661219C1676702/TimeData"
}SUBSCRIBEMessage_Payload_Def;
///////SUBACK报文的结构定义///////
//SUBACK报文的“固定报头”
typedef struct
{
uint8_t byte1;
//固定报头的字节1,“bit7:4”表示“报文的类型”,“bit3:0”为保留位
uint8_t RemanentLength_Buffer[1];
//订阅单一主题的SUBACK报文的剩余长度值为0x03
}SUBACKMessage_FixedHeader_Def;
//SUBACK报文的“可变报头”,固定为2个字节
typedef struct
{
uint16_t tmpIdentifier;//报文标识符,最小为1,最大为65535;
}SUBACKMessage_VariableHeader_Def;
//SUBACK报文的“有效载荷”,固定为1个字节
typedef struct
{
uint8_t requestedQos;//最高位bit7=1,则表示错误
}SUBACKMessage_Payload_Def;
7、PUBLISH报文的结构及其相关报文的结构
////PUBLISH报文的“固定报头”的第1个字节的位定义////
typedef union
{
unsigned char Value;
struct
{
unsigned int retain : 1;
///RETAIN占用bit0, PUBLISH报文的保留标志;
// 1)、RETAIN=0,服务端不存储消息,也不移除或替换任何现存的保留消息。
// 2)、RETAIN=1且QoS=0,服务端会存储消息,并丢弃之前为那个保留主题的所有消息。
// RETAIN=1且QoS>0,服务端存储消息,但不会丢弃之前为那个保留主题的所有消息。
// 如果出现“新的订阅这个主题的订阅者”时,服务器会将“保留的主题消息”发送给“新订阅者”。
// 3)、RETAIN=1,且消息为0字节,则服务端不能存储这个消息。
unsigned int qos : 2;
//QoS占bit2:1,PUBLISH报文的服务质量等级
//如果服务端或客户端收到QoS所有位都为1的PUBLISH报文,它必须关闭网络连接
unsigned int dup : 1;
//DUP占用bit3,表示重发标志;如果DUP标志被设置为0,表示这是客户端或服务端第一次请求发送这个PUBLISH报文。
//如果QoS =1 /QoS =2,且DUP标志被设置为1,则表示这可能是一个早前报文请求的重发。
//注意:对于QoS =0的消息,DUP标志必须设置为0。
unsigned int type : 4;
//占bit7:4,MQTT头字节高4位,表示MQTT控制报文的类型
} bits;
}PUBLISHMessage_FixedHeaderByte1_Def;
///////PUBLISH报文的结构定义///////
#define RemanentLength_BufferSize2 4
//PUBLISH报文的“固定报头”
typedef struct
{
PUBLISHMessage_FixedHeaderByte1_Def FirstByte;
//固定报头的字节1,“bit7:4”表示“报文的类型”
uint8_t RemanentLength_Buffer[RemanentLength_BufferSize2];
//从固定报头的字节2开始为剩余长度,最多有4个字节,它是编码后的值
}PUBLISHMessage_FixedHeader_Def;
#define MQTT_QOS1_QOS2 //当QOS=1或QOS=2时,才会使用可变报头中的“报文标识符”
#define TopicName_BufferSize2 4
//PUBLISH报文的“可变报头”
typedef struct
{
uint16_t TopicNameLength; //可变报头中的“主题名长度”
uint8_t TopicNameBuffer[TopicName_BufferSize2];
//可变报头中的“主题名”
#ifdef MQTT_QOS1_QOS2
uint16_t PacketIdentifier;
//当QOS=1或QOS=2时,才会使用可变报头中的“报文标识符”
#endif
}PUBLISHMessage_VariableHeader_Def;
#define MessageBuffer_Size2 17
//PUBLISH报文的“有效载荷”
typedef struct
{
uint8_t MessageBuffer[MessageBuffer_Size2];
//消息内容
}PUBLISHMessage_Payload_Def;
///////PUBACK报文的结构定义///////
//PUBACK报文的“固定报头”
typedef struct
{
uint8_t byte1;
//固定报头的字节1,“bit7:4”表示“报文的类型”,“bit3:0”为0000B
uint8_t RemanentLength_Buffer[1];
//PUBACK报文的剩余长度值为0x02
//“PUBACK报文”只有“可变报头”,没有“有效载荷”
}PUBACKMessage_FixedHeader_Def;
//PUBACK报文的“可变报头”,固定为2个字节
typedef struct
{
uint16_t PacketIdentifier; //可变报头中的“报文标识符”
}PUBACKMessage_VariableHeader_Def;
///////MQTT客户端收到PUBREC报文的结构定义///////
//PUBREC报文的“固定报头”
typedef struct
{
uint8_t byte1;
//固定报头的字节1,“bit7:4”表示“报文的类型”,“bit3:0”为0010B
uint8_t RemanentLength_Buffer[1];
//PUBREC报文的剩余长度值为0x02
//“PUBREC报文”只有“可变报头”,没有“有效载荷”
}PUBRECMessage_FixedHeader_Def;
//PUBREC报文的“可变报头”,固定为2个字节
typedef struct
{
uint16_t PacketIdentifier; //可变报头中的“报文标识符”
}PUBRECMessage_VariableHeader_Def;
///////MQTT客户端发送PUBREL报文的结构定义///////
//PUBREL报文的“固定报头”
typedef struct
{
uint8_t byte1;
//固定报头的字节1,“bit7:4”表示“报文的类型”,“bit3:0”为0010B
uint8_t RemanentLength_Buffer[1];
//PUBREL报文的剩余长度值为0x02
//“PUBREL报文”只有“可变报头”,没有“有效载荷”
}PUBRELMessage_FixedHeader_Def;
//PUBREL报文的“可变报头”,固定为2个字节
typedef struct
{
uint16_t PacketIdentifier; //可变报头中的“报文标识符”
}PUBRELMessage_VariableHeader_Def;
///////PUBCOM报文的结构定义///////
//PUBCOM报文的“固定报头”
typedef struct
{
uint8_t byte1;
//固定报头的字节1,“bit7:4”表示“报文的类型”,“bit3:0”为0000B
uint8_t RemanentLength_Buffer[1];
//PUBCOM报文的剩余长度值为2
//“PUBCOM报文”只有“可变报头”,没有“有效载荷”
}PUBCOMMessage_FixedHeader_Def;
//PUBCOM报文的“可变报头”,固定为2个字节
typedef struct
{
uint16_t PacketIdentifier; //可变报头中的“报文标识符”
}PUBCOMMessage_VariableHeader_Def;
8、UNSUBSCRIBE报文的结构
///////UNSUBSCRIBE报文的结构定义///////
//UNSUBSCRIBE报文的“固定报头”
typedef struct
{
uint8_t FirstByte;
//固定报头的字节1,“bit7:4”表示“报文的类型”,“bit3:0”为保留位
uint8_t RemanentLength_Buffer[4];
//从固定报头的字节2开始为剩余长度,最多有4个字节,它是编码后的值
}UNSUBSCRIBEMessage_FixedHeader_Def;
//UNSUBSCRIBE报文的“可变报头”,固定为2个字节
typedef struct
{
uint16_t PacketIdentifier; //可变报头中的“报文标识符”
}UNSUBSCRIBEMessage_VariableHeader_Def;
#define TopicName_Size4 17
//UNSUBSCRIBE报文的“有效载荷”
typedef struct
{
uint16_t Length_of_TopicName;
uint8_t TopicName[TopicName_Size4];
//注销的消息主题,如:"04661219C1676702/TimeData"
}UNSUBSCRIBEMessage_Payload_Def;
///////UNSUBACK报文的结构定义///////
//UNSUBACK报文只有“固定报头和可变报头”,没有“有效载荷”
//UNSUBACK报文的“固定报头”
typedef struct
{
uint8_t byte1;
//固定报头的字节1,“bit7:4”表示“报文的类型”,“bit3:0”为保留位
uint8_t RemanentLength_Buffer[1];
//UNSUBACK报文的剩余长度值为2
}UNSUBACKMessage_FixedHeader_Def;
//UNSUBACK报文的“可变报头”,固定为2个字节
typedef struct
{
uint16_t tmpIdentifier;//报文标识符,最小为1,最大为65535;
}UNSUBACKMessage_VariableHeader_Def;
9、PINGREQ报文的结构
///////PINGREQ报文的结构定义///////
//PINGREQ报文只有“固定报头”,没有“可变报头和有效载荷”
typedef struct
{
uint8_t FirstByte;
//固定报头的字节1,“bit7:4”表示“报文的类型”,“bit3:0”为保留位
uint8_t RemanentLength_Buffer[1];
//PINGREQ报文的剩余长度值为0
}PINGREQMessage_FixedHeader_Def;
///////PINGRESP报文的结构定义///////
//PINGRESP报文只有“固定报头”,没有“可变报头和有效载荷”
//PINGRESP报文的“固定报头”
typedef struct
{
uint8_t byte1;
//固定报头的字节1,“bit7:4”表示“报文的类型”,“bit3:0”为保留位
uint8_t RemanentLength_Buffer[1];
//PINGRESP报文的剩余长度值为0
}PINGRESPMessage_FixedHeader_Def;
10、DISCONNECT报文的结构
///////DISCONNECT报文的结构定义///////
//DISCONNECT报文是“客户端”发给“服务端”的最后一个控制报文,因此,服务端不会有任何响应数据;
//DISCONNECT报文只有“固定报头”,没有“可变报头和有效载荷”
typedef struct
{
uint8_t FirstByte;
//固定报头的字节1,“bit7:4”表示“报文的类型”,“bit3:0”为保留位
uint8_t RemanentLength_Buffer[1];
//DISCONNECT报文的剩余长度值为0
}DISCONNECTMessage_FixedHeader_Def;
11、生成报文和接收报文的相关结构(自定义)
MQTT_Package_Def MQTT_Package;
#define MQTT_Buffer_SIZE (1024 * 16)
typedef struct
{
uint8_t BufferError;
uint8_t Buffer[MQTT_Buffer_SIZE]; //这是“接收缓冲区”,也可以用作“发送缓冲区”
uint8_t NumberOf_RemanentLengthBytes;//“剩余长度字段”的字节数
uint16_t RemanentLengthValue;//“剩余长度值”
uint16_t Index1; //“报文”在Buffer[]中的结束索引值
uint8_t receivedPacketType; //接收到的报文类型
}MQTT_Package_Def;
12、接收单一报文的程序
//函数功能:打印一条报文;
void Print_a_Package(unsigned char* buf,uint16_t len)
{uint16_t i;uint8_t temp;printf("\r\nLen=%u\r\n",len); //将"\r\nW5500:"发送到调试串口,由PC显示;for(i=0;i<len;i++){temp=0;if( ( (buf[i]==0x0D)||(buf[i]==0x0A) ) ){printf("%c",buf[i]);temp=1;}if(temp==0){if( ( (buf[i]>0x20)&&(buf[i]<='~') ) ) printf("%c",buf[i]);else{printf(" 0x%02X",buf[i]);printf(" ");}}}printf("\r\n");//将"\r\n"发送到调试串口,由PC显示;
}//函数功能:MQTT_Package结构变量初始化
void MQTT_Package_Struct_Init(void)
{MQTT_Package.BufferError=0;MQTT_Package.Index1=0;MQTT_Package.NumberOf_RemanentLengthBytes=0;//“剩余长度字段”的字节数为0MQTT_Package.RemanentLengthValue=0;//“剩余长度值”为0
// MQTT_Package.ReceivedPUBLISH_Cnt=0;
}//函数功能:
//将buf[]的前length个字节通过"SOCKET端口sn"发送出去
//返回值为1表示发送完成
//buf[]是待发送数据
//length为发送数据长度
uint8_t MQTT_SendPacket(uint8_t sn,uint8_t buf[],uint16_t length)
{int rc;uint16_t sent;uint16_t cnt;cnt=50;//设置在50ms内发送完成sent = 0;while (sent < length && cnt>0){rc=send(sn, &buf[sent], length);//将“&buf[sent]”为首地址存储区的前len个字节通过"SOCKET端口sn"发送出去//rc=0表示忙//如果rc=-7,则表示“socket状态无效”//如果rc=-13,则表示“超时”if (rc >= 0){sent += rc;delay_us(1);//如果网速为10MB/秒,则接收一个字节的时间为0.1uscnt--;}else break;//出现错误}if (sent == length) rc=1;//发送完成else rc = 0;//发送失败return rc;
}//函数功能:将d的值写入MQTT_Package.Buffer[MQTT_Package.Index1]中
void Wtite_One_Byte_To_MQTT_Package_Buffer(uint8_t d)
{if(MQTT_Package.Index1<MQTT_Buffer_SIZE){MQTT_Package.Buffer[MQTT_Package.Index1]=d;MQTT_Package.Index1++;MQTT_Package.BufferError=0;}else MQTT_Package.BufferError=1;
}//函数功能:将d的值写入MQTT_Package.Buffer[MQTT_Package.Index1]中
void Wtite_Two_Bytes_To_MQTT_Package_Buffer(uint16_t d)
{if( MQTT_Package.Index1 < MQTT_Buffer_SIZE){MQTT_Package.Buffer[MQTT_Package.Index1]=(uint8_t)(d>>8);MQTT_Package.Index1++;MQTT_Package.BufferError=0;}else MQTT_Package.BufferError=1;if( MQTT_Package.Index1 < MQTT_Buffer_SIZE){MQTT_Package.Buffer[MQTT_Package.Index1]=(uint8_t)d;MQTT_Package.Index1++;MQTT_Package.BufferError=0;}else MQTT_Package.BufferError=1;
}//函数功能:将length编码,保存到buf[]中,返回值为buf[]中的有效字节数
//调用举例:SendRemanentLength_Encoded(length,RemanentLengthBuffer)
uint8_t RemanentLength_Encoded(uint16_t length,uint8_t *buf)
{uint16_t ret;uint8_t cnt;uint8_t tmp;////对“剩余长度”进行编码,得到剩余长度的字节数////ret=length;cnt = 0;//借用cnt来计算数组下标do{tmp = ret % 128;//对128求余数,保存到tmpret /= 128;//对128求商,并将商值保存到ret中if (ret > 0) tmp |= 0x80;//对128求商,若商值大于0,将余数的最高位置1;buf[cnt] = tmp;//将余数保存到buf[]cnt++;}while (ret > 0);return(cnt);
}//函数功能:对buf[]中前length个字节解码,生成“剩余长度值”
//buf[]保存的是“编码后的剩余长度”
//buf[]中前length个字节参与解码
//返回值为解码后的剩余长度
uint16_t RemanentLength_Decode(uint8_t *buf,uint8_t length)
{uint8_t i,tmp;uint32_t tp;uint32_t multiplier;//最大值可以到达0x200000uint16_t retValue;//对“接收到的剩余长度”进行解码retValue=0;multiplier = 1;for(i=0;i<length;i++){tmp=buf[i];tp=tmp & 127;//(tmp&127)是余数值tp=tp* multiplier;//剩余长度是1个字节时multiplier=1//剩余长度是2字节时multiplier=128//剩余长度是3字节时multiplier=128*128//剩余长度是4字节时multiplier=128*128*128=0x200000retValue = retValue + tp;multiplier *= 128;}return(retValue);
}//函数功能:读报文的长度
uint16_t Read_Package_Length(void)
{uint16_t k;k=MQTT_Package.Index1;return (k);
}//函数功能:接收一个报文,并返回报文的类型
//MQTT_Package.receivedPacketType=0,表示没有读到报文
void MQTT_Receive_a_Package(uint8_t sn)
{uint8_t tmpPacketType;//用来保存“报文类型”uint8_t ch,loop,retValue;int numberOfBytes;uint16_t cnt;//读“固定报头第1个字节”的最大时间uint8_t tmp;uint32_t multiplier;//最大值可以到达0x200000uint32_t tp;MQTT_Package.receivedPacketType=MQTT_FAILURE;
///////读“固定报头的第1个字节”开始///////cnt=1000;loop=1;retValue=0;MQTT_Package.Index1=0;tmpPacketType=MQTT_FAILURE;//假定没有读到“报文类型”MQTT_Package.RemanentLengthValue=0;//“剩余长度值”为0MQTT_Package.NumberOf_RemanentLengthBytes=0;//“剩余长度字段”的字节数为0while ( loop ){delay_us(1);//如果网速为10MB/秒,则接收一个字节的时间为0.1usch=getSn_SR(sn);//获取SOCKET端口sn的状态寄存器if(ch == SOCK_ESTABLISHED) ch=0x01;else{ch=0;MQTT_Connect_Flag=0;MQTT_Init_Steps=0;}numberOfBytes=getSn_RX_RSR(sn);//在“TCP模式”中,读“Socket n的接收缓冲区的接收量大小寄存器”,直到W5500接收完成if(numberOfBytes>0) ch=(uint8_t)(ch<<1);if( ch==0x02 ){numberOfBytes=recv(sn, &tmp, 1);//读"SOCKET端口sn"的数据,长度为1个字节,保存到tmp//numberOfBytes>0表示“接收到的数据长度”,numberOfBytes=0表示忙,numberOfBytes=-7表示无效的“Socket状态”//适用于“TCP模式”//如果“Socket n的接收缓冲区”配置的最大字节数比1大,则从“Socket n的接收缓冲区”读取1个字节保存到buf[]中;//如果“Socket n的接收缓冲区”配置的最大字节数比1小,则从“Socket n的接收缓冲区”读取Sn_RXBUF_numberOfBytes个KB保存到buf[]中;//执行“RECV命令”,直到接收完成if ( numberOfBytes == 1)//读到数据{tmpPacketType=(uint8_t)( tmp&0xF0 );//“固定报头”第1个字节的高4位值,即报文类型if(tmp==MQTT_CONNECT_FixedHeaderByte1)retValue=1;//“CONNECT报文的固定报头”字节1if(tmp==MQTT_CONNACK_FixedHeaderByte1)retValue=1;//“CONNACK报文的固定报头”字节1if( tmpPacketType == MQTT_PUBLISH_FixedHeaderByte1 )retValue=1;//“PUBLISH报文的固定报头”字节1if(tmp == MQTT_PUBACK_FixedHeaderByte1)retValue=1;//“PUBACK报文的固定报头”字节1if(tmp == MQTT_PUBREC_FixedHeaderByte1)retValue=1;//“PUBREC报文的固定报头”字节1if(tmp == MQTT_PUBREL_FixedHeaderByte1)retValue=1;//“PUBREL报文的固定报头”字节1if(tmp == MQTT_PUBCOM_FixedHeaderByte1)retValue=1;//“PUBCOM报文的固定报头”字节1if(tmp == MQTT_SUBSCRIBE_FixedHeaderByte1)retValue=1;//“SUBSCRIBE报文的固定报头”字节1if(tmp == MQTT_SUBACK_FixedHeaderByte1)retValue=1;//“SUBACK报文的固定报头”字节1if(tmp == MQTT_UNSUBSCRIBE_FixedHeaderByte1)retValue=1;//“UNSUBSCRIBE报文的固定报头”字节1if(tmp == MQTT_UNSUBACK_FixedHeaderByte1)retValue=1;//“UNSUBACK报文的固定报头”字节1if(tmp == MQTT_PINGREQ_FixedHeaderByte1)retValue=1;//“PINGREQ报文的固定报头”字节1if(tmp == MQTT_PINGRESP_FixedHeaderByte1)retValue=1;//“PINGRESP报文的固定报头”字节1if(tmp == MQTT_DISCONNECT_FixedHeaderByte1)retValue=1;//“DISCONNECT报文的固定报头”字节1if(retValue==1)//读到正确的“固定报头”字节1{Wtite_One_Byte_To_MQTT_Package_Buffer(tmp);//保存固定报头cnt=1000;loop=0;//退出while}else ch=0;}else//没有读到数据,或者是读错误{loop=0;//没有读到数据,退出while}}else//无数据,或是错误{loop=0;//没有读到数据,退出while}cnt--;if(cnt==0)//超时{loop=0;//没有读到数据,退出while}}if(retValue==1)//读到“固定报头的第1个字节”{cnt=1000;//设置“读剩余长度的最大时间”为1000usloop=1;multiplier = 1;//为“剩余长度解码”做准备MQTT_Package.RemanentLengthValue=0;//为“剩余长度解码”做准备}
///////读“固定报头的第1个字节”结束//////////////读“固定报头中的剩余长度字段”开始///////retValue=0;while (loop){delay_us(1);//如果网速为10MB/秒,则接收一个字节的时间为0.1usif ( MQTT_Package.NumberOf_RemanentLengthBytes <= 4)//剩余长度字段不超过4个字节{tmp=getSn_SR(sn);//获取SOCKET端口sn的状态寄存器if(tmp == SOCK_ESTABLISHED) ch=0x01;else{ch=0;MQTT_Connect_Flag=0;MQTT_Init_Steps=0;}numberOfBytes=getSn_RX_RSR(sn);//在“TCP模式”中,读“Socket n的接收缓冲区的接收量大小寄存器”,直到W5500接收完成if(numberOfBytes>0) ch=(uint8_t)(ch<<1);if( ch==0x02 ){numberOfBytes=recv(sn, &tmp, 1);//读"SOCKET端口sn"的数据,长度为len个字节,保存到i//numberOfBytes>0表示“接收到的数据长度”,numberOfBytes=0表示忙,numberOfBytes=-7表示无效的“Socket状态”//适用于“TCP模式”//如果“Socket n的接收缓冲区”配置的最大字节数比1大,则从“Socket n的接收缓冲区”读取1个字节保存到tmp中;//如果“Socket n的接收缓冲区”配置的最大字节数比1小,则从“Socket n的接收缓冲区”读取Sn_RXBUF_numberOfBytes个KB保存到tmp中;//执行“RECV命令”,直到接收完成if ( numberOfBytes == 1)//读到一个字节数据{Wtite_One_Byte_To_MQTT_Package_Buffer(tmp);//保存读到的“剩余长度字段”tp=tmp & 127;//(tmp&127)是余数值,剩余长度解码tp=tp* multiplier;//剩余长度是1个字节时multiplier=1//剩余长度是2字节时multiplier=128//剩余长度是3字节时multiplier=128*128//剩余长度是4字节时multiplier=128*128*128=0x200000MQTT_Package.RemanentLengthValue = MQTT_Package.RemanentLengthValue + tp;multiplier =multiplier * 128;//为“剩余长度解码”做准备if((tmp & 128)==0x00)//“剩余长度”读完成{retValue=1;//读数据正确loop=0;//退出while}MQTT_Package.NumberOf_RemanentLengthBytes++;cnt=1000;}else//没有读到数据,或者是读错误{loop=0;//没有读到数据,退出while}}else//无数据,或是错误{loop=0;//没有读到数据,退出while}}else//“剩余长度”读完成,因为“剩余长度字段”最多4个字节{retValue=1;//读数据正确loop=0;//没有读到数据,退出while}cnt--;if(cnt==0)//超时{loop=0;//没有读到数据,退出while}}if(retValue==1)//分析“剩余长度”是否正确{tmpPacketType=(uint8_t)(tmpPacketType>>4);if(tmpPacketType==MQTT_CONNECT)retValue=(uint8_t)(retValue<<1);if(tmpPacketType==MQTT_CONNACK && MQTT_Package.RemanentLengthValue==MQTT_CONNACK_RemanentLength)retValue=(uint8_t)(retValue<<1);if(tmpPacketType==MQTT_PUBLISH){retValue=(uint8_t)(retValue<<1);}if(tmpPacketType==MQTT_PUBACK && MQTT_Package.RemanentLengthValue==MQTT_PUBACK_RemanentLength)retValue=(uint8_t)(retValue<<1);if(tmpPacketType==MQTT_PUBREC && MQTT_Package.RemanentLengthValue==MQTT_PUBREC_RemanentLength)retValue=(uint8_t)(retValue<<1);if(tmpPacketType==MQTT_PUBREL && MQTT_Package.RemanentLengthValue==MQTT_PUBREL_RemanentLength)retValue=(uint8_t)(retValue<<1);if(tmpPacketType==MQTT_PUBCOM && MQTT_Package.RemanentLengthValue==MQTT_PUBCOM_RemanentLength)retValue=(uint8_t)(retValue<<1);if(tmpPacketType==MQTT_SUBSCRIBE)retValue=(uint8_t)(retValue<<1);if(tmpPacketType==MQTT_SUBACK && MQTT_Package.RemanentLengthValue==MQTT_SUBACK_RemanentLength)retValue=(uint8_t)(retValue<<1);if(tmpPacketType==MQTT_UNSUBSCRIBE)retValue=(uint8_t)(retValue<<1);if(tmpPacketType==MQTT_UNSUBACK && MQTT_Package.RemanentLengthValue==MQTT_UNSUBACK_RemanentLength)retValue=(uint8_t)(retValue<<1);if(tmpPacketType==MQTT_PINGREQ && MQTT_Package.RemanentLengthValue==MQTT_PINGREQ_RemanentLength)retValue=(uint8_t)(retValue<<1);if(tmpPacketType==MQTT_PINGRESP && MQTT_Package.RemanentLengthValue==MQTT_PINGRESP_RemanentLength)retValue=(uint8_t)(retValue<<1);if(tmpPacketType==MQTT_DISCONNECT && MQTT_Package.RemanentLengthValue==MQTT_DISCONNECT_RemanentLength)retValue=(uint8_t)(retValue<<1);}if(retValue==0x02)//“剩余长度”正确{if(MQTT_Package.RemanentLengthValue>0)//说明有“可变报头”和“有效载荷”{cnt=1000;//设置读“可变报头和有效载荷”的最大时间为1000ustp=0;loop=1;}else MQTT_Package.receivedPacketType=tmpPacketType;}else//“剩余长度”不正确{tmpPacketType=MQTT_FAILURE;}
///////读“固定报头中的剩余长度字段”开始//////////////读“可变报头和有效载荷”开始///////retValue=0;while (loop){delay_us(1);//如果网速为10MB/秒,则接收一个字节的时间为0.1usif ( tp < MQTT_Package.RemanentLengthValue){tmp=getSn_SR(sn);//获取SOCKET端口sn的状态寄存器if(tmp == SOCK_ESTABLISHED) ch=0x01;else{ch=0;MQTT_Connect_Flag=0;MQTT_Init_Steps=0;}numberOfBytes=getSn_RX_RSR(sn);//在“TCP模式”中,读“Socket n的接收缓冲区的接收量大小寄存器”,直到W5500接收完成if(numberOfBytes>0) ch=(uint8_t)(ch<<1);if( ch==0x02 ){numberOfBytes=recv(sn, &tmp, 1);//读"SOCKET端口sn"的数据,长度为len个字节,保存到i//numberOfBytes>0表示“接收到的数据长度”,numberOfBytes=0表示忙,numberOfBytes=-7表示无效的“Socket状态”//适用于“TCP模式”//如果“Socket n的接收缓冲区”配置的最大字节数比1大,则从“Socket n的接收缓冲区”读取1个字节保存到tmp中;//如果“Socket n的接收缓冲区”配置的最大字节数比1小,则从“Socket n的接收缓冲区”读取Sn_RXBUF_numberOfBytes个KB保存到tmp中;//执行“RECV命令”,直到接收完成if ( numberOfBytes == 1)//读到一个字节数据{Wtite_One_Byte_To_MQTT_Package_Buffer(tmp);//保存“可变报头和有效载荷”tp++;//计数器加1;cnt=1000;}else//没有读到数据,或者是读错误{loop=0;//没有读到数据,退出while}}else//无数据,或是错误{loop=0;//没有读到数据,退出while}}else//“可变报头和有效载荷”读完成{MQTT_Package.receivedPacketType=tmpPacketType;retValue=1;//读数据正确loop=0;//没有读到数据,退出while}cnt--;if(cnt==0)//超时{loop=0;//没有读到数据,退出while}}
///////读“可变报头和有效载荷”结束///////if(MQTT_Package.BufferError==0){if(MQTT_Package.receivedPacketType>MQTT_FAILURE)//打印接收到的报文{cnt=Read_Package_Length();//读报文的长度Print_a_Package(&MQTT_Package.Buffer[0],cnt);printf("MQTT_Package.Index1=0x%04X\r\n",MQTT_Package.Index1);}}else MQTT_Package.receivedPacketType=MQTT_FAILURE;
}
接收订阅消息,略。
13、代码效果
服务器显示效果: