当前位置: 首页 > news >正文

手写Muduo网络库核心代码2--Poller、EPollPoller详细讲解

Poller抽象层代码

Muduo 网络库中的 Poller 抽象层是其事件驱动模型的核心组件之一,负责统一封装不同 I/O 复用机制(如 epoll、poll),实现事件监听与分发。

Poller 抽象层的作用

  • 统一 I/O 复用接口
    • Poller 作为抽象基类,定义了 poll()、updateChannel()、removeChannel() 等纯虚函数,强制派生类(如 EPollPoller)实现这些接口。这使得上层模块(如 EventLoop)无需关心底层具体使用 epoll 还是 poll,只需通过基类指针操作,实现了 “多路分发器” 的功能 
  • 解耦事件循环与 I/O 机制
    • EventLoop 依赖 Poller 监听文件描述符(fd)事件,但通过抽象层隔离了具体 I/O 复用机制的实现细节。例如,EventLoop::loop() 调用 Poller::poll() 等待事件,而无需知晓底层是 epoll_wait 还是 poll 调用 


头文件

#pragma once
#include"noncopyable.h"
#include"Timestamp.h"
#include<vector>
#include<unordered_map>
class Channel;
class Eventloop;class Poller:noncopyable
{public:using ChannelList=std::vector<Channel*>;Poller(Eventloop *loop);virtual ~Poller()=default;//给所有I/O复用保留统一的接口virtual Timestamp poll(int timeoutMs,ChannelList *activeChannels)=0;virtual void updateChannel(Channel *channel)=0;virtual void removeChannel(Channel *channnel)=0; //判断参数channel是否在当前Poller当中bool hasChannel(Channel *channel) const;//事件循环可以通过当前接口获取默认的I/O复用的具体实现static Poller* newDefaultPoller(Eventloop *loop);protected://这里map的key就是sockfd value就是sockfd所属的通道类型channelusing ChannelMap=std::unordered_map<int,Channel*>;ChannelMap Channels_;private:Eventloop *ownerLoop_;//定义poller所属的事件循环EventLoop
};

1、存放有事件发生的 Channel

using ChannelList = std::vector<Channel*>;

poll 函数会把有事件发生的 Channel 指针填充到这个列表中。

Poller能监听到哪些channel发生事件了,获取活跃事件,然后遍历处理,然后上报给Eventloop,然后通知channel处理相应的事件

2、构造函数

Poller(EventLoop *loop);

初始化必须知道所属 EventLoop,
这是在建立 Poller→EventLoop 的反向指针
由 EventLoop 在初始化时创建(EventLoop 持有 Poller 实例)

3、事件轮循

virtual Timestamp poll(int timeoutMs, ChannelList *activeChannels) = 0;

定义了一个跨平台 I/O 多路复用的抽象接口,派生类必须重写用它来实现 epoll、poll 等系统调用,上层EventLoop 通过它等待事件并分发处理,返回事件发生的精确时间戳

4、更新/添加通道监控

virtual void updateChannel(Channel *channel) = 0;

当 Channel 的事件关注集变化时(如新增写监听)

调用底层 epoll_ctl/poll 更新监听状态

5、移除通道监控

virtual void removeChannel(Channel *channel) = 0; 

当 Channel 销毁时(如连接关闭)

  • 清理 Poller 内部资源
  • 由 Channel::remove() 调用
  • 清除 channels_ 映射关系
  • 调用底层 epoll_ctl/poll 移除监听

6、判断参数channel是否在当前Poller当中

 bool hasChannel(Channel *channel) const;

防止重复注册同一个 Channel 到 Poller 中,避免重复添加导致的崩溃或未定义行为。

7、默认I/O的复用的具体实现

通过该接口,可以获取到想要的具体的IO复用实例,是Epoll、Poll、还是select

static Poller* newDefaultPoller(EventLoop *loop);

这里又学到一招,这个接口的实现要放在一个单独的源文件(DefaultPoller.cc)中,因为这里需要 new 出来一个EpollPoller实例,如果放在(Poller.cc)中实现,就会出现基类包含子类头文件的情况。(Poller是基类,EPollPoller是子类)。

8、维护 fd → Channel 的映射

using ChannelMap = std::unordered_map<int, Channel*>;
ChannelMap Channels_;

这里map的key就是sockfd value就是sockfd所属的通道类型channel

