当前位置: 首页 > ops >正文

《Linux C编程实战》笔记:多路复用

select方式

#include <sys/select.h>int select(int nfds,fd_set *readfds,fd_set *writefds,fd_set *exceptfds,struct timeval *timeout);
参数名含义
nfds所有监听的 最大文件描述符+1(注意:不是数组长度)
readfds你关心的 可读事件
writefds你关心的 可写事件
exceptfds你关心的 异常事件
timeout超时时间(为 nullptr 表示无限等待)

timeval在《Linux C编程实战》笔记:文件属性操作函数_linux获取文件属性c语言函数-CSDN博客中介绍过 :

struct timeval {long tv_sec;   // 秒long tv_usec;  // 微秒(1秒 = 1000000微秒)
};
  • 设置为 {0, 0}:立即返回,不等待(轮询)

  • 设置为 {5, 0}:等待 5 秒

  • 设置为 nullptr阻塞等待直到某 fd 有事件

系统为文件描述符集合提供了一系列的宏方便操作:

FD_ZERO(fd_set *set)//清空一个文件描述符集合(即把所有位清零,相当于初始化)。
FD_SET(int fd, fd_set *set)//将一个文件描述符 fd 添加到集合 set 中。对应位被置为 1。
FD_CLR(int fd, fd_set *set)//将一个文件描述符从集合中移除(对应位清 0)。
FD_ISSET(int fd, fd_set *set)//检查一个文件描述符是否在集合中,即对应位是否为 1。通常在 select() 返回之后使用,用来判断某个 fd 是否就绪。

这些宏配合使用的结构:fd_set

typedef struct {unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(unsigned long))];
} fd_set;
  • 它本质上是一个位图(bitset)结构,最多能表示 FD_SETSIZE 个文件描述符。

  • 通常 FD_SETSIZE == 1024,也就是说最多能监听 1024 个 fd。

select函数的功能相当于:当套接字上有事件发生时(如有数据到达),系统通知服务器进程告知哪个套接字上发生了什么事,服务器进程查询对应套接字并进行处理。若套接字上没有事件发生时,服务器进程不会去查询套接字的状态,不会浪费CPU时间

参数nfds是需要监视的文件描述符数,要监视的文件描述符值为0~nfds-1。参数readfds指定需
要监视的可读文件描述符集合,当这个集合中的一个描述符上有数据到达时,系统将通知调用
select函数的程序。参数writefds指定需要监视的可写文件描述符集合,当这个集合中的某个描
述符可以发送数据时,程序将收到通知。参数exceptfds指定需要监视的异常文件描述符集合,
当该集合中的一个描述符发生异常时,程序将收到通知。参数timeout指定了阻塞的时间,如果
在这段时间内监视的文件描述符上都没有事件发生,则函数selectO将返回0。

如果select设定的要监视的文件描述符集合中有描述符发生了事件,则select将返回发生事件的文件描述符个数。

如果监听的文件描述符集合中有的描述符无事件发生,则会把该描述符踢出集合中;所以最好事先拷贝一份原集合。

select 本质上是一种单线程的 I/O 多路复用机制select 是单线程,但能同时管理多个 I/O

它的核心优势不是并行处理,而是:

  • 通过一个线程同时监听多个文件描述符(fd);

  • 避免一个线程阻塞在某个 fd 上;

  • 一旦某些 fd “就绪”,再有选择地去 read()write()

select 带来的“并发性”是什么?

不是线程级别的并行,而是:

模式特点并发性能
每连接一线程简单易懂,线程多了开销大差(线程爆炸)
select单线程,多连接统一管理中等
epoll / kqueue事件驱动,可多线程+高性能高(现代首选)

select 的并发场景举例

比如你有一个服务器同时连接 500 个客户端:

  • 如果每个连接都分一个线程→ 系统崩溃;

  • 如果用 select,只需要一个线程就能管理所有 500 个连接;

  • 每次 select() 返回一批就绪的 fd,依次处理。

所以 select 适合什么?

