《Linux C编程实战》笔记:多路复用
select方式
#include <sys/select.h>int select(int nfds,fd_set *readfds,fd_set *writefds,fd_set *exceptfds,struct timeval *timeout);
参数名 | 含义 |
---|---|
nfds | 所有监听的 最大文件描述符+1(注意:不是数组长度) |
readfds | 你关心的 可读事件 |
writefds | 你关心的 可写事件 |
exceptfds | 你关心的 异常事件 |
timeout | 超时时间(为 nullptr 表示无限等待) |
timeval在《Linux C编程实战》笔记:文件属性操作函数_linux获取文件属性c语言函数-CSDN博客中介绍过 :
struct timeval {long tv_sec; // 秒long tv_usec; // 微秒(1秒 = 1000000微秒)
};
-
设置为
{0, 0}
:立即返回,不等待(轮询) -
设置为
{5, 0}
:等待 5 秒 -
设置为
nullptr
:阻塞等待直到某 fd 有事件
系统为文件描述符集合提供了一系列的宏方便操作:
FD_ZERO(fd_set *set)//清空一个文件描述符集合(即把所有位清零,相当于初始化)。
FD_SET(int fd, fd_set *set)//将一个文件描述符 fd 添加到集合 set 中。对应位被置为 1。
FD_CLR(int fd, fd_set *set)//将一个文件描述符从集合中移除(对应位清 0)。
FD_ISSET(int fd, fd_set *set)//检查一个文件描述符是否在集合中,即对应位是否为 1。通常在 select() 返回之后使用,用来判断某个 fd 是否就绪。
这些宏配合使用的结构:fd_set
typedef struct {unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(unsigned long))];
} fd_set;
-
它本质上是一个位图(bitset)结构,最多能表示
FD_SETSIZE
个文件描述符。 -
通常
FD_SETSIZE == 1024
,也就是说最多能监听 1024 个 fd。
select函数的功能相当于:当套接字上有事件发生时(如有数据到达),系统通知服务器进程告知哪个套接字上发生了什么事,服务器进程查询对应套接字并进行处理。若套接字上没有事件发生时,服务器进程不会去查询套接字的状态,不会浪费CPU时间
参数nfds
是需要监视的文件描述符数,要监视的文件描述符值为0~nfds
-1。参数readfds指定需
要监视的可读文件描述符集合,当这个集合中的一个描述符上有数据到达时,系统将通知调用
select函数的程序。参数writefds指定需要监视的可写文件描述符集合,当这个集合中的某个描
述符可以发送数据时,程序将收到通知。参数exceptfds指定需要监视的异常文件描述符集合,
当该集合中的一个描述符发生异常时,程序将收到通知。参数timeout指定了阻塞的时间,如果
在这段时间内监视的文件描述符上都没有事件发生,则函数selectO将返回0。
如果select设定的要监视的文件描述符集合中有描述符发生了事件,则select将返回发生事件的文件描述符个数。
如果监听的文件描述符集合中有的描述符无事件发生,则会把该描述符踢出集合中;所以最好事先拷贝一份原集合。
select
本质上是一种单线程的 I/O 多路复用机制。select
是单线程,但能同时管理多个 I/O
它的核心优势不是并行处理,而是:
-
通过一个线程同时监听多个文件描述符(fd);
-
避免一个线程阻塞在某个 fd 上;
-
一旦某些 fd “就绪”,再有选择地去
read()
或write()
。
select 带来的“并发性”是什么?
不是线程级别的并行,而是:
模式 | 特点 | 并发性能 |
---|---|---|
每连接一线程 | 简单易懂,线程多了开销大 | 差(线程爆炸) |
select | 单线程,多连接统一管理 | 中等 |
epoll / kqueue | 事件驱动,可多线程+高性能 | 高(现代首选) |
select 的并发场景举例
比如你有一个服务器同时连接 500 个客户端:
-
如果每个连接都分一个线程→ 系统崩溃;
-
如果用
select
,只需要一个线程就能管理所有 500 个连接; -
每次
select()
返回一批就绪的 fd,依次处理。
所以 select 适合什么?
场景 | 是否适合用 select |
---|---|
少量连接,逻辑简单 | ✅ 适合 |
中等连接(几十到上百) | ⚠️ 可用,但性能瓶颈明显 |
高并发(几千+连接) | ❌ 推荐用 epoll 或 io_uring |
单线程阻塞回声服务器
下面展示一个基础的回声服务器模型;单线程,一段时间只能由一个客户端占有服务器进行回声服务。
服务器代码:
#include <cstdio>
#include<iostream>
#include <cstdlib>
#include<cstring>
#include<unistd.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include <sys/socket.h>
#define BUF_SIZE 1024
void error_handling(int res,const char* message) {//错误处理函数if (res == -1) {std::cerr << message << std::endl;exit(EXIT_FAILURE);}
}
int main(int argc,char *argv[])//程序执行时指定端口
{int serv_sock, clnt_sock;char message[BUF_SIZE];int str_len;sockaddr_in serv_adr, clnt_adr;socklen_t clnt_adr_sz = sizeof(clnt_adr);if (argc != 2) {std::cout << "Usage:" << argv[0] << "<port>" << std::endl;exit(EXIT_FAILURE);}serv_sock = socket(AF_INET, SOCK_STREAM, 0);//tcperror_handling(serv_sock,"socket");serv_adr.sin_family = AF_INET;serv_adr.sin_addr.s_addr = INADDR_ANY;serv_adr.sin_port = htons(atoi(argv[1]));//由参数指定监听的端口memset(serv_adr.sin_zero, 0, sizeof(serv_adr.sin_zero));error_handling(bind(serv_sock, reinterpret_cast<sockaddr*>(&serv_adr), sizeof(serv_adr)), "bind");//socket 绑定error_handling(listen(serv_sock, 5), "listen");//监听for (int i = 0; i < 5; i++) {//顺序取出连接的客户端请求,处理完一个客户端后才能处理下一个(也即只能等客户端自行断开连接)clnt_sock = accept(serv_sock, reinterpret_cast<sockaddr*>(&clnt_adr), &clnt_adr_sz);//调用accepterror_handling(clnt_sock, "accept");std::cout << "Connected client " << i + 1 << std::endl;memset(message, 0, BUF_SIZE);while ((str_len = recv(clnt_sock, message, BUF_SIZE, 0)) != 0) {//阻塞等待客户端发送数据send(clnt_sock, message, str_len,0);//再发回去}close(clnt_sock);}close(serv_sock);return 0;
}
客户端代码
#include <cstdio>
#include<cstdlib>
#include<iostream>
#include<cstring>
#include<unistd.h>
#include<sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#define BUF_SIZE 1024
void error_handling(int res, const char* message) {if (res == -1) {std::cerr << message << std::endl;exit(EXIT_FAILURE);}
}
int main(int argc, char* argv[])
{int sock;char message[BUF_SIZE];int str_len;sockaddr_in serv_adr;if (argc != 3) {//指定服务器ip和端口printf("Usage : %s <IP> <port>\n", argv[0]);exit(EXIT_FAILURE);}sock = socket(PF_INET, SOCK_STREAM, 0);error_handling(sock, "socket");serv_adr.sin_family = AF_INET;inet_pton(AF_INET, argv[1], &serv_adr.sin_addr);//ipserv_adr.sin_port = htons(atoi(argv[2]));//端口error_handling(connect(sock, reinterpret_cast<sockaddr*>(&serv_adr), sizeof(serv_adr)),"connect");std::cout << "Connected......" << std::endl;while (1) {memset(message, 0, BUF_SIZE);std::cout << "Input message(Q to quit):" << std::endl;std::cin >> message;if (strcmp(message, "Q") == 0) break;//退出send(sock, message, strlen(message) + 1,0);//发送memset(message, 0, BUF_SIZE);str_len = recv(sock, message, BUF_SIZE, 0);std::cout << "Message from server:" << message << std::endl;}close(sock);}
使用c++线程实现多连接
服务器代码
#include <cstdio>
#include<iostream>
#include <cstdlib>
#include<cstring>
#include<unistd.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<sys/socket.h>
#include<thread>
#include<vector>
#define BUF_SIZE 1024
void error_handling(int res,const char* message) {//错误处理函数if (res == -1) {std::cerr << message << std::endl;exit(EXIT_FAILURE);}
}
int main(int argc,char *argv[])//程序执行时指定端口
{int serv_sock;int i = 0;sockaddr_in serv_adr;if (argc != 2) {std::cout << "Usage:" << argv[0] << "<port>" << std::endl;exit(EXIT_FAILURE);}serv_sock = socket(AF_INET, SOCK_STREAM, 0);//tcperror_handling(serv_sock,"socket");serv_adr.sin_family = AF_INET;serv_adr.sin_addr.s_addr = INADDR_ANY;serv_adr.sin_port = htons(atoi(argv[1]));//由参数指定监听的端口memset(serv_adr.sin_zero, 0, sizeof(serv_adr.sin_zero));error_handling(bind(serv_sock, reinterpret_cast<sockaddr*>(&serv_adr), sizeof(serv_adr)), "bind");//socket 绑定error_handling(listen(serv_sock, 5), "listen");//监听std::vector<std::thread> workers;for (int k = 0; k < 5; k++) {//最多可以有5个客户端同时进行连接(服务器只能服务5个,再多就不行了)//如果想无限服务的话,在线程里进行无限的while循环即可workers.emplace_back([serv_sock]() {sockaddr_in clnt_addr;socklen_t clnt_adr_sz = sizeof(clnt_addr);int clnt_sock = accept(serv_sock, reinterpret_cast<sockaddr*>(&clnt_addr), &clnt_adr_sz);error_handling(clnt_sock, "accept");char message[BUF_SIZE];int str_len;memset(message, 0, BUF_SIZE);while ((str_len = recv(clnt_sock, message, BUF_SIZE, 0)) != 0) {//阻塞等待客户端发送数据send(clnt_sock, message, str_len, 0);//再发回去memset(message, 0, BUF_SIZE);}close(clnt_sock);std::cout << "Client disconnected." << std::endl;});}for (auto& t : workers) {if (t.joinable())t.join();}close(serv_sock);return 0;
}
accept()
是线程安全的系统调用
在 Linux 下,accept()
调用是线程安全的(可以被多个线程同时调用在同一个监听 socket 上),这是操作系统内核保证的。
当你有多个线程在同时调用 accept()
时,它们的行为就像这样:
-
监听 socket 内核缓冲区中存放了客户端的连接请求队列(
backlog
队列)。 -
所有调用
accept()
的线程,阻塞等待这个队列中的连接。 -
一旦有连接到达,内核:
-
只唤醒一个线程;
-
将连接分配给它,其他线程继续阻塞或重新抢占。
-
-
所以不会两个线程“抢到”同一个连接,不会冲突。
使用select实现多路复用
#include <cstdio>
#include<iostream>
#include <cstdlib>
#include<cstring>
#include<unistd.h>
#include<arpa/inet.h>
#include<netinet/in.h>
#include<sys/socket.h>
#include <sys/select.h>
#define BUF_SIZE 1024
void error_handling(int res,const char* message) {//错误处理函数if (res == -1) {std::cerr << message << std::endl;exit(EXIT_FAILURE);}
}
int main(int argc,char *argv[])//程序执行时指定端口
{int serv_sock, clnt_sock;sockaddr_in serv_adr, clnt_adr;socklen_t clnt_adr_sz;if (argc != 2) {std::cerr << "Usage: " << argv[0] << " <port>" << std::endl;return EXIT_FAILURE;}serv_sock = socket(AF_INET, SOCK_STREAM, 0);error_handling(serv_sock, "socket() error");memset(&serv_adr, 0, sizeof(serv_adr));serv_adr.sin_family = AF_INET;serv_adr.sin_addr.s_addr = INADDR_ANY;serv_adr.sin_port = htons(std::atoi(argv[1]));error_handling(bind(serv_sock, reinterpret_cast<sockaddr*>(&serv_adr), sizeof(serv_adr)), "bind() error");error_handling(listen(serv_sock, 5), "listen() error");fd_set reads, cpy_reads;//需要有两个集合,一个用来备份FD_ZERO(&reads);FD_SET(serv_sock, &reads);//把服务端socket加入reads集合中,监听它是否接收到连接int fd_max = serv_sock;//现在的fd最大值是服务器socketchar buf[BUF_SIZE];std::cout << "Server listening on port " << argv[1] << "..." << std::endl;while (true) {cpy_reads = reads;//cpy_reads是reads的备份,每次while循环都要复制一遍,因为循环里会增删reads集合timeval timeout;timeout.tv_sec = 5;//阻塞时间为5秒timeout.tv_usec = 0;//fd_max + 1;需要加一;请看select参数讲解int ret = select(fd_max + 1, &cpy_reads, nullptr, nullptr, &timeout);//select调用会修改cpy_reads,所以不能直接传reads进去if (ret == -1) break;if (ret == 0) continue;//无事件发生for (int fd = 0; fd <= fd_max; fd++) {//遍历if (FD_ISSET(fd, &cpy_reads)) {//如果fd在集合cpy_reads中if (fd == serv_sock) {//服务端 socket 有活动 → 说明有新客户端连入;clnt_adr_sz = sizeof(clnt_adr);clnt_sock = accept(serv_sock, reinterpret_cast<sockaddr*>(&clnt_adr), &clnt_adr_sz);error_handling(clnt_sock, "accept() error");FD_SET(clnt_sock, &reads);if (clnt_sock > fd_max) fd_max = clnt_sock;std::cout << "New client connected: fd=" << clnt_sock << std::endl;}else {//客户端 socket 有活动 → 有数据需要接收;int str_len = recv(fd, buf, BUF_SIZE, 0);if (str_len <= 0) {FD_CLR(fd, &reads);//该客户端发送完毕,不用再监听了close(fd);std::cout << "Client disconnected: fd=" << fd << std::endl;}else {send(fd, buf, str_len, 0);//回声}}}}}close(serv_sock);return 0;
}