当前位置: 首页 > ops >正文

linux网络编程之单reactor模型(一)

        Reactor 是一种事件驱动的设计模式(Event-Driven Pattern),主要用于处理高并发 I/O,特别适合网络服务器场景。它通过一个多路复用机制监听多个事件源(如 socket 文件描述符),并在事件就绪时将事件分发给对应的处理器(回调函数)执行。

六、单reactor单线程模型

1、核心思想

        利用一个统一的事件分发中心(EventLoop),在单个线程中通过 I/O 多路复用机制(如 epoll)高效监听和响应多个并发连接的 I/O 事件,将事件检测与事件处理解耦,从而实现结构简单、性能良好的并发 I/O 服务器。

2、核心组件

类名主要职责
Channel封装一个 fd 的事件及回调,是 fd 与 epoll 的桥梁
EventLoop管理 epoll 及事件分发,是每个线程中的核心循环
TcpServer管理监听 socket 和连接接入逻辑,创建 TcpConn
TcpConn封装已建立的连接的读写处理、缓冲、生命周期等
1)Channel类-事件通道

Channel 是 一个 fd 与其事件处理逻辑之间的中介,负责:

  • 事件注册:设置监听哪些事件(如 EPOLLIN, EPOLLOUT

  • 事件触发:内核通知时,调用用户设置的回调函数

  • 与 EventLoop 协作:将事件注册/修改到 epoll

.h

/* 负责在事件分发系统中起到“事件通道”的作用,连接底层的I/O多路复用机制(如epoll、select、poll)和具体的事件处理逻辑 */
class EventLoop;class Channel {
public:Channel(EventLoop& loop, int fd);~Channel();// 设置 fd 对应的感兴趣事件(EPOLLIN/EPOLLOUT)void EnableReading();void EnableWriting();void DisableWriting();void DisableAll();// 用户提供的读写事件回调void SetReadCallback(std::function<void()> cb);void SetWriteCallback(std::function<void()> cb);// 实际处理事件触发,调用该函数判断是否调用 read_cb_ 或 write_cb_void HandleEvent(uint32_t events);// 获取 fdint GetFd() const;// 获取事件uint32_t GetEvents() const;// 将当前 Channel 注册到 epoll 或修改其状态。void Update();private:int fd_;                          // 监听的文件描述符bool added_ = false;              // 是否已添加到 epollEventLoop& loop_;                 // 所属的事件循环uint32_t events_;                 // 当前监听的事件类型(EPOLLIN/EPOLLOUT)std::function<void()> read_cb_;   // 读事件回调std::function<void()> write_cb_;  // 写事件回调
};
函数名功能调用时机
EnableReading()关注读事件并更新 epoll 状态监听 socket / 连接 socket 可读时
EnableWriting()关注写事件(注册 EPOLLOUT)Send() 数据写不完时注册
DisableWriting()注销写事件(避免忙等)写完所有 buffer 后
HandleEvent(events)根据 epoll 返回事件触发回调epoll_wait 返回后由 EventLoop 调用
Update()将本 Channel 注册或修改到 epoll每次 Enable/Disable 事件后必须调用

.cpp

Channel::Channel(EventLoop& loop, int fd): fd_(fd), loop_(loop), events_(0) {}Channel::~Channel() {DisableAll();
}void Channel::EnableReading() {events_ |= EPOLLIN;Update();
}void Channel::EnableWriting() {events_ |= EPOLLOUT;Update();
}void Channel::DisableWriting() {events_ &= ~EPOLLOUT;Update();
}void Channel::DisableAll() {loop_.DelEvent(fd_);events_ = 0;
}void Channel::SetReadCallback(std::function<void()> cb) {read_cb_ = std::move(cb);
}void Channel::SetWriteCallback(std::function<void()> cb) {write_cb_ = std::move(cb);
}void Channel::HandleEvent(uint32_t events) {if ((events & EPOLLIN) && read_cb_) read_cb_();if ((events & EPOLLOUT) && write_cb_) write_cb_();
}int Channel::GetFd() const {return fd_;
}uint32_t Channel::GetEvents() const {return events_;
}void Channel::Update() {if (!added_) {loop_.AddEvent(fd_, events_, this);added_ = true;} else {loop_.ModEvent(fd_, events_, this);}
}
2)EventLoop类-事件循环

