C++ asio网络编程(7)增加发送队列实现全双工通信
提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档
文章目录
- 前言
- 一、数据节点设计
- 二、封装发送接口
- 介绍锁mutex和加锁工具lock_guard
- 回调函数的实现
- 为什么在回调函数中也要加锁
- 修改读回调
- 总结
前言
前文介绍了通过智能指针实现伪闭包的方式延长了session的生命周期,而实际使用的服务器并不是应答式,而是全双工通信方式,服务器一直监听写事件,接收对端数据,可随时发送数据给对端,今天介绍如何封装异步的发送接口,因为多次发送时,异步的发送要保证回调触发后再次发送才能确保数据是有序的,这一点我们已经在前文异步发送函数介绍的时候提到了
提示:以下是本篇文章正文内容,下面案例可供参考
一、数据节点设计
我们设计一个数据节点MsgNode用来存储数据
class MsgNode
{friend class CSession;
public:MsgNode(char* msg, int max_len){_data = new char[max_len];memcpy(_data, msg, max_len);}~MsgNode(){delete[]_data;}private:int _cur_len;int _max_len;char* _data;};
_cur_len表示数据当前已处理的长度(已经发送的数据或者已经接收的数据长度),因为一个数据包存在未发送完或者未接收完的情况。
_max_len表示数据的总长度。
_data表示数据域,已接收或者已发送的数据都放在此空间内
二、封装发送接口
首先在CSession类里新增一个队列存储要发送的数据,因为我们不能保证每次调用发送接口的时候上一次数据已经发送完,就要把要发送的数据放入队列中,通过回调函数不断地发送。而且我们不能保证发送的接口和回调函数的接口在一个线程,所以要增加一个锁保证发送队列安全性。
同时我们新增一个发送接口Send
void CSession::Send(char* msg, int max_length)
{bool pending = false;//上次发送数据没有残留std::lock_guard<std::mutex>lock(_send_lock);if (_send_que.size() > 0){pending = true;}_send_que.push(make_shared<MsgNode>(msg, max_length));if (pending){return;}boost::asio::async_write(_socket, boost::asio::buffer(msg, max_length),std::bind(&CSession::handle_write, this, std::placeholders::_1, std::placeholders::_2, shared_from_this()));
}
发送接口里判断发送队列是否为空,如果不为空说明有数据未发送完,需要将数据放入队列,然后返回。如果发送队列为空,则说明当前没有未发送完的数据,将要发送的数据放入队列并调用async_write函数发送数据。
这里我们先介绍一下锁和加锁
介绍锁mutex和加锁工具lock_guard
- 什么是 std::mutex?
std::mutex 是 C++ 标准库中的互斥量(mutex),用于保护多线程环境下的共享资源,防止两个或多个线程同时访问同一资源引发数据竞争。
● “mutex” 即 mutual exclusion(互斥)的缩写。
● 当一个线程锁住 mutex 时,其他线程必须等待该线程释放(unlock)后,才能继续访问资源。
● 常用于临界区保护,比如:修改共享变量、队列、容器等。
使用方式(std::mutex)
#include <iostream>
#include <thread>
#include <mutex>std::mutex mtx; // 声明一个全局互斥锁int counter = 0;void increase() {mtx.lock(); // 显式加锁++counter; // 临界区:访问共享资源mtx.unlock(); // 显式解锁
}
⚠️ 注意:
如果忘记 unlock(),或者异常导致提前退出,程序会出现死锁问题。因此更推荐使用 std::lock_guard 自动管理锁
- 什么是 std::lock_guard?
std::lock_guard 是 C++11 引入的RAII 风格的自动加锁工具,用于自动管理 mutex 的加锁和解锁,防止因忘记手动 unlock 而导致的死锁问题。
使用方式:
void increase_safe() {std::lock_guard<std::mutex> lock(mtx); // 自动加锁和解锁++counter;
} // 离开作用域时自动调用 mtx.unlock()
优点:
● 自动管理锁的生命周期:进入作用域时加锁,退出作用域时自动释放锁。
● 简洁安全,不容易因异常或遗漏导致锁未释放。
● 适合短时间的临界区控制
回调函数的实现
void CSession::HandleWrite(const boost::system::error_code& ec, size_t bytes_transferred,shared_ptr<CSession>_self_shared)
{ if (ec)//0正确 1错误{cout << "write error" << endl;_server->ClearSession(_uuid);}else{std::lock_guard<std::mutex>lock(_send_lock);//发完了 就清除掉原先的//memset(_data, 0, sizeof(_data));_send_que.pop();if (!_send_que.empty()){auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_max_len),std::bind(&CSession::HandleWrite, this, std::placeholders::_1,std::placeholders::_2, _self_shared));}}
}
这里为什么在回调函数中也要加锁呢,不是Send中已经加锁了吗???
为什么在回调函数中也要加锁
📌 问题:
明明在 Send() 中已经对发送队列加锁了,为什么回调函数 HandleWrite() 还要再次加锁?
我这里虽然在Send中进行加锁,防止了在后面回调还没走完的时候又有人进来,
我回调函数中加锁,是因为不止是Send函数中会进入这个回调,说不定其他函数也会绑定这个回调,但我这个回调中还有东西没走完,我也要防止其他地方进入来干扰我
!!我在 Send() 中加锁,是为了防止 _send_que 正在处理(比如还没写完数据),有人又来 push 数据。
!!回调函数 HandleWrite() 中也要加锁,是因为它操作的 _send_que 也有可能被其他地方绑定回调触发,我不能保证这个函数不会被多个线程同时调用。
📌 举个具体的例子看冲突:
假设你没在 HandleWrite() 中加锁,会发生什么:
- 主线程调用 Send(),加锁了,往 _send_que 里 push 一个消息。
- 这时 Boost.Asio 在另一个线程异步调用了 HandleWrite(),它没加锁,pop() 了一个消息。
- 这两个操作几乎同时进行时,就会出现“竞态条件(Race Condition)”,可能:
○ pop 出了空数据;
○ push 时还没完成,pop 就动手了;
○ 程序崩溃或数据错乱
修改读回调
因为我们要一直监听对端发送的数据,所以要在每次收到数据后继续绑定监听事件
//读的回调函数
void CSession::HandleRead(const boost::system::error_code& ec, size_t bytes_transferred, shared_ptr<CSession>_self_shared)
{if (ec)//0正确 1错误{cout << "read error" << endl;_server->ClearSession(_uuid);}else{cout << "server receivr data is "<<_data << endl;Send(_data,bytes_transferred);//发送数据返回memset(_data, 0, max_length);_socket.async_read_some(boost::asio::buffer(_data,max_length), std::bind(&CSession::HandleRead, this,std::placeholders::_1, std::placeholders::_2, _self_shared));}
}
总结
该服务器虽然实现了全双工通信,但是仍存在缺陷,比如粘包问题未处理,下一版本实现粘包处理