场景是否适合用 select
少量连接,逻辑简单✅ 适合
中等连接(几十到上百)⚠️ 可用,但性能瓶颈明显
高并发(几千+连接)❌ 推荐用 epollio_uring

单线程阻塞回声服务器

下面展示一个基础的回声服务器模型;单线程,一段时间只能由一个客户端占有服务器进行回声服务。

服务器代码:

#include <cstdio>
#include<iostream>
#include <cstdlib>
#include<cstring>
#include<unistd.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include <sys/socket.h>
#define BUF_SIZE 1024
void error_handling(int res,const char* message) {//错误处理函数if (res == -1) {std::cerr << message << std::endl;exit(EXIT_FAILURE);}
}
int main(int argc,char *argv[])//程序执行时指定端口
{int serv_sock, clnt_sock;char message[BUF_SIZE];int str_len;sockaddr_in serv_adr, clnt_adr;socklen_t clnt_adr_sz = sizeof(clnt_adr);if (argc != 2) {std::cout << "Usage:" << argv[0] << "<port>" << std::endl;exit(EXIT_FAILURE);}serv_sock = socket(AF_INET, SOCK_STREAM, 0);//tcperror_handling(serv_sock,"socket");serv_adr.sin_family = AF_INET;serv_adr.sin_addr.s_addr = INADDR_ANY;serv_adr.sin_port = htons(atoi(argv[1]));//由参数指定监听的端口memset(serv_adr.sin_zero, 0, sizeof(serv_adr.sin_zero));error_handling(bind(serv_sock, reinterpret_cast<sockaddr*>(&serv_adr), sizeof(serv_adr)), "bind");//socket 绑定error_handling(listen(serv_sock, 5), "listen");//监听for (int i = 0; i < 5; i++) {//顺序取出连接的客户端请求,处理完一个客户端后才能处理下一个(也即只能等客户端自行断开连接)clnt_sock = accept(serv_sock, reinterpret_cast<sockaddr*>(&clnt_adr), &clnt_adr_sz);//调用accepterror_handling(clnt_sock, "accept");std::cout << "Connected client " << i + 1 << std::endl;memset(message, 0, BUF_SIZE);while ((str_len = recv(clnt_sock, message, BUF_SIZE, 0)) != 0) {//阻塞等待客户端发送数据send(clnt_sock, message, str_len,0);//再发回去}close(clnt_sock);}close(serv_sock);return 0;
}

客户端代码

