当前位置: 首页 > news >正文

C++ asio网络编程(7)增加发送队列实现全双工通信

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 一、数据节点设计
  • 二、封装发送接口
    • 介绍锁mutex和加锁工具lock_guard
    • 回调函数的实现
    • 为什么在回调函数中也要加锁
    • 修改读回调
  • 总结


前言

前文介绍了通过智能指针实现伪闭包的方式延长了session的生命周期,而实际使用的服务器并不是应答式,而是全双工通信方式服务器一直监听写事件,接收对端数据,可随时发送数据给对端,今天介绍如何封装异步的发送接口,因为多次发送时,异步的发送要保证回调触发后再次发送才能确保数据是有序的,这一点我们已经在前文异步发送函数介绍的时候提到了
在这里插入图片描述


提示:以下是本篇文章正文内容,下面案例可供参考

一、数据节点设计

我们设计一个数据节点MsgNode用来存储数据

class MsgNode
{friend class CSession;
public:MsgNode(char* msg, int max_len){_data = new char[max_len];memcpy(_data, msg, max_len);}~MsgNode(){delete[]_data;}private:int _cur_len;int _max_len;char* _data;};

_cur_len表示数据当前已处理的长度(已经发送的数据或者已经接收的数据长度),因为一个数据包存在未发送完或者未接收完的情况。
_max_len表示数据的总长度。
_data表示数据域,已接收或者已发送的数据都放在此空间内

二、封装发送接口

首先在CSession类里新增一个队列存储要发送的数据,因为我们不能保证每次调用发送接口的时候上一次数据已经发送完,就要把要发送的数据放入队列中,通过回调函数不断地发送。而且我们不能保证发送的接口和回调函数的接口在一个线程,所以要增加一个锁保证发送队列安全性
同时我们新增一个发送接口Send

void CSession::Send(char* msg, int max_length)
{bool pending = false;//上次发送数据没有残留std::lock_guard<std::mutex>lock(_send_lock);if (_send_que.size() > 0){pending = true;}_send_que.push(make_shared<MsgNode>(msg, max_length));if (pending){return;}boost::asio::async_write(_socket, boost::asio::buffer(msg, max_length),std::bind(&CSession::handle_write, this, std::placeholders::_1, std::placeholders::_2, shared_from_this()));
}

发送接口里判断发送队列是否为空,如果不为空说明有数据未发送完,需要将数据放入队列,然后返回。如果发送队列为空,则说明当前没有未发送完的数据,将要发送的数据放入队列并调用async_write函数发送数据。

这里我们先介绍一下锁和加锁

介绍锁mutex和加锁工具lock_guard

  1. 什么是 std::mutex?
    std::mutex 是 C++ 标准库中的互斥量(mutex),用于保护多线程环境下的共享资源,防止两个或多个线程同时访问同一资源引发数据竞争。
    ● “mutex” 即 mutual exclusion(互斥)的缩写。
    当一个线程锁住 mutex 时,其他线程必须等待该线程释放(unlock)后,才能继续访问资源
    ● 常用于临界区保护,比如:修改共享变量、队列、容器等。
    使用方式(std::mutex)
#include <iostream>
#include <thread>
#include <mutex>std::mutex mtx;  // 声明一个全局互斥锁int counter = 0;void increase() {mtx.lock();       // 显式加锁++counter;        // 临界区:访问共享资源mtx.unlock();     // 显式解锁
}

⚠️ 注意:
如果忘记 unlock(),或者异常导致提前退出,程序会出现死锁问题。因此更推荐使用 std::lock_guard 自动管理锁

  1. 什么是 std::lock_guard?
    std::lock_guard 是 C++11 引入的RAII 风格的自动加锁工具,用于自动管理 mutex 的加锁和解锁,防止因忘记手动 unlock 而导致的死锁问题。
    使用方式:
void increase_safe() {std::lock_guard<std::mutex> lock(mtx);  // 自动加锁和解锁++counter;
}  // 离开作用域时自动调用 mtx.unlock()

优点:
● 自动管理锁的生命周期:进入作用域时加锁,退出作用域时自动释放锁
● 简洁安全,不容易因异常或遗漏导致锁未释放。
● 适合短时间的临界区控制
在这里插入图片描述

回调函数的实现