EventLoop 是 Reactor 的核心调度器,负责:

  • 管理所有 Channel 的事件监听

  • 调用 epoll_wait 等待就绪事件

  • 分发事件并调用 Channel 的处理函数

.h

#define MAX_EVENTS 1024/* 负责事件的等待、分发和调度执行*/
class EventLoop
{
public:EventLoop() ;~EventLoop();void AddEvent(int fd, uint32_t events, void *ptr);void ModEvent(int fd, uint32_t events, void *ptr);void DelEvent(int fd);void Run();void stop();
private:int epfd_;bool running_;
};
函数名功能说明
Run()启动事件循环持续调用 epoll_wait 并触发回调
stop()停止事件循环设置 running_ 为 false
AddEvent(fd, events, ptr)添加一个 fd 到 epollfd 封装在 Channel 内,由 TcpConn 创建
ModEvent(fd, events, ptr)修改 fd 的监听事件典型于 Send() 或关闭写事件
DelEvent(fd)从 epoll 中删除 fd连接关闭、Channel 销毁时

.cpp

EventLoop::EventLoop() : epfd_(::epoll_create1(0)), running_(true)
{if (epfd_ == -1){std::cerr << "epoll_create error: " << errno << std::endl;exit(EXIT_FAILURE);}
}EventLoop::~EventLoop()
{close(epfd_);
}void EventLoop::AddEvent(int fd, uint32_t events, void *ptr)
{epoll_event ev;ev.events = events;ev.data.ptr = ptr;if (::epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev) == -1){std::cerr << "epoll_ctl add error: " << errno << std::endl;}
}void EventLoop::ModEvent(int fd, uint32_t events, void *ptr)
{epoll_event ev;ev.events = events;ev.data.ptr = ptr;if (::epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &ev) == -1){std::cerr << "epoll_ctl mod error: " << errno << std::endl;}
}void EventLoop::DelEvent(int fd)
{if (::epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr) == -1){std::cerr << "epoll_ctl del error: " << errno << std::endl;}
}void EventLoop::Run()
{epoll_event events[MAX_EVENTS];while (running_){// 超时参数传入TimerInstance()->WaitTime(),功能是确保 epoll_wait 最迟要在定时任务到期时返回,否则任务会延迟处理。int nfds = ::epoll_wait(epfd_, events, MAX_EVENTS, TimerInstance()->WaitTime());if (nfds == -1){if (errno == EINTR) // EINTR(系统中断)时忽略重试,其他错误打印后直接返回。continue;std::cerr << "epoll_wait error: " << errno << std::endl;return;}// 遍历就绪事件数组for (int i = 0; i < nfds; ++i){// 获取就绪事件存储在 data.ptr 的 Channel*Channel* ch = static_cast<Channel*>(events[i].data.ptr);// 触发读/写等回调ch->HandleEvent(events[i].events);}// 处理定时器任务TimerInstance()->HandleTimeout();}
}void EventLoop::stop() {running_ = false;
}
3)TcpServer-连接管理

TcpServer 是服务端框架的顶层组织者,负责:

  • 创建监听 socket:创建 socket、bind、listen

  • 创建Accept Channel:负责接收新连接事件并注册到 EventLoop

  • 创建新连接回调:接收到新连接后,通过回调交由用户处理

.h