快速通过 fd 查找 Channel,存储所有被管理的 Channel,避免重复管理同一个 fd.

9、poller所属的事件循环EventLoop

EventLoop *ownerLoop_;

反向指针 - 指向所属 EventLoop

源文件Poller.cc

#include"Poller.h"
#include"Channel.h"Poller::Poller(Eventloop *loop):ownerLoop_(loop){}bool Poller::hasChannel(Channel *channel) const
{auto it=Channels_.find(channel->fd());return it!=Channels_.end() && it->second==channel;
}

channel->fd():获取 Channel 底层的文件描述符
Channels_.find():在 unordered_map<int, Channel*> 中 O(1) 复杂度查找

源文件DefaultPoller.cc

//细节性设计,为了不让基类包含子类的头文件
#include"Poller.h"
#include"EPollPoller.h"
#include<stdlib.h>
Poller* Poller::newDefaultPoller(Eventloop *loop)
{if(::getenv("MUDUO_USE_POLL"))//获取环境变量,如果有则生成poll实例,(本代码没有实现){return nullptr;//生成poll的实例}else{return new EPollPoller(loop);//生成epoll的实例}
}

EPollPoller

接下来这个模块用来封装Linux 的 epoll I/O 多路复用机制

头文件

#pragma once
#include"Poller.h"
#include<vector>
#include<sys/epoll.h>
#include"Timestamp.h"class EPollPoller:public Poller{public:EPollPoller(Eventloop* loop);~EPollPoller() override;//重写基类Poller的方法Timestamp poll(int timeoutMs,ChannelList *activeChannels) override;void updateChannel(Channel *channel) override;void removeChannel(Channel *channel) override;
private:static const int kInitEventListSize=16;//给定的vector的一个初始长度默认是16,可以面试证明自己看过源码,为啥是静态//填写活跃的连接void fillActiveChannels(int numEvents,ChannelList* activeChannels) const;//更新channel通道void update(int operation ,Channel* channel);using EventList=std::vector<epoll_event>;int epollfd_;EventList events_;//这个将来是作为epoll_wait的第二个参数};

成员变量

static const int kInitEventListSize=16;//Muduo库默认给定的vector的一个初始长度默认是16
using EventList=std::vector<epoll_event>;    
int epollfd_;
EventList events_;//这个将来是作为epoll_wait的第二个参数

定义一个数组,用于存储epoll_wait返回的就绪事件表,使用vector动态扩展事件缓冲区(当就绪事件数 == 缓冲区大小时自动扩容)

成员函数

1、重写Poller的方法
Timestamp poll(int timeoutMs,ChannelList *activeChannels) override;
void updateChannel(Channel *channel) override;
void removeChannel(Channel *channel) override;

这是在Poller类中声明的纯虚函数,继承Poller之后对这些函数必须进行重写,

  • poll()函数:等待并获取当前有事件发生的 Channel 列表。调用 epoll_wait() 等待事件。将“就绪的事件”转换为 Channel* 指针,填充到 activeChannels 中。
  • updateChannel():将一个 Channel 注册或修改到 epoll 中(对应 epoll_ctl(ADD/MOD))。当Channel 的事件发生变化(如从不监听可读 → 监听可读),需要通知 EPollPoller。根据 Channel 的状态决定是 EPOLL_CTL_ADD 还是 EPOLL_CTL_MOD。
  • removeChannel():从 epoll 中移除一个 Channel(对应 epoll_ctl(DEL)),从channels_映射表中删除,并清理内部状态。当 Channel 对应的资源(如 socket)关闭时,必须从 epoll 中注销。防止后续 epoll_wait 返回无效的 Channel。
2、填写活跃的连接
 void fillActiveChannels(int numEvents,ChannelList* activeChannels) const;
  • 遍历就绪事件
  • 设置Channel的revents_
  • 填充到activeChannels列表
3、更新channel通道
void update(int operation ,Channel *channel);

执行具体的epoll_ctl操作

源文件

