当前位置: 首页 > news >正文

C++ asio网络编程(4)异步读写操作及注意事项

文章目录

  • 前言
  • 一、介绍Session
  • 二、异步写操作(async_wirte_some)(不会一次性发完)
    • (1)介绍MsgNode
    • (2)为Session添加异步写操作和负责发送写数据的节点
    • (3)什么是bind绑定
    • (4)发送过程流程图
    • (5)async_wirte_some缺陷
    • 6.通过队列实现异步的发送信息的方式(企业常用)
      • 1.流程图
      • 2.Session
      • 3.MsgNode
      • 4.WriteToSocket
      • 5.WriteCallBack
  • 三、利用send+队列来发送数据,能一次性发完
    • 1.代码
    • 2.关键点
  • 四、异步读操作(async_read_some)(不会一次性读完)(公司常用)
  • 五、异步读操作(async_receive)(会一次性读完)
  • 总结


前言

今天学习boost asio的异步读写操作及注意事项,为保证知识便于吸收,今天仅介绍api使用的代码片段,下一节再编写完整的客户端和服务器程序
下面都是异步的操作


一、介绍Session

这个Session类表示服务器处理客户端连接的管理类,session类定义了一个socket成员变量,负责处理对端的连接读写,封装了Connect函数

//头文件
#include<memory>
#include<boost/asio.hpp>
#include<iostream>
using namespace boost;
using namespace std;class Session
{
public:Session(std::shared_ptr<asio::ip::tcp::socket> socket);void Connect(const asio::ip::tcp::endpoint& ep);
private:std::shared_ptr<asio::ip::tcp::socket> _socket;
};
#include"Session.h"
//初始化列表
Session::Session(std::shared_ptr<asio::ip::tcp::socket> socket) :_socket(socket)
{
}//实现连接
void Session::Connect(const asio::ip::tcp::endpoint& ep)
{_socket->connect(ep);
}

这个Session大家可以理解为当有一个客户端发来连接请求的时候,服务端就创建一个Session,Session里面创建一个socket来服务这个客户端

二、异步写操作(async_wirte_some)(不会一次性发完)

(1)介绍MsgNode

在写操作前,我们先封装一个Node结构,用来管理要发送和接收的数据,该结构包含数据域首地址,数据的总长度,以及已经处理的长度(已读的长度或者已写的长度)

class MsgNode
{
public://发送信息的构造函数  发送信息肯定知道要发什么  有多长//所以参数是字符串的首地址  和发送总长度MsgNode(const char* msg, int total_len) :_total_len(total_len),_cur_len(0){_msg = new char[total_len];//手动开辟memcpy(_msg, msg, total_len);//防止空间污染 //将msg内存中total_len个字节复制到_msg中}//接收信息的构造函数MsgNode(int total_len) :_total_len(total_len), _cur_len(0){_msg = new char[total_len];}~MsgNode(){delete []_msg;//既然手动开辟了那么就要手动清除}private:int _total_len;//要发送的字符串总长度char* _msg; //表示一个指向字符的指针int _cur_len;//存储此时的数据字符串 初始为0
};

此处我们定义的最大接收长度为1024字节,但在实际开发中,我们并不会定义一个最大长度,这里我们只是做一个演示

(2)为Session添加异步写操作和负责发送写数据的节点

