五种IO模型与阻塞IO
1.阻塞式
阻塞IO:在内核将数据准备就绪之前,系统调用会一直等待,套接字默认都是阻塞方式。
例子如钓鱼,张三哪一个鱼竿去钓鱼,会一直盯着水面看是否有鱼上钩,这个期间什么都不会做。
非阻塞式
非阻塞IO:内核没有将数据就绪好,系统调用仍然会直接返回,并返回错误码EWOULDBLOCK,表示当前操作在阻塞模式下执行,就会阻塞等待,因为不是阻塞就不会等待。
例子钓鱼,李四拿着鱼竿鱼钓鱼 ,把鱼漂扔进水里,就去干别的事情了,偶然注意鱼漂。
信号驱动式
信号驱动IO:内核将数据准备好,使用SIGIO信号通知上层进行IO操作。
例子钓鱼,王五拿着鱼竿去钓鱼,在鱼竿上吊着铃铛,铃铛响了就拉杆。
多路转接
IO多路转接:多路转接可以同时等待多个文件描述符的就绪状态,一旦有一个出现就绪状态就会马上处理。
例子钓鱼,赵六去钓鱼,拿了很多鱼竿并将鱼竿都放置好,然后来回走动观看那个鱼竿动了,就过去收杆。
异步IO
异步IO模式:内核把数据拷贝完成才通知上层,去处理拷贝完的数据,与信号驱动不同的是,信号驱动是数据就绪时通知,这个是完成拷贝通知。
高级IO概念
同步通信
同步就是发出一个调用时,没有得到结果之前是不返回的,但是一旦调用就得得到返回值,调用者主动等待调用的结果。
异步通信
调用发出后,这个调用就直接返回了,没有返回结果,一个异步调用发出后,调用者本身不会立刻得到结果,而是在调用发出后,被调用者通过状态,通知来告诉调用者,或通过回调函数来处理这个调用,就是调用者本身不参与钓鱼过程,只参与了拿到鱼步骤。
非阻塞函数
系统调用函数:fcntl
参数介绍
fd要操作的文件描述符,cmd要执行的操作命令,arg可选参数。
下面是fcntl函数五种功能,获取和设置文件状态标记,可以实现一个文件描述符变成非阻塞式。
cmd参数为F_GETFL时,fcntl将当前文件描述符的属性提取出来
cmd参数为F_SETFL时,fcntl用于设置文件描述符的状态标志。
F_DUPFD:复制文件描述符
用于创建一个新的文件描述符,它与原文件描述符
fd
指向同一个文件。新的文件描述符会是大于等于
arg
的最小可用值。F_GETFL 和 F_SETFL:获取和设置文件状态标志
F_GETFL:获取文件的当前状态标志(如
O_RDONLY
、O_WRONLY
、O_RDWR
等)。F_SETFL:设置文件的状态标志。常用于设置文件的非阻塞模式(
O_NONBLOCK
)。F_GETFD 和 F_SETFD:获取和设置文件描述符标志
用于控制文件描述符的行为,例如是否在
exec
时关闭文件描述符(FD_CLOEXEC
)。F_GETLK、F_SETLK 和 F_SETLKW:记录锁操作
用于在文件上设置或获取记录锁(文件锁定机制),用于进程间的同步。
int fcntl(int fd, F_SETFL, int flags);
flags:新的文件状态标志,如O_APPEND写操作时可以追加内容不会覆盖,O_NONBLOCK表示的是非阻塞模式。
多路转接函数
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
- nfds: 需要检查的文件描述符的最大值加1。即所有被监视的文件描述符中最大的那个加1。这样内核只需检查0到nfds-1的文件描述符。
- readfds: 指向fd_set结构的指针,用于监视文件描述符的读就绪状态。如果某个文件描述符上有数据可读(包括连接断开等),则会被标记。调用后,只有就绪的文件描述符会被保留在集合中。
- writefds: 指向fd_set结构的指针,用于监视文件描述符的写就绪状态。如果某个文件描述符可以写入数据(而不导致阻塞),则会被标记。
- exceptfds: 指向fd_set结构的指针,用于监视文件描述符的异常状态(如带外数据到达)。
- timeout: 指向timeval结构的指针,用于设置select的超时时间。如果设置为NULL,则select会一直阻塞直到有文件描述符就绪;如果timeval的两个字段都为0,则select会立即返回(轮询模式)。
readfds和writefds是一个集合,分别表示监察的可读写文件集合。初始调用时是输入输出型参数,输入型是要把检查的文件描述符传进去,输出型体现在执行完后会把就绪的文件描述符放到参数里,集合是以位图形式存储文件描述符状态,0表示为就绪,1表示就绪,然后第几位就表示第一个文件描述符就绪了。
select优点和缺点
可以被监控的文件描述符个数是有限的,通过sizeof(fd_set)看到,不同版本可能显示的不一样,
每次调用select需要手动设置fd集合,从接口方面说不方便。
同时每次调用select都要在内核遍历传递过来的fd,会有很大开销。
select支持的文件描述符数量太小。
每次调用select,都需要把fd集合从用户态拷贝到内核态,也有很大的开销。
select特点
可监控的文件描述符个数取决于sizeof(fd_set)的值,每一个bit表示一个文件描述符,则服务器最大支持文件描述符个数为512*8=4096,将fd加入select监控集时,还需要再使用一个数据结构保存放入的fd,一是select返回后,array作为源数据与fd_set进行FD_ISSET判断。第二是,select返回后会把以前加入的selectfd清空,每次开始都要重新从array中取地fd驱逐,扫描array时,可以得到fd最大值maxfd,作为select第一个参数。
select代码实现
size定义了文件描述符的最大容量是多少,fd_set是一个类型,是有上限的,不同环境不一样,一般是1024个,并且设置默认的数组值为-1。
const static int size = sizeof(fd_set) * 8;
const static int defaultfd = -1;
_listensock让智能指针管理监听套接字,unique_ptr模板保证了唯一指针指向套接字,数组保存要监控的文件描述符。
std::unique_ptr<Socket> _listensock; // 监听套接字
bool _isrunning; // 服务器运行状态
int _fd_array[size]; // 文件描述符管理数组
创建TCP套接字,初始化所有数组的元素为-1,将监听套接字的文件描述符放在数组首位位置,保证了select的n不为0,会执行处理事件函数,里面还有判断是否为监听套接字,是就表示有新连接到来,更新数组。
SelectServer(int port) : _listensock(std::make_unique<TcpSocket>()), _isrunning(false)
{
_listensock->BuildTcpSocketMethod(port);
for (int i = 0; i < size; i++)
_fd_array[i] = defaultfd;
_fd_array[0] = _listensock->Fd();
}
每次循环都会初始化fd_set结构,select函数会修改传入的fd_set。而且上一次监控的对象下一次可能不监控了,所以要清空,重新设置监控对象。
fd_set rfds; // 定义fds集合
FD_ZERO(&rfds); // 清空fds
maxfd+1监控范围,必须是最大文件描述符值+1,rfds监控读事件的文件描述符集合,第一个nullptr不监控写事件,第二个不监控异常事件,最后一个无超时限制,一直阻塞到有事件发生。
int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);
使用FD_ISSET宏检查特定的文件描述符是否就在集合中,一种是监听套接字就绪,表示的是有客服端连接请求,普通套接字表示有数据可读。 当有新客服端发起连接请求时,监听套接字读事件就绪,可以调用accept从内核中拿到新的fd。
void Dispatcher(fd_set &rfds)
{
for (int i = 0; i < size; i++)
{
if (_fd_array[i] == defaultfd)
continue;
if (FD_ISSET(_fd_array[i], &rfds)) // 检查fd是否就绪
{
if (_fd_array[i] == _listensock->Fd())
{
Accepter(); // 处理新连接
}
else
{
Recver(_fd_array[i], i); // 处理数据读取
}
}
}
}
acccept函数用于接收客服端请求,返回一个新的已连接的套接字,accept在这里不会阻塞,因为select确认了有套接字就绪了,就可以直接从内核中拿出套接字,新的文件描述符必须添加到_fd_array中,可以下次select监控。
void Accepter()
{
InetAddr client;
int sockfd = _listensock->Accept(&client);
if (sockfd >= 0)
{
// 寻找空闲位置存储新的文件描述符
int pos = 0;
for (; pos < size; pos++)
{
if (_fd_array[pos] == defaultfd)
break;
}
if (pos == size)
{
LOG(LogLevel::WARNING) << "select server full";
close(sockfd); // 服务器满载,关闭连接
}
else
{
_fd_array[pos] = sockfd; // 保存新的客户端文件描述符
}
}
}
#pragma once#include <iostream>
#include <memory>
#include <unistd.h>
#include "Socket.hpp"
#include "Log.hpp"using namespace SocketModule;
using namespace LogModule;using namespace SocketModule;
using namespace LogModule;class SelectServer
{const static int size=sizeof(fd_set)*8;const static int defaultfd=-1;public:SelectServer(int port):_listensock(std::make_unique<TcpSocket>()),_isrunning(false){_listensock->BuildTcpSocketMethod(port);for(int i=0;i<size;i++){_fd_array[i]=defaultfd;}_fd_array[0]=_listensock->Fd();}void Start(){_isrunning=true;while(_isrunning){fd_set rfds;FD_ZERO(&rfds);int maxfd=defaultfd;for(int i=0;i<size;i++){if(_fd_array[i]==defaultfd)continue;FD_SET(_fd_array[i],&rfds);if(maxfd<_fd_array[i]){maxfd=_fd_array[i];}}PrintFd();int n=select(maxfd+1,&rfds,nullptr,nullptr,nullptr);switch(n){case -1:LOG(LogLevel::ERROR)<<"select error";break;case 0:LOG(LogLevel::INFO)<<"time out";break;default:LOG(LogLevel::DEBUG)<<"有事件就绪"<<n;break;}}_isrunning=false;}void Dispatcher(fd_set& rfds){for(int i=0;i<size;i++){if(_fd_array[i]==defaultfd)continue;if(FD_ISSET(_fd_array[i],&rfds)){if(_fd_array[i]==_listensock->Fd()){Accepter();}else{Recver(_fd_array[i],i);}}}}void Accepter(){InetAddr client;int sockfd=_listensock->Accept(&client);if(sockfd>=0){LOG(LogLevel::INFO)<<"get a new link sockfd"<<sockfd<<client.StringAddr();int pos=0;for(;pos<size;pos++){if(_fd_array[pos]==defaultfd)break;}if(pos==size){LOG(LogLevel::WARNING)<<"slect server full";close(sockfd);}else{_fd_array[pos]=sockfd;}}}void Recver(int fd,int pos){char buffer[1024];ssize_t n=recv(fd,buffer,sizeof(buffer)-1,0);if(n>0){buffer[n]=0;std::cout<<"clinet say@"<<buffer<<std::endl;}else if(n==0){LOG(LogLevel::INFO)<<"client quit";_fd_array[pos]=defaultfd;close(fd);}else{LOG(LogLevel::ERROR)<<"recv error";_fd_array[pos]=defaultfd;close(fd);}}void PrintFd(){std::cout<<"fd_array[]";for(int i=0;i<size;i++){if(_fd_array[i]==defaultfd)continue;}std::cout<<"\r\n";}void Stop(){_isrunning=false;}~SelectServer(){}private:std::unique_ptr<Socket>_listensock;bool _isrunning;int _fd_array[size];
};
poll介绍
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
fds:指向pollfd结构体数组的指针
nfds:要监控的文件描述符数量
timeout:超时时间设置
pollfd结构体
struct pollfd {
int fd; /* 文件描述符 */
short events; /* 请求监控的事件 */
short revents; /* 返回的就绪事件 */
}
事件表
输入事件
-
POLLIN:普通数据或优先数据可读
-
POLLRDNORM:普通数据可读
-
POLLRDBAND:优先级带数据可读
-
POLLPRI:高优先级数据可读
输出事件
-
POLLOUT:普通数据可写
-
POLLWRNORM:普通数据可写
-
POLLWRBAND:优先级带数据可写
错误事件
-
POLLERR:发生错误
-
POLLHUP:文件描述符被挂起
-
POLLNVAL:文件描述符无效
补充
虽然代码是单线程的,但是可以满足多个客服端同时访问,因为poll同时检测多个socket同时就绪,会依次处理,因为是无阻塞的,每一个都是立即完成,让每一个客服端都及时响应。
poll与select区别
poll没有文件描述符数量的硬件限制,select使用位图管理文件描述符,Poll用结构体数组管理
poll特点
用户态到内核态需要拷贝,开销大,内核遍历所有的文件描述符,返回处理结果,将revents结果拷贝回用户空间。
poll代码实现
将所有的pollfd结构体的三个字段设置为初始值
监听套接字放在在数组首位,events设置为POLLIN监控读事件
PollServer(int port) : _listensock(std::make_unique<TcpSocket>()), _isrunning(false)
{_listensock->BuildTcpSocketMethod(port);for(int i = 0; i < size; i++){_fds[i].fd = defaultfd;_fds[i].events = 0;_fds[i].revents = 0;}_fds[0].fd = _listensock->Fd();_fds[0].events = POLLIN;
}
timeout为-1表示无限等待,直到有事件发生,_fds参数是直接传入整个数组,不需要像select每次重新设置监控集合。
int timeout = -1;
int n = poll(_fds, size, timeout);
用位检查运算revents&POLLIN来直到哪一位是就绪的,相同的数组用&是不变的,保证事件就绪,如何再判断fd是否为监听套接字,是就表明有新连接的到来,否则就是普通套接字。
void Dispatcher()
{for(int i = 0; i < size; i++){if(_fds[i].fd == defaultfd)continue;if(_fds[i].revents & POLLIN) // 位运算检测读事件{if(_fds[i].fd == _listensock->Fd())Accepter();elseRecver(i);}}
}
#pragma once#include <iostream>
#include <memory>
#include <unistd.h>
#include <sys/poll.h>
#include "Socket.hpp"
#include "Log.hpp"using namespace SocketModule;
using namespace LogModule;class PollServer
{const static int size=4096;const static int defaultfd=-1;public:PollServer(int port):_listensock(std::make_unique<TcpSocket>()),_isrunning(false){_listensock->BuildTcpSocketMethod(port);for(int i=0;i<size;i++){_fds[i].fd=defaultfd;_fds[i].events=0;_fds[i].revents=0;}_fds[0].fd=_listensock->Fd();_fds[0].events=POLLIN;}void Start(){int timeout=-1;_isrunning=true;while(_isrunning){Printfd();int n=poll(_fds,size,timeout);switch(n){case -1:LOG(LogLevel::ERROR)<<"poll error";break;case 0:LOG(LogLevel::INFO)<<"poll time out...";break;default:LOG(LogLevel::DEBUG)<<"有事件就绪了"<<n;Dispatcher();break;}}_isrunning=false;}void Dispatcher(){for(int i=0;i<size;i++){if(_fds[i].fd==defaultfd)continue;if(_fds[i].revents&POLLIN){if(_fds[i].fd==_listensock->Fd()){Accepter();}else{Recver(i);}}}}void Accepter(){InetAddr client;int sockfd=_listensock->Accept(&client);if(sockfd>0){LOG(LogLevel::INFO)<<"get a new link,sockfd:"<<sockfd<<"client is"<<client.StringAddr();int pos=0;for(;pos<size;pos++){if(_fds[pos].fd==defaultfd){break;}}if(pos==size){LOG(LogLevel::WARNING)<<"poll server full";close(sockfd);}else{_fds[pos].fd=sockfd;_fds[pos].events=POLLIN;_fds[pos].revents=0;}}}void Recver(int pos){char buffer[1024];ssize_t n=recv(_fds[pos].fd,buffer,sizeof(buffer)-1,0);if(n>0){buffer[n]=0;std::cout<<"client say@"<<buffer<<std::endl;}else if(n==0){LOG(LogLevel::INFO)<<"clinet quit...";close(_fds[pos].fd);_fds[pos].fd=defaultfd;_fds[pos].events=0;_fds[pos].revents=0;}}void Printfd(){std::cout<<"_fds[]";for(int i=0;i<size;i++){if(_fds[i].fd==defaultfd)continue;std::cout<<_fds[i].fd<<" ";}std::cout<<"\r\n";}void Stop(){_isrunning=false;}~PollServer(){}
private:std::unique_ptr<Socket> _listensock;bool _isrunning;struct pollfd _fds[size];
};