#include"EPollPoller.h"
#include"Logger.h"
#include<unistd.h>
#include<string.h>
#include"Channel.h"
//#include<errno.h>
//表示channel是否添加到poller,与channel的成员index_相关,它的初始值就是-1,表示还没有添加到里面
//用来区分,如果再次添加channel时候发现是kAdded表示已经添加到里面了,那这次操作就是更新channel中的事件
const int kNew=-1;//一个channel还没有添加到poller里边
const int kAdded=1;//一个channel已经添加到poller里面吗
const int kDeleted=2;//删除EPollPoller::EPollPoller(Eventloop *loop):Poller(loop),epollfd_(::epoll_create1(EPOLL_CLOEXEC))//EPOLL_CLOEXEC:exec 时自动关闭文件描述符,安全防护,events_(kInitEventListSize)
{if(epollfd_<0){LOG_FATAL("epoll_create error:%d \n",errno);}
}
EPollPoller::~EPollPoller()
{::close(epollfd_);
}
//返回具体发生事件的时间点,主要调用的是epoll_wait
Timestamp EPollPoller::poll(int timeoutMs,ChannelList *activeChannels)
{//poll这里短时间是接受大量的高并发请求的,如果在这里使用LOG_INFO则每次调用都会影响它的性能//实际上这里应该用LOG_DEBUG,只要我们不定义debug,他就不用调用进而不会影响它的性能
//记录当前系统中“有多少个连接正在被监听”。LOG_INFO("func=%s => fd total count:%lu \n",__FUNCTION__,Channels_.size());//第二个参数是存放发生事件的数组地址,但是为了方便扩容,//我们用的是vector.events_.size()返回的是size_t类型,但是这里参数要求int,所以使用转换int numEvents=::epoll_wait(epollfd_,&*events_.begin(),static_cast<int>(events_.size()),timeoutMs);int saveErrno=errno;Timestamp now(Timestamp::now());if(numEvents>0){LOG_INFO("%d events happend \n",numEvents);fillActiveChannels(numEvents,activeChannels);//扩容if(numEvents==events_.size()){events_.resize(events_.size()*2);}}else if (numEvents==0){LOG_INFO("%s timeout! \n",__FUNCTION__);}else//外部中断,还要接着执行业务逻辑{if(saveErrno != EINTR){errno=saveErrno;//不太懂LOG_ERROR("EPollPoller::poll() err!");}}return now;
}
void EPollPoller::updateChannel(Channel *channel) 
{const int index=channel->index();LOG_INFO("func=%s => fd=%d events=%d index=%d \n",__FUNCTION__,channel->fd(),channel->events(),index);if(index==kNew || index==kDeleted){if(index==kNew){int fd=channel->fd();Channels_[fd]=channel;}channel->set_index(kAdded);update(EPOLL_CTL_ADD,channel);}else//此时channel已经在poller上注册过了{int fd=channel->fd();if(channel->isNoneEvent()){update(EPOLL_CTL_DEL,channel);}else{update(EPOLL_CTL_MOD,channel);}}
}
//从poller中删除channel
void EPollPoller::removeChannel(Channel *channel) 
{int fd=channel->fd();Channels_.erase(fd);LOG_INFO("func=%s => fd=%d\n",__FUNCTION__,fd);int index=channel->index();if(index==kAdded){update(EPOLL_CTL_DEL,channel);}channel->set_index(kNew);
}//填写活跃的连接
void EPollPoller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const
{for(int i=0;i< numEvents;++i){Channel* channel=static_cast<Channel*>(events_[i].data.ptr);channel->set_revents(events_[i].events);activeChannels->push_back(channel);//Eventloop就拿到了poller给它返回的所有发生事件的channel列表了}
}
//更新channel通道,做的是事件的增删改
void EPollPoller::update(int operation ,Channel* channel)
{epoll_event event;memset(&event,0,sizeof(event));int fd=channel->fd();event.events=channel->events();event.data.fd=fd;event.data.ptr=channel;if(::epoll_ctl(epollfd_,operation,fd,&event)<0){if(operation==EPOLL_CTL_DEL){LOG_ERROR("epoll_ctl del error:%d\n",errno);}else{LOG_FATAL("epoll_ctl add/mod error%d \n",errno);}}
}
1、Channel 状态常量定义
const int kNew = -1;     // Channel 未添加到 Poller
const int kAdded = 1;    // Channel 已添加到 Poller
const int kDeleted = 2;  // Channel 已从 Poller 移除
//kDeleted 状态允许 Channel 暂时移除但保留在映射表中,避免频繁添加/删除的开销

定义 Channel 在 Poller 中的生命周期状态。确保操作的正确顺序(不能重复添加已添加的 Channel)。避免不必要的系统调用(如对已删除 Channel 再次删除)。

2、poll方法重写

等待 I/O 事件发生,并将“活跃的 Channel”填充到 activeChannels 中

Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels) {LOG_INFO("func=%s => fd total count:%lu \n", __FUNCTION__, Channels_.size());int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs);int saveErrno = errno; // 保存系统错误码Timestamp now(Timestamp::now());if(numEvents > 0) {LOG_INFO("%d events happend \n", numEvents);fillActiveChannels(numEvents, activeChannels);// 动态扩容if(numEvents == events_.size()) {events_.resize(events_.size() * 2);}}else if (numEvents == 0) {LOG_INFO("%s timeout! \n", __FUNCTION__);}else { // 错误处理if(saveErrno != EINTR) { // 非中断错误errno = saveErrno;LOG_ERROR("EPollPoller::poll() err!");}}return now;
}