class Session
{
public:Session(std::shared_ptr<asio::ip::tcp::socket> socket);void Connect(const asio::ip::tcp::endpoint& ep);void WriteCallBackErr(const boost::system::error_code& ec,std::size_t bytes_transferred, std::shared_ptr<MsgNode>);void WriteToSocketErr(const std::string buf);
private:std::shared_ptr<asio::ip::tcp::socket> _socket;std::shared_ptr<MsgNode> _send_node;
};
void Session::WriteToSocketErr(const std::string buf)
{_send_node = make_shared<MsgNode>(buf.c_str(), buf.length());this->_socket->async_write_some(asio::buffer(_send_node->_msg, _send_node->_total_len),//这里要求发送长度为_total_lem//把Session成员函数类型绑定为回调函数类型std::bind(&Session::WriteCallBackErr,this, std::placeholders::_1,std::placeholders::_2, _send_node));//中间两个是占位符
}
void Session::WriteCallBackErr(const boost::system::error_code& ec,std::size_t bytes_transferred, std::shared_ptr<MsgNode> msg_node)
{//_cur_len表示已经发送的长度if (bytes_transferred + msg_node->_cur_len < msg_node->_total_len) {_send_node->_cur_len += bytes_transferred;this->_socket->async_write_some(asio::buffer(_send_node->_msg+ _send_node->_cur_len,+  _send_node->_total_len - _send_node->_cur_len),std::bind(&Session::WriteCallBackErr,this, std::placeholders::_1,std::placeholders::_2, _send_node));}
}

(3)什么是bind绑定

std::bind,它是一个函数适配器,可以把成员函数或普通函数 “预绑定” 一部分参数,生成一个新的可调用对象(类似于回调函数)

意思就是比如我现在一个函数有4个参数,我可以提前绑定一些参数,下次用的时候我只需要绑定剩下的参数即可
✅ 场景介绍:
假设你有这样一个类成员函数:

class Session {
public:void WriteCallBackErr(int errCode, const std::string& errMsg, MsgNode* msg);
};

这个函数有三个参数,现在你想把它当成回调函数传出去,比如给某个异步网络库用,
但库要求的函数形式是

void (*)(int, string)  // 或 std::function<void(int, string)>

所以你需要“提前绑定”第三个参数 _send_node,**只暴露前两个参数,让这个函数变得“只需要传前两个参数”,**这时候就用到了 std::bind。

(4)发送过程流程图

在这里插入图片描述


(5)async_wirte_some缺陷

这个函数并不能投入实际应用,因为async_write_some回调函数返回已发送的字节数可能并不是全部长度。比如TCP发送缓存区总大小为8字节,但是有3字节未发送(上一次未发送完),这样剩余空间为5字节
在这里插入图片描述
此时我们调用async_write_some发送hello world!实际发送的长度就是为5,也就是只发送了hello,剩余world!通过我们的回调继续发送。
而实际开发的场景用户是不清楚底层tcp的多路复用调用情况的,用户想发送数据的时候就调用WriteToSocketErr,或者循环调用WriteToSocketErr,很可能在一次没发送完数据还未调用回调函数时再次调用WriteToSocketErr,因为boost::asio封装的时epoll和iocp等多路复用模型,当写事件就绪后就发数据,发送的数据按照async_write_some调用的顺序发送,所以回调函数内调用的async_write_some可能并没有被及时调用。
比如我们如下代码

//用户发送数据
WriteToSocketErr("Hello World!");
//用户无感知下层调用情况又一次发送了数据
WriteToSocketErr("Hello World!");

那么很可能第一次只发送了Hello,后面的数据没发完,第二次发送了Hello World!之后又发送了World!
所以对端收到的数据很可能是”HelloHello World! World!”
那怎么解决这个问题呢,我们可以通过队列保证应用层的发送顺序。我们在Session中定义一个发送队列,然后重新定义正确的异步发送函数和回调处理

6.通过队列实现异步的发送信息的方式(企业常用)

1.流程图

在这里插入图片描述

2.Session

class Session
{
public:Session(std::shared_ptr<asio::ip::tcp::socket> socket);void WriteCallBack(const system::error_code& ec, size_t bytes_transferred);void WriteToSocket(const string& buf);
private:queue<std::shared_ptr<MsgNode>> _send__queue;//消息队列std::shared_ptr<asio::ip::tcp::socket> _socket;bool _send_pending;//是否有未发完的数据
};

3.MsgNode

class MsgNode
{
public://发送信息的构造函数  发送信息肯定知道要发什么  有多长//所以参数是字符串的首地址  和发送总长度MsgNode(const char* msg, int total_len) :_total_len(total_len),_cur_len(0){_msg = new char[total_len];//手动开辟memcpy(_msg, msg, total_len);//防止空间污染 将msg内存中total_len个字节复制到_msg中}//接收信息的构造函数MsgNode(int total_len) :_total_len(total_len), _cur_len(0){_msg = new char[total_len];}~MsgNode(){delete []_msg;//手动清除}int _total_len;//要发送的字符串总长度char* _msg; //表示一个指向字符的指针int _cur_len;//当前发送完成的长度
};

4.WriteToSocket

void Session::WriteToSocket(const string& buf)
{_send__queue.emplace(new MsgNode(buf.c_str(), buf.length()));if (_send_pending)//如果上一条信息还没发完{return;}this->_socket->async_write_some(asio::buffer(buf),std::bind(&Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));_send_pending = true;
}

5.WriteCallBack

void Session::WriteCallBack(const system::error_code& ec, size_t bytes_transferred)
{if (ec.value() != 0)//说明发生错误{std::cout << "Error Code is " << ec.value() << " Message is " << ec.message();return;}//取出队头信息auto& send_date = _send__queue.front();send_date->_cur_len += bytes_transferred;//已经完成发送的数据长度if (send_date->_cur_len < send_date->_total_len){this->_socket->async_write_some(asio::buffer(send_date->_msg + send_date->_cur_len,send_date->_total_len - send_date->_cur_len),std::bind(Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}//发完_send__queue.pop();if (_send__queue.empty()){_send_pending = false;}if (!_send__queue.empty()){//取出队头信息auto& send_date = _send__queue.front();if (send_date->_cur_len < send_date->_total_len){this->_socket->async_write_some(asio::buffer(send_date->_msg + send_date->_cur_len, send_date->_total_len - send_date->_cur_len),std::bind(Session::WriteCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}}
}

三、利用send+队列来发送数据,能一次性发完

1.代码

流程和前面一样,只是这里调用send

void Session::WriteAllToSocket(const string& buf)
{_send_queue.emplace(new MsgNode(buf.c_str(), buf.length()));if (_send_pending){return;}this->_socket->async_send(asio::buffer(buf),std::bind(&Session::WriteAllCallBack, this, std::placeholders::_1, std::placeholders::_2));_send_pending = true;
}
void Session::WriteAllCallBack(const system::error_code& ec, size_t bytes_transferred)
{if (ec.value() != 0)//说明发生错误{std::cout << "Error Code is " << ec.value() << " Message is " << ec.message();return;}_send_queue.pop();if (_send_queue.empty()){_send_pending = false;return;}if (!_send_queue.empty()){//取出队头信息auto& send_date = _send_queue.front();this->_socket->async_send(asio::buffer(send_date->_msg , send_date->_total_len ),std::bind(Session::WriteAllCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}
}

2.关键点

在这里插入图片描述
在这个地方我们直接pop!
async_wirte_some一开始获取的时候是因为第一次发送可能没有一次性发送完,所以通过send_date->_cur_len += bytes_transferred;获取偏移量继续发送第一条数据的剩下的信息,但async_send会一次性发送完,所以我就直接pop

四、异步读操作(async_read_some)(不会一次性读完)(公司常用)

void Session::ReadFromSocket()
{if (_recv_pending)//当前正在接收{return;}_recv_node = make_shared<MsgNode>(RECVSIZE);_socket->async_read_some(asio::buffer(_recv_node->_msg, _recv_node->_total_len),std::bind(Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));_recv_pending = true;
}void Session::ReadCallBack(const system::error_code& ec, size_t bytes_transferred)
{_recv_node->_cur_len += bytes_transferred;if (_recv_node->_cur_len < _recv_node->_total_len){_socket->async_read_some(asio::buffer(_recv_node->_msg+ _recv_node->_cur_len, _recv_node->_total_len- _recv_node->_cur_len),std::bind(Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));return;}_recv_pending = false;
}

这里其实和前面的写操作很像,都是读一部分,然后回调中判断是否读完,没有就继续回调

五、异步读操作(async_receive)(会一次性读完)

void Session::ReadAllFromSocket()
{if (_recv_pending)//当前正在接收{return;}_recv_node = make_shared<MsgNode>(RECVSIZE);_socket->async_receive(asio::buffer(_recv_node->_msg, _recv_node->_total_len),std::bind(Session::ReadCallBack, this, std::placeholders::_1, std::placeholders::_2));_recv_pending = true;
}void Session::ReadAllCallBack(const system::error_code& ec, size_t bytes_transferred)
{_recv_node->_cur_len += bytes_transferred;_recv_pending = false;
}

这里和写操作中的send一样可以一次性解决,所以回调函数中只需要改一下状态就行,因为只会调用一次回调函数

总结

这次介绍了异步通信两个操作

可以用
async_wirte_some
这个不一定一次性可以发送完全部信息,会多次调用回调函数
async_send(推荐使用)
这个内部就是包含了一个async_wirte_some,这个会一次性发送完所有数据

可以用
async_read_some(推荐使用)
这个不一定一次性可以读取完所有数据,会多次调用回调函数
async_receive
这个内部就是包含了一个async_read_some,这个会一次性发读完所有数据

读和写的时候就使用推荐的api就行,不用混用,比如读的时候用了async_send就不用async_wirte_some ,写也是类似,不要混用!!!!!!!!!!!!!!

http://www.xdnf.cn/news/394309.html

相关文章:

  • (十二)Java枚举类深度解析:从基础到高级应用
  • C++八股——函数对象
  • 工具篇-扣子空间MCP,一键做游戏,一键成曲
  • C/C++实践(五)C++内存管理:从基础到高阶的系统性实现指南
  • 《从零构建一个简易的IOC容器,理解Spring的核心思想》
  • 命令行解释器中shell、bash和zsh的区别
  • LangChain对话链:打造智能多轮对话机器人
  • C 语言报错 xxx incomplete type xxx
  • CTFd CSRF 校验模块解读
  • 表加字段如何不停机
  • NCCL N卡通信机制
  • 《Effective Python》第1章 Pythonic 思维详解——始终用括号包裹单元素元组
  • 用一张网记住局域网核心概念:从拓扑结构到传输介质的具象化理解
  • 懒人美食帮SpringBoot订餐系统开发实现
  • Linux网络编程day9 libevent库
  • 代码随想录算法训练营第60期第三十二天打卡
  • RAII是什么?
  • 大学之大:东京工业大学2025.5.11
  • 误差函数(Error Function)的推导与物理意义
  • 【电机控制器】PY32MD310K18U7TR——ADC、UART
  • AAAI-2025 | 电子科大类比推理助力精准识别!SPAR:基于自提示类比推理的无人机目标探测技术
  • Java 线程池原理
  • 解决stm32HAL库使用vscode打开,识别不到头文件及uint8_t等问题
  • LOJ 6346 线段树:关于时间 Solution
  • 假如你的项目是springboot+vue怎么解决跨域问题
  • Anaconda环境中conda与pip命令的区别
  • Java--图书管理系统(简易版)
  • 信息安全管理与评估索引
  • 02.three官方示例+编辑器+AI快速学习webgl_animation_skinning_blending
  • C++类和对象--初阶