void CSession::HandleWrite(const boost::system::error_code& ec, size_t bytes_transferred,shared_ptr<CSession>_self_shared)
{ if (ec)//0正确  1错误{cout << "write error" << endl;_server->ClearSession(_uuid);}else{std::lock_guard<std::mutex>lock(_send_lock);//发完了 就清除掉原先的//memset(_data, 0, sizeof(_data));_send_que.pop();if (!_send_que.empty()){auto& msgnode = _send_que.front();boost::asio::async_write(_socket, boost::asio::buffer(msgnode->_data, msgnode->_max_len),std::bind(&CSession::HandleWrite, this, std::placeholders::_1,std::placeholders::_2, _self_shared));}}
}

这里为什么在回调函数中也要加锁呢,不是Send中已经加锁了吗???

为什么在回调函数中也要加锁

📌 问题:
明明在 Send() 中已经对发送队列加锁了,为什么回调函数 HandleWrite() 还要再次加锁?

我这里虽然在Send中进行加锁,防止了在后面回调还没走完的时候又有人进来,
我回调函数中加锁,是因为不止是Send函数中会进入这个回调,说不定其他函数也会绑定这个回调,但我这个回调中还有东西没走完,我也要防止其他地方进入来干扰我

!!我在 Send() 中加锁,是为了防止 _send_que 正在处理(比如还没写完数据),有人又来 push 数据。

!!回调函数 HandleWrite() 中也要加锁,是因为它操作的 _send_que 也有可能被其他地方绑定回调触发,我不能保证这个函数不会被多个线程同时调用。

📌 举个具体的例子看冲突:
假设你没在 HandleWrite() 中加锁,会发生什么:

  1. 主线程调用 Send(),加锁了,往 _send_que 里 push 一个消息。
  2. 这时 Boost.Asio 在另一个线程异步调用了 HandleWrite(),它没加锁,pop() 了一个消息。
  3. 这两个操作几乎同时进行时,就会出现“竞态条件(Race Condition)”,可能:
    ○ pop 出了空数据;
    ○ push 时还没完成,pop 就动手了;
    ○ 程序崩溃或数据错乱

修改读回调

因为我们要一直监听对端发送的数据,所以要在每次收到数据后继续绑定监听事件

//读的回调函数
void CSession::HandleRead(const boost::system::error_code& ec, size_t bytes_transferred, shared_ptr<CSession>_self_shared)
{if (ec)//0正确  1错误{cout << "read error" << endl;_server->ClearSession(_uuid);}else{cout << "server receivr data is "<<_data << endl;Send(_data,bytes_transferred);//发送数据返回memset(_data, 0, max_length);_socket.async_read_some(boost::asio::buffer(_data,max_length), std::bind(&CSession::HandleRead, this,std::placeholders::_1, std::placeholders::_2, _self_shared));}
}

总结

在这里插入图片描述

该服务器虽然实现了全双工通信,但是仍存在缺陷,比如粘包问题未处理,下一版本实现粘包处理

http://www.xdnf.cn/news/488035.html

相关文章:

  • Maven Deploy的依赖与引用方的依赖不同
  • 信奥赛-刷题笔记-队列篇-T4-P7912小熊的果篮
  • MySQL 数据库优化:InnoDB 存储引擎深度解析:架构、调优与最佳实践
  • 记录一个为打印高清而做投喂图像增强的例子
  • docker compose 启动指定的 service
  • MongoTemplate 基础使用帮助手册
  • 12条热门照片提示
  • XS9922C芯片:多能一体的视频处理强者,可p2p替代TP9930和TP9932,开启智能视觉新征程
  • Flask框架深度解析:蓝图、上下文机制与Jinja2模板引擎实战
  • ssh 配置了.ssh/authorized_keys 依旧需要密码的问题
  • 如何同时管理不同平台的多个账号?
  • 【第七节】ESP32-S3 霍尔传感器应用实战:磁场检测与蜂鸣器控制
  • 小学数学题批量生成及检查工具
  • PT2062单触控单输出LED调光IC
  • python报错:应为类型Union[str,int],但实际为None问题原因及解决方案
  • HGDB索引膨胀的检查与处理思路
  • 哈希表实现(1):
  • 【言语】刷题5(填空)
  • 2025-05-15 代码人生 - 精选文章周刊
  • Microsoft Azure 服务4月更新告示
  • 简化WPF开发:CommunityToolkit属性绑定与命令声明实战
  • 服务器死机了需要检查哪些问题
  • pojo层、dao层、service层、controller层的作用
  • C++(15):默认值(default)
  • 等离子模块【杀菌消毒】
  • 大群将至:通付盾推出多智能体协同平台Legion
  • mysql隔离级别的学习分享
  • python可视化:北方省市人口流动与春运数据综合分析5
  • Java项目使用Tomcat启动后JS文件中的中文乱码问题
  • CRM客户管理系统有什么缺点?