当前位置: 首页 > backend >正文

QT聊天项目DAY08

1.使用IOContextPool提高并发量

1.1 新建连接池类

#ifndef __ASIOIOSERVICEPOOL_H__
#define __ASIOIOSERVICEPOOL_H__#include <vector>
#include <boost/asio.hpp>
#include "Singletion.h"
class AsioIOServicePool : public Singletion<AsioIOServicePool>
{friend Singletion<AsioIOServicePool>;public:using IOService = boost::asio::io_context;/* 防止io_context.run() 提前退出 */using Work = boost::asio::io_context::work;using WorkPtr = std::unique_ptr<Work>;public:~AsioIOServicePool();AsioIOServicePool(const AsioIOServicePool&) = delete;AsioIOServicePool& operator=(const AsioIOServicePool&) = delete;public:IOService& GetIOService();void Stop();private:AsioIOServicePool(std::size_t size = 2);private:std::vector<IOService> m_ioServices;std::vector<WorkPtr> m_works;std::vector<std::thread> m_threads;std::size_t m_nextIOService;
};#endif // __ASIOIOSERVICEPOOL_H__

这里写着写着突然让我领会到了STL源码解析的重要性

在m_threads.emplace_back([this, i]()
            {
                m_ioServices[i].run();
            });

里可以采用thread([this,i](){})的方式写,但是会导致多复制一次,所以使用临时对象的方式来添加线程

#include "AsioIOServicePool.h"
#include <iostream>
using namespace std;AsioIOServicePool::AsioIOServicePool(std::size_t size):m_ioServices(size),m_works(size),m_nextIOService(0)
{for (size_t i = 0; i < size; ++i){/* 每个IOService对象要有一个Work对象,否则它在没有任务时会立即退出run() */ m_works[i] = unique_ptr<Work>(new Work(m_ioServices[i]));m_threads.emplace_back([this, i](){m_ioServices[i].run();});}
}AsioIOServicePool::~AsioIOServicePool()
{Stop();cout << "AsioIOServicePool::~AsioIOServicePool()" << endl;
}AsioIOServicePool::IOService& AsioIOServicePool::GetIOService()
{IOService& ioService = m_ioServices[m_nextIOService];m_nextIOService = (m_nextIOService + 1) % m_ioServices.size();return ioService;
}void AsioIOServicePool::Stop()
{/* 先停止已经绑定了读或写的监听事件的服务,然后重置Work对象 */for (auto& work : m_works){work->get_io_context().stop();work.reset();}/* 等待所有线程退出 */for (auto& thread : m_threads){if (thread.joinable()){thread.join();}}
}

io_context像一个任务调度中心,Wrok是一个"假任务",告诉调度中心:"你别关门,我一会还有任务要来"

1.2 关于线程的一些知识点

当添加线程时,这个线程就已经启动了

也就是已经开始监听了

1.3 什么是io_context事件循环?

关于epoll的本质可以看下面这篇文章,我这里不做阐述了

https://zhuanlan.zhihu.com/p/17856755436#:~:text=socket%E4%BA%8B%E4%BB%B6%E6%B7%BB%E5%8A%A0%E6%88%90%E5%8A%9F%E5%90%8E%EF%BC%8Cepoll%E6%89%8D%E8%83%BD%E7%9B%91%E5%90%ACsocket%E8%AF%BB%E5%86%99%E4%BA%8B%E4%BB%B6%E3%80%82%20%E5%A6%82%E6%9E%9Cepoll%E5%B0%B1%E7%BB%AA%E9%98%9F%E5%88%97%E6%9C%89%E5%B0%B1%E7%BB%AA%E4%BA%8B%E4%BB%B6%EF%BC%8C%E7%94%A8%E6%88%B7%E7%A8%8B%E5%BA%8F%E8%B0%83%E7%94%A8epoll_wait%E5%87%BD%E6%95%B0%E4%BC%9A%E6%88%90%E5%8A%9F%E8%8E%B7%E5%8F%96%E5%88%B0%E5%B0%B1%E7%BB%AA%E4%BA%8B%E4%BB%B6%E3%80%82,%E5%A6%82%E6%9E%9C%E6%B2%A1%E6%9C%89%E5%B0%B1%E7%BB%AA%E4%BA%8B%E4%BB%B6%EF%BC%8C%E5%88%99epoll%E7%BA%BF%E7%A8%8B%E9%99%B7%E5%85%A5%E4%BC%91%E7%9C%A0%E3%80%82%20%E5%BD%93socket%E6%8E%A5%E6%94%B6%E5%88%B0%E6%95%B0%E6%8D%AE%E5%90%8E%EF%BC%8C%E9%80%9A%E8%BF%87socket%E7%AD%89%E5%BE%85%E9%98%9F%E5%88%97%E5%8F%AF%E4%BB%A5%E5%94%A4%E9%86%92%E4%BC%91%E7%9C%A0%E7%9A%84epoll%E7%BA%BF%E7%A8%8B%EF%BC%8C%E5%B9%B6%E5%B0%86socket%E5%B0%81%E8%A3%85%E6%88%90epoll%E5%B0%B1%E7%BB%AA%E4%BA%8B%E4%BB%B6%E6%8F%92%E5%85%A5%E5%B0%B1%E7%BB%AA%E9%98%9F%E5%88%97%E3%80%82

