【计算机网络】非阻塞IO——poll实现多路转接
🔥个人主页🔥:孤寂大仙V
🌈收录专栏🌈:计算机网络
🌹往期回顾🌹:【计算机网络】非阻塞IO——select实现多路转接
🔖流水不争,争的是滔滔不息
一、poll实现多路转接
在网络编程或多路 IO 编程中,我们经常需要同时监听多个文件描述符(fd),比如多个客户端的 socket 连接。这时,poll 就登场了。
poll 是 Linux 提供的一种 IO 多路复用机制,用于监听多个 fd 上的读写等事件,一旦有就绪,立刻通知我们处理。
poll的参数
int poll(struct pollfd fds[], nfds_t nfds, int timeout);
- 参数
struct pollfd fds[]
是关心的文件描述符数组,每一个元素代表一个要监听的 fd 及其感兴趣的事件和返回的事件。
struct pollfd {int fd; // 要监听的文件描述符short events; // 感兴趣的事件(由你设置)short revents; // 实际返回的事件(由内核设置)
};
- 参数
nfds_t nfds
这个是 fds[] 数组里有效元素的数量,简单说就是你监听几个文件描述符就写几。 - int timeout
单位是 毫秒(ms),表示阻塞多久。timeout > 0:等待指定毫秒数后返回、timeout == 0:立即返回(非阻塞)、timeout < 0:永远阻塞,直到有事件发生。
函数返回值
- 0:就绪的文件描述符数量
- 0:超时,没有任何事件发生
- <0:出错(比如信号中断)
常用事件(events/revents)取值
POLLIN // 有数据可读
POLLOUT // 可以写数据而不会阻塞
POLLERR // 错误(由 revents 设置)
POLLHUP // 对端关闭连接
POLLNVAL // 描述符非法
poll的缺点
- 每次调用都要传入整个 fd 列表
poll 的 pollfd 是个数组,内核不会保存状态,每次都得重新传。
如果你有上千个连接,那每次 poll() 都会把这上千个 fd 重新复制到内核 → 代价大。 - 线性扫描效率低
poll 返回的是“多少个 fd 就绪”,但你要线性扫描整个数组找出来。
比如你监听 1000 个连接,但只有 1 个可读,你得从 0 扫到 999 才找到它。 - fd 数量仍有限制
虽然 poll 相比 select 不再限制 1024 个,但:
它还是受限于内核最大文件描述符数量(ulimit -n,比如 65535)
每个 pollfd 占用空间较大,更不适合极大并发 - 无事件通知机制
poll 只能“等和扫”,没有像 epoll 的 EPOLLONESHOT、EPOLLET(边缘触发)那种高级机制。
没法在事件处理完后说“我暂时不关注这个 fd 了”。
二、poll实现非阻塞服务器
#pragma once
#include "Common.hpp"
#include "Log.hpp"
#include "Socket.hpp"
#include <sys/poll.h>using namespace std;
using namespace LogModule;
using namespace SocketModule;class PollServer
{const static int size = 4096;const static int defaultfd = -1;public:PollServer(int port): _isrunning(false), _listensockfd(make_unique<TcpSocket>()){_listensockfd->BuildTcpSocketServer(port); // 构造TCP服务器for (int i = 0; i < size; i++){_fds[i].fd = defaultfd;_fds[i].events = 0;_fds[i].revents = 0;}_fds[0].fd = _listensockfd->FD();_fds[0].events = POLLIN;}void Start() // 服务器启动{int timeout = 1000; // 1000毫秒_isrunning = true;while (true){int n = poll(_fds, size, timeout); // 多路转接只关系读事件switch (n){case -1:LOG(LogLevel::ERROR) << "poll error"; // 异常break;case 0:LOG(LogLevel::WARNING) << "poll timeout"; // 超时default:LOG(LogLevel::INFO) << "事件就绪"; // 读事件就绪Dispatcher(); // 派发break;}}}void Dispatcher() // 事件派发{for (int i = 0; i < size; i++){if (_fds[i].fd == defaultfd) // 跳过continue;if (_fds[i].revents & POLLIN) // 是读就绪{if (_fds[i].fd == _listensockfd->FD()){// listen 套接字Accept();}else{// 普通 套接字Recv(i);}}}}void Accept(){InetAddr client;int sockfd = _listensockfd->AcceptOrDie(&client);LOG(LogLevel::DEBUG) << "accept a new client" << client.StringAddr();int pos = 0;for (; pos < size; pos++){if (_fds[pos].fd == defaultfd)break; // 数组中找到空位}if (pos == size){LOG(LogLevel::WARNING) << "poll server full";close(sockfd);}else{_fds[pos].fd = sockfd;_fds[pos].events = POLLIN;_fds[pos].revents = 0;}}void Recv(int pos) // 读数据{char buffer[1024];ssize_t n = recv(_fds[pos].fd, buffer, sizeof(buffer) - 1, 0); // 收信息if (n > 0){buffer[n] = 0;cout << "client say@ " << buffer << endl;}else if (n == 0) // 客户端退出{LOG(LogLevel::INFO) << "client quit";_fds[pos].fd = defaultfd;_fds[pos].events = 0;_fds[pos].revents = 0;close(_fds[pos].fd);}else // 出现错误 异常{LOG(LogLevel::FATAL) << "recv error";_fds[pos].fd = defaultfd;_fds[pos].events = 0;_fds[pos].revents = 0;close(_fds[pos].fd);}}~PollServer(){}private:unique_ptr<Socket> _listensockfd;bool _isrunning;struct pollfd _fds[size];
};
构造
//私有成员变量
private:unique_ptr<Socket> _listensockfd;bool _isrunning;struct pollfd _fds[size];
//构造PollServer(int port): _isrunning(false), _listensockfd(make_unique<TcpSocket>()){_listensockfd->BuildTcpSocketServer(port); // 构造TCP服务器for (int i = 0; i < size; i++){_fds[i].fd = defaultfd;_fds[i].events = 0;_fds[i].revents = 0;}_fds[0].fd = _listensockfd->FD();_fds[0].events = POLLIN;}
私有成员变量中,把关心文件描述符的数组开好,构造服务器的时候遍历这个数组,把要监听的文件描述符设置进去,把要关系的状态设置为关系读事件。
服务器启动
void Start() // 服务器启动{int timeout = 1000; // 1000毫秒_isrunning = true;while (true){int n = poll(_fds, size, timeout); // 多路转接只关系读事件switch (n){case -1:LOG(LogLevel::ERROR) << "poll error"; // 异常break;case 0:LOG(LogLevel::WARNING) << "poll timeout"; // 超时default:LOG(LogLevel::INFO) << "事件就绪"; // 读事件就绪Dispatcher(); // 派发break;}}}
用poll函数,进行多路转接,事件就绪就派发任务。
事件派发
void Dispatcher() // 事件派发{for (int i = 0; i < size; i++){if (_fds[i].fd == defaultfd) // 跳过continue;if (_fds[i].revents & POLLIN) // 是读就绪{if (_fds[i].fd == _listensockfd->FD()){// listen 套接字Accept();}else{// 普通 套接字Recv(i);}}}}
对文件描述符数组,进行遍历。如果不是合法位置(没放文件描述符)就跳过这个位置。如果是读就绪然后判断是监听套接字合适普通套接字。
收到客户端的连接accept
void Accept(){InetAddr client;int sockfd = _listensockfd->AcceptOrDie(&client);LOG(LogLevel::DEBUG) << "accept a new client" << client.StringAddr();int pos = 0;for (; pos < size; pos++){if (_fds[pos].fd == defaultfd)break; // 数组中找到空位}if (pos == size){LOG(LogLevel::WARNING) << "poll server full";close(sockfd);}else{_fds[pos].fd = sockfd;_fds[pos].events = POLLIN;_fds[pos].revents = 0;}}
主要是收到客户端的连接,创建了accept的套接字,要把这个套接字放到数组中。在文件描述符数组中找到空位,把这个acceptfd设置进去。
读数据
void Recv(int pos) // 读数据{char buffer[1024];ssize_t n = recv(_fds[pos].fd, buffer, sizeof(buffer) - 1, 0); // 收信息if (n > 0){buffer[n] = 0;cout << "client say@ " << buffer << endl;}else if (n == 0) // 客户端退出{LOG(LogLevel::INFO) << "client quit";_fds[pos].fd = defaultfd;_fds[pos].events = 0;_fds[pos].revents = 0;close(_fds[pos].fd);}else // 出现错误 异常{LOG(LogLevel::FATAL) << "recv error";_fds[pos].fd = defaultfd;_fds[pos].events = 0;_fds[pos].revents = 0;close(_fds[pos].fd);}
这时候,读数据已经是非阻塞的了,客户端退出和异常要把文件描述符数组的内容清空,然后关闭文件描述符。
源码:poll多路转接