class TcpConn;
class EventLoop;class TcpServer {
public:// 新连接回调,供用户处理新连接事件。using NewConnCallback = std::function<void(std::shared_ptr<TcpConn>)>;TcpServer(EventLoop& loop);~TcpServer();// 启动 TCP 服务监听,注册新连接的回调void Start(uint16_t port, NewConnCallback cb);private:// 新连接建立时的处理逻辑,作为回调函数使用void HandleAccept();private:EventLoop& loop_;int listen_fd_;std::shared_ptr<Channel> accept_channel_; // 监听 fd 的事件通道,负责读事件注册(新连接到来)NewConnCallback new_conn_cb_;             // 外部注入的新连接回调函数
};
函数名功能说明
Start(port, cb)初始化 socket,设置 Channel 和回调注册 EPOLLIN 用于接收连接
HandleAccept()有新连接到来时被触发accept() 然后调用回调构建 TcpConn
new_conn_cb_用户设置的处理逻辑通常设置为 lambda 创建 TcpConn 实例

.cpp

TcpServer::TcpServer(EventLoop& loop): loop_(loop), listen_fd_(-1) {}TcpServer::~TcpServer() {if (listen_fd_ != -1) {accept_channel_->DisableAll();close(listen_fd_);}
}void TcpServer::Start(uint16_t port, NewConnCallback cb) {new_conn_cb_ = std::move(cb); // 保存回调函数// 创建 IPv4 TCP 套接字,非阻塞模式listen_fd_ = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);if (listen_fd_ == -1) {std::cerr << "socket error: " << errno << std::endl;return;}// 配置 SO_REUSEADDR(端口立即重用,避免“Address already in use”) 和 SO_REUSEPORT(多进程监听同一端口,可用于多核负载均衡)int opt = 1;setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));// 绑定 socket 到指定端口地址sockaddr_in addr{};addr.sin_family = AF_INET;addr.sin_addr.s_addr = INADDR_ANY;addr.sin_port = htons(port);if (bind(listen_fd_, (sockaddr*)&addr, sizeof(addr)) == -1) {std::cerr << "bind error: " << errno << std::endl;close(listen_fd_);return;}// 开始监听,监听队列长度为 SOMAXCONN(4096)if (listen(listen_fd_, SOMAXCONN) == -1) {std::cerr << "listen error: " << errno << std::endl;close(listen_fd_);return;}// 创建 Channel 对象,监听 listen_fd_ 上的事件,会注册到 epoll 中accept_channel_ = std::make_shared<Channel>(loop_, listen_fd_);// 设置读事件回调函数:即新连接到来时应调用的逻辑函数accept_channel_->SetReadCallback([this]() { HandleAccept(); });// 启动监听accept_channel_->EnableReading();std::cout << "Server listening on port " << port << std::endl;
}void TcpServer::HandleAccept() {sockaddr_in client_addr{};socklen_t len = sizeof(client_addr);// 接受新连接,返回新的客户端 fd,设置非阻塞int conn_fd = accept4(listen_fd_, (sockaddr*)&client_addr, &len, SOCK_NONBLOCK);if (conn_fd == -1) return;// 创建新的连接对象(TcpConn)管理该客户端 fdauto conn = std::make_shared<TcpConn>(conn_fd, loop_);// 调用用户逻辑处理这个连接if (new_conn_cb_) new_conn_cb_(conn);
}
4)TcpConn-与客户端通信

TcpConn 封装了每个 TCP 连接的生命周期、读写事件、缓冲区管理等。

.h

