用 epoll 实现的 Reactor 模式详解(含代码逐块讲解)
在高并发网络编程中,如何高效地处理大量连接是一个核心问题。传统的阻塞 I/O 或者“一连接一线程”的模型,在线程数和上下文切换上会造成巨大的开销。为了解决这些问题,Reactor 模式应运而生,它结合了 I/O 复用技术(如
epoll
),成为构建高性能服务器的常见架构。本文将从 Reactor 原理 入手,结合一段基于
epoll
的 C 语言实现代码,详细讲解 Reactor 是如何工作的。
一、Reactor 模式原理
Reactor 模式可以理解为一个 事件驱动模型,它的核心思想是:
-
将 I/O 事件(如连接请求、数据可读、可写)交给内核去监听;
-
一旦有事件发生,内核会通知 Reactor;
-
Reactor 再分发事件给对应的处理函数(回调)。
通俗点说:
-
Reactor 就像“事件总线”,它负责把 事件(事件分发) 和 事件处理(回调函数) 解耦。
-
程序员只需要关注“当某个事件发生时我要干什么”。
对比:
-
阻塞 I/O:线程被阻塞在
accept/recv
上,浪费资源。 -
多线程:一个连接一个线程,线程切换开销大。
-
Reactor + epoll:少量线程监听所有事件,事件就绪时再分发处理。
二、代码实现思路
代码整体分为以下几个模块:
-
连接对象结构体(conn_list):保存 fd、读写缓冲区、回调函数。
-
事件注册函数(set_event/event_register):负责向 epoll 注册、修改事件。
-
accept 回调(accept_cb):处理新客户端的连接。
-
recv 回调(recv_cb):处理客户端发来的数据。
-
send 回调(send_cb):向客户端发送数据。
-
主循环(main + epoll_wait):Reactor 的核心,监听并分发事件。
三、代码分块详解
1) 连接上下文结构体 struct conn
struct conn{int fd;char rbuffer[BUFFER_LENGTH];int rlength;char wbuffer[BUFFER_LENGTH];int wlength;RCALLBACK send_callback;union{RCALLBACK recv_callback;RCALLBACK accept_callback;} r_action;
};
-
每个连接(index 用 fd)维护一个结构:读取缓冲区
rbuffer
+ 长度rlength
、写缓冲区wbuffer
+ 长度wlength
。 -
r_action.recv_callback
:当该 fd 可读时调用的函数(对监听 socket 这位置复用为 accept_cb)。 -
send_callback
:当该 fd 可写时调用的函数(send_cb)。 -
这样的设计方便事件分派:在
epoll_wait
中直接用conn_list[fd].r_action.recv_callback(connfd);
调用对应 handler。
2) init_server(port)
—— socket 初始化
int init_server(unsigned short port){int sockfd = socket(AF_INET, SOCK_STREAM, 0);// bind、listenreturn sockfd;
}
标准服务器初始化:socket
→ bind
→ listen
。
3) set_event(fd, event, flag)
—— 封装 epoll_ctl
int set_event(int fd, int event, int flag){if(flag){ // addstruct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);}else{ // modstruct epoll_event ev;ev.events = event;ev.data.fd = fd;epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ev);}
}
-
flag != 0
→EPOLL_CTL_ADD
(新增);flag == 0
→EPOLL_CTL_MOD
(修改)。 -
ev.data.fd = fd
:回传 fd,主循环用来定位conn_list
。
4) event_register(fd, event)
—— 初始化 conn_list 并 ADD
int event_register(int fd, int event){
conn_list[fd].fd = fd;
conn_list[fd].r_action.recv_callback = recv_cb;
conn_list[fd].send_callback = send_cb;memset(conn_list[fd].rbuffer, 0, BUFFER_LENGTH);
conn_list[fd].rlength = 0;memset(conn_list[fd].wbuffer, 0, BUFFER_LENGTH);
conn_list[fd].wlength = 0;set_event(fd, event, 1);
}
-
初始化一个连接对象(清空读写缓冲区);
-
设置回调函数;
-
注册到 epoll。
5) accept_cb(fd)
—— 新连接处理
int accept_cb(int fd){
struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);int clientfd = accept(fd, (struct sockaddr*)&clientaddr, &len);
printf("accept finished: %d\n", clientfd);event_register(clientfd, EPOLLIN);
return 0;
}
-
当监听 socket 可读时,说明有新连接到来;
-
调用
accept
获取新客户端 fd; -
使用
event_register
注册该客户端,关注EPOLLIN
(读事件)。
6) recv_cb(fd)
—— 读回调(核心逻辑)
int recv_cb(int fd){int count = recv(fd, conn_list[fd].rbuffer, BUFFER_LENGTH, 0);if(count == 0){ // 客户端断开close(fd);epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);return 0;}conn_list[fd].rlength = count;// echo: 拷贝到 wbufferconn_list[fd].wlength = conn_list[fd].rlength;memcpy(conn_list[fd].wbuffer, conn_list[fd].rbuffer, conn_list[fd].wlength);set_event(fd, EPOLLOUT, 0); // 切换为可写监听return count;
}
-
从客户端读数据,存到
rbuffer
; -
如果
count==0
,说明客户端断开,关闭连接; -
否则将数据复制到写缓冲区;
-
把收到的数据长度写入
rlength
,并拷贝到wbuffer
,设置wlength
—— 因为后续send_cb
会使用这些字段发送回去。 -
最后通过
set_event(fd, EPOLLOUT, 0)
修改该 fd 在 epoll 中关注为可写事件,以便下一轮epoll_wait
返回时触发写回调。
7) send_cb(fd)
—— 写回调
int send_cb(int fd){int count = send(fd, conn_list[fd].wbuffer, conn_list[fd].wlength, 0);set_event(fd, EPOLLIN, 0);return count;
}
-
把
wbuffer
的wlength
字节发送出去(注意:send
可能返回已发送字节数< wlength
,需要处理短写并保存未发送的数据)。 -
发送完成后修改监听为
EPOLLIN
,回到读等待。
8) main
主循环:注册监听 socket 并 dispatch
int sockfd = init_server(port);
epfd = epoll_create(1);// 初始化监听 socket 的 conn_list,绑定 accept_cb
conn_list[sockfd].fd = sockfd;
conn_list[sockfd].r_action.recv_callback = accept_cb;
set_event(sockfd, EPOLLIN, 1);while(1){struct epoll_event events[1024] = {0};int nready = epoll_wait(epfd, events, 1024, -1);for(i=0;i<nready;i++){int connfd = events[i].data.fd;if(events[i].events & EPOLLIN){conn_list[connfd].r_action.recv_callback(connfd);}if(events[i].events & EPOLLOUT){conn_list[connfd].send_callback(connfd);}}
}
-
epoll_wait
阻塞等待事件发生; -
遍历所有就绪的 fd,根据事件类型(EPOLLIN/EPOLLOUT)调用回调函数;
0voice · GitHub