当前位置: 首页 > ops >正文

五种IO模型与阻塞IO

1.阻塞式

阻塞IO:在内核将数据准备就绪之前,系统调用会一直等待,套接字默认都是阻塞方式。

例子如钓鱼,张三哪一个鱼竿去钓鱼,会一直盯着水面看是否有鱼上钩,这个期间什么都不会做。

非阻塞式

非阻塞IO:内核没有将数据就绪好,系统调用仍然会直接返回,并返回错误码EWOULDBLOCK,表示当前操作在阻塞模式下执行,就会阻塞等待,因为不是阻塞就不会等待。

例子钓鱼,李四拿着鱼竿鱼钓鱼 ,把鱼漂扔进水里,就去干别的事情了,偶然注意鱼漂。

信号驱动式

信号驱动IO:内核将数据准备好,使用SIGIO信号通知上层进行IO操作。

例子钓鱼,王五拿着鱼竿去钓鱼,在鱼竿上吊着铃铛,铃铛响了就拉杆。

 

多路转接

IO多路转接:多路转接可以同时等待多个文件描述符的就绪状态,一旦有一个出现就绪状态就会马上处理。

例子钓鱼,赵六去钓鱼,拿了很多鱼竿并将鱼竿都放置好,然后来回走动观看那个鱼竿动了,就过去收杆。

异步IO

异步IO模式:内核把数据拷贝完成才通知上层,去处理拷贝完的数据,与信号驱动不同的是,信号驱动是数据就绪时通知,这个是完成拷贝通知。

高级IO概念

同步通信

同步就是发出一个调用时,没有得到结果之前是不返回的,但是一旦调用就得得到返回值,调用者主动等待调用的结果。

异步通信

调用发出后,这个调用就直接返回了,没有返回结果,一个异步调用发出后,调用者本身不会立刻得到结果,而是在调用发出后,被调用者通过状态,通知来告诉调用者,或通过回调函数来处理这个调用,就是调用者本身不参与钓鱼过程,只参与了拿到鱼步骤。

非阻塞函数

系统调用函数:fcntl

参数介绍

fd要操作的文件描述符,cmd要执行的操作命令,arg可选参数。

下面是fcntl函数五种功能,获取和设置文件状态标记,可以实现一个文件描述符变成非阻塞式。

cmd参数为F_GETFL时,fcntl将当前文件描述符的属性提取出来

cmd参数为F_SETFL时,fcntl用于设置文件描述符的状态标志。

  1. F_DUPFD:复制文件描述符

    • 用于创建一个新的文件描述符,它与原文件描述符 fd 指向同一个文件。

    • 新的文件描述符会是大于等于 arg 的最小可用值。

  2. F_GETFLF_SETFL:获取和设置文件状态标志

    • F_GETFL:获取文件的当前状态标志(如 O_RDONLYO_WRONLYO_RDWR 等)。

    • F_SETFL:设置文件的状态标志。常用于设置文件的非阻塞模式(O_NONBLOCK)。

  3. F_GETFDF_SETFD:获取和设置文件描述符标志

    • 用于控制文件描述符的行为,例如是否在 exec 时关闭文件描述符(FD_CLOEXEC)。

  4. F_GETLKF_SETLKF_SETLKW:记录锁操作

    • 用于在文件上设置或获取记录锁(文件锁定机制),用于进程间的同步。

 int fcntl(int fd, F_SETFL, int flags);

flags:新的文件状态标志,如O_APPEND写操作时可以追加内容不会覆盖,O_NONBLOCK表示的是非阻塞模式。

多路转接函数

#include <sys/select.h>

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

  1. nfds: 需要检查的文件描述符的最大值加1。即所有被监视的文件描述符中最大的那个加1。这样内核只需检查0到nfds-1的文件描述符。
  2. readfds: 指向fd_set结构的指针,用于监视文件描述符的读就绪状态。如果某个文件描述符上有数据可读(包括连接断开等),则会被标记。调用后,只有就绪的文件描述符会被保留在集合中。
  3. writefds: 指向fd_set结构的指针,用于监视文件描述符的写就绪状态。如果某个文件描述符可以写入数据(而不导致阻塞),则会被标记。
  4. exceptfds: 指向fd_set结构的指针,用于监视文件描述符的异常状态(如带外数据到达)。
  5. timeout: 指向timeval结构的指针,用于设置select的超时时间。如果设置为NULL,则select会一直阻塞直到有文件描述符就绪;如果timeval的两个字段都为0,则select会立即返回(轮询模式)。

readfds和writefds是一个集合,分别表示监察的可读写文件集合。初始调用时是输入输出型参数,输入型是要把检查的文件描述符传进去,输出型体现在执行完后会把就绪的文件描述符放到参数里,集合是以位图形式存储文件描述符状态,0表示为就绪,1表示就绪,然后第几位就表示第一个文件描述符就绪了。