本质上就是一个epoll()监听外加一个事件分发器,假设有三种事件类型,按照正常的linux开发来说应该是三种,分别是连接事件,连接中有数据到来事件以及连接的读写事件,epoll会循环的监听这三种事件,epool监听到这三种事件会将事件分发给处理该事件的对象,其实这就和io_context差不多了,本质上都是在监听事件有没有被触发,被触发了就执行对应的回调

int main(int argc, char *argv[]){structepoll_eventev, events[MAX_EVENTS];int sock_fd, ret = 0;int efd = epoll_create(10);                             //创建epoll实例ev.data.fd = sock_fd;ev.events = EPOLLIN;                                    //注册监听端口连接事件epoll_ctl(efd, EPOLL_CTL_ADD, sock_fd, &ev);while (1) {// 超时1000毫秒,获取就绪事件int nfds = epoll_wait(efd, events, MAX_EVENTS, 1000);if (nfds == -1) return-1;                           // 获取失败退出elseif (nfds == 0) continue;                        // 超时,继续下一轮事件获取// 轮询就绪事件数组for (int i = 0; i < nfds; i++) {    int fd = events[i].data.fd;// 新连接到来if (fd == sock_fd) {new_fd = accept(sock_fd, (struct sockaddr *)&peer, &addrlen);setnonblocking(new_fd);                     // 设置新套接字为非阻塞模式ev.data.fd = new_fd;ev.events = EPOLLIN|EPOLLET;                // 添加新套接字epoll_ctl(efd, EPOLL_CTL_ADD, new_fd, &ev); // 添加新的监听,等待该事件到来} else { // 已经分配的连接有数据到来if (events[i].events & EPOLLIN) {           // EPOLLIN事件recv(fd, recv_buf, len, 0);             // 读取数据/* 对该数据做对应的处理, 这里可以分配线程去单独的处理每一个事件 */ }}}  }return 0;
}

下面就是io_context事件循环绑定了监听端口是否有连接事件,当事件触发时会直接调用对应的回调函数,直接简化了上述的epoll操作,非常nice

所以创建了两个线程去不停的轮询事件循环,只需要在端口号到来时绑定事件,去监听事件中是否有数据到来即可。如何绑定事件的关键在于async的类型

调用了什么async_*函数,就绑定了对应类型的事件,如下

async_*的调用本身就是注册事件的过程,这些事件都会被挂载到io_context的事件队列中,等待epoll/select/kqueue检测和触发

将两个线程中的事件循环绑定到async_read()上实时的监听socket(连接)中是否有数据到来,就变成了主线程实时监听端口是否有客户端发来的新连接,子线程实时监听连接中客户端是否发来数据

iocontext一直都在轮询监听注册的事件,类似于epoll的边缘触发,也就是不会重复的监听注册的事件,当注册的事件被监听到并执行完回调后,需要在重新注册,一个iocontext可以被多个socket绑定,本质上socket就对应一个事件的监听(就像电话总站和下属的电话机一样,电话总站收到电话,查询是找其下属的电话机,就会把这个连接交给下属的电话机去处理事情,处理完之后,就会把这个电话机移除,后续电话总站也不会处理这个电话机相应的服务哦,也就是说呢,一个电话机总站可以处理多个下属电话机服务);一个事件循环可以处理好几个不同的连接,而不是一个线程只能处理一个连接

io_context 像电话总站(事件循环器)
socket 是电话机(单个事件源)
电话总站轮询注册的 socket,有事件了就派发到对应 handler,处理完自动“移除”。
一个 io_context 能服务多个 socket,多个连接并发处理,不是一个 socket 一个线程。

1.4 改为每次从连接池中获取连接

之前是每有一个连接到来就将套接字分配给新的连接,但是这个连接还是在主线程上运行的,可以打印一下线程ID

