当前位置: 首页 > news >正文

​多线程 + io_uring 实现高效大文件写入

多线程 + io_uring 实现高效大文件写入(64MB数据块,2GB文件分割)​​

多线程 + io_uring 实现高效大文件写入 - LinuxGuideLinuxGuide

以下是完整的代码实现,使用 ​​两个线程​​:

  1. ​生产者线程​​:生成 ​​64MB 数据块​​,放入队列。
  2. ​消费者线程​​:从队列取出数据,通过 io_uring​异步写入文件​​,并在文件超过 ​​2GB 时自动切分​​。


​1. 完整代码​ 参考:多线程 + io_uring 实现高效大文件写入 - LinuxGuideLinuxGuide​

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <pthread.h>
#include <liburing.h>
#include <sys/stat.h>
#include <stdatomic.h>#define BLOCK_SIZE (64 * 1024 * 1024)  // 64MB 数据块
#define MAX_FILE_SIZE (2ULL * 1024 * 1024 * 1024)  // 2GB 文件分割阈值
#define QUEUE_SIZE 8  // 队列容量(防止内存爆炸)// 数据块结构
typedef struct {char *data;size_t size;
} DataBlock;// 线程安全队列
typedef struct {DataBlock blocks[QUEUE_SIZE];atomic_int head, tail;pthread_mutex_t mutex;pthread_cond_t not_empty, not_full;
} BlockQueue;// 全局队列
BlockQueue block_queue;
atomic_int file_counter = 0;  // 文件计数器(用于切分)
atomic_ullong current_file_size = 0;  // 当前文件大小// 初始化队列
void init_queue(BlockQueue *q) {q->head = q->tail = 0;pthread_mutex_init(&q->mutex, NULL);pthread_cond_init(&q->not_empty, NULL);pthread_cond_init(&q->not_full, NULL);
}// 生产者:生成随机数据并放入队列
void *producer_thread(void *arg) {while (1) {DataBlock block;block.data = malloc(BLOCK_SIZE);if (!block.data) {perror("malloc");exit(EXIT_FAILURE);}block.size = BLOCK_SIZE;// 填充随机数据for (size_t i = 0; i < BLOCK_SIZE; i++) {block.data[i] = rand() % 256;}// 放入队列pthread_mutex_lock(&block_queue.mutex);while ((block_queue.tail + 1) % QUEUE_SIZE == block_queue.head) {pthread_cond_wait(&block_queue.not_full, &block_queue.mutex);}block_queue.blocks[block_queue.tail] = block;block_queue.tail = (block_queue.tail + 1) % QUEUE_SIZE;pthread_cond_signal(&block_queue.not_empty);pthread_mutex_unlock(&block_queue.mutex);}return NULL;
}// 消费者:从队列取出数据,用 io_uring 写入文件
void *consumer_thread(void *arg) {struct io_uring ring;int fd = -1;char filename[256];// 初始化 io_uringif (io_uring_queue_init(8, &ring, 0) < 0) {perror("io_uring_queue_init");exit(EXIT_FAILURE);}while (1) {DataBlock block;// 从队列取出数据pthread_mutex_lock(&block_queue.mutex);while (block_queue.head == block_queue.tail) {pthread_cond_wait(&block_queue.not_empty, &block_queue.mutex);}block = block_queue.blocks[block_queue.head];block_queue.head = (block_queue.head + 1) % QUEUE_SIZE;pthread_cond_signal(&block_queue.not_full);pthread_mutex_unlock(&block_queue.mutex);// 检查是否需要切分文件if (fd == -1 || current_file_size + block.size > MAX_FILE_SIZE) {if (fd != -1) close(fd);snprintf(filename, sizeof(filename), "large_file_%d.bin", file_counter++);fd = open(filename, O_WRONLY | O_CREAT | O_TRUNC, 0644);if (fd < 0) {perror("open");exit(EXIT_FAILURE);}current_file_size = 0;printf("Created new file: %s\n", filename);}// 提交异步写入请求struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);io_uring_prep_write(sqe, fd, block.data, block.size, current_file_size);io_uring_sqe_set_data(sqe, block.data);  // 关联数据块(用于释放)io_uring_submit(&ring);// 等待写入完成struct io_uring_cqe *cqe;int ret = io_uring_wait_cqe(&ring, &cqe);if (ret < 0) {perror("io_uring_wait_cqe");exit(EXIT_FAILURE);}if (cqe->res < 0) {fprintf(stderr, "Write error: %s\n", strerror(-cqe->res));exit(EXIT_FAILURE);}// 更新文件大小并释放内存current_file_size += cqe->res;io_uring_cqe_seen(&ring, cqe);free(block.data);}io_uring_queue_exit(&ring);return NULL;
}int main() {pthread_t producer, consumer;// 初始化队列init_queue(&block_queue);// 启动生产者线程if (pthread_create(&producer, NULL, producer_thread, NULL) != 0) {perror("pthread_create");exit(EXIT_FAILURE);}// 启动消费者线程if (pthread_create(&consumer, NULL, consumer_thread, NULL) != 0) {perror("pthread_create");exit(EXIT_FAILURE);}// 等待线程结束(实际上不会结束)pthread_join(producer, NULL);pthread_join(consumer, NULL);return 0;
}


​2. 关键设计​

​(1)线程分工​

线程任务
​生产者线程​生成 ​​64MB 随机数据​​,放入队列
​消费者线程​从队列取出数据,用 io_uring​异步写入文件​

​(2)线程安全队列​

  • ​环形缓冲区​​(BlockQueue)避免频繁 malloc/free
  • ​互斥锁(pthread_mutex_t)​​ + ​​条件变量(pthread_cond_t)​​ 保证线程安全:
    • not_empty:队列非空时唤醒消费者。
    • not_full:队列未满时唤醒生产者。

​(3)文件切分逻辑​

  • current_file_size​ 记录当前文件大小。
  • ​超过 2GB 时​​:
    • 关闭当前文件。
    • 创建新文件(large_file_0.bin, large_file_1.bin, ...)。

​(4)io_uring 优化​

  • ​批量化提交​​:可调整 QUEUE_SIZE 提高并发。
  • ​内存对齐​​:建议 posix_memalign 分配内存(减少内核拷贝)。
  • ​轮询模式​​:可启用 IORING_SETUP_SQPOLL 减少系统调用(需root)。


​3. 编译与运行​

gcc -o io_uring_multi_thread io_uring_multi_thread.c -luring -lpthread
./io_uring_multi_thread

​输出示例​​:

Created new file: large_file_0.bin
Created new file: large_file_1.bin
...


​4. 验证结果​

ls -lh large_file_*.bin  # 检查文件大小
md5sum large_file_*.bin  # 验证数据完整性


​5. 性能优化建议​

优化点说明
​内存池​预分配多个 64MB 块,减少 malloc/free 开销
​批量提交​一次提交多个 io_uring 请求(提高吞吐)
​O_DIRECT​直接 I/O 绕过页缓存(需内存对齐)
​多消费者线程​多个消费者线程并行处理队列


​6. 总结​

  • ​生产者-消费者模型​​:解耦数据生成和写入,提高并行度。
  • io_uring 异步 I/O​​:最大化磁盘写入性能。
  • ​自动文件切分​​:避免单个文件过大(2GB 限制)。

适用于 ​​日志系统、数据库、大数据存储​​ 等场景。 🚀

多线程 + io_uring 实现高效大文件写入 - LinuxGuideLinuxGuide

http://www.xdnf.cn/news/1172899.html

相关文章:

  • MCP:UVX的安装
  • JS逆向基础( AES 解密密文WordArray和Uint8Array实战②)
  • 在线深凹槽深检测方法都有哪些 —— 激光频率梳 3D 轮廓检测
  • 【Word Press基础】创建一个动态的自定义区块
  • 探索大语言模型(LLM):提升 RAG 性能的全方位优化策略
  • 主要分布在背侧海马体(dHPC)CA1区域(dCA1)的时间细胞对NLP中的深层语义分析的积极影响和启示
  • OpenCV(01)基本图像操作、绘制,读取视频
  • 【趣味解读】淘宝登录的前后端交互机制:Cookie-Session 如何保障你的账户安全?
  • 网络基础DAY18-动态路由协议基础
  • Python笔记之跨文件实例化、跨文件调用、导入库
  • 基于 XGBoost 与 SHAP 的医疗自动化办公与可视化系统(下)
  • 用Phi-3 Mini微调实现英文到尤达语翻译
  • SpringCloud sentinel服务熔断 服务降级
  • 希尔排序cc
  • 电子电气架构 --- 汽车软件全生命周期
  • Cesium绘制圆锥
  • 「源力觉醒 创作者计划」深度讲解大模型之在百花齐放的大模型时代看百度文心大模型4.5的能力与未来
  • 深度图像滤波
  • Java 时间处理 API 全解析:从 JDK7 到 JDK8 的演进
  • Linux基本命令
  • Python实战:基于Streamlit的股票筛选系统,实时K线图+数据缓存优化
  • 应急响应基础
  • 通用图片 OCR 到 Word API 数据接口
  • 增强LLM最后隐藏层的意义与效果
  • 代码随想录算法训练营第五十二天|图论part3
  • 分享鸢尾花数据集:iris.csv,以及简单数据分析与分类预测示例(决策树)
  • 动态IP+AI反侦测:新一代爬虫如何绕过生物行为验证?
  • PyTorch中nn.Module详解和综合代码示例
  • 【前端】ikun-pptx编辑器前瞻问题三: pptx的图片如何提取,并在前端渲染。
  • 7月23日华为机考真题第二题-200分