select优点和缺点

可以被监控的文件描述符个数是有限的,通过sizeof(fd_set)看到,不同版本可能显示的不一样,

每次调用select需要手动设置fd集合,从接口方面说不方便。

同时每次调用select都要在内核遍历传递过来的fd,会有很大开销。

select支持的文件描述符数量太小。

每次调用select,都需要把fd集合从用户态拷贝到内核态,也有很大的开销。

select特点

可监控的文件描述符个数取决于sizeof(fd_set)的值,每一个bit表示一个文件描述符,则服务器最大支持文件描述符个数为512*8=4096,将fd加入select监控集时,还需要再使用一个数据结构保存放入的fd,一是select返回后,array作为源数据与fd_set进行FD_ISSET判断。第二是,select返回后会把以前加入的selectfd清空,每次开始都要重新从array中取地fd驱逐,扫描array时,可以得到fd最大值maxfd,作为select第一个参数。

select代码实现

size定义了文件描述符的最大容量是多少,fd_set是一个类型,是有上限的,不同环境不一样,一般是1024个,并且设置默认的数组值为-1。

const static int size = sizeof(fd_set) * 8;
const static int defaultfd = -1;

_listensock让智能指针管理监听套接字,unique_ptr模板保证了唯一指针指向套接字,数组保存要监控的文件描述符。 

 std::unique_ptr<Socket> _listensock;  // 监听套接字
bool _isrunning;                      // 服务器运行状态
int _fd_array[size];                  // 文件描述符管理数组

 创建TCP套接字,初始化所有数组的元素为-1,将监听套接字的文件描述符放在数组首位位置,保证了select的n不为0,会执行处理事件函数,里面还有判断是否为监听套接字,是就表示有新连接到来,更新数组。

SelectServer(int port) : _listensock(std::make_unique<TcpSocket>()), _isrunning(false)
{
    _listensock->BuildTcpSocketMethod(port);
    for (int i = 0; i < size; i++)
        _fd_array[i] = defaultfd;
    _fd_array[0] = _listensock->Fd();
}
 

 每次循环都会初始化fd_set结构,select函数会修改传入的fd_set。而且上一次监控的对象下一次可能不监控了,所以要清空,重新设置监控对象。

fd_set rfds;    // 定义fds集合
FD_ZERO(&rfds); // 清空fds

maxfd+1监控范围,必须是最大文件描述符值+1,rfds监控读事件的文件描述符集合,第一个nullptr不监控写事件,第二个不监控异常事件,最后一个无超时限制,一直阻塞到有事件发生。 

 int n = select(maxfd + 1, &rfds, nullptr, nullptr, nullptr);

使用FD_ISSET宏检查特定的文件描述符是否就在集合中,一种是监听套接字就绪,表示的是有客服端连接请求,普通套接字表示有数据可读。 当有新客服端发起连接请求时,监听套接字读事件就绪,可以调用accept从内核中拿到新的fd。

