【Linux | 网络】多路转接IO之poll
- 一、poll函数
- 二、poll的优缺点
- 三、实现poll服务器(只关心读事件)
- 3.1 Log.hpp(日志)
- 3.2 Lockguard.hpp(自动管理锁)
- 3.3 Socket.hpp(封装套接字)
- 3.4 PollServer.hpp(服务端封装)
- 3.5 Main.cpp(服务端)
- 结尾
一、poll函数
#include <poll.h>int poll(struct pollfd *fds, nfds_t nfds, int timeout);
功能:poll 是 Linux 系统中的一种 I/O 多路复用机制,主要用于同时监控多个文件描述符的状态。
参数:
- fds:指向 struct pollfd 数组的指针,每个元素指定一个要监控的文件描述符及其关注的事件
- nfds:数组中元素的数量(即监控的文件描述符总数)
- timeout:超时时间(毫秒):
- -1:永久阻塞,直到有事件发生
- 0:立即返回(非阻塞模式)
- >0:指定超时时间,超时后返回
返回值:
- 正数:表示就绪的文件描述符总数
- 0:表示超时(无文件描述符就绪)
- -1:表示错误,并设置 errno
struct pollfd 结构
struct pollfd {int fd; // 文件描述符short events; // 关注的事件(输入掩码,如 POLLIN、POLLOUT)short revents; // 实际发生的事件(输出掩码,由内核填充)
};
poll 函数支持的标准事件类型,本质上是宏,只有一个比特位为1,通过与events和revents异或分为两种情况:
- 调用时:用户告诉内核,需要关注文件描述符中的events事件
- 返回时:内核告诉用户,用户关注的文件描述符,有revents中的事件准备就绪
事件 | 描述 | 是否可以作为输入 | 是否可以作为输出 |
---|---|---|---|
POLLIN | 有普通数据或优先数据可读 | 是 | 是 |
POLLRDNORM | 有普通数据可读 | 是 | 是 |
POLLRDBAND | 有优先级带数据可读 | 是 | 是 |
POLLPRI | 有高优先级带数据可读 | 是 | 是 |
POLLOUT | 有普通数据或优先数据可写 | 是 | 是 |
POLLWRNORM | 有普通数据可写 | 是 | 是 |
POLLWRBAND | 有优先级带数据可写 | 是 | 是 |
POLLRDHUP | TCP连接的对端关闭连接,或关闭了写操作 | 是 | 是 |
POLLHUP | 挂起 | 否 | 是 |
POLLERR | 错误 | 否 | 是 |
POLLNVAL | 文件描述符未打开 | 否 | 是 |
二、poll的优缺点
- 优点
- poll 只负责等待,可以等待多个文件描述符,在IO的时候效率会比较高
- 输入和输出参数进行分离,events和revents,不需要再对poll的参数进行频繁的重置了
- poll使用了动态数组,所以 poll 能够检测文件描述符的个数也是没有有限的
- 缺点
- 用户和内核之间,需要一直进行数据拷贝
- 在编写代码的时候,需要遍历动态数组,可能会影响select的效率
- poll 会让操作系统在底层遍历要关心的所有文件描述符,会导致效率降低
三、实现poll服务器(只关心读事件)
3.1 Log.hpp(日志)
#pragma once#include "LockGuard.hpp"
#include <iostream>
#include <string>
#include <stdarg.h>
#include <stdio.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>
#include <sys/types.h>
#include <fcntl.h>
#include <sys/stat.h>using namespace std;// 日志等级
enum
{Debug = 0, // 调试Info, // 正常Warning, // 警告Error, // 错误,但程序并未直接退出Fatal // 程序直接挂掉
};enum
{Screen = 10, // 打印到显示器上OneFile, // 打印到一个文件中ClassFile // 按照日志等级打印到不同的文件中
};string LevelToString(int level)
{switch (level){case Debug:return "Debug";case Info:return "Info";case Warning:return "Warning";case Error:return "Error";case Fatal:return "Fatal";default:return "Unknow";}
}const char *default_filename = "log.";
const int default_style = Screen;
const char *defaultdir = "log";class Log
{
public:Log(): style(default_style), filename(default_filename){// mkdir(defaultdir,0775);pthread_mutex_init(&_log_mutex, nullptr);}void SwitchStyle(int sty){style = sty;}void WriteLogToOneFile(const string &logname, const string &logmessage){int fd = open(logname.c_str(), O_CREAT | O_WRONLY | O_APPEND, 0666);if (fd == -1)return;{LockGuard lockguard(&_log_mutex);write(fd, logmessage.c_str(), logmessage.size());}close(fd);}void WriteLogToClassFile(const string &levelstr, const string &logmessage){mkdir(defaultdir, 0775);string name = defaultdir;name += "/";name += filename;name += levelstr;WriteLogToOneFile(name, logmessage);}void WriteLog(int level, const string &logmessage){switch (style){case Screen:{LockGuard lockguard(&_log_mutex);cout << logmessage;}break;case OneFile:WriteLogToClassFile("All", logmessage);break;case ClassFile:WriteLogToClassFile(LevelToString(level), logmessage);break;default:break;}}string GetTime(){time_t CurrentTime = time(nullptr);struct tm *curtime = localtime(&CurrentTime);char time[128];// localtime 的年是从1900开始的,所以要加1900, 月是从0开始的所以加1snprintf(time, sizeof(time), "%d-%d-%d %d:%d:%d",curtime->tm_year + 1900, curtime->tm_mon + 1, curtime->tm_mday,curtime->tm_hour, curtime->tm_min, curtime->tm_sec);return time;return "";}void LogMessage(int level, const char *format, ...){char left[1024];string Levelstr = LevelToString(level).c_str();string Timestr = GetTime().c_str();string Idstr = to_string(getpid());snprintf(left, sizeof(left), "[%s][%s][%s] ",Levelstr.c_str(), Timestr.c_str(), Idstr.c_str());va_list args;va_start(args, format);char right[1024];vsnprintf(right, sizeof(right), format, args);string logmessage = left;logmessage += right;WriteLog(level, logmessage);va_end(args);}~Log(){pthread_mutex_destroy(&_log_mutex);};private:int style;string filename;pthread_mutex_t _log_mutex;
};Log lg;class Conf
{
public:Conf(){lg.SwitchStyle(Screen);}~Conf(){}
};Conf conf;
3.2 Lockguard.hpp(自动管理锁)
#pragma once#include <iostream>class Mutex
{
public:Mutex(pthread_mutex_t* lock):pmutex(lock){}void Lock(){pthread_mutex_lock(pmutex);}void Unlock(){pthread_mutex_unlock(pmutex);}~Mutex(){}
public:pthread_mutex_t* pmutex;
};class LockGuard
{
public:LockGuard(pthread_mutex_t* lock):mutex(lock){mutex.Lock();}~LockGuard(){mutex.Unlock();}
public:Mutex mutex;
};
3.3 Socket.hpp(封装套接字)
#pragma once#include <iostream>
#include <string>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>#define CONV(addrptr) (struct sockaddr*)addrptrenum{Socket_err = 1,Bind_err,Listen_err
};const static int defalutsockfd = -1;
const int defalutbacklog = 5;class Socket
{
public:virtual ~Socket(){};virtual void CreateSocketOrDie() = 0;virtual void BindSocketOrDie(uint16_t port) = 0;virtual void ListenSocketOrDie(int backlog) = 0;virtual int AcceptConnection(std::string* ip , uint16_t* port) = 0;virtual bool ConnectServer(const std::string& serverip , uint16_t serverport) = 0;virtual int GetSockFd() = 0;virtual void SetSockFd(int sockfd) = 0;virtual void CloseSockFd() = 0;virtual bool Recv(std::string& buffer,int size) = 0;virtual void Send(const std::string& send_string) = 0;public:void BuildListenSocketMethod(uint16_t port,int backlog = defalutbacklog){CreateSocketOrDie();BindSocketOrDie(port);ListenSocketOrDie(backlog);}bool BuildConnectSocketMethod(const std::string& serverip , uint16_t serverport){CreateSocketOrDie();return ConnectServer(serverip,serverport);}void BuildNormalSocketMethod(int sockfd){SetSockFd(sockfd);}
};class TcpSocket : public Socket
{
public:TcpSocket(int sockfd = defalutsockfd):_sockfd(sockfd){}~TcpSocket(){};void CreateSocketOrDie() override{_sockfd = ::socket(AF_INET,SOCK_STREAM,0);if(_sockfd < 0) exit(Socket_err);}void BindSocketOrDie(uint16_t port) override{struct sockaddr_in addr;memset(&addr,0,sizeof(addr));addr.sin_family = AF_INET;addr.sin_addr.s_addr = INADDR_ANY;addr.sin_port = htons(port);socklen_t len = sizeof(addr);int n = ::bind(_sockfd,CONV(&addr),len);if(n < 0) exit(Bind_err);}void ListenSocketOrDie(int backlog) override{int n = ::listen(_sockfd,backlog);if(n < 0) exit(Listen_err);}int AcceptConnection(std::string* clientip , uint16_t* clientport) override{struct sockaddr_in client;memset(&client,0,sizeof(client));socklen_t len = sizeof(client);int fd = ::accept(_sockfd,CONV(&client),&len);if(fd < 0) return -1;char buffer[64];inet_ntop(AF_INET,&client.sin_addr,buffer,len);*clientip = buffer;*clientport = ntohs(client.sin_port);return fd;} bool ConnectServer(const std::string& serverip , uint16_t serverport) override{struct sockaddr_in server;memset(&server,0,sizeof(server));server.sin_family = AF_INET;// server.sin_addr.s_addr = inet_addr(serverip.c_str());inet_pton(AF_INET,serverip.c_str(),&server.sin_addr);server.sin_port = htons(serverport);socklen_t len = sizeof(server);int n = connect(_sockfd,CONV(&server),len);if(n < 0) return false;else return true;}int GetSockFd() override{return _sockfd;}void SetSockFd(int sockfd) override{_sockfd = sockfd;}void CloseSockFd() override{if(_sockfd > defalutsockfd){close(_sockfd);}}bool Recv(std::string& buffer , int size)override{char inbuffer[size];int n = recv(_sockfd,inbuffer,sizeof(inbuffer)-1,0);if(n > 0){inbuffer[n] = 0;}else{return false;}buffer += inbuffer;return true;}void Send(const std::string& send_string){send(_sockfd,send_string.c_str(),send_string.size(),0);}private:int _sockfd;
};
3.4 PollServer.hpp(服务端封装)
#pragma once#include <iostream>
#include <string>
#include <poll.h>
#include "Socket.hpp"
#include "Log.hpp"
#include <memory>using namespace std;const static uint16_t defalutport = 8888;
const static int gbacklog = 8;
const static int num = 1024;class PollServer
{
private:void HandlerEvent(){for (int i = 0; i < _num; i++){// 是否监控if (_rfds[i].fd == -1)continue;// 是否就绪int fd = _rfds[i].fd;if (_rfds[i].revents & POLLIN){// 是新连接到来,还是新数据到来// 新连接到来if (fd == _listensock->GetSockFd()){lg.LogMessage(Info, "get a new link\n");string clientip;uint16_t cilentport;// 由于select已经检测到listensock已经就绪了,这里不会阻塞int sockfd = _listensock->AcceptConnection(&clientip, &cilentport);if (sockfd == -1){lg.LogMessage(Error, "accept error\n");continue;}lg.LogMessage(Info, "get a client , client info# %s %d , fd:%d\n", clientip.c_str(), cilentport, sockfd);// 这里已经获取连接成功,由于底层数据不一定就绪// 所以这里需要将新连接的文件描述符交给poll托管// 只需将文件描述符加入到_rfds即可int pos = 0;for (; pos < _num; pos++){if (_rfds[pos].fd == -1){_rfds[pos].fd = sockfd;_rfds[pos].events |= POLLIN;break;}}// 当存储上限时,可以选择扩容,由于poll并不是很重要,这里我为了方便就直接关闭文件描述符if(pos == _num){close(sockfd);lg.LogMessage(Warning, "server is full...!\n");}}else{ // 是新数据来了// 这里读是有问题的char buffer[1024];bool flag = recv(fd,buffer,1024,0);if(flag) // 读取成功{lg.LogMessage(Info,"client say# %s\n",buffer);}else // 读取失败{lg.LogMessage(Warning,"cilent quit !! close fd : %d\n",fd);close(fd);_rfds[i].fd = -1;_rfds[i].events = 0;_rfds[i].revents = 0;}}}}}public:PollServer(uint16_t port = defalutport): _port(port), _listensock(new TcpSocket()), _isrunning(false), _num(num),_rfds(new pollfd[_num]){}void Init(){_listensock->BuildListenSocketMethod(_port, gbacklog);for (int i = 0; i < _num; i++){_rfds[i].fd = -1;_rfds[i].events = 0;_rfds[i].revents = 0;}_rfds[0].fd = _listensock.get()->GetSockFd();_rfds[0].events |= POLLIN;}void Loop(){_isrunning = true;while (_isrunning){PrintDebug();int timeout = 1000;ssize_t n = poll(_rfds,_num,timeout);switch (n){case -1:{lg.LogMessage(Fatal, "select Error\n");break;}case 0:{lg.LogMessage(Info, "select timeout...");break;}default:{lg.LogMessage(Info, "select success , begin handler event\n");HandlerEvent();break;}}}_isrunning = false;}void Stop(){_isrunning = false;}// 查看当前哪些文件描述符需要被监控void PrintDebug(){std::cout << "current select rfds list is : ";for (int i = 0; i < _num; i++){if (_rfds[i].fd == -1)continue;elsestd::cout << _rfds[i].fd << " ";}std::cout << std::endl;}~PollServer() {delete[] _rfds;}private:unique_ptr<Socket> _listensock;uint16_t _port;bool _isrunning;int _num;struct pollfd* _rfds;
};
3.5 Main.cpp(服务端)
#include <iostream>
#include <memory>
#include "PollServer.hpp"using namespace std;// ./pollServer port
int main(int argc , char* argv[])
{if(argc != 2){cout << "Usage : " << argv[0] << " port" << endl;exit(0); }uint16_t localport = stoi(argv[1]);unique_ptr<PollServer> svr = make_unique<PollServer>(localport);svr->Init();svr->Loop();return 0;
}
结尾
如果有什么建议和疑问,或是有什么错误,大家可以在评论区中提出。
希望大家以后也能和我一起进步!!🌹🌹
如果这篇文章对你有用的话,希望大家给一个三连支持一下!!🌹🌹