当前位置: 首页 > news >正文

基于RocketMQ源码理解顺序写、刷盘机制与零拷贝

顺序写加速文件写入磁盘

通常应用程序往磁盘写文件时,由于磁盘空间不是连续的,会有很多碎片。所以我们去写一个文件时,也就无法把一个文件写在一块连续的磁盘空间中,而需要在磁盘多个扇区之间进行大量的随机写。这个过程中有大量的寻址操作,会严重影响写数据的性能。而顺序写机制是在磁盘中提前申请一块连续的磁盘空间,每次写数据时,记录下次开始地址,就可以避免这些寻址操作,直接在之前写入的地址后面接着写就行。

在RocketMQ实现的DefaultMappedFile中就是用顺序写,并且在CommitLog.asyncPutMessage使用自适应自旋锁(AdaptiveBackOffSpinLockImpl)putMessageLock.lock();

public class DefaultMappedFile extends AbstractMappedFile {...public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,PutMessageContext putMessageContext) {assert messageExt != null;assert cb != null;int currentPos = WROTE_POSITION_UPDATER.get(this);//获取文件写入地址到哪了if (currentPos < this.fileSize) {ByteBuffer byteBuffer = appendMessageBuffer().slice();byteBuffer.position(currentPos);AppendMessageResult result;if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) messageExt).isInnerBatch()) {// traditional batch messageresult = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBatch) messageExt, putMessageContext);} else if (messageExt instanceof MessageExtBrokerInner) {// traditional single message or newly introduced inner-batch messageresult = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,(MessageExtBrokerInner) messageExt, putMessageContext);} else {return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());//维护下次文件写入地址this.storeTimestamp = result.getStoreTimestamp();return result;}log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);}
}

刷盘机制保证消息不丢失

在操作系统层面,为了提升性能,写文件的操作通常不会立即写入磁盘,而是先写入内存缓冲区(Page Cache),由操作系统或应用程序在合适的时机统一写入磁盘。这样虽然快,但有个问题:如果系统突然宕机,缓冲区中的数据会丢失。所以,我们就需要“刷盘”,确保数据已经真正写入磁盘,而不是还停留在内存里。
在这里插入图片描述
RocketMQ中,commitLog的异步刷盘服务是FlushRealTimeService,GroupCommitService是同步刷盘(间隔短的异步刷盘并且采用双缓冲(间隔10ms一刷))

        public DefaultFlushManager() {if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {this.flushCommitLogService = new CommitLog.GroupCommitService();} else {this.flushCommitLogService = new CommitLog.FlushRealTimeService();}this.commitRealTimeService = new CommitLog.CommitRealTimeService();}

