【C/C++】高性能网络编程之Reactor模型
文章目录
- Reactor模型
- 1 核心思想
- 2 组成组件
- 3 工作流程图
- 4 常见的三种 Reactor 模型
- 4.1 单 Reactor 单线程
- 4.2 单 Reactor 多线程
- 4.3 多 Reactor 多线程(主从 Reactor)
- 5 和 Proactor 区别
- 6 入门教程c++
- 目标
- 设计
- 实现代码
- 7 总结
Reactor模型
Reactor 模型是一种事件驱动的并发模型,广泛应用于高性能网络服务器开发中,比如 Nginx、Redis、Muduo、libevent 等,属于 同步非阻塞 IO(Reactor 与 Proactor 是两大典型 IO 模型)。
1 核心思想
Reactor 模型通过注册事件和对应的处理器(handler),当某个事件发生时,由事件分发器(Demultiplexer)将其分发给相应的处理器处理。其核心流程为:
事件发生 → 事件多路复用器检测到事件 → 分发事件给处理器 → 处理器处理事件
2 组成组件
组件 | 说明 |
---|---|
Reactor(反应器) | 核心调度模块,负责监听 IO 事件并分发到相应处理器 |
Demultiplexer(多路分发器) | 常用 select / poll / epoll ,用于等待事件 |
Handler(事件处理器) | 处理具体的读/写/连接等事件逻辑 |
Acceptor(连接处理器) | 处理新连接接入事件 |
Channel(通道) | 封装描述符及事件类型、回调函数 |
3 工作流程图
+-------------------------+
| Reactor |
+-------------------------+|v
+-------------------------+
| IO 多路复用器(epoll) |
+-------------------------+|事件发生(如连接/读/写)|v
+-------------------------+
| 分发给对应的 Handler |
+-------------------------+|v
+-------------------------+
| 执行读/写/业务处理逻辑 |
+-------------------------+
4 常见的三种 Reactor 模型
4.1 单 Reactor 单线程
- 所有事件都由一个线程完成监听和处理。
- 简单但性能有限,适用于低并发场景(如 Redis)。
Reactor 线程: 监听事件 → 处理事件(accept/read/write/业务处理)
- 优点:结构简单,适合逻辑简单、IO 密集场景。
- 缺点:处理慢或阻塞将影响其他事件。
4.2 单 Reactor 多线程
- 主线程负责事件分发,多个工作线程负责业务处理。
- 比如:主线程只处理 accept,交由线程池处理 read/write。
Reactor 线程: accept
Worker 线程池: read/write/业务处理
- 优点:适合并发较高,业务处理耗时场景。
- 缺点:需要加锁、线程同步,逻辑复杂。
4.3 多 Reactor 多线程(主从 Reactor)
- 主 Reactor:只负责 accept,新连接后分发给从 Reactor。
- 从 Reactor:每个从线程监听部分连接,处理其 IO 事件。
主 Reactor: accept
从 Reactor: read/write
工作线程池: 业务逻辑处理
- Nginx、Muduo 等采用此结构。
- 优点:高性能、高并发、分工明确。
- 缺点:结构复杂,实现难度大。
5 和 Proactor 区别
特性 | Reactor | Proactor |
---|---|---|
IO 模式 | 同步非阻塞 | 异步 IO |
事件处理 | 应用负责读/写处理 | 内核完成 IO,应用只处理结果 |
使用平台 | POSIX/Linux 常见 | Windows 支持良好 |
示例 | Nginx、Muduo、libevent、Netty 等 | Windows IOCP、Boost.Asio(部分模式) |
6 入门教程c++
目标
- epoll 多路复用;
- 非阻塞套接字;
- 事件注册与回调;
- 支持新连接接入(Acceptor)和客户端数据读取(Handler);
- 单 Reactor + 单线程模型
设计
组件 | 职责 |
---|---|
Reactor | 主循环,监听事件并分发 |
Channel | 封装 fd + 事件类型 + 回调函数 |
Acceptor | 监听 socket,接收新连接 |
Handler | 处理连接上的 IO(如读/写) |
EventLoop | 封装 epoll,对应主线程或每个工作线程的事件循环 |
实现代码
reactor-demo/
├── main.cpp
├── EventLoop.h / .cpp
├── Channel.h / .cpp
├── Acceptor.h / .cpp
├── ConnectionHandler.h / .cpp
└── CMakeLists.txt
EventLoop.h
#pragma once
#include <vector>
#include <functional>class Channel;class EventLoop {
public:EventLoop();~EventLoop();void loop();void updateChannel(Channel* channel);private:int epoll_fd_;static const int MAX_EVENTS = 1024;
};
EventLoop.cpp
#include "EventLoop.h"
#include "Channel.h"
#include <sys/epoll.h>
#include <unistd.h>
#include <cstring>
#include <iostream>EventLoop::EventLoop() {epoll_fd_ = epoll_create1(0);
}EventLoop::~EventLoop() {close(epoll_fd_);
}void EventLoop::updateChannel(Channel* channel) {epoll_event ev;ev.events = channel->events();ev.data.ptr = channel;if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, channel->fd(), &ev) == -1) {std::cerr << "epoll_ctl failed\n";}
}void EventLoop::loop() {epoll_event events[MAX_EVENTS];while (true) {int n = epoll_wait(epoll_fd_, events, MAX_EVENTS, -1);for (int i = 0; i < n; ++i) {Channel* ch = static_cast<Channel*>(events[i].data.ptr);ch->handleEvent();}}
}
Channel.h
#pragma once
#include <functional>class Channel {
public:using Callback = std::function<void()>;Channel(int fd);void setReadCallback(Callback cb);void handleEvent();int fd() const;int events() const;private:int fd_;int events_;Callback read_callback_;
};
Channel.cpp
#include "Channel.h"
#include <sys/epoll.h>Channel::Channel(int fd): fd_(fd), events_(EPOLLIN) {}void Channel::setReadCallback(Callback cb) {read_callback_ = std::move(cb);
}void Channel::handleEvent() {if (read_callback_) read_callback_();
}int Channel::fd() const { return fd_; }
int Channel::events() const { return events_; }
Acceptor.h
#pragma once
#include "Channel.h"class EventLoop;class Acceptor {
public:Acceptor(EventLoop* loop, int port, std::function<void(int)> new_conn_cb);void listen();private:int listen_fd_;Channel channel_;std::function<void(int)> new_conn_cb_;
};
Acceptor.cpp
#include "Acceptor.h"
#include "EventLoop.h"
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <iostream>static int setNonBlocking(int fd) {int flags = fcntl(fd, F_GETFL, 0);return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}Acceptor::Acceptor(EventLoop* loop, int port, std::function<void(int)> cb): listen_fd_(socket(AF_INET, SOCK_STREAM, 0)),channel_(listen_fd_),new_conn_cb_(cb) {sockaddr_in addr {};addr.sin_family = AF_INET;addr.sin_addr.s_addr = INADDR_ANY;addr.sin_port = htons(port);bind(listen_fd_, (sockaddr*)&addr, sizeof(addr));setNonBlocking(listen_fd_);::listen(listen_fd_, SOMAXCONN);channel_.setReadCallback([this]() {sockaddr_in cli_addr;socklen_t len = sizeof(cli_addr);int conn_fd = accept(listen_fd_, (sockaddr*)&cli_addr, &len);if (conn_fd >= 0) {setNonBlocking(conn_fd);new_conn_cb_(conn_fd);}});loop->updateChannel(&channel_);
}
ConnectionHandler.h
#pragma once
#include "Channel.h"
#include <memory>class EventLoop;class ConnectionHandler {
public:ConnectionHandler(EventLoop* loop, int fd);private:int conn_fd_;Channel channel_;
};
ConnectionHandler.cpp
#include "ConnectionHandler.h"
#include "EventLoop.h"
#include <unistd.h>
#include <iostream>ConnectionHandler::ConnectionHandler(EventLoop* loop, int fd): conn_fd_(fd), channel_(fd) {channel_.setReadCallback([this]() {char buf[1024] = {0};ssize_t n = read(conn_fd_, buf, sizeof(buf));if (n > 0) {std::cout << "Recv: " << buf;write(conn_fd_, buf, n); // echo} else if (n == 0) {close(conn_fd_);}});loop->updateChannel(&channel_);
}
main.cpp
#include "EventLoop.h"
#include "Acceptor.h"
#include "ConnectionHandler.h"int main() {EventLoop loop;Acceptor acceptor(&loop, 12345, [&](int conn_fd) {new ConnectionHandler(&loop, conn_fd);});loop.loop();return 0;
}
CMakeLists.txt
cmake_minimum_required(VERSION 3.10)
project(ReactorDemo)set(CMAKE_CXX_STANDARD 17)add_executable(ReactorDemomain.cppEventLoop.cppChannel.cppAcceptor.cppConnectionHandler.cpp
)
编译并运行:
mkdir build && cd build
cmake ..
make
./ReactorDemo
telnet
测试:
telnet 127.0.0.1 12345
输入数据,能看到回显,说明 Reactor 模型生效。
7 总结
- Reactor 模型是现代高性能服务器开发的核心框架。
- 利用 IO 多路复用机制,有效提升单线程处理多个客户端连接的能力。
- 推荐结合 epoll、线程池、状态机 进一步构建高性能系统。