当前位置: 首页 > news >正文

用 epoll 实现的 Reactor 模式详解(含代码逐块讲解)

在高并发网络编程中,如何高效地处理大量连接是一个核心问题。传统的阻塞 I/O 或者“一连接一线程”的模型,在线程数和上下文切换上会造成巨大的开销。为了解决这些问题,Reactor 模式应运而生,它结合了 I/O 复用技术(如 epoll),成为构建高性能服务器的常见架构。

本文将从 Reactor 原理 入手,结合一段基于 epoll 的 C 语言实现代码,详细讲解 Reactor 是如何工作的

一、Reactor 模式原理

Reactor 模式可以理解为一个 事件驱动模型,它的核心思想是:

  • 将 I/O 事件(如连接请求、数据可读、可写)交给内核去监听;

  • 一旦有事件发生,内核会通知 Reactor;

  • Reactor 再分发事件给对应的处理函数(回调)。

通俗点说:

  • Reactor 就像“事件总线”,它负责把 事件(事件分发)事件处理(回调函数) 解耦。

  • 程序员只需要关注“当某个事件发生时我要干什么”。

对比:

  • 阻塞 I/O:线程被阻塞在 accept/recv 上,浪费资源。

  • 多线程:一个连接一个线程,线程切换开销大。

  • Reactor + epoll:少量线程监听所有事件,事件就绪时再分发处理。

二、代码实现思路

代码整体分为以下几个模块:

  1. 连接对象结构体(conn_list):保存 fd、读写缓冲区、回调函数。

  2. 事件注册函数(set_event/event_register):负责向 epoll 注册、修改事件。

  3. accept 回调(accept_cb):处理新客户端的连接。

  4. recv 回调(recv_cb):处理客户端发来的数据。

  5. send 回调(send_cb):向客户端发送数据。

  6. 主循环(main + epoll_wait):Reactor 的核心,监听并分发事件。

三、代码分块详解

1) 连接上下文结构体 struct conn

struct conn{int fd;char rbuffer[BUFFER_LENGTH];int rlength;char wbuffer[BUFFER_LENGTH];int wlength;RCALLBACK send_callback;union{RCALLBACK recv_callback;RCALLBACK accept_callback;} r_action;
};
  • 每个连接(index 用 fd)维护一个结构:读取缓冲区 rbuffer + 长度 rlength、写缓冲区 wbuffer + 长度 wlength

  • r_action.recv_callback:当该 fd 可读时调用的函数(对监听 socket 这位置复用为 accept_cb)。

  • send_callback:当该 fd 可写时调用的函数(send_cb)。

  • 这样的设计方便事件分派:在 epoll_wait 中直接用 conn_list[fd].r_action.recv_callback(connfd); 调用对应 handler。

2) init_server(port) —— socket 初始化

int init_server(unsigned short port){int sockfd = socket(AF_INET, SOCK_STREAM, 0);// bind、listenreturn sockfd;
}

标准服务器初始化:socketbindlisten

3) set_event(fd, event, flag) —— 封装 epoll_ctl

int set_event(int fd, int event, int flag){if(flag){ // addstruct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);}else{ // modstruct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}
}
  • flag != 0EPOLL_CTL_ADD(新增);flag == 0EPOLL_CTL_MOD(修改)。

  • ev.data.fd = fd:回传 fd,主循环用来定位 conn_list

4) event_register(fd, event) —— 初始化 conn_list 并 ADD

int event_register(int fd, int event){
conn_list[fd].fd = fd;
conn_list[fd].r_action.recv_callback = recv_cb;
conn_list[fd].send_callback = send_cb;memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
conn_list[fd].rlength = 0;memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
conn_list[fd].wlength = 0;set_event(fd, event, 1);
}
  • 初始化一个连接对象(清空读写缓冲区);

  • 设置回调函数;

  • 注册到 epoll。

5) accept_cb(fd) —— 新连接处理

int accept_cb(int fd){
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
printf("accept finished: %d\n", clientfd);event_register(clientfd, EPOLLIN);
return 0;
}
  • 当监听 socket 可读时,说明有新连接到来;

  • 调用 accept 获取新客户端 fd;

  • 使用 event_register 注册该客户端,关注 EPOLLIN(读事件)。

6) recv_cb(fd) —— 读回调(核心逻辑)