/**声明 TcpConn 类,同时继承 enable_shared_from_this,方便在回调中安全获取 shared_ptr<TcpConn> 自己的智能指针。*/
class TcpConn : public std::enable_shared_from_this<TcpConn> {
public:// 声明回调函数using ReadCallback = std::function<void()>;using CloseCallback = std::function<void()>;TcpConn(int fd, EventLoop& loop);~TcpConn();// 设置回调函数void SetReadCallback(ReadCallback cb);void SetCloseCallback(CloseCallback cb);// 异步发送数据int Send(const char* data, size_t size);// 获取当前接收缓冲区内的全部数据std::string GetAllData();private:// 内部事件处理函数:读取、写入、关闭连接void HandleRead();void HandleWrite();void Close();private:int fd_;bool closed_;EventLoop& loop_;MessageBuffer input_buffer_;       // 读缓冲区std::string output_buffer_{};        // 写缓冲区std::shared_ptr<Channel> channel_; // 封装 fd 的 epoll 管理类ReadCallback read_cb_;             // 外部注入的回调函数CloseCallback close_cb_;
};
函数名功能调用说明
HandleRead()可读事件触发时从 fd 读数据到 input_buffer_然后触发 read_cb_
HandleWrite()可写事件触发时将 output_buffer_ 中数据写出写完后注销 EPOLLOUT
Send(data)异步发送数据若 fd 可写则立即写,否则加入 buffer
GetAllData()获取 input_buffer_ 所有数据可在 onMessage 中使用
Close()主动关闭连接注销事件、关闭 fd、触发 close_cb_

.cpp