void Dispatcher(fd_set &rfds)
{
    for (int i = 0; i < size; i++)
    {
        if (_fd_array[i] == defaultfd)
            continue;
        if (FD_ISSET(_fd_array[i], &rfds))  // 检查fd是否就绪
        {
            if (_fd_array[i] == _listensock->Fd())
            {
                Accepter();  // 处理新连接
            }
            else
            {
                Recver(_fd_array[i], i);  // 处理数据读取
            }
        }
    }

acccept函数用于接收客服端请求,返回一个新的已连接的套接字,accept在这里不会阻塞,因为select确认了有套接字就绪了,就可以直接从内核中拿出套接字,新的文件描述符必须添加到_fd_array中,可以下次select监控。 

void Accepter()
{
    InetAddr client;
    int sockfd = _listensock->Accept(&client);
    if (sockfd >= 0)
    {
        // 寻找空闲位置存储新的文件描述符
        int pos = 0;
        for (; pos < size; pos++)
        {
            if (_fd_array[pos] == defaultfd)
                break;
        }
        if (pos == size)
        {
            LOG(LogLevel::WARNING) << "select server full";
            close(sockfd);  // 服务器满载,关闭连接
        }
        else
        {
            _fd_array[pos] = sockfd;  // 保存新的客户端文件描述符
        }
    }
}
 

#pragma once#include <iostream>
#include <memory>
#include <unistd.h>
#include "Socket.hpp"
#include "Log.hpp"using namespace SocketModule;
using namespace LogModule;using namespace SocketModule;
using namespace LogModule;class SelectServer
{const static int size=sizeof(fd_set)*8;const static int defaultfd=-1;public:SelectServer(int port):_listensock(std::make_unique<TcpSocket>()),_isrunning(false){_listensock->BuildTcpSocketMethod(port);for(int i=0;i<size;i++){_fd_array[i]=defaultfd;}_fd_array[0]=_listensock->Fd();}void Start(){_isrunning=true;while(_isrunning){fd_set rfds;FD_ZERO(&rfds);int maxfd=defaultfd;for(int i=0;i<size;i++){if(_fd_array[i]==defaultfd)continue;FD_SET(_fd_array[i],&rfds);if(maxfd<_fd_array[i]){maxfd=_fd_array[i];}}PrintFd();int n=select(maxfd+1,&rfds,nullptr,nullptr,nullptr);switch(n){case -1:LOG(LogLevel::ERROR)<<"select error";break;case 0:LOG(LogLevel::INFO)<<"time out";break;default:LOG(LogLevel::DEBUG)<<"有事件就绪"<<n;break;}}_isrunning=false;}void Dispatcher(fd_set& rfds){for(int i=0;i<size;i++){if(_fd_array[i]==defaultfd)continue;if(FD_ISSET(_fd_array[i],&rfds)){if(_fd_array[i]==_listensock->Fd()){Accepter();}else{Recver(_fd_array[i],i);}}}}void Accepter(){InetAddr client;int sockfd=_listensock->Accept(&client);if(sockfd>=0){LOG(LogLevel::INFO)<<"get a new link sockfd"<<sockfd<<client.StringAddr();int pos=0;for(;pos<size;pos++){if(_fd_array[pos]==defaultfd)break;}if(pos==size){LOG(LogLevel::WARNING)<<"slect server full";close(sockfd);}else{_fd_array[pos]=sockfd;}}}void Recver(int fd,int pos){char buffer[1024];ssize_t n=recv(fd,buffer,sizeof(buffer)-1,0);if(n>0){buffer[n]=0;std::cout<<"clinet say@"<<buffer<<std::endl;}else if(n==0){LOG(LogLevel::INFO)<<"client quit";_fd_array[pos]=defaultfd;close(fd);}else{LOG(LogLevel::ERROR)<<"recv error";_fd_array[pos]=defaultfd;close(fd);}}void PrintFd(){std::cout<<"fd_array[]";for(int i=0;i<size;i++){if(_fd_array[i]==defaultfd)continue;}std::cout<<"\r\n";}void Stop(){_isrunning=false;}~SelectServer(){}private:std::unique_ptr<Socket>_listensock;bool _isrunning;int _fd_array[size];
};

poll介绍

#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

fds:指向pollfd结构体数组的指针

nfds:要监控的文件描述符数量

timeout:超时时间设置

pollfd结构体

struct pollfd {
    int fd;        /* 文件描述符 */
    short events;  /* 请求监控的事件 */
    short revents; /* 返回的就绪事件 */
}

事件表

输入事件

  • POLLIN:普通数据或优先数据可读

  • POLLRDNORM:普通数据可读

  • POLLRDBAND:优先级带数据可读

  • POLLPRI:高优先级数据可读

输出事件

  • POLLOUT:普通数据可写

  • POLLWRNORM:普通数据可写

  • POLLWRBAND:优先级带数据可写

错误事件

  • POLLERR:发生错误

  • POLLHUP:文件描述符被挂起

  • POLLNVAL:文件描述符无效

补充

虽然代码是单线程的,但是可以满足多个客服端同时访问,因为poll同时检测多个socket同时就绪,会依次处理,因为是无阻塞的,每一个都是立即完成,让每一个客服端都及时响应。

poll与select区别

poll没有文件描述符数量的硬件限制,select使用位图管理文件描述符,Poll用结构体数组管理

poll特点

用户态到内核态需要拷贝,开销大,内核遍历所有的文件描述符,返回处理结果,将revents结果拷贝回用户空间。

poll代码实现

将所有的pollfd结构体的三个字段设置为初始值

监听套接字放在在数组首位,events设置为POLLIN监控读事件

PollServer(int port) : _listensock(std::make_unique<TcpSocket>()), _isrunning(false)
{_listensock->BuildTcpSocketMethod(port);for(int i = 0; i < size; i++){_fds[i].fd = defaultfd;_fds[i].events = 0;_fds[i].revents = 0;}_fds[0].fd = _listensock->Fd();_fds[0].events = POLLIN;
}

timeout为-1表示无限等待,直到有事件发生,_fds参数是直接传入整个数组,不需要像select每次重新设置监控集合。

int timeout = -1;
int n = poll(_fds, size, timeout);

用位检查运算revents&POLLIN来直到哪一位是就绪的,相同的数组用&是不变的,保证事件就绪,如何再判断fd是否为监听套接字,是就表明有新连接的到来,否则就是普通套接字。 

void Dispatcher()
{for(int i = 0; i < size; i++){if(_fds[i].fd == defaultfd)continue;if(_fds[i].revents & POLLIN)  // 位运算检测读事件{if(_fds[i].fd == _listensock->Fd())Accepter();elseRecver(i);}}
}

 

#pragma once#include <iostream>
#include <memory>
#include <unistd.h>
#include <sys/poll.h>
#include "Socket.hpp"
#include "Log.hpp"using namespace SocketModule;
using namespace LogModule;class PollServer
{const static int size=4096;const static int defaultfd=-1;public:PollServer(int port):_listensock(std::make_unique<TcpSocket>()),_isrunning(false){_listensock->BuildTcpSocketMethod(port);for(int i=0;i<size;i++){_fds[i].fd=defaultfd;_fds[i].events=0;_fds[i].revents=0;}_fds[0].fd=_listensock->Fd();_fds[0].events=POLLIN;}void Start(){int timeout=-1;_isrunning=true;while(_isrunning){Printfd();int n=poll(_fds,size,timeout);switch(n){case -1:LOG(LogLevel::ERROR)<<"poll error";break;case 0:LOG(LogLevel::INFO)<<"poll time out...";break;default:LOG(LogLevel::DEBUG)<<"有事件就绪了"<<n;Dispatcher();break;}}_isrunning=false;}void Dispatcher(){for(int i=0;i<size;i++){if(_fds[i].fd==defaultfd)continue;if(_fds[i].revents&POLLIN){if(_fds[i].fd==_listensock->Fd()){Accepter();}else{Recver(i);}}}}void Accepter(){InetAddr client;int sockfd=_listensock->Accept(&client);if(sockfd>0){LOG(LogLevel::INFO)<<"get a new link,sockfd:"<<sockfd<<"client is"<<client.StringAddr();int pos=0;for(;pos<size;pos++){if(_fds[pos].fd==defaultfd){break;}}if(pos==size){LOG(LogLevel::WARNING)<<"poll server full";close(sockfd);}else{_fds[pos].fd=sockfd;_fds[pos].events=POLLIN;_fds[pos].revents=0;}}}void Recver(int pos){char buffer[1024];ssize_t n=recv(_fds[pos].fd,buffer,sizeof(buffer)-1,0);if(n>0){buffer[n]=0;std::cout<<"client say@"<<buffer<<std::endl;}else if(n==0){LOG(LogLevel::INFO)<<"clinet quit...";close(_fds[pos].fd);_fds[pos].fd=defaultfd;_fds[pos].events=0;_fds[pos].revents=0;}}void Printfd(){std::cout<<"_fds[]";for(int i=0;i<size;i++){if(_fds[i].fd==defaultfd)continue;std::cout<<_fds[i].fd<<" ";}std::cout<<"\r\n";}void Stop(){_isrunning=false;}~PollServer(){}
private:std::unique_ptr<Socket> _listensock;bool _isrunning;struct pollfd _fds[size];
};

 

http://www.xdnf.cn/news/13465.html

相关文章:

  • LeetCode - 1047. 删除字符串中的所有相邻重复项
  • dockerfile 简单搭建 和 supervisor 进程管理工具
  • JAVASE:方法
  • 亚远景-ASPICE在汽车软件全生命周期管理中的作用
  • 7. 整数反转
  • 探索奇妙的LLM应用:提高工作效率的AI代理和RAG合集
  • Jemily张洁领域成就概述:匠心筑品牌,革新引航家用电梯新征程
  • 31.Python编程实战:自动化批量压缩与解压文件
  • GoldenDB简述
  • 【DVWA系列】——xss(DOM)——High详细教程
  • debian12 修改MariaDB数据库存储位置报错
  • 界面控件Kendo UI在实战应用——打通数据链路,重塑业务效率
  • UE5 蓝图按键控制物体旋转、暂停
  • Android NDK: Could not find application project directory
  • 【Mac技巧】修复Mac应用程序无法打开的解决办法
  • tryhackme 之反弹 shell 理解
  • FastAPI的数据契约:Pydantic与SQLModel联手打造健壮API
  • 斐讯N1部署Armbian与CasaOS实现远程存储管理
  • JS之Dom模型和Bom模型
  • strs[0] == “0“是否为字符串内容比较
  • 在GIS 工作流中实现数据处理(2)
  • 想考Kubernetes认证?CKA考试内容与报名全解析
  • 华测CGI-430配置
  • RAG文档解析难点3:Excel多层表头的智能解析与查询方法
  • Linux操作系统-性能优化
  • 电路图识图基础知识-行程开关自动往返运行控制电路详解(二十三)
  • SSL错误无法建立安全连接
  • BIRT交叉表维度自定义排序
  • Spring Cloud与Alibaba微服务架构全解析
  • vue封装移动端日历,可折叠展开,以及考勤