当前位置: 首页 > java >正文

记录一个典型的epoll socket

最近项目中用到epoll做server端,用的是本地套接字(UDS),将代码记录一下,用法一看就懂,就是调用run_server。改写ClientInfo就可以适配自己的项目。后面有一个更加整洁的封装,作为一个代码封装的案例保留。

#include <iostream>
#include <string>
#include <cstring>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <unordered_map>
#include <memory>
#include <vector>
#include <cerrno>
#include <csignal>
#include "spdlog/spdlog.h"using namespace std;extern std::shared_ptr<spdlog::logger> g_logger;static const int MAX_EVENTS = 10;    // epoll_wait 一次最多返回的事件数
static const int BUFFER_SIZE = 1024; // 读写缓冲区大小
static const int BACKLOG = 10;       // listen 队列长度// 客户端连接信息结构体
struct ClientInfo
{int fd;std::string buffer; // 用于累积接收到的数据std::string appname; // 应用名称ClientInfo(int sockfd) : fd(sockfd) {}~ClientInfo(){close(fd);g_logger->warn("[{}] Exist!", appname);}void write_log(){while (buffer.size() > 4){// cout << "buffer size: " << buffer.size() << endl;int len;std::memcpy(&len, buffer.data(), sizeof(int));if (buffer.size() < len + 4){break;}std::string log_str = buffer.substr(4, len);buffer = buffer.substr(len + 4);char type = toupper(log_str[0]);string log_content = log_str.substr(1);if (appname.empty()) {size_t pos = log_content.find("[");size_t pos1 = log_content.find("]");if (pos != string::npos && pos1 != string::npos) {appname = log_content.substr(pos + 1, pos1 - pos - 1);}}switch (type){case 'I':g_logger->info(log_content);break;case 'D':g_logger->debug(log_content);break;case 'W':g_logger->warn(log_content);break;case 'E':g_logger->error(log_content);break;case 'C':g_logger->critical(log_content);break;default:g_logger->info(log_content);break;}}}
};// 将文件描述符设置为非阻塞
static int set_sock_non_blocking(int fd)
{int flags = fcntl(fd, F_GETFL, 0);if (flags == -1){perror("fcntl F_GETFL");return -1;}if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1){perror("fcntl F_SETFL");return -1;}return 0;
}static int add_sock_to_epoll(int epfd, int sock)
{struct epoll_event ev;ev.data.fd = sock;                                // 要监视的文件描述符,可以是任何打开的在/proc/pid/fd/目录下的fdev.events = EPOLLIN | EPOLLET;                    // 监听读状态同时设置LT模式return epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev); // 注册epoll事件
}static int del_sock_from_epoll(int epfd, int sock)
{return epoll_ctl(epfd, EPOLL_CTL_DEL, sock, NULL); // 移除epoll事件
}// C++11 兼容的 make_unique 实现
template <typename T, typename... Args>
std::unique_ptr<T> make_unique(Args &&...args)
{return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}void run_server(const char *sock_path)
{try{int listenfd = -1, epoll_fd = -1;std::unordered_map<int, std::unique_ptr<ClientInfo>> clients; // 存储所有客户端连接// 1. 创建 UDS socketlistenfd = socket(AF_UNIX, SOCK_STREAM, 0);if (listenfd == -1){throw std::runtime_error("socket() failed: " + std::string(strerror(errno)));}std::cout << "Server socket created (fd: " << listenfd << ")" << std::endl;// 2. 设置服务器 socket 为非阻塞if (set_sock_non_blocking(listenfd) == -1){throw std::runtime_error("set_sock_non_blocking() failed for server socket");}int one = 1;if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0){throw std::runtime_error("setsockopt SO_REUSEADDR error");}// 3. 绑定 socket 到路径struct sockaddr_un addr;memset(&addr, 0, sizeof(addr));addr.sun_family = AF_UNIX;strncpy(addr.sun_path, sock_path, sizeof(addr.sun_path) - 1);// 如果 socket 文件已存在,先删除 (优雅重启)unlink(sock_path);if (bind(listenfd, (struct sockaddr *)&addr, sizeof(addr)) == -1){throw std::runtime_error("bind() failed: " + std::string(strerror(errno)));}std::cout << "Socket bound to " << sock_path << std::endl;// 4. 监听连接if (listen(listenfd, BACKLOG) == -1){throw std::runtime_error("listen() failed: " + std::string(strerror(errno)));}std::cout << "Server listening for connections..." << std::endl;// 5. 创建 epoll 实例epoll_fd = epoll_create1(EPOLL_CLOEXEC);if (epoll_fd == -1){throw std::runtime_error("epoll_create1() failed: " + std::string(strerror(errno)));}std::cout << "Epoll instance created (fd: " << epoll_fd << ")" << std::endl;// 6. 将服务器 socket 添加到 epoll 监听可读事件 (新连接)struct epoll_event events[MAX_EVENTS];if (add_sock_to_epoll(epoll_fd, listenfd) == -1){throw std::runtime_error("epoll_ctl(EPOLL_CTL_ADD) failed for server socket: " + std::string(strerror(errno)));}printf("======waiting for client's request======\n");while (1){int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);for (int i = 0; i < nfds; i++){if (events[i].data.fd == listenfd){int client_fd;if ((client_fd = accept(listenfd, (struct sockaddr *)NULL, NULL)) == -1){printf("accept socket error: %s(errno: %d)", strerror(errno), errno);continue;}int flags = fcntl(client_fd, F_GETFL, 0);fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);if (add_sock_to_epoll(epoll_fd, client_fd) == -1){throw std::runtime_error("epoll_ctl(EPOLL_CTL_ADD) failed for client socket: " + std::string(strerror(errno)));}// 创建客户端信息并加入管理auto client_info = make_unique<ClientInfo>(client_fd);clients[client_fd] = std::move(client_info);}else{int sockfd = events[i].data.fd;auto &client = clients[sockfd];// cout << "client " << sockfd << " is ready to read" << endl;char buff[BUFFER_SIZE];int nbytes, all_bytes = 0;string content;int len = 0;while ((nbytes = recv(sockfd, buff, sizeof buff, 0)) > 0){all_bytes += nbytes;client->buffer.append(buff, nbytes);// cout << "received " << nbytes << " bytes" << endl;}bool be_closed = false;if (nbytes < 0){// 最后一次读取会返回EAGAINif (errno == EINTR || errno == EWOULDBLOCK || errno == EAGAIN){// 正常情况,继续接收}else{printf("recv msg error: %s(errno: %d)\n", strerror(errno), errno);del_sock_from_epoll(epoll_fd, sockfd);be_closed = true;}}if (all_bytes == 0){del_sock_from_epoll(epoll_fd, sockfd);be_closed = true;}client->write_log();if (be_closed) {clients.erase(sockfd);}}}}}catch (const std::exception &e){std::cerr << "Exception: " << e.what() << std::endl;}catch (...){std::cerr << "Unknown exception occurred!" << std::endl;}
}

更加好的封装,是把run_server封装到动态库里,项目中调用即可。
如下所示:
1 uds_server.h

#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <unordered_map>
#include <memory>
#include <vector>
#include <cerrno>
#include <csignal>
#include "socket_client_base.h"using namespace std;__attribute__((visibility("default"))) void run_server(const char *sock_path, std::function<std::unique_ptr<SocketClientBase>(int)> create_client);

2 uds_server.cpp

#include <iostream>
#include <string>
#include <cstring>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <unordered_map>
#include <memory>
#include <vector>
#include <cerrno>
#include <csignal>
#include "uds_server.h"using namespace std;static const int MAX_EVENTS = 10;    // epoll_wait 一次最多返回的事件数
static const int BACKLOG = 10;       // listen 队列长度// 将文件描述符设置为非阻塞
static int set_sock_non_blocking(int fd)
{int flags = fcntl(fd, F_GETFL, 0);if (flags == -1){perror("fcntl F_GETFL");return -1;}if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1){perror("fcntl F_SETFL");return -1;}return 0;
}static int add_sock_to_epoll(int epfd, int sock)
{struct epoll_event ev;ev.data.fd = sock;                                // 要监视的文件描述符,可以是任何打开的在/proc/pid/fd/目录下的fdev.events = EPOLLIN | EPOLLET;                    // 监听读状态同时设置LT模式return epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev); // 注册epoll事件
}static int del_sock_from_epoll(int epfd, int sock)
{return epoll_ctl(epfd, EPOLL_CTL_DEL, sock, NULL); // 移除epoll事件
}void run_server(const char *sock_path, std::function<std::unique_ptr<SocketClientBase>(int)> create_client)
{try{int listenfd = -1, epoll_fd = -1;// 1. 创建 UDS socketlistenfd = socket(AF_UNIX, SOCK_STREAM, 0);if (listenfd == -1){throw std::runtime_error("socket() failed: " + std::string(strerror(errno)));}std::cout << "Server socket created (fd: " << listenfd << ")" << std::endl;// 2. 设置服务器 socket 为非阻塞if (set_sock_non_blocking(listenfd) == -1){throw std::runtime_error("set_sock_non_blocking() failed for server socket");}int one = 1;if (setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one)) < 0){throw std::runtime_error("setsockopt SO_REUSEADDR error");}// 3. 绑定 socket 到路径struct sockaddr_un addr;memset(&addr, 0, sizeof(addr));addr.sun_family = AF_UNIX;strncpy(addr.sun_path, sock_path, sizeof(addr.sun_path) - 1);// 如果 socket 文件已存在,先删除 (优雅重启)unlink(sock_path);if (bind(listenfd, (struct sockaddr *)&addr, sizeof(addr)) == -1){throw std::runtime_error("bind() failed: " + std::string(strerror(errno)));}std::cout << "Socket bound to " << sock_path << std::endl;// 4. 监听连接if (listen(listenfd, BACKLOG) == -1){throw std::runtime_error("listen() failed: " + std::string(strerror(errno)));}std::cout << "Server listening for connections..." << std::endl;// 5. 创建 epoll 实例epoll_fd = epoll_create1(EPOLL_CLOEXEC);if (epoll_fd == -1){throw std::runtime_error("epoll_create1() failed: " + std::string(strerror(errno)));}std::cout << "Epoll instance created (fd: " << epoll_fd << ")" << std::endl;// 6. 将服务器 socket 添加到 epoll 监听可读事件 (新连接)struct epoll_event events[MAX_EVENTS];if (add_sock_to_epoll(epoll_fd, listenfd) == -1){throw std::runtime_error("epoll_ctl(EPOLL_CTL_ADD) failed for server socket: " + std::string(strerror(errno)));}std::unordered_map<int, std::unique_ptr<SocketClientBase>> clients; // 存储所有客户端连接printf("======waiting for client's request======\n");while (1){int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);for (int i = 0; i < nfds; i++){if (events[i].data.fd == listenfd){int new_client_fd;if ((new_client_fd = accept(listenfd, (struct sockaddr *)NULL, NULL)) == -1){printf("accept socket error: %s(errno: %d)", strerror(errno), errno);continue;}int flags = fcntl(new_client_fd, F_GETFL, 0);fcntl(new_client_fd, F_SETFL, flags | O_NONBLOCK);if (add_sock_to_epoll(epoll_fd, new_client_fd) == -1){throw std::runtime_error("epoll_ctl(EPOLL_CTL_ADD) failed for client socket: " + std::string(strerror(errno)));}// 创建用户客户端对象auto client = create_client(new_client_fd);clients[new_client_fd] = std::move(client);}else{int one_client_fd = events[i].data.fd;auto it = clients.find(one_client_fd);if (it == clients.end()) continue;auto& client = it->second;// cout << "client " << one_client_fd << " is ready to read" << endl;if (!client->read_all()){// 连接关闭或出错epoll_ctl(epoll_fd, EPOLL_CTL_DEL, one_client_fd, nullptr);client->on_close();clients.erase(it);}else{client->on_data(); // 处理数据}}}}}catch (const std::exception &e){std::cerr << "Exception: " << e.what() << std::endl;}catch (...){std::cerr << "Unknown exception occurred!" << std::endl;}
}

3 socket_client_base.h

#pragma once#include <string>
#include <memory>// C++11 兼容的 make_unique 实现
template <typename T, typename... Args>
std::unique_ptr<T> make_unique(Args &&...args)
{return std::unique_ptr<T>(new T(std::forward<Args>(args)...));
}class __attribute__((visibility("default"))) SocketClientBase
{
protected:int fd;std::string buffer;public:explicit SocketClientBase(int sockfd);virtual ~SocketClientBase();// 禁止拷贝SocketClientBase(const SocketClientBase&) = delete;SocketClientBase& operator=(const SocketClientBase&) = delete;// 钩子函数(可被重写)virtual void on_connect() {}virtual void on_data() = 0;  // 子类必须实现virtual void on_close() {}// 工具方法:非阻塞读取所有可用数据bool read_all();// 获取文件描述符int get_fd() const { return fd; }
};

4 socket_client_base.cpp

#include <sys/socket.h>
#include <unistd.h>
#include <cerrno>
#include "socket_client_base.h"SocketClientBase::SocketClientBase(int sockfd) : fd(sockfd) {on_connect();
}SocketClientBase::~SocketClientBase()
{if (fd != -1) {close(fd);fd = -1;}
}bool SocketClientBase::read_all()
{char tmp_buf[1024];ssize_t n;bool closed = false;while ((n = recv(fd, tmp_buf, sizeof(tmp_buf), 0)) > 0) {buffer.append(tmp_buf, static_cast<size_t>(n));}if (n == 0) {closed = true;} else if (n < 0) {if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {closed = true;}}return !closed;
}