2.1 

int numEvents=::epoll_wait(epollfd_,&*events_.begin(),static_cast<int>(events_.size()),timeoutMs);

这里的第二个参数:

  1. events_.begin() → 返回一个 iterator,指向第一个元素。
  2. *events_.begin() → 解引用 iterator,得到第一个 epoll_event 对象。
  3. &*events_.begin() → 取第一个对象的地址,即 struct epoll_event*。

第三个参数

  1. 我们用的是vector.events_.size()返回的是size_t类型,但是这里参数要求int,所以使用转换

2.2

int saveErrno=errno;

epoll_wait 返回 -1 时,错误原因在 errno 中。但后续代码可能调用其他函数(如 LOG_INFO),会覆盖 errno。所以必须立即保存,否则错误信息丢失。

这是 Linux 系统编程的 黄金法则:一旦系统调用失败,立刻保存 errno。

2.3

else//外部中断,还要接着执行业务逻辑{if(saveErrno != EINTR){errno=saveErrno;//虽然日志不需要了,但有些设计会保留 errno 给上层处理。LOG_ERROR("EPollPoller::poll() err!");}}

EINTR:系统调用被信号中断(如 SIGCHLD、SIGTERM)。这是正常情况,不应视为错误,循环会继续。如果是其他错误(如 EBADF、ENOMEM)才是真正的异常。

3、updateChannel重写

状态驱动 —— 通过 channel->index() 的状态决定行为,即根据 Channel 的当前状态,决定是 epoll_ctl(ADD)、MOD 还是 DEL。

void EPollPoller::updateChannel(Channel *channel) 
{const int index=channel->index();//获取状态,是未注册到Poller、已注册还是已删除LOG_INFO("func=%s => fd=%d events=%d index=%d \n",__FUNCTION__,channel->fd(),channel->events(),index);if(index==kNew || index==kDeleted){if(index==kNew){int fd=channel->fd();Channels_[fd]=channel;}channel->set_index(kAdded);update(EPOLL_CTL_ADD,channel);}else//此时channel已经在poller上注册过了{int fd=channel->fd();if(channel->isNoneEvent()){update(EPOLL_CTL_DEL,channel);}else{update(EPOLL_CTL_MOD,channel);}}
}

如果是kNew(新连接,首次注册)或者是 kDeleted(之前被移除,现在重新启用:比如连接复用、延迟删除后恢复)。无论之前是否注册过,只要不是 kAdded,就当作“新来”的处理。

仅在 kNew 时加入 channels_ 映射表

channels_[fd] = channel:建立 fd → Channel 的映射,用于 epoll_wait 返回后查找 Channel。

而遵循 kDeleted 状态允许 Channel 暂时移除但保留在映射表中,避免频繁添加/删除的开销。

所以不需要将channel再次添加到Channels_中了。

对于已经注册在Poller上的channel,如果他没有感兴趣的事,就从 epoll 实例中删除。

3、update()

封装 epoll_ctl 系统调用,用于向 epoll 实例添加、修改或删除一个 fd 的事件监听。

void EPollPoller::update(int operation ,Channel* channel)
{epoll_event event;memset(&event,0,sizeof(event));int fd=channel->fd();event.events=channel->events();event.data.fd=fd;event.data.ptr=channel;//关键:存储 Channel* 用于事件分发if(::epoll_ctl(epollfd_,operation,fd,&event)<0){if(operation==EPOLL_CTL_DEL){LOG_ERROR("epoll_ctl del error:%d\n",errno);}else{LOG_FATAL("epoll_ctl add/mod error%d \n",errno);}}
}