#include <cstdio>
#include<cstdlib>
#include<iostream>
#include<cstring>
#include<unistd.h>
#include<sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#define BUF_SIZE 1024
void error_handling(int res, const char* message) {if (res == -1) {std::cerr << message << std::endl;exit(EXIT_FAILURE);}
}
int main(int argc, char* argv[])
{int sock;char message[BUF_SIZE];int str_len;sockaddr_in serv_adr;if (argc != 3) {//指定服务器ip和端口printf("Usage : %s <IP> <port>\n", argv[0]);exit(EXIT_FAILURE);}sock = socket(PF_INET, SOCK_STREAM, 0);error_handling(sock, "socket");serv_adr.sin_family = AF_INET;inet_pton(AF_INET, argv[1], &serv_adr.sin_addr);//ipserv_adr.sin_port = htons(atoi(argv[2]));//端口error_handling(connect(sock, reinterpret_cast<sockaddr*>(&serv_adr), sizeof(serv_adr)),"connect");std::cout << "Connected......" << std::endl;while (1) {memset(message, 0, BUF_SIZE);std::cout << "Input message(Q to quit):" << std::endl;std::cin >> message;if (strcmp(message, "Q") == 0) break;//退出send(sock, message, strlen(message) + 1,0);//发送memset(message, 0, BUF_SIZE);str_len = recv(sock, message, BUF_SIZE, 0);std::cout << "Message from server:" << message << std::endl;}close(sock);}

使用c++线程实现多连接

服务器代码

#include <cstdio>
#include<iostream>
#include <cstdlib>
#include<cstring>
#include<unistd.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<sys/socket.h>
#include<thread>
#include<vector>
#define BUF_SIZE 1024
void error_handling(int res,const char* message) {//错误处理函数if (res == -1) {std::cerr << message << std::endl;exit(EXIT_FAILURE);}
}
int main(int argc,char *argv[])//程序执行时指定端口
{int serv_sock;int i = 0;sockaddr_in serv_adr;if (argc != 2) {std::cout << "Usage:" << argv[0] << "<port>" << std::endl;exit(EXIT_FAILURE);}serv_sock = socket(AF_INET, SOCK_STREAM, 0);//tcperror_handling(serv_sock,"socket");serv_adr.sin_family = AF_INET;serv_adr.sin_addr.s_addr = INADDR_ANY;serv_adr.sin_port = htons(atoi(argv[1]));//由参数指定监听的端口memset(serv_adr.sin_zero, 0, sizeof(serv_adr.sin_zero));error_handling(bind(serv_sock, reinterpret_cast<sockaddr*>(&serv_adr), sizeof(serv_adr)), "bind");//socket 绑定error_handling(listen(serv_sock, 5), "listen");//监听std::vector<std::thread> workers;for (int k = 0; k < 5; k++) {//最多可以有5个客户端同时进行连接(服务器只能服务5个,再多就不行了)//如果想无限服务的话,在线程里进行无限的while循环即可workers.emplace_back([serv_sock]() {sockaddr_in clnt_addr;socklen_t clnt_adr_sz = sizeof(clnt_addr);int clnt_sock = accept(serv_sock, reinterpret_cast<sockaddr*>(&clnt_addr), &clnt_adr_sz);error_handling(clnt_sock, "accept");char message[BUF_SIZE];int str_len;memset(message, 0, BUF_SIZE);while ((str_len = recv(clnt_sock, message, BUF_SIZE, 0)) != 0) {//阻塞等待客户端发送数据send(clnt_sock, message, str_len, 0);//再发回去memset(message, 0, BUF_SIZE);}close(clnt_sock);std::cout << "Client disconnected." << std::endl;});}for (auto& t : workers) {if (t.joinable())t.join();}close(serv_sock);return 0;
}

accept() 是线程安全的系统调用

在 Linux 下,accept() 调用是线程安全的(可以被多个线程同时调用在同一个监听 socket 上),这是操作系统内核保证的。

当你有多个线程在同时调用 accept() 时,它们的行为就像这样:

  1. 监听 socket 内核缓冲区中存放了客户端的连接请求队列(backlog 队列)。

  2. 所有调用 accept() 的线程,阻塞等待这个队列中的连接

  3. 一旦有连接到达,内核:

    • 只唤醒一个线程

    • 将连接分配给它,其他线程继续阻塞或重新抢占。

  4. 所以不会两个线程“抢到”同一个连接,不会冲突。

使用select实现多路复用

#include <cstdio>
#include<iostream>
#include <cstdlib>
#include<cstring>
#include<unistd.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<sys/socket.h>
#include <sys/select.h>
#define BUF_SIZE 1024
void error_handling(int res,const char* message) {//错误处理函数if (res == -1) {std::cerr << message << std::endl;exit(EXIT_FAILURE);}
}
int main(int argc,char *argv[])//程序执行时指定端口
{int serv_sock, clnt_sock;sockaddr_in serv_adr, clnt_adr;socklen_t clnt_adr_sz;if (argc != 2) {std::cerr << "Usage: " << argv[0] << " <port>" << std::endl;return EXIT_FAILURE;}serv_sock = socket(AF_INET, SOCK_STREAM, 0);error_handling(serv_sock, "socket() error");memset(&serv_adr, 0, sizeof(serv_adr));serv_adr.sin_family = AF_INET;serv_adr.sin_addr.s_addr = INADDR_ANY;serv_adr.sin_port = htons(std::atoi(argv[1]));error_handling(bind(serv_sock, reinterpret_cast<sockaddr*>(&serv_adr), sizeof(serv_adr)), "bind() error");error_handling(listen(serv_sock, 5), "listen() error");fd_set reads, cpy_reads;//需要有两个集合,一个用来备份FD_ZERO(&reads);FD_SET(serv_sock, &reads);//把服务端socket加入reads集合中,监听它是否接收到连接int fd_max = serv_sock;//现在的fd最大值是服务器socketchar buf[BUF_SIZE];std::cout << "Server listening on port " << argv[1] << "..." << std::endl;while (true) {cpy_reads = reads;//cpy_reads是reads的备份,每次while循环都要复制一遍,因为循环里会增删reads集合timeval timeout;timeout.tv_sec = 5;//阻塞时间为5秒timeout.tv_usec = 0;//fd_max + 1;需要加一;请看select参数讲解int ret = select(fd_max + 1, &cpy_reads, nullptr, nullptr, &timeout);//select调用会修改cpy_reads,所以不能直接传reads进去if (ret == -1) break;if (ret == 0) continue;//无事件发生for (int fd = 0; fd <= fd_max; fd++) {//遍历if (FD_ISSET(fd, &cpy_reads)) {//如果fd在集合cpy_reads中if (fd == serv_sock) {//服务端 socket 有活动 → 说明有新客户端连入;clnt_adr_sz = sizeof(clnt_adr);clnt_sock = accept(serv_sock, reinterpret_cast<sockaddr*>(&clnt_adr), &clnt_adr_sz);error_handling(clnt_sock, "accept() error");FD_SET(clnt_sock, &reads);if (clnt_sock > fd_max) fd_max = clnt_sock;std::cout << "New client connected: fd=" << clnt_sock << std::endl;}else {//客户端 socket 有活动 → 有数据需要接收;int str_len = recv(fd, buf, BUF_SIZE, 0);if (str_len <= 0) {FD_CLR(fd, &reads);//该客户端发送完毕,不用再监听了close(fd);std::cout << "Client disconnected: fd=" << fd << std::endl;}else {send(fd, buf, str_len, 0);//回声}}}}}close(serv_sock);return 0;
}

http://www.xdnf.cn/news/10786.html

相关文章:

  • vue3:Table组件动态的字段(列)权限、显示隐藏和左侧固定
  • Oracle中的循环——FOR循环、WHILE循环和LOOP循环
  • 免费批量文件重命名软件
  • Spring AI Alibaba + Nacos 动态 MCP Server 代理方案
  • 重新审视自回归语言模型的知识蒸馏
  • 总览四级考试
  • 用AI(Deepseek)做了配色网站-功能介绍【欢迎体验】
  • 电子电路:全面深入了解晶振的定义、作用及应用
  • linux安全加固(非常详细)
  • Redis:常用数据结构 单线程模型
  • 多线程编程中的数据竞争与内存可见性问题解析
  • [Java 基础]变量,装东西的盒子
  • 基于QwenAgent解锁Qwen3无思考高效模式:vLLM部署实战与Ollama模板定制
  • 美尔斯通携手北京康复辅具技术中心开展公益活动,科技赋能助力银龄健康管理
  • RabbitMQ在SpringBoot中的应用
  • 六步完成软件验收:从计划到终验的全面指南(二)
  • smartGit 试用突破30天
  • HCIP(BGP基础)
  • 工厂方法模式深度解析:从原理到应用实战
  • 【灵动Mini-F5265-OB】vscode+gcc工程创建、下载、调试
  • Unity——QFramework框架 内置工具
  • 强制卸载openssl-libs导致系统异常的修复方法
  • 无人机智能识别交通目标,AI视觉赋能城市交通治理新高度
  • 【OCCT+ImGUI系列】012-Geom2d_AxisPlacement
  • EPSON差分晶振X1G005331000100,SG7050VEN晶振6G无线应用
  • JVM简介
  • 二叉树(二)
  • 深入理解前端DOM:现代Web开发的基石
  • Ansys Zemax | 手机镜头设计 - 第 4 部分:用 LS-DYNA 进行冲击性能分析
  • Android Native 内存泄漏检测全解析:从原理到工具的深度实践