将上述代码编译成动态库的Makefile,根据需求自己修改一下:

.PHONY : cleanROOTDIR = ../..
INCLUDE = -I../$(ROOTDIR)/lib/inc -I$(ROOTDIR)/public
LDLIBS = -lspdlog
runlibs = -Wl,-rpath=/web/lib -Wl,-rpath=.
CFLAGS = -g -fPIC -DSPDLOG_COMPILED_LIB dst = arm
ifeq ($(dst),arm)
CXX = aarch64-linux-gnu-g++ --std=c++11
LDLIBS += -L../$(ROOTDIR)/lib/so.aarch64
else
CXX = g++ --std=c++11
LDLIBS += -L../$(ROOTDIR)/lib/so.x86_64
endifbin = ../$(ROOTDIR)/lib/so.aarch64/libappwatching.so
all : $(bin) $(ROOTDIR)/bin/test $(ROOTDIR)/bin/test_send_logsrc = $(wildcard *.cpp ../../public/json.cpp)
obj = $(patsubst %.cpp, %.o, $(src))$(bin) : $(obj)$(CXX) -shared -Wl,-Bsymbolic -Wl,--exclude-libs,ALL $(runlibs) $^ -o $@ $(LDLIBS)$(obj): %.o : %.cpp$(CXX) $(CFLAGS) -c -fvisibility=hidden $(INCLUDE) $< -o $@clean :rm -f $(obj) $(bin) $(bin) $(ROOTDIR)/bin/test $(ROOTDIR)/bin/test_send_log

