手写Muduo网络库核心代码2--Poller、EPollPoller详细讲解
Poller抽象层代码
Muduo 网络库中的 Poller 抽象层是其事件驱动模型的核心组件之一,负责统一封装不同 I/O 复用机制(如 epoll、poll),实现事件监听与分发。
Poller 抽象层的作用
- 统一 I/O 复用接口
- Poller 作为抽象基类,定义了 poll()、updateChannel()、removeChannel() 等纯虚函数,强制派生类(如 EPollPoller)实现这些接口。这使得上层模块(如 EventLoop)无需关心底层具体使用 epoll 还是 poll,只需通过基类指针操作,实现了 “多路分发器” 的功能
- 解耦事件循环与 I/O 机制
- EventLoop 依赖 Poller 监听文件描述符(fd)事件,但通过抽象层隔离了具体 I/O 复用机制的实现细节。例如,EventLoop::loop() 调用 Poller::poll() 等待事件,而无需知晓底层是 epoll_wait 还是 poll 调用
头文件
#pragma once
#include"noncopyable.h"
#include"Timestamp.h"
#include<vector>
#include<unordered_map>
class Channel;
class Eventloop;class Poller:noncopyable
{public:using ChannelList=std::vector<Channel*>;Poller(Eventloop *loop);virtual ~Poller()=default;//给所有I/O复用保留统一的接口virtual Timestamp poll(int timeoutMs,ChannelList *activeChannels)=0;virtual void updateChannel(Channel *channel)=0;virtual void removeChannel(Channel *channnel)=0; //判断参数channel是否在当前Poller当中bool hasChannel(Channel *channel) const;//事件循环可以通过当前接口获取默认的I/O复用的具体实现static Poller* newDefaultPoller(Eventloop *loop);protected://这里map的key就是sockfd value就是sockfd所属的通道类型channelusing ChannelMap=std::unordered_map<int,Channel*>;ChannelMap Channels_;private:Eventloop *ownerLoop_;//定义poller所属的事件循环EventLoop
};
1、存放有事件发生的 Channel
using ChannelList = std::vector<Channel*>;
poll 函数会把有事件发生的 Channel 指针填充到这个列表中。
Poller能监听到哪些channel发生事件了,获取活跃事件,然后遍历处理,然后上报给Eventloop,然后通知channel处理相应的事件
2、构造函数
Poller(EventLoop *loop);
初始化必须知道所属 EventLoop,
这是在建立 Poller→EventLoop 的反向指针
由 EventLoop 在初始化时创建(EventLoop 持有 Poller 实例)
3、事件轮循
virtual Timestamp poll(int timeoutMs, ChannelList *activeChannels) = 0;
定义了一个跨平台 I/O 多路复用的抽象接口,派生类必须重写用它来实现 epoll、poll 等系统调用,上层EventLoop 通过它等待事件并分发处理,返回事件发生的精确时间戳
4、更新/添加通道监控
virtual void updateChannel(Channel *channel) = 0;
当 Channel 的事件关注集变化时(如新增写监听)
调用底层 epoll_ctl/poll 更新监听状态
5、移除通道监控
virtual void removeChannel(Channel *channel) = 0;
当 Channel 销毁时(如连接关闭)
- 清理 Poller 内部资源
- 由 Channel::remove() 调用
- 清除 channels_ 映射关系
- 调用底层 epoll_ctl/poll 移除监听
6、判断参数channel是否在当前Poller当中
bool hasChannel(Channel *channel) const;
防止重复注册同一个 Channel 到 Poller 中,避免重复添加导致的崩溃或未定义行为。
7、默认I/O的复用的具体实现
通过该接口,可以获取到想要的具体的IO复用实例,是Epoll、Poll、还是select
static Poller* newDefaultPoller(EventLoop *loop);
这里又学到一招,这个接口的实现要放在一个单独的源文件(DefaultPoller.cc)中,因为这里需要 new 出来一个EpollPoller实例,如果放在(Poller.cc)中实现,就会出现基类包含子类头文件的情况。(Poller是基类,EPollPoller是子类)。
8、维护 fd → Channel 的映射
using ChannelMap = std::unordered_map<int, Channel*>;
ChannelMap Channels_;
这里map的key就是sockfd value就是sockfd所属的通道类型channel
快速通过 fd 查找 Channel,存储所有被管理的 Channel,避免重复管理同一个 fd.
9、poller所属的事件循环EventLoop
EventLoop *ownerLoop_;
反向指针 - 指向所属 EventLoop
源文件Poller.cc
#include"Poller.h"
#include"Channel.h"Poller::Poller(Eventloop *loop):ownerLoop_(loop){}bool Poller::hasChannel(Channel *channel) const
{auto it=Channels_.find(channel->fd());return it!=Channels_.end() && it->second==channel;
}
channel->fd():获取 Channel 底层的文件描述符
Channels_.find():在 unordered_map<int, Channel*> 中 O(1) 复杂度查找
源文件DefaultPoller.cc
//细节性设计,为了不让基类包含子类的头文件
#include"Poller.h"
#include"EPollPoller.h"
#include<stdlib.h>
Poller* Poller::newDefaultPoller(Eventloop *loop)
{if(::getenv("MUDUO_USE_POLL"))//获取环境变量,如果有则生成poll实例,(本代码没有实现){return nullptr;//生成poll的实例}else{return new EPollPoller(loop);//生成epoll的实例}
}
EPollPoller
接下来这个模块用来封装Linux 的 epoll I/O 多路复用机制
头文件
#pragma once
#include"Poller.h"
#include<vector>
#include<sys/epoll.h>
#include"Timestamp.h"class EPollPoller:public Poller{public:EPollPoller(Eventloop* loop);~EPollPoller() override;//重写基类Poller的方法Timestamp poll(int timeoutMs,ChannelList *activeChannels) override;void updateChannel(Channel *channel) override;void removeChannel(Channel *channel) override;
private:static const int kInitEventListSize=16;//给定的vector的一个初始长度默认是16,可以面试证明自己看过源码,为啥是静态//填写活跃的连接void fillActiveChannels(int numEvents,ChannelList* activeChannels) const;//更新channel通道void update(int operation ,Channel* channel);using EventList=std::vector<epoll_event>;int epollfd_;EventList events_;//这个将来是作为epoll_wait的第二个参数};
成员变量
static const int kInitEventListSize=16;//Muduo库默认给定的vector的一个初始长度默认是16
using EventList=std::vector<epoll_event>;
int epollfd_;
EventList events_;//这个将来是作为epoll_wait的第二个参数
定义一个数组,用于存储epoll_wait返回的就绪事件表,使用vector动态扩展事件缓冲区(当就绪事件数 == 缓冲区大小时自动扩容)
成员函数
1、重写Poller的方法
Timestamp poll(int timeoutMs,ChannelList *activeChannels) override;
void updateChannel(Channel *channel) override;
void removeChannel(Channel *channel) override;
这是在Poller类中声明的纯虚函数,继承Poller之后对这些函数必须进行重写,
- poll()函数:等待并获取当前有事件发生的 Channel 列表。调用 epoll_wait() 等待事件。将“就绪的事件”转换为 Channel* 指针,填充到 activeChannels 中。
- updateChannel():将一个 Channel 注册或修改到 epoll 中(对应 epoll_ctl(ADD/MOD))。当Channel 的事件发生变化(如从不监听可读 → 监听可读),需要通知 EPollPoller。根据 Channel 的状态决定是 EPOLL_CTL_ADD 还是 EPOLL_CTL_MOD。
- removeChannel():从 epoll 中移除一个 Channel(对应 epoll_ctl(DEL)),从channels_映射表中删除,并清理内部状态。当 Channel 对应的资源(如 socket)关闭时,必须从 epoll 中注销。防止后续 epoll_wait 返回无效的 Channel。
2、填写活跃的连接
void fillActiveChannels(int numEvents,ChannelList* activeChannels) const;
- 遍历就绪事件
- 设置Channel的revents_
- 填充到activeChannels列表
3、更新channel通道
void update(int operation ,Channel *channel);
执行具体的epoll_ctl操作
源文件
#include"EPollPoller.h"
#include"Logger.h"
#include<unistd.h>
#include<string.h>
#include"Channel.h"
//#include<errno.h>
//表示channel是否添加到poller,与channel的成员index_相关,它的初始值就是-1,表示还没有添加到里面
//用来区分,如果再次添加channel时候发现是kAdded表示已经添加到里面了,那这次操作就是更新channel中的事件
const int kNew=-1;//一个channel还没有添加到poller里边
const int kAdded=1;//一个channel已经添加到poller里面吗
const int kDeleted=2;//删除EPollPoller::EPollPoller(Eventloop *loop):Poller(loop),epollfd_(::epoll_create1(EPOLL_CLOEXEC))//EPOLL_CLOEXEC:exec 时自动关闭文件描述符,安全防护,events_(kInitEventListSize)
{if(epollfd_<0){LOG_FATAL("epoll_create error:%d \n",errno);}
}
EPollPoller::~EPollPoller()
{::close(epollfd_);
}
//返回具体发生事件的时间点,主要调用的是epoll_wait
Timestamp EPollPoller::poll(int timeoutMs,ChannelList *activeChannels)
{//poll这里短时间是接受大量的高并发请求的,如果在这里使用LOG_INFO则每次调用都会影响它的性能//实际上这里应该用LOG_DEBUG,只要我们不定义debug,他就不用调用进而不会影响它的性能
//记录当前系统中“有多少个连接正在被监听”。LOG_INFO("func=%s => fd total count:%lu \n",__FUNCTION__,Channels_.size());//第二个参数是存放发生事件的数组地址,但是为了方便扩容,//我们用的是vector.events_.size()返回的是size_t类型,但是这里参数要求int,所以使用转换int numEvents=::epoll_wait(epollfd_,&*events_.begin(),static_cast<int>(events_.size()),timeoutMs);int saveErrno=errno;Timestamp now(Timestamp::now());if(numEvents>0){LOG_INFO("%d events happend \n",numEvents);fillActiveChannels(numEvents,activeChannels);//扩容if(numEvents==events_.size()){events_.resize(events_.size()*2);}}else if (numEvents==0){LOG_INFO("%s timeout! \n",__FUNCTION__);}else//外部中断,还要接着执行业务逻辑{if(saveErrno != EINTR){errno=saveErrno;//不太懂LOG_ERROR("EPollPoller::poll() err!");}}return now;
}
void EPollPoller::updateChannel(Channel *channel)
{const int index=channel->index();LOG_INFO("func=%s => fd=%d events=%d index=%d \n",__FUNCTION__,channel->fd(),channel->events(),index);if(index==kNew || index==kDeleted){if(index==kNew){int fd=channel->fd();Channels_[fd]=channel;}channel->set_index(kAdded);update(EPOLL_CTL_ADD,channel);}else//此时channel已经在poller上注册过了{int fd=channel->fd();if(channel->isNoneEvent()){update(EPOLL_CTL_DEL,channel);}else{update(EPOLL_CTL_MOD,channel);}}
}
//从poller中删除channel
void EPollPoller::removeChannel(Channel *channel)
{int fd=channel->fd();Channels_.erase(fd);LOG_INFO("func=%s => fd=%d\n",__FUNCTION__,fd);int index=channel->index();if(index==kAdded){update(EPOLL_CTL_DEL,channel);}channel->set_index(kNew);
}//填写活跃的连接
void EPollPoller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const
{for(int i=0;i< numEvents;++i){Channel* channel=static_cast<Channel*>(events_[i].data.ptr);channel->set_revents(events_[i].events);activeChannels->push_back(channel);//Eventloop就拿到了poller给它返回的所有发生事件的channel列表了}
}
//更新channel通道,做的是事件的增删改
void EPollPoller::update(int operation ,Channel* channel)
{epoll_event event;memset(&event,0,sizeof(event));int fd=channel->fd();event.events=channel->events();event.data.fd=fd;event.data.ptr=channel;if(::epoll_ctl(epollfd_,operation,fd,&event)<0){if(operation==EPOLL_CTL_DEL){LOG_ERROR("epoll_ctl del error:%d\n",errno);}else{LOG_FATAL("epoll_ctl add/mod error%d \n",errno);}}
}
1、Channel 状态常量定义
const int kNew = -1; // Channel 未添加到 Poller
const int kAdded = 1; // Channel 已添加到 Poller
const int kDeleted = 2; // Channel 已从 Poller 移除
//kDeleted 状态允许 Channel 暂时移除但保留在映射表中,避免频繁添加/删除的开销
定义 Channel 在 Poller 中的生命周期状态。确保操作的正确顺序(不能重复添加已添加的 Channel)。避免不必要的系统调用(如对已删除 Channel 再次删除)。
2、poll方法重写
等待 I/O 事件发生,并将“活跃的 Channel”填充到 activeChannels 中
Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels) {LOG_INFO("func=%s => fd total count:%lu \n", __FUNCTION__, Channels_.size());int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);int saveErrno = errno; // 保存系统错误码Timestamp now(Timestamp::now());if(numEvents > 0) {LOG_INFO("%d events happend \n", numEvents);fillActiveChannels(numEvents, activeChannels);// 动态扩容if(numEvents == events_.size()) {events_.resize(events_.size() * 2);}}else if (numEvents == 0) {LOG_INFO("%s timeout! \n", __FUNCTION__);}else { // 错误处理if(saveErrno != EINTR) { // 非中断错误errno = saveErrno;LOG_ERROR("EPollPoller::poll() err!");}}return now;
}
2.1
int numEvents=::epoll_wait(epollfd_,&*events_.begin(),static_cast<int>(events_.size()),timeoutMs);
这里的第二个参数:
- events_.begin() → 返回一个 iterator,指向第一个元素。
- *events_.begin() → 解引用 iterator,得到第一个 epoll_event 对象。
- &*events_.begin() → 取第一个对象的地址,即 struct epoll_event*。
第三个参数
- 我们用的是vector.events_.size()返回的是size_t类型,但是这里参数要求int,所以使用转换
2.2
int saveErrno=errno;
epoll_wait 返回 -1 时,错误原因在 errno 中。但后续代码可能调用其他函数(如 LOG_INFO),会覆盖 errno。所以必须立即保存,否则错误信息丢失。
这是 Linux 系统编程的 黄金法则:一旦系统调用失败,立刻保存 errno。
2.3
else//外部中断,还要接着执行业务逻辑{if(saveErrno != EINTR){errno=saveErrno;//虽然日志不需要了,但有些设计会保留 errno 给上层处理。LOG_ERROR("EPollPoller::poll() err!");}}
EINTR:系统调用被信号中断(如 SIGCHLD、SIGTERM)。这是正常情况,不应视为错误,循环会继续。如果是其他错误(如 EBADF、ENOMEM)才是真正的异常。
3、updateChannel重写
状态驱动 —— 通过 channel->index() 的状态决定行为,即根据 Channel 的当前状态,决定是 epoll_ctl(ADD)、MOD 还是 DEL。
void EPollPoller::updateChannel(Channel *channel)
{const int index=channel->index();//获取状态,是未注册到Poller、已注册还是已删除LOG_INFO("func=%s => fd=%d events=%d index=%d \n",__FUNCTION__,channel->fd(),channel->events(),index);if(index==kNew || index==kDeleted){if(index==kNew){int fd=channel->fd();Channels_[fd]=channel;}channel->set_index(kAdded);update(EPOLL_CTL_ADD,channel);}else//此时channel已经在poller上注册过了{int fd=channel->fd();if(channel->isNoneEvent()){update(EPOLL_CTL_DEL,channel);}else{update(EPOLL_CTL_MOD,channel);}}
}
如果是kNew(新连接,首次注册)或者是 kDeleted(之前被移除,现在重新启用:比如连接复用、延迟删除后恢复)。无论之前是否注册过,只要不是 kAdded,就当作“新来”的处理。
仅在 kNew 时加入 channels_ 映射表
channels_[fd] = channel:建立 fd → Channel 的映射,用于 epoll_wait 返回后查找 Channel。
而遵循 kDeleted 状态允许 Channel 暂时移除但保留在映射表中,避免频繁添加/删除的开销。
所以不需要将channel再次添加到Channels_中了。
对于已经注册在Poller上的channel,如果他没有感兴趣的事,就从 epoll 实例中删除。
3、update()
封装 epoll_ctl 系统调用,用于向 epoll 实例添加、修改或删除一个 fd 的事件监听。
void EPollPoller::update(int operation ,Channel* channel)
{epoll_event event;memset(&event,0,sizeof(event));int fd=channel->fd();event.events=channel->events();event.data.fd=fd;event.data.ptr=channel;//关键:存储 Channel* 用于事件分发if(::epoll_ctl(epollfd_,operation,fd,&event)<0){if(operation==EPOLL_CTL_DEL){LOG_ERROR("epoll_ctl del error:%d\n",errno);}else{LOG_FATAL("epoll_ctl add/mod error%d \n",errno);}}
}
epoll_event 是 epoll 的核心数据结构,用于描述一个 fd 的监听事件和用户数据。
event.data.ptr=channel::最关键字段!绑定用户数据,后面用于 fillActiveChannels() 中直接取出 Channel*
4、removeChannel重写
void EPollPoller::removeChannel(Channel *channel) {int fd = channel->fd();Channels_.erase(fd); // 从映射表移除LOG_INFO("func=%s => fd=%d\n", __FUNCTION__, fd);int index = channel->index();if(index == kAdded) {update(EPOLL_CTL_DEL, channel); // 从epoll移除}channel->set_index(kNew); // 重置状态
}
5、填充活跃的连接
void EPollPoller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const
{for(int i=0;i< numEvents;++i){Channel* channel=static_cast<Channel*>(events_[i].data.ptr);channel->set_revents(events_[i].events);activeChannels->push_back(channel);//Eventloop就拿到了poller给它返回的所有发生事件的channel列表了}
}
numEvents:来自 epoll_wait 的返回值,表示有多少个 fd 就绪。遍历 events_[i] 数组中的每一个就绪事件。
接着从 epoll_event.data.ptr 取出 Channel*,上面update()中设置过
event.data.ptr = channel;
epoll 本身不关心用户数据,只返回 fd 和 events。但我们需要知道“哪个 Channel 对应这个 fd”。
通过 data.ptr 直接绑定 Channel*,实现了 O(1) 的快速查找,避免了 channels_[fd] 的哈希查找。
利用了空间换时间。
events_ 是 EPollPoller 的成员变量:std::vector<epoll_event> events_;接着设置实际发生的事件。
接着将活跃的channel添加到活跃列表activeChannels中,activeChannels 是一个输出参数,类型为 std::vector<Channel*>*。这个列表会被返回给 EventLoop,后续遍历并调用 channel->handleEvent()。
感谢阅读!