TcpConn::TcpConn(int fd, EventLoop& loop): fd_(fd), closed_(false), loop_(loop) {// // 设置 socket 为非阻塞模式// int flags = fcntl(fd, F_GETFL, 0);// fcntl(fd, F_SETFL, flags | O_NONBLOCK);// 创建 Channel 并设置读写回调,注册 EPOLLIN 监听可读事件。channel_ = std::make_shared<Channel>(loop_, fd);channel_->SetReadCallback([this]() { HandleRead(); });channel_->SetWriteCallback([this]() { HandleWrite(); });channel_->EnableReading();
}TcpConn::~TcpConn() {Close();
}void TcpConn::SetReadCallback(ReadCallback cb) {read_cb_ = std::move(cb);
}void TcpConn::SetCloseCallback(CloseCallback cb) {close_cb_ = std::move(cb);
}int TcpConn::Send(const char* data, size_t size) {// 若连接已关闭或无数据,直接返回。if (closed_ || data == nullptr || size == 0) return -1;// 如果写缓冲区中已有未发送的数据,追加数据并监听写事件。if (!output_buffer_.empty()) {output_buffer_.append(data, size);channel_->EnableWriting();return size;}// 直接发送(零拷贝),MSG_NOSIGNAL:防止对方关闭连接时触发 SIGPIPE 信号(使得进程崩溃)。int n = send(fd_, data, size, MSG_NOSIGNAL);if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // 非阻塞 socket 下,内核发送缓冲区可能暂时满// 将数据缓存到用户态 output_buffer_, 监听写事件output_buffer_.append(data, size);channel_->EnableWriting();} else if (n > 0 && n < static_cast<int>(size)) {   // 有部分数据被写入 socket(如发送了 n 字节,小于总长度 size)// 剩下的数据未能写完,需要缓存到 output_buffer_,并监听写事件output_buffer_.append(data + n, size - n);channel_->EnableWriting();} else if (n < 0) {Close();}return n;
}std::string TcpConn::GetAllData() {auto data = input_buffer_.GetAllData();if (data.first != nullptr) {// 获取读缓冲区全部有效数据std::string result(reinterpret_cast<char*>(data.first), data.second);// 标记已读的数据input_buffer_.ReadCompleted(data.second);return result;}return "";
}void TcpConn::HandleRead() {int err = 0;// 调用 MessageBuffer::Recv() 使用 readv() 读取数据,读取数据到缓冲区中。int n = input_buffer_.Recv(fd_, &err);if (n > 0 && read_cb_) {read_cb_(); // 触发读取回调逻辑} else if (n == 0 || (n < 0 && err != EAGAIN && err != EWOULDBLOCK)) { // 连接关闭或错误则关闭连接。Close();}
}void TcpConn::HandleWrite() {// 如果写缓冲区为空,则取消监听写事件。if (output_buffer_.empty()) {channel_->DisableWriting();return;}// 缓冲区不为空,调用 send() 发送数据,发送成功则删除发送缓冲区中的数据。int n = send(fd_, output_buffer_.data(), output_buffer_.size(), MSG_NOSIGNAL);if (n > 0) {output_buffer_.erase(0, n);if (output_buffer_.empty()) {channel_->DisableWriting();}} else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { // 非阻塞错误以外的写失败则关闭连接Close();}
}void TcpConn::Close() {if (closed_) return;closed_ = true;channel_->DisableAll();close(fd_);if (close_cb_) close_cb_();
}

3、完整调用流程

启动阶段:

  1. TcpServer::Start(port, cb)

    • 创建 listen_fd,设为非阻塞

    • 创建 accept_channel_ 封装 listen_fd

    • 设置其 read_cbTcpServer::HandleAccept

    • 将其注册到 EventLoop 中(AddEvent()


有连接到来:

  1. 内核通知 listen_fd 可读 → Channel::HandleEvent()read_cb_TcpServer::HandleAccept()

    • 调用 accept(),获得 conn_fd

    • 设置为非阻塞

    • 构建 TcpConn 对象,封装连接的生命周期

    • 创建该连接的 Channel,并注册其 read_cb_TcpConn::HandleRead


数据收发流程:

  1. conn_fd 可读 → epoll 通知 → EventLoop 触发 TcpConn::HandleRead()

    • 从 socket 读入数据到 input_buffer_

    • 调用 read_cb_ 让上层应用逻辑处理数据

  2. TcpConn::Send()

    • 若当前 fd 可写,则直接发送

    • 否则写入 output_buffer_,并注册 EPOLLOUT 事件

    • channel_->SetWriteCallback(...) 设定写回调

  3. conn_fd 可写 → TcpConn::HandleWrite()

    • output_buffer_ 中写出数据

    • 若 buffer 为空,注销写事件,调用 writeCompleteCallback

http://www.xdnf.cn/news/15121.html

相关文章:

  • Python 数据建模与分析项目实战预备 Day 2 - 数据构建与字段解析(模拟简历结构化数据)
  • 【前端】【组件库开发】【原理】【无框架开发】现代网页弹窗开发指南:从基础到优化
  • GNhao,获取跨境手机SIM卡跨境通信新选择!
  • 手机恢复出厂设置怎么找回数据?Aiseesoft FoneLab for Android数据恢复工具分享
  • Java中的泛型继承
  • 深度学习篇---昇腾NPUCANN 工具包
  • 《Java EE与中间件》实验三 基于Spring Boot框架的购物车
  • BLOB 数据的插入与读取详解
  • Linux驱动学习day22(interrupt子系统)
  • [python]在drf中使用drf_spectacular
  • 卢比危机下的金融破局:科伦坡交易所技术升级作战图
  • SpringBoot JWT
  • NFS文件存储及论坛项目搭建(php)
  • Web攻防-SSTI服务端模版注入利用分类语言引擎数据渲染项目工具挖掘思路
  • MCU芯片内部的ECC安全机制
  • OpenCV图像基本操作:读取、显示与保存
  • 《数据库》MySQL备份回复
  • AI加持的开源知识库新秀:PandaWiki,如何用它打造智能化文档系统?
  • 新作品:吃啥好呢 - 个性化美食推荐
  • [面试] 手写题-爬楼梯,斐波那契数列
  • 利用Claude code,只用文字版系统设计大纲,就能轻松实现系统~
  • Kafka——应该选择哪种Kafka?
  • 京东携手HarmonyOS SDK首发家电AR高精摆放功能
  • 【深度学习新浪潮】图像生成有哪些最新进展?
  • 光电耦合器在电冰箱开关电源的应用
  • pandas销售数据分析
  • Cesium实战:交互式多边形绘制与编辑功能完全指南(最终修复版)
  • 前端面试专栏-算法篇:23. 图结构与遍历算法
  • Java(7.11 设计模式学习)
  • python的社区残障人士服务系统