这里可以看到分配连接和执行连接逻辑处理都是在一个线程上运行,这样无法做到高并发的

现在修改为将连接的处理逻辑分配到一个单独的线程上去处理

/* 实时的监听该端口是否有客户端发来新的连接 */
void CServer::Start()
{auto& IOService = AsioIOServicePool::GetInstance()->GetIOService();HttpConnection* pConnection = new HttpConnection(IOService);								// 创建新的连接对象_acceptor.async_accept(pConnection->GetSocket(), [this, pConnection](boost::system::error_code ec){try {if (ec){Start();																	// 重新监听return;}pConnection->Start();															// 启动连接Start();																		// 继续监听cout << "CServer Thread ID: " << this_thread::get_id() << "\n";}catch (const std::exception& e) {std::cerr << "CServer::Exception: " << e.what() << "\n";}});
}

管理连接的修改,用事件循环来初始化空套接字,最后在端口中监听客户端连接到来时直接赋值给这个套接字

HttpConnection::HttpConnection(net::io_context& ioContext): _socket(ioContext)
{CheckDeadline();														// 绑定超时的回调
}tcp::socket& HttpConnection::GetSocket()
{return _socket;
}

编译,出现了模板重定义的问题,没有加上

#ifndef _SINGLETON_H_
#define _SINGLETON_H_

#endif

导致的

再编译,最后仍然没有变换线程,可以看出确实是创建出线程了,但是事件循环还是在主线程中运行的

也有可能是回调函数是在线程中进行处理的,更改打印线程id的位置为当事件发生的回调函数里

1.5 关于线程的一些看法

如上文一样,回调函数是放在线程中处理的,这样的操作非常类似于qt中的movetoThread

添加qt的类到普通的c++工程中,如果遇到了报错,需要导入qt环境

在解决方案下面将该工程转换成qt工程

Qt::DirectConnection

槽函数在发送信号的线程中被调用

Qt::QueuedConnection

槽函数在接收者的线程中被调用

Qt::AutoConnection

如果发送者和接收者在同一个线程中,使用DirectConnection

不在同一个线程中使用QueuedConnection

观察源码可以看到connect的最后一个参数默认为AutoConnection了

抽时间一定要好好看看STL源码解析

2. GRPC连接池

当服务器里分配线程去处理连接中的逻辑后,会向grpc服务器(java写的那段代码)发送请求,但是此时只有一个grpc通道,也就是说所有的线程在处理完客户端发来的数据后要共用一个通道去请求grpc服务器发送邮箱验证码。

这样做是不对的,应该线程去从grpc连接池中取出空闲的通道,这样就能够极大的利用线程的效率了

全局只有这里应用了grpc邮箱请求,所以最后的优化放在GetVerifyCode里,如下


 

2.1 锁

lock_guard锁(RAII锁)

std::mutex mtx;void safe_increment() {std::lock_guard<std::mutex> lock(mtx);  // 自动加锁// 临界区
} // 离开作用域时自动解锁

unique_lock锁

std::mutex mtx;
std::condition_variable cv;void wait_for_event() {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [] { return ready; });  // 需要 unique_lock
}

// 假设 ready = false;
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []{ return ready; });
// 等价于以下逻辑(伪代码)while (!ready) {lock.unlock();      // 释放锁block until notified; // 等待唤醒lock.lock();        // 唤醒后重新加锁
}
// 退出 while 时 lock 仍然是加锁状态

2.2 GRPC连接池代码