int recv_cb(int fd){int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);if(count == 0){ // 客户端断开close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);return 0;}conn_list[fd].rlength = count;// echo: 拷贝到 wbufferconn_list[fd].wlength = conn_list[fd].rlength;memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);set_event(fd, EPOLLOUT, 0); // 切换为可写监听return count;
}
  • 从客户端读数据,存到 rbuffer

  • 如果 count==0,说明客户端断开,关闭连接;

  • 否则将数据复制到写缓冲区;

  • 把收到的数据长度写入 rlength,并拷贝到 wbuffer,设置 wlength —— 因为后续 send_cb 会使用这些字段发送回去。

  • 最后通过 set_event(fd, EPOLLOUT, 0) 修改该 fd 在 epoll 中关注为可写事件,以便下一轮 epoll_wait 返回时触发写回调。

7) send_cb(fd) —— 写回调

int send_cb(int fd){int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);set_event(fd, EPOLLIN, 0);return count;
}
  • wbufferwlength 字节发送出去(注意:send 可能返回已发送字节数 < wlength,需要处理短写并保存未发送的数据)。

  • 发送完成后修改监听为 EPOLLIN,回到读等待。

8) main 主循环:注册监听 socket 并 dispatch

int sockfd = init_server(port);
epfd = epoll_create(1);// 初始化监听 socket 的 conn_list,绑定 accept_cb
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback = accept_cb;
set_event(sockfd, EPOLLIN, 1);while(1){struct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, -1);for(i=0;i<nready;i++){int connfd = events[i].data.fd;if(events[i].events & EPOLLIN){conn_list[connfd].r_action.recv_callback(connfd);}if(events[i].events & EPOLLOUT){conn_list[connfd].send_callback(connfd);}}
}
  • epoll_wait 阻塞等待事件发生;

  • 遍历所有就绪的 fd,根据事件类型(EPOLLIN/EPOLLOUT)调用回调函数;

0voice · GitHub

http://www.xdnf.cn/news/1462681.html

相关文章:

  • Vue3源码reactivity响应式篇之EffectScope
  • Android 应用进程启动
  • 趣味学RUST基础篇(构建一个命令行程序2重构)
  • 基于FPGA实现数字QAM调制系统
  • AiPPT生成的PPT内容质量怎么样?会不会出现逻辑混乱或数据错误?
  • 一键生成PPT的AI工具排名:2025年能读懂你思路的AI演示工具
  • 深度学习——迁移学习
  • 鸿蒙:获取UIContext实例的方法
  • Spring Boot+Nacos+MySQL微服务问题排查指南
  • 国产化PDF处理控件Spire.PDF教程:如何在 Java 中通过模板生成 PDF
  • 抓虫:sw架构防火墙服务启动失败 Unable to initialize Netlink socket: 不支持的协议
  • 还有人没搞懂住宅代理IP的属性优势吗?
  • java解析网络大端、小端解析方法
  • 信息安全基础知识
  • 云原生部署_Docker入门
  • 将 Android 设备的所有系统日志(包括内核日志、系统服务日志等)完整拷贝到 Windows 本地
  • android View详解—动画
  • Kali搭建sqli-labs靶场
  • modbus_tcp和modbus_rtu对比移植AT-socket,modbus_tcp杂记
  • 《sklearn机器学习——聚类性能指数》同质性,完整性和 V-measure
  • 从 Prompt 到 Context:LLM OS 时代的核心工程范式演进
  • [特殊字符] AI时代依然不可或缺:精通后端开发的10个GitHub宝藏仓库
  • Xilinx系列FPGA实现DP1.4视频收发,支持4K60帧分辨率,提供2套工程源码和技术支持
  • 【Arxiv 2025 预发行论文】重磅突破!STAR-DSSA 模块横空出世:显著性+拓扑双重加持,小目标、大场景统统拿下!
  • K8S的Pod为什么可以解析访问集群之外的域名地址
  • LeetCode刷题-top100( 矩阵置零)
  • android 四大组件—BroadcastReceiver
  • 《深入理解双向链表:增删改查及销毁操作》
  • 贪吃蛇鱼小游戏抖音快手微信小程序看广告流量主开源
  • 架构性能优化三板斧:从10秒响应到毫秒级的演进之路