FlushRealTimeService刷盘关键代码

    class FlushRealTimeService extends FlushCommitLogService {@Overridepublic void run() {CommitLog.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();//获取刷盘间隔500int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();int flushPhysicQueueThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();boolean printFlushProgress = false;// Print flush progresslong currentTimeMillis = System.currentTimeMillis();if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {this.lastFlushTimestamp = currentTimeMillis;flushPhysicQueueLeastPages = 0;printFlushProgress = (printTimes++ % 10) == 0;}try {if (flushCommitLogTimed) {Thread.sleep(interval);//休眠刷盘间隔} else {this.waitForRunning(interval);//等待刷盘间隔}if (printFlushProgress) {this.printFlushProgress();}long begin = System.currentTimeMillis();CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);//参数控制刷盘的最小页数,避免频繁小数据量刷盘long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();if (storeTimestamp > 0) {CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);}long past = System.currentTimeMillis() - begin;CommitLog.this.getMessageStore().getPerfCounter().flowOnce("FLUSH_DATA_TIME_MS", (int) past);if (past > 500) {log.info("Flush data to disk costs {} ms", past);}} catch (Throwable e) {CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);this.printFlushProgress();}}// Normal shutdown, to ensure that all the flush before exitboolean result = false;for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {result = CommitLog.this.mappedFileQueue.flush(0);CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));}this.printFlushProgress();CommitLog.log.info(this.getServiceName() + " service end");}}

RocketMQ对于何时进行刷盘,也设计了两种刷盘机制,同步刷盘和异步刷盘。只需要在broker.conf中进行配置就行。
在这里插入图片描述

零拷贝加速文件读写

零拷贝(Zero-Copy) 是一种优化技术,旨在减少数据在用户态(User Space)和内核态(Kernel Space)之间的拷贝次数,从而提高 I/O 性能,尤其在处理大数据量(如网络传输、文件传输)时效果显著。

所谓的零拷贝技术,其实并不是不拷贝,而是要尽量减少CPU拷贝

  1. CPU拷贝和DMA拷贝
    操作系统对于内存空间,是分为用户态和内核态的。用户态的应用程序无法直接操作硬件,需要通过内核空间进行操作转换,才能真正操作硬件。这些IO接口都是由CPU独立负责,所以当发生大规模的数据读写操作时,CPU的占用率会非常高。
    在这里插入图片描述

操作系统为了避免CPU完全被各种IO调用给占用,引入了DMA(直接存储器存储)。由DMA来负责这些频繁的IO操作。DMA是一套独立的指令集,不会占用CPU的计算资源。这样,CPU就不需要参与具体的数据复制的工作,只需要管理DMA的权限即可。

DMA在拷贝数据的过程cpu就可以继续工作了

在这里插入图片描述
引入DMA拷贝之后,在读写请求的过程中,CPU不再需要参与具体的工作,DMA可以独立完成数据在系统内部的复制。但是,数据复制过程中,依然需要借助数据总进线。当系统内的IO操作过多时,还是会占用过多的数据总线,造成总线冲突,最终还是会影响数据读写性能。在早期大型计算机中,为了减轻 CPU I/O 负担,引入了专门的 I/O Channel(硬件处理器)来执行 I/O 操作。现代编程语言如 Java 中的 Channel 继承了这一思想,提供了更高效的 I/O 抽象,但它本质是一个软件接口,不具备硬件通道的能力。​

  1. mmap文件映射机制
    mmap 是一种操作系统提供的内存映射机制,允许进程将一个文件或设备的内容直接映射到进程的虚拟内存空间。
    主要是通过java.nio.channels.FileChannel的map方法完成映射。以一次文件的读写操作为例,应用程序对磁盘文件的读与写,都需要经过内核态与用户态之间的状态切换,每次状态切换的过程中,就需要有大量的数据复制。
    在这里插入图片描述
    在这个过程中,总共需要进行四次数据拷贝。而磁盘与内核态之间的数据拷贝,在操作系统层面已经由CPU拷贝优化成了DMA拷贝。而内核态与用户态之间的拷贝依然是CPU拷贝。所以,在这个场景下,零拷贝技术优化的重点,就是内核态与用户态之间的这两次拷贝。

而mmap文件映射的方式,就是在用户态不再保存文件的内容,进程可以像访问普通内存一样,通过指针访问文件内容,只保存文件的映射,包括文件的内存起始地址,文件大小等。真实的数据也不需要在用户态留存,可以直接通过操作映射,在内核态完成数据复制。
在这里插入图片描述
在JDK的NIO包中,java.nio.HeapByteBuffer映射的就是JVM的一块堆内内存,在HeapByteBuffer中,会由一个byte数组来缓存数据内容,所有的读写操作也是先操作这个byte数组。这其实就是没有使用零拷贝的普通文件读写机制。

 HeapByteBuffer(int cap, int lim) {            // package-privatesuper(-1, 0, lim, cap, new byte[cap], 0);/*hb = new byte[cap];offset = 0;*/}

而NIO把包中的另一个实现类java.nio.DirectByteBuffer则映射的是一块堆外内存。在DirectByteBuffer中,并没有一个数据结构来保存数据内容,只保存了一个内存地址。所有对数据的读写操作,都通过unsafe魔法类直接交由内核完成,这其实就是mmap的读写机制。可以看到没有直接获取文件的数据

// DirectByteBuffer构造函数核心逻辑
DirectByteBuffer(int cap) {// 判断虚拟机是否要求直接内存地址页对齐(Page Aligned)boolean pa = VM.isDirectMemoryPageAligned();// 获取系统页面大小(通常4KB)int ps = Bits.pageSize();// 计算申请内存大小:如果需要页对齐,则多申请一个页面大小的空间以方便对齐// Math.max确保至少分配1字节,避免0字节申请异常long size = Math.max(1L, (long)cap + (pa ? ps : 0));// 预先申请直接内存配额,防止超出允许的直接内存限制Bits.reserveMemory(size, cap);long base = 0;try {// 通过Unsafe调用本机接口,分配size字节的堆外内存,返回内存起始地址base = UNSAFE.allocateMemory(size);} catch (OutOfMemoryError x) {// 如果分配失败,撤销之前的内存配额申请,防止配额泄漏Bits.unreserveMemory(size, cap);// 抛出内存不足错误throw x;}// 将申请的内存全部置零,避免使用到未初始化的脏数据UNSAFE.setMemory(base, size, (byte) 0);// 计算页对齐后的起始地址:// (base & (ps - 1)) 获取内存地址的页内偏移,// ps - 页内偏移即对齐到下一个页边界的偏移量,// base + 偏移值即对齐后的内存地址address = (base + ps - (base & (ps - 1)));// 创建Cleaner对象,绑定一个Deallocator回调任务,用于在DirectByteBuffer对象回收时释放本机内存cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
}
  1. sendFile机制
    sendFile 是一种高效的文件传输机制,允许将文件直接从文件系统传输到网络套接字,而不需要经过用户空间缓冲区的拷贝。
    ​ sendFile机制的具体实现参见配套示例代码。主要是通过java.nio.channels.FileChannel的transferTo方法完成。

​ 早期的sendfile实现机制其实还是依靠CPU进行页缓存与socket缓存区之间的数据拷贝。
在这里插入图片描述
在后期的不断改进过程中,sendfile优化了实现机制,在拷贝过程中,并不直接拷贝文件的内容,而是只拷贝一个带有文件位置和长度等信息的文件描述符FD,这样就大大减少了需要传递的数据。而真实的数据内容,会交由DMA控制器,从页缓存中打包异步发送到socket中。
在这里插入图片描述
2.6.33版本以前的Linux内核中,out_fd只能是一个socket。但是现在版本已经没有了这个限制。

since linux 2.6.33 it can be any file. if it is a regular file, then sendfile() changes the file offset appropriately

在这里插入图片描述

http://www.xdnf.cn/news/986311.html

相关文章:

  • 海康对接摄像头
  • Chromium 136 编译指南 Windows篇:获取源代码(五)
  • 基于贝叶斯学习方法的块稀疏信号压缩感知算法
  • Spring核心框架完全指南 - 基础知识全解析
  • 关于界面存在AB测试后UI刷新空白的问题
  • 计算机网络 : 传输层协议UDP与TCP
  • 设计原则——KISS原则
  • 过拟合和欠拟合
  • RAG技术全解析:从概念到实践,构建高效语义检索系统——嵌入模型与向量数据库搭建指南
  • java每日精进 6.11【消息队列】
  • C++11的特性上
  • Cursor 编程实践 — 开发环境部署
  • 案例8 模型量化
  • 使用MyBatis-Plus实现数据权限功能
  • 【Unity3D优化】优化多语言字体包大小
  • swagger通过配置将enum自动添加到字段说明中
  • PHP如何检查一个字符串是否是email格式
  • 【微信小程序】| 在线咖啡点餐平台设计与实现
  • 华为云Flexus+DeepSeek征文 | 基于华为云ModelArts Studio打造AingDesk AI聊天助手
  • list类型
  • SCADA|测试KingSCADA4.0信创版采集汇川PLC AC810数据
  • 开源夜莺支持MySQL数据源,更方便做业务指标监控了
  • xss分析
  • C2f模块 vs Darknet-53——YOLOv8检测效率的提升
  • 9.IP数据包分片计算
  • HNCTF2025 - Misc、Osint、Crypto WriteUp
  • 第三讲 基础运算之整数运算
  • 什么是数字化项目风险管理?如何实现项目风险管理数字化?
  • IIS 实现 HTTPS:OpenSSL证书生成与配置完整指南
  • 突然虚拟机磁盘只剩下几十K