class StubPool
{
public:StubPool(size_t poolSize): _bStop(false), _poolSize(poolSize){for (size_t i = 0; i < poolSize; i++){int Port = ServerStatic::ParseConfig("./Config/config.json", "VerifyServer", "Port");string Ip = "localhost:" + to_string(Port);shared_ptr<Channel> channel = grpc::CreateChannel(Ip,grpc::InsecureChannelCredentials());							// 创建GRPC通道_stubQueue.push(VerifyService::NewStub(channel));					// 存入stub队列}}~StubPool(){lock_guard<mutex> lock(_mutex);											// 加锁_bStop = true;_cv.notify_all();														// 通知所有线程退出for (size_t i = 0; i < _stubQueue.size(); i++){_stubQueue.pop();													// 弹出所有stub}}/* 获取一个stub */unique_ptr<VerifyService::Stub> GetStub(){unique_lock<mutex> lock(_mutex);										// 加锁_cv.wait(lock, [this]() {return !_stubQueue.empty(); });						// 阻塞等待,队列非空时才继续往下执行if (_bStop)																// 线程池停止return nullptr;unique_ptr<VerifyService::Stub> stub = move(_stubQueue.front());		// 获取stub_stubQueue.pop();														// 弹出stubreturn stub;}/* 归还一个stub */void ReturnStub(unique_ptr<VerifyService::Stub> stub){lock_guard<mutex> lock(_mutex);											// 加锁if (_bStop)																// 线程池停止return;_stubQueue.push(move(stub));											// 存入stub队列_cv.notify_one();														// 通知一个线程}private:atomic<bool> _bStop;														// 该线程池是否停止size_t _poolSize;															// 线程池大小queue<unique_ptr<VerifyService::Stub>> _stubQueue;							// 存放stub的队列mutex _mutex;																// 互斥锁condition_variable _cv;														// 条件变量
};

2.3 修改VerifyGrpcClient请求服务器逻辑

*.h

class VerifyGrpcClient : public Singletion<VerifyGrpcClient>
{friend class Singletion<VerifyGrpcClient>;
public:~VerifyGrpcClient();/* 向GRPC服务器请求验证码 */GetVerifyRsponse GetVerifyCode(string email);private:VerifyGrpcClient();StubPool* _stubPool = nullptr;												// 存放stub的线程池
};

*.cpp

#include "VerifyGrpcClient.h"VerifyGrpcClient::VerifyGrpcClient()
{_stubPool = new StubPool(5);													// 创建连接池
}VerifyGrpcClient::~VerifyGrpcClient()
{}GetVerifyRsponse VerifyGrpcClient::GetVerifyCode(string email)
{ClientContext context;															// GRPC上下文GetVerifyRsponse response;														// 响应对象GetVerifyRequest request;														// 请求对象request.set_email(email);														unique_ptr<VerifyService::Stub> stub_ = _stubPool->GetStub();					// 从连接池中获取一个连接(unique_ptr的移动拷贝构造)Status status = stub_->GetVerifyCode(&context, request, &response);				// 发起GRPC请求if (status.ok()){return response;}else{response.set_error(ErrorCodes::RPC_FAILED);return response;}_stubPool->ReturnStub(move(stub_));												// 归还连接到连接池,由于stub_是左值,此时需要move一下变成右值
}

2.4 编译

没有问题

测试一下代码是否生效

正常生效,没有任何问题

http://www.xdnf.cn/news/4039.html

相关文章:

  • C 语言逻辑运算符:组合判断,构建更复杂的条件
  • Cisco Packet Tracer 选项卡的使用
  • Python中的客户端和服务端交互的基本内容
  • vue实现AI问答Markdown打字机效果
  • 【C/C++】函数模板
  • Auto.js 脚本:清理手机数据但保留账号
  • 第R8周:RNN实现阿尔兹海默病诊断(pytorch)
  • 基于EFISH-SCB-RK3576工控机/SAIL-RK3576核心板的网络安全防火墙技术方案‌(国产化替代J1900的全栈技术解析)
  • Python生活手册-正则表达式:从快递单到咖啡订单的文本魔法
  • Level DB --- MergingIterator
  • Compose 中使用 WebView
  • 基于YOLOv的目标检测训练数据构建方法研究—图像采集、标注、划分与增强一体化流程设计
  • Softmax回归与单层感知机对比
  • 【platform push 提示 Invalid source ref: HEAD】
  • 双目视觉的核心目标
  • NGINX 的 ngx_http_auth_jwt_module模块
  • 模块方法模式(Module Method Pattern)
  • JavaScript 实现输入框的撤销功能
  • 算力经济模型推演:从中心化到去中心化算力市场的转变(区块链+智能合约的算力交易原型设计)
  • Python项目源码57:数据格式转换工具1.0(csv+json+excel+sqlite3)
  • C++ 类与对象(下)—— 进阶特性与底层机制解析(构造函数初始化,类型转换,static成员,友元,内部类,匿名对象)
  • 基于 HTML 和 CSS 实现的 3D 翻转卡片效果
  • WebRTC 服务器之SRS服务器概述和环境搭建
  • 【算法笔记】动态规划基础(二):背包dp
  • TopK题-快速选择方法
  • 数据结构实验8.1:图的基本操作
  • 联邦学习的深度解析,有望打破数据孤岛
  • 005-nlohmann/json 基础方法-C++开源库108杰
  • Sim Studio 是一个开源的代理工作流程构建器。Sim Studio 的界面是一种轻量级、直观的方式,可快速构建和部署LLMs与您最喜欢的工具连接
  • 网络安全自动化:找准边界才能筑牢安全防线