调用示例:
log_client.h 继承了基类:

// log_client.h
#pragma once
#include "../appwatchinglib/socket_client_base.h"
#include <memory>
#include "spdlog/spdlog.h"class LogClient : public SocketClientBase
{std::string appname;std::shared_ptr<spdlog::logger> logger;public:LogClient(int fd, std::shared_ptr<spdlog::logger> l);void on_data() override;void on_close() override;
};

log_client.cpp内容:

// log_client.cpp
#include "log_client.h"
#include <cstring>
#include <cctype>LogClient::LogClient(int fd, std::shared_ptr<spdlog::logger> l): SocketClientBase(fd), logger(std::move(l))
{
}void LogClient::on_data()
{while (buffer.size() > 4){int len = 0;std::memcpy(&len, buffer.data(), sizeof(int));if (buffer.size() < len + 4){break; // 数据不完整,等待下一次}std::string log_str = buffer.substr(4, len);buffer.erase(0, len + 4);if (log_str.empty())continue;char level_char = std::toupper(static_cast<unsigned char>(log_str[0]));std::string content = log_str.substr(1);// 提取 appnameif (appname.empty()){auto pos = content.find("[");auto pos1 = content.find("]");if (pos != std::string::npos && pos1 != std::string::npos){appname = content.substr(pos + 1, pos1 - pos - 1);}}// 去掉content最右边的1个换行符if (content.size() && content[content.size() - 1] == '\n'){content.pop_back();}// 分发日志switch (level_char){case 'I':logger->info(content);break;case 'D':logger->debug(content);break;case 'W':logger->warn(content);break;case 'E':logger->error(content);break;case 'C':logger->critical(content);break;default:logger->info(content);break;}}
}void LogClient::on_close()
{if (!appname.empty()){logger->warn("[{}] Connection closed.", appname);}else{logger->warn("Client (fd={}) disconnected.", get_fd());}
}

main.cpp

#include "spdlog/spdlog.h"
#include "common.h"
#include "log_client.h"
#include "../appwatchinglib/app.h"
#include "../appwatchinglib/uds_server.h"#define SOCK_PATH "/tmp/unix_udp_applogging.sock"#define APP_NAME "applogging"
int g_fd = -1;                            // 应用锁,让应用单例运行
std::shared_ptr<spdlog::logger> g_logger; // spdlog日志指针// socket监听线程
void uds_thread()
{// 客户端工厂函数auto factory = [](int fd) {return make_unique<LogClient>(fd, g_logger);};run_server(SOCK_PATH, factory);
}void flush_thread()
{while (true){sleep(5);g_logger->flush();}
}// ====================== init ======================
void cleanup()
{string pid_path = app::get_pid_filename(APP_NAME);app::delete_pid_file(pid_path.c_str());app::unlock_app_running_file(APP_NAME, g_fd);
}void handle_sigint(int sig)
{exit(EXIT_SUCCESS);
}void sign_signals()
{struct sigaction sa;sa.sa_handler = handle_sigint;sigemptyset(&sa.sa_mask);sa.sa_flags = 0;if (sigaction(SIGINT, &sa, NULL) == -1){g_logger->error("sigaction failed: {}", strerror(errno));exit(-1);}if (sigaction(SIGTERM, &sa, NULL) == -1){g_logger->error("sigaction failed: {}", strerror(errno));exit(-1);}
}void init_main(int argc, char **argv, const char *app_name)
{// 切换至应用程序所在目录app::change_dir_to_app_path(argv[0]);// 检查应用是否已经运行g_fd = app::lock_app_running_file(app_name);// 初始化日志g_logger = app::init_log_output(app_name);if (argc == 1 || argc >= 2 && strcmp(argv[1], "-d") == 0){app::init_daemon(); // 守护进程}// 程序退出时清理pid文件if (atexit(cleanup) != 0){g_logger->error("Failed to register exit function.");exit(-1);}// 需要处理多个信号sign_signals();// 创建pid文件,记录主进程的pidstring pid_path = app::get_pid_filename(app_name);app::write_pid_to_file(pid_path.c_str());
}
// ====================== init ======================int main(int argc, char *argv[])
{const char *app_name = APP_NAME;init_main(argc, argv, app_name);string pid_path = app::get_pid_filename(app_name);std::thread uds_sock(uds_thread);uds_sock.detach();std::thread flush_log(flush_thread);flush_log.detach();while (1){app::flush_pid_file_mtime(pid_path.c_str()); // 更新pid文件修改日期,表示应用正常运行sleep(10);}return 0;
}

Makefile

.PHONY : cleanROOTDIR = ../..
INCLUDE = -I../$(ROOTDIR)/lib/inc -I$(ROOTDIR)/public
LDLIBS = -lspdlog -lappwatching -lpthread
CFLAGS = -g -DSPDLOG_COMPILED_LIB 
LDFLAGS = -Wl,-rpath=/web/libdst = arm
ifeq ($(dst),arm)
CXX = aarch64-linux-gnu-g++ --std=c++11
LDFLAGS += -L../$(ROOTDIR)/lib/so.aarch64
else
CXX = g++ --std=c++11
LDFLAGS += -L../$(ROOTDIR)/lib/so.x86_64
endifbin = $(ROOTDIR)/bin/apploggingsrc = $(wildcard *.cpp ../../public/common.cpp ../../public/json.cpp)
obj = $(patsubst %.cpp, %.o, $(src))$(bin) : $(obj)$(CXX) $(LDFLAGS) $^ -o $@ $(LDLIBS)$(obj): %.o : %.cpp$(CXX) $(CFLAGS) -c $(INCLUDE) $< -o $@clean :rm -f $(obj) $(bin)
http://www.xdnf.cn/news/19363.html

相关文章:

  • 深度解析Fluss LockUtils类的并发艺术
  • Linux学习----归档和传输文件实用指南
  • Xshell自动化脚本大赛
  • LightGBM(Light Gradient Boosting Machine,轻量级梯度提升机)梳理总结
  • 互联网大厂AI面试:从大模型原理到场景应用的深度解析
  • 【shell】Shell脚本中的if判断条件和文件测试操作符
  • shell编程基础入门-1
  • Spring : 事务管理
  • 深度学习函数
  • 洛谷 P1395 会议 -普及/提高-
  • 一款基于selenium的前端验证码绕过爆破工具
  • java怎么实现根据指标预警的功能
  • C++多态介绍
  • 【Leetcode】17、电话号码的字母组合
  • 哪些人需要考道路运输安全员证?政策要求与适用范围
  • C++day2作业
  • 突破视界的边界:16公里远距离无人机图传模块全面解析
  • 毕业项目推荐:47-基于yolov8/yolov5/yolo11的焊缝质量检测识别系统(Python+卷积神经网络)
  • pip 镜像源配置(清华/阿里/豆瓣)详解
  • 智瞰风评 - 基于大语言模型的个人征信报告风险分析师
  • k8s--efk日志收集
  • 用简单仿真链路产生 WiFi CSI(不依赖专用工具箱,matlab实现)
  • Java数组入门教程:零基础掌握数组定义与遍历+新手避坑指南
  • Python3 lambda(匿名函数)
  • 轻量xlsx读取库xlsx_drone的编译与测试
  • 元素滚动scrollIntoView
  • A5M2(数据库管理工具)下载安装
  • 谈物质的运动与运动的物质
  • 智能消防栓闷盖终端:让城市消防管理更智慧高效
  • Robolectric拿到当前的Activity