epoll_event 是 epoll 的核心数据结构,用于描述一个 fd 的监听事件和用户数据。

event.data.ptr=channel::最关键字段绑定用户数据,后面用于 fillActiveChannels() 中直接取出 Channel*

4、removeChannel重写
void EPollPoller::removeChannel(Channel *channel) {int fd = channel->fd();Channels_.erase(fd); // 从映射表移除LOG_INFO("func=%s => fd=%d\n", __FUNCTION__, fd);int index = channel->index();if(index == kAdded) {update(EPOLL_CTL_DEL, channel); // 从epoll移除}channel->set_index(kNew); // 重置状态
}
5、填充活跃的连接
void EPollPoller::fillActiveChannels(int numEvents,ChannelList* activeChannels) const
{for(int i=0;i< numEvents;++i){Channel* channel=static_cast<Channel*>(events_[i].data.ptr);channel->set_revents(events_[i].events);activeChannels->push_back(channel);//Eventloop就拿到了poller给它返回的所有发生事件的channel列表了}
}

numEvents:来自 epoll_wait 的返回值,表示有多少个 fd 就绪。遍历 events_[i] 数组中的每一个就绪事件。

接着从 epoll_event.data.ptr 取出 Channel*,上面update()中设置过

event.data.ptr = channel;

epoll 本身不关心用户数据,只返回 fd 和 events。但我们需要知道“哪个 Channel 对应这个 fd”。
通过 data.ptr 直接绑定 Channel*,实现了 O(1) 的快速查找,避免了 channels_[fd] 的哈希查找。

利用了空间换时间。

events_ 是 EPollPoller 的成员变量:std::vector<epoll_event> events_;接着设置实际发生的事件。

接着将活跃的channel添加到活跃列表activeChannels中,activeChannels 是一个输出参数,类型为 std::vector<Channel*>*。这个列表会被返回给 EventLoop,后续遍历并调用 channel->handleEvent()。


感谢阅读!

http://www.xdnf.cn/news/1428751.html

相关文章:

  • 百度智能云,除了AI还有啥?
  • 线程特定存储
  • Go语言开发合并文件小工具
  • go命令行工具:如何在现有的工程里加入使用cobra
  • 苹果手机文本转音频,自行制作背诵素材
  • Redis 持久化机制详解
  • 《WINDOWS 环境下32位汇编语言程序设计》第10章 内存管理和文件操作(2)
  • 文华财经wh6波段多空指标-变色K做多做空信号,抄底逃顶主图幅图
  • Docker启动两个Redis镜像并配置一主一从
  • Linux内核O(1)调度算法
  • 汽车制造工厂如何应用力控SCADA实现全方位智能监控与诊断
  • 从“成本中心”到“生产力引擎”:MCP如何将AI从“建议者”变为“执行者”
  • 2025年新版C语言 模电数电及51单片机Proteus嵌入式开发入门实战系统学习,一整套全齐了再也不用东拼西凑
  • 久等啦!Tigshop O2O多门店JAVA/PHP版本即将上线!
  • 通义万相Wan2.2-S2V-14B:AI视频生成的革命性突破与实践指南
  • c++ 类和对象(上)
  • 与后端对话:在React中优雅地请求API数据 (Fetch/Axios)
  • token存储方案
  • iOS XML 处理利器:CNXMLParser 与 CNXMLDocument 深度解析
  • 从零开始的python学习——函数(2)
  • 漫画短剧小程序系统开发:从0到1的核心架构与思路
  • 今天我们开始学习shell编程语言
  • @ZooKeeper 详细介绍部署与使用详细指南
  • 【JavaScript】前端两种路由模式,Hash路由,History 路由
  • 通过 FinalShell 访问服务器并运行 GUI 程序,提示 “Cannot connect to X server“ 的解决方法
  • NV115NV119美光固态闪存NV129NV112
  • 【53页PPT】华为制造行业数字化转型工业互联网智能制造解决方案(附下载方式)
  • Spring MVC BOOT 中体现的设计模式
  • Python 环境配置初学者指南:从安装到 Pycharm 项目配置
  • OpenHarmony HVB安全启动一键启停全栈实践:从U-Boot签名到fastboot解锁的闭环避坑指南