【Netty4核心原理⑭】【Netty 内存分配 ByteBuf❷】
文章目录
- 一、前言
- 二、非池化内存分配
- 1. 堆内内存分配
- 1.1 构造函数
- 1.2 IO 读写
- 2. 堆外内存分配
- 2.1 构造函数
- 2.2 IO 读写
- 三、池化内存分配
- 1. threadCache 和 cache
- 1.1 PoolThreadLocalCache
- 1.2 PoolThreadCache
- 1.2.1 PoolThreadCache 构造
- 1.2.2 PoolThreadCache 的绑定
- 1.2.3 PoolThreadCache 的内存分配
- 2. DirectArena 内存分配流程
- 2.1 newByteBuf(maxCapacity)
- 2.2 allocate(cache, buf, reqCapacity)
- 3. 内存池的内存规格
- 4. 命中缓存的分配
- 4.1 PoolThreadCache#createSubPageCaches
- 4.2 PoolThreadCache#createNormalCaches
- 4.3 缓存数据的结构
- 5. PoolChunkList 结构
- 四、参考内容
一、前言
本系列虽说本意是作为 《Netty4 核心原理》一书的读书笔记,但在实际阅读记录过程中加入了大量个人阅读的理解和内容,因此对书中内容存在大量删改。
本篇涉及内容 :第十一章 Netty 内存分配 ByteBuf
本系列内容基于 Netty 4.1.73.Final 版本,如下:
<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.73.Final</version></dependency>
系列文章目录:
【Netty4核心原理】【全系列文章目录】
在 【Netty4核心原理⑬】【Netty 内存分配 ByteBuf❶】 中,我们介绍了ByteBuf 和 ByteBufAllocator 的基本内容,并且得知了 Netty 的内存分配依赖于 ByteBufAllocator ,而 ByteBufAllocator 有两个关键实现类:UnpooledByteBufAllocator (负责非池化的内存分配)和 PooledByteBufAllocator(池化内存分配)
本篇来具体看看这两个类是如何处理内存分配的。(本文相较于书中删减了大量源码阅读过程,如有需要可自行阅读书中内容)
二、非池化内存分配
Netty 中非池化内存分配是由 UnpooledByteBufAllocator 类来完成,而 UnpooledByteBufAllocator 的内存分配场景还分为堆内内存分配和堆外内存分配,因此我们分别来看这两种场景。
非池化内存分配的思想是:每次创建 ByteBuf 时直接向操作系统申请内存,释放时直接归还给操作系统,因此实现逻辑会比池化内存更简单一些。
1. 堆内内存分配
在 UnpooledByteBufAllocator 中堆内内存分配是交由 UnpooledByteBufAllocator#newHeapBuffer 方法的,其代码如下:
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {// 判断当前JVM是否支持Unsafe API// Unsafe是JDK内部的底层API,允许直接操作内存地址以提升性能return PlatformDependent.hasUnsafe() ?// 若支持Unsafe,使用基于Unsafe优化的堆缓冲区实现// Instrumented:会收集内存分配统计信息用于监控// Unpooled:表示非池化,每次创建都是新分配内存// Unsafe:使用Unsafe API优化内存操作new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :// 若不支持Unsafe,使用基于标准Java API的堆缓冲区实现// 仅依赖byte[]数组操作,兼容性更好但性能略低new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
可以看到这里是先判断了 JVM 是否支持 Unsafe ,并根据结果选择创建不同的实现类(Unsafe 的判断并不是本篇重点,所以这里不展开说明),
- 如果支持 Unsafe ,则选择 InstrumentedUnpooledUnsafeHeapByteBuf
- 如果不支持 Unsafe ,则选择 InstrumentedUnpooledUnsafeHeapByteBuf
InstrumentedUnpooledUnsafeHeapByteBuf 和 InstrumentedUnpooledUnsafeHeapByteBuf 在大逻辑上并无多少区别,因此我们下面以 InstrumentedUnpooledUnsafeHeapByteBuf 为例来说明:
1.1 构造函数
InstrumentedUnpooledUnsafeHeapByteBuf 的构造函数如下:
InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {// 调用父类 UnpooledUnsafeHeapByteBuf 的构造函数,然后再调用更上层的父类 UnpooledHeapByteBuf 的构造函数super(alloc, initialCapacity, maxCapacity);}
这里的构造函数有三个入参,释义如下:
-
UnpooledByteBufAllocator alloc
:当前 ByteBuf 所属的内存分配器,负责管理内存的分配与释放。 -
int initialCapacity
:指定缓冲区的初始容量(创建时分配的内存大小,单位:字节)。初始容量决定了缓冲区刚创建时可容纳的字节数,若后续写入数据超过此容量,缓冲区会自动扩容(不超过 maxCapacity)。 -
int maxCapacity
:指定缓冲区的最大容量(允许扩容的上限,单位:字节)。 当缓冲区需要扩容时,最大不能超过此值,否则会抛出 IndexOutOfBoundsException。
InstrumentedUnpooledUnsafeHeapByteBuf 的构造函数会调用到 父类UnpooledHeapByteBuf 的构造函数,UnpooledHeapByteBuf 的构造函数如下:
public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {/****************** 1. 设置缓冲区的最大容量 ******************/// 调用父类构造方法,设置缓冲区的最大容量super(maxCapacity);/****************** 2. 参数校验 ******************/// 校验初始容量不能大于最大容量,否则抛出异常if (initialCapacity > maxCapacity) {throw new IllegalArgumentException(String.format("initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));}// 校验分配器不为null,若为null则抛出异常this.alloc = checkNotNull(alloc, "alloc");/****************** 3. 初始化缓冲区(创建一个指定大小的 byte[] ) ******************/// 分配初始容量的byte数组,并设置为缓冲区的底层存储setArray(allocateArray(initialCapacity));// 初始化缓冲区的读写索引,均从0开始(此时缓冲区为空)setIndex(0, 0);}protected byte[] allocateArray(int initialCapacity) {// 创建一个 byte 数组return new byte[initialCapacity];}private void setArray(byte[] initialArray) {// 将数组赋值给 array 属性array = initialArray;tmpNioBuf = null;}@Overridepublic ByteBuf setIndex(int readerIndex, int writerIndex) {if (checkBounds) {checkIndexBounds(readerIndex, writerIndex, capacity());}// 设置读写范围都是0,因为是刚开始创建setIndex0(readerIndex, writerIndex);return this;}
可以看到 UnpooledHeapByteBuf 的构造函数逻辑并不复杂,主要就是一些基础属性的初始化工作,上面代码注释已经说明,这里不再赘述。
InstrumentedUnpooledHeapByteBuf 同样也是 UnpooledHeapByteBuf 子类,所以在构建 InstrumentedUnpooledHeapByteBuf 时也是调用的 UnpooledHeapByteBuf 构造函数。
1.2 IO 读写
InstrumentedUnpooledHeapByteBuf 和 InstrumentedUnpooledUnsafeHeapByteBuf 在 IO 读写上略有不同,主要区别在于:
- InstrumentedUnpooledUnsafeHeapByteBuf 调用
UnsafeByteBufUtil.getByte(array, index)
完成数据读取。 - InstrumentedUnpooledHeapByteBuf 调用
HeapByteBufUtil.getByte(array, index)
; 完成数据读取(因此这种场景是不支持 Unsafe 所以无法通过 Unsafe 读取)。
InstrumentedUnpooledUnsafeHeapByteBuf 和 InstrumentedUnpooledHeapByteBuf 的读取都会调用到 _getByte
方法,各自的实现如下:
// InstrumentedUnpooledUnsafeHeapByteBuf#_getByte 方法如下@Overrideprotected byte _getByte(int index) {return UnsafeByteBufUtil.getByte(array, index);}// InstrumentedUnpooledHeapByteBuf#_getByte 方法如下@Overrideprotected byte _getByte(int index) {return HeapByteBufUtil.getByte(array, index);}
2. 堆外内存分配
在 UnpooledByteBufAllocator 中堆外内存分配是交由 UnpooledByteBufAllocator#newDirectBuffer 方法的,其代码如下:
/*** 创建新的非池化直接缓冲区实例* 直接缓冲区的数据存储在操作系统的直接内存(堆外内存)中,不占用JVM堆空间* * @param initialCapacity 初始容量(字节数),缓冲区创建时的初始大小* @param maxCapacity 最大容量(字节数),缓冲区可扩展的上限* @return 新创建的直接缓冲区实例,可能已添加内存泄漏检测功能*/
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {final ByteBuf buf;// 判断当前JVM是否支持Unsafe API// Unsafe是JDK内部的底层API,允许直接操作内存地址,可提升直接内存操作性能if (PlatformDependent.hasUnsafe()) {// 根据是否禁用Cleaner机制选择不同的实现类// Cleaner机制用于在直接缓冲区对象被GC回收时释放对应的堆外内存buf = noCleaner ? // 不使用Cleaner机制的Unsafe直接缓冲区实现new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :// 使用Cleaner机制的Unsafe直接缓冲区实现new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);} else {// 不支持Unsafe时,使用基于标准NIO API的直接缓冲区实现buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);}// 根据是否禁用内存泄漏检测器,决定返回原始缓冲区还是添加了泄漏检测的缓冲区// 内存泄漏检测是Netty提供的用于排查缓冲区未正确释放问题的功能return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
可以看到,基本思路与 堆内内存分配 的场景类似:
-
当支持 Unsafe 时,选择 InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf 或 InstrumentedUnpooledUnsafeDirectByteBuf 实现类
在支持 Unsafe 的场景下还会额外判断是否支持 Cleaner 机制 来选择不同的实现类,关于 Cleaner 机制在 【Netty4核心原理⑬】【Netty 内存分配 ByteBuf❶】 中有过介绍。
-
当不支持 Unsafe 时,InstrumentedUnpooledDirectByteBuf 实现类。
同样,我们这里选择 InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf 展开来看。
2.1 构造函数
当 JVM 支持 Unsafe 是,会选择 InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf 或 InstrumentedUnpooledUnsafeDirectByteBuf ,这两个类都是 UnpooledDirectByteBuf 的子类,都会调用 UnpooledDirectByteBuf 的构造函数。
InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf 的构造函数会调用其父类 UnpooledDirectByteBuf 的构造函数(InstrumentedUnpooledUnsafeDirectByteBuf 也是 ),因此我们这里直接看UnpooledDirectByteBuf 子类,因此也会调用到这里 UnpooledDirectByteBuf 的构造函数,如下:
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {// 调用父类的构造函数,传入最大容量super(maxCapacity);// 忽略一些参数检查...// 将传入的 ByteBufAllocator 实例赋值给当前对象的 alloc 属性this.alloc = alloc;// 调用 allocateDirect 方法分配指定初始容量的直接字节缓冲区,并调用 setByteBuffer 方法设置缓冲区setByteBuffer(allocateDirect(initialCapacity), false);
}protected ByteBuffer allocateDirect(int initialCapacity) {// 调用 Java NIO 的 ByteBuffer 类的静态方法 allocateDirect 来分配直接字节缓冲区(返回类型是 DirectByteBuffer)return ByteBuffer.allocateDirect(initialCapacity);
}
这里的逻辑与堆内内存分配的场景也基本相同,区别在于这里的内存分配并不是直接创建一个 byte[]
,而是通过 ByteBuffer.allocateDirect(initialCapacity)
通过 JDK 底层分配一个直接缓冲区 DirectByteBuffer ,然后交由 UnpooledDirectByteBuf#setByteBuffer 来处理这个 Buffer。
这里我们需要了解 :
ByteBuffer.allocateDirect(initialCapacity)
通过 JDK 底层分配一个直接缓冲区,其类型是 DirectByteBuffer- UnpooledDirectByteBuf 本身不直接存储数据,而是通过代理这个 ByteBuffer 实现内存操作。
因此这里我们就来看 UnpooledDirectByteBuf#setByteBuffer 的实现,其代码如下:
/*** 设置缓冲区关联的ByteBuffer,并根据需要释放旧的直接内存缓冲区* * @param buffer 新的ByteBuffer实例,将作为当前缓冲区的底层存储* @param tryFree 是否尝试释放旧的ByteBuffer所占用的直接内存*/
void setByteBuffer(ByteBuffer buffer, boolean tryFree) {// 尝试释放旧缓冲区,防止堆外内存泄漏if (tryFree) {// 获取当前关联的旧ByteBufferByteBuffer oldBuffer = this.buffer;if (oldBuffer != null) {if (doNotFree) {// 如果标记了不释放旧缓冲区,则仅重置标记,不执行释放操作doNotFree = false;} else {// 释放旧ByteBuffer占用的直接内存freeDirect(oldBuffer);}}}// 更新当前缓冲区关联的ByteBuffer为新实例this.buffer = buffer;// 清空临时NIO缓冲区引用(确保后续操作使用新的buffer)tmpNioBuf = null;// 更新当前缓冲区的容量为新ByteBuffer的剩余可读字节数capacity = buffer.remaining();
}
UnpooledDirectByteBuf#setByteBuffer 方法的作用是 更新缓冲区关联的底层 ByteBuffer 实例,并根据配置处理旧缓冲区的内存释放。
这里 tryFree 的取值逻辑是 : 当 UnpooledDirectByteBuf 创建时为 false ,因为刚创建是初始化阶段,无需清理旧 ByteBuffer;而当调整当前非池化堆外缓冲区时(调用 UnpooledDirectByteBuf#capacity 方法时),此时需要重新初始化新的 ByteBuffer,所以此时 为true,需要清理旧 ByteBuffer。
简言之,该方法是 Netty 中直接缓冲区(如 DirectByteBuf)底层存储管理的核心逻辑,负责安全地完成新旧缓冲区的切换并处理内存释放,平衡了性能与内存安全。
2.2 IO 读写
与堆内内存分配类似,堆外内存的 IO 读写也因为 是否支持 Unsafe 有了不同的实现,在支持 Unsafe 的场景下, IO 的读写操作都是交由 UnsafeByteBufUtil 的方法来直接利用 Unsafe 机制操作内存地址 ,而非 Unsafe 场景下则是通过 数组的下标获取数据,具体如下:
// 非 Unsafe 场景下 在io.netty.buffer.UnpooledDirectByteBuf#_getByte 的实现@Overrideprotected byte _getByte(int index) {return buffer.get(index);}// Unsafe 场景下 在io.netty.buffer.UnpooledUnsafeDirectByteBuf#_getByte 的实现@Overrideprotected byte _getByte(int index) {return UnsafeByteBufUtil.getByte(addr(index));}
三、池化内存分配
池化内存分配的场景下要比非池化内存场景下更为复杂,在 【Netty4核心原理⑬】【Netty 内存分配 ByteBuf❶】 中介绍过 Netty 的池化内存分配是通过 PooledByteBufAllocator 完成的。因此我们以 PooledByteBufAllocator 为切入口来具体看。
PooledByteBufAllocator 中有 PooledByteBufAllocator#newHeapBuffer 和 PooledByteBufAllocator#newDirectBuffer 两个方法,分别用于池化内存场景下的堆内内存分配和堆外内存分配。实现具体如下:
@Overrideprotected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {// 获取当前线程的缓存对象,用于缓存内存分配相关资源PoolThreadCache cache = threadCache.get();// 从线程缓存中获取堆内存分配器(PoolArena)PoolArena<byte[]> heapArena = cache.heapArena;final ByteBuf buf;// 如果存在可用的堆内存分配器,则使用池化方式分配缓冲区if (heapArena != null) {buf = heapArena.allocate(cache, initialCapacity, maxCapacity);} else {// 如果没有可用的堆内存分配器,则使用非池化方式创建缓冲区// 根据当前平台是否支持Unsafe来选择不同的实现buf = PlatformDependent.hasUnsafe() ?new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);}// 将创建的缓冲区包装为具有内存泄漏检测功能的缓冲区并返回return toLeakAwareBuffer(buf);}@Overrideprotected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {// 获取当前线程的缓存对象PoolThreadCache cache = threadCache.get();// 从线程缓存中获取直接内存分配器(PoolArena)PoolArena<ByteBuffer> directArena = cache.directArena;final ByteBuf buf;// 如果存在可用的直接内存分配器,则使用池化方式分配缓冲区if (directArena != null) {buf = directArena.allocate(cache, initialCapacity, maxCapacity);} else {// 如果没有可用的直接内存分配器,则使用非池化方式创建缓冲区// 根据当前平台是否支持Unsafe来选择不同的实现buf = PlatformDependent.hasUnsafe() ?UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);}// 将创建的缓冲区包装为具有内存泄漏检测功能的缓冲区并返回return toLeakAwareBuffer(buf);}
PooledByteBufAllocator#newHeapBuffer 和 PooledByteBufAllocator#newDirectBuffer 的大体逻辑都比较类似,简单总结如下:
-
通过
threadCache
获取线程缓存的对象cache
,并从cache
中获取内存分配器(堆内存分配器PoolArena<byte[]>
或 直接内存分配器PoolArena<ByteBuffer>
)。threadCache
的类型是 PoolThreadLocalCachecache
类型是 PoolThreadCache
-
如果存在可用的内存分配器 (
heapArena
和directArena
),则通过内存分配器来分配缓冲区。 -
如果不存在可用的内存分配器,则根据当前平台是否支持 Unsafe 来选择不同的方式分配缓冲区。
在具体分析内存分类流程前,我们需要先了解一下上面的 threadCache
和 cache
对象。
1. threadCache 和 cache
上面代码中的 threadCache
和 cache
对象类型分别是 PoolThreadLocalCache 和 PoolThreadCache,下面我们介绍一下这两个类。
1.1 PoolThreadLocalCache
PoolThreadLocalCache,是 FastThreadLocal 的子类,提供了类似 ThreadLocal 的功能。
FastThreadLocal 是 Netty 框架提供的一个高性能线程本地变量实现,旨在替代 JDK 原生的 ThreadLocal,提供更快的访问速度和更低的内存开销。 在 【Netty4核心原理⑯】【Netty高性能调优工具类解析】 中我们会做进一步介绍。
因为其功能与 ThreadLocal 类似,所以我们直接来看 PoolThreadLocalCache#initialValue 方法,如下:
/*** 初始化线程本地的内存缓存(PoolThreadCache)* 这是ThreadLocal的initialValue()方法重写,当首次调用get()时会执行* 采用synchronized修饰以保证线程安全* * @return 初始化后的PoolThreadCache实例*/
@Override
protected synchronized PoolThreadCache initialValue() {// 从堆内存分配器列表中选择负载最低的 PoolArena(或者说使用率最少的)final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);// 从直接内存分配器列表中选择负载最低的 PoolArena (或者说使用率最少的)final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);// 获取当前线程实例final Thread current = Thread.currentThread();// 判断是否需要使用缓存:配置为所有线程使用缓存,或者当前线程是FastThreadLocalThreadif (useCacheForAllThreads || current instanceof FastThreadLocalThread) {// 创建PoolThreadCache实例final PoolThreadCache cache = new PoolThreadCache(heapArena, directArena, smallCacheSize, normalCacheSize,DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);// 如果配置了缓存清理间隔(默认大于0)if (DEFAULT_CACHE_TRIM_INTERVAL_MILLIS > 0) {// 获取当前线程关联的EventExecutorfinal EventExecutor executor = ThreadExecutorMap.currentExecutor();if (executor != null) {// 定期调度缓存清理任务// 首次延迟、间隔时间都为DEFAULT_CACHE_TRIM_INTERVAL_MILLISexecutor.scheduleAtFixedRate(trimTask, DEFAULT_CACHE_TRIM_INTERVAL_MILLIS,DEFAULT_CACHE_TRIM_INTERVAL_MILLIS, TimeUnit.MILLISECONDS);}}return cache;}// 不使用缓存的情况,创建一个所有大小都为0的PoolThreadCachereturn new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0);
}
上述代码的逻辑如下:
-
选择策略:通过
leastUsedArena()
方法选择负载最低的内存分配器,目的是为了实现负载器的负载均衡。- 这里是从
heapArenas
和directArenas
中选择内存分配器,而heapArenas
和directArenas
的定义为PoolArena<byte[]>[]
和PoolArena<ByteBuffer>[]
。 heapArenas
和directArenas
是在 PooledByteBufAllocator 的构造函数中进行初始化,数组大小默认是 CPU 核心数 * 2,数量与 EventLoopGroup的默认线程数数量一致,目的是为了保证 Netty 中的每一个任务线程都可以有一个独享的 Arena,保证在每个线程分配内存的时候不用加锁。
- 这里是从
-
缓存判断:根据
useCacheForAllThreads
配置 或 线程类型(是否为 FastThreadLocalThread)决定是否启用缓存。useCacheForAllThreads
变量值可以默认读取环境变量io.netty.allocator.useCacheForAllThreads
的值,默认为 true,也可以在构建 PooledByteBufAllocator 的时候通过构造函数入参指定具体值,级别高于环境变量的配置。
-
缓存配置:当启用缓存时,会创建具有指定大小的缓存实例,并可能定期调度缓存清理任务。
- 这里缓存的大小由 DEFAULT_MAX_CACHED_BUFFER_CAPACITY 和 DEFAULT_NUM_DIRECT_ARENA 来决定,这两个属性在静态代码块中初始化,默认大小是 CPU 核心数 * 2 。 而我们在之前客户端和服务端的分析文章中提到 EventLoopGroup 分配线程时,默认线程数也是 CPU 核数 * 2。这样就可以保证在默认情况下 Netty 中的每一个任务线程都可以独享一个 Arena。
-
无缓存情况:返回一个空缓存(所有尺寸为 0),实际上不进行任何缓存。
到这里我们可以知道了,当我们第一次调用 PoolThreadLocalCache#get 方法时会为当前线程创建一个 PoolThreadCache 对象(在useCacheForAllThreads || current instanceof FastThreadLocalThread
场景下),因此下面我们来看 PoolThreadCache 的内容。
1.2 PoolThreadCache
PoolThreadCache 是内存池(PooledByteBufAllocator)的核心组件之一,用于线程本地的内存缓存,旨在减少内存分配 / 释放的开销,提升多线程环境下的内存使用效率。
1.2.1 PoolThreadCache 构造
在上面的代码中,我们可以看到 PoolThreadCache 构造时传入了六个构造参数,如下:
final PoolThreadCache cache = new PoolThreadCache(heapArena, directArena, smallCacheSize, normalCacheSize,DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
这里六个构造入参分别代表的是
heapArena
:堆内存分配器(PoolArena<byte[]>
类型),负责管理 JVM 堆内存的分配与释放,用于缓存堆内存缓冲区directArena
:直接内存分配器(PoolArena<ByteBuffer>
类型),负责管理直接内存(堆外内存)的分配与释放,用于缓存直接内存缓冲区smallCacheSize
:用于设置小型缓冲区缓存槽的数量,小型缓冲区通常指容量较小的内存块normalCacheSize
:用于设置普通缓冲区缓存槽的数量,普通缓冲区指容量较大的内存块DEFAULT_MAX_CACHED_BUFFER_CAPACITY
:最大缓存缓冲区容量,超过此值的缓冲区不会被缓存DEFAULT_CACHE_TRIM_INTERVAL
:缓存整理间隔,指定缓存整理(修剪)的时间间隔,用于定期清理缓存中不常用的缓冲区,优化内存使用
1.2.2 PoolThreadCache 的绑定
我们这里来简单说明下:
- PooledByteBufAllocator 是 Netty 中的池化内存分配器,该类中有两个属性
PoolArena<byte[]>[] heapArenas;
和PoolArena<ByteBuffer>[] directArenas;
分别对应着堆内内存分配和堆外内存分配,这两个 PoolArena 数组的默认大小都是 CPU 核心数 * 2。 - 当 EventLoop 创建 ByteBuf 时,首先会通过 PoolThreadLocalCache#get 获取到一个 PoolThreadCache 对象(因为 PoolThreadLocalCache 的特性,该对象与当前线程绑定)。
- 在 PoolThreadCache 创建时会从
heapArenas
和directArenas
中分别选择一个使用率最低的内存分配器保存到当前 PoolThreadCache 对象中。 - 至此完成了每一个 EventLoop 与 PoolArena 的绑定,如下图:
PoolThreadCache 的具体内存分配逻辑下文会详细说明。
1.2.3 PoolThreadCache 的内存分配
PoolThreadCache 的内存缓存并不完全来自 PoolArena ,还可以在它底层维护的 ByteBuf 缓存列表进行分配。
PoolThreadCache 内部自身有基于多个内存缓存队列,当进行内存分配时,首先会尝试从内存缓存中分配,如果内存缓存队列中没有合适的块,则通过 PoolArena 来进行内存缓存。
具体来说如下:
-
PoolThreadCache 内部维护了多组不同规格的缓存列表(按内存块大小分类),当需要分配内存时,PoolThreadCache 会先检查自身缓存列表,如果存在匹配尺寸的空闲内存块,则直接从缓存中取出复用,无需经过 PoolArena。这种方式速度最快,因为完全是线程本地操作,无锁竞争。
- 在 PooledByteBufAllocator 中维护着两种规格大小的缓存列表,分别是两个值 smallCacheSize、normalCacheSize。PooledByteBufAllocator 在静态代码块中固定了常量 DEFAULT_SMALL_CACHE_SIZE 和 DEFAULT_NORMAL_CACHE_SIZE 大小,并在构造函数中对 smallCacheSize 和 normalCacheSize 进行赋值,因此 smallCacheSize 固定为 256,normalCacheSize 固定为64。
- 在早期 Netty 版本中,存在三种规格大小的缓存列表,额外还存在一个 tinyCacheSize,是比 smallCacheSize 更小的内存划分。
-
如果 PoolThreadCache 的缓存列表中没有合适的内存块(缓存未命中),则会委托给关联的 PoolArena(堆内存对应 HeapArena,直接内存对应 DirectArena)进行分配
PoolArena 会从自身管理的内存池(Chunk、Subpage 等)中切割出所需大小的内存块。分配成功后,该内存块会被返回给 PoolThreadCache,供当前线程使用。
-
当 ByteBuf 被释放时,内存块会被归还给 PoolThreadCache:
- 如果内存块尺寸符合缓存规格,且缓存未达上限,则会被存入对应的缓存列表,供下次复用。
- 如果缓存已满或内存块尺寸不适合缓存,则会被归还给 PoolArena,由 PoolArena 进行全局管理(可能复用或释放给操作系统)
PoolThreadCache 的设计体现了 “线程本地缓存 -> 全局内存池” 的分层复用策略:
- 优先使用线程本地缓存(自身维护的 ByteBuf 缓存列表),减少跨线程竞争。
- 缓存未命中时才向 PoolArena 申请,利用全局内存池的资源
2. DirectArena 内存分配流程
上面提到的内存分配有 PooledByteBufAllocator#newHeapBuffer 和 PooledByteBufAllocator#newDirectBuffer 两个方法,我们这里以 PooledByteBufAllocator#newDirectBuffer 的方法为例进行说明,如下:
@Overrideprotected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {// 获取当前线程的缓存对象PoolThreadCache cache = threadCache.get();// 从线程缓存中获取直接内存分配器(PoolArena)PoolArena<ByteBuffer> directArena = cache.directArena;final ByteBuf buf;// 如果存在可用的直接内存分配器,则使用池化方式分配缓冲区if (directArena != null) {// 调用 PoolArena#allocate 进行内存分配buf = directArena.allocate(cache, initialCapacity, maxCapacity);} else {// 如果没有可用的直接内存分配器,则使用非池化方式创建缓冲区// 根据当前平台是否支持Unsafe来选择不同的实现buf = PlatformDependent.hasUnsafe() ?UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);}// 将创建的缓冲区包装为具有内存泄漏检测功能的缓冲区并返回return toLeakAwareBuffer(buf);}
我们这里直接来看 directArena.allocate(cache, initialCapacity, maxCapacity);
的代码逻辑,其实现在 PoolArena#allocate 中,具体如下:
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {// 1. 创建一个空的 PooledByteBuf 实例PooledByteBuf<T> buf = newByteBuf(maxCapacity);// 2. 执行内存分配逻辑allocate(cache, buf, reqCapacity);return buf;}
这里的逻辑比较清楚:
- 通过
newByteBuf(maxCapacity)
方法创建一个空的 PooledByteBuf 实例,传入 maxCapacity 参数指定缓冲区的最大容量。 - 调用
allocate(cache, buf, reqCapacity)
方法在线程私有的 PoolThreadCache 中分配一块内存,再对buf
里面的内存之类的值进行初始化。
这里我们按照注释看这两步:
2.1 newByteBuf(maxCapacity)
我们这个场景下 newByteBuf(maxCapacity)
的实现为 DirectArena#newByteBuf,具体如下:
@Overrideprotected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {// 根据是否支持 Unsafe 选择不同的实现类if (HAS_UNSAFE) {return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);} else {return PooledDirectByteBuf.newInstance(maxCapacity);}}
这里我们以 PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
为例看下实现:
final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {private static final ObjectPool<PooledUnsafeDirectByteBuf> RECYCLER = ObjectPool.newPool(new ObjectCreator<PooledUnsafeDirectByteBuf>() {@Overridepublic PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {return new PooledUnsafeDirectByteBuf(handle, 0);}});static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {// 从缓存池中获取缓存对象PooledUnsafeDirectByteBuf buf = RECYCLER.get();buf.reuse(maxCapacity);return buf;}...}
这里我们需要注意上面代码中的三个对象,如下:
ObjectPool
是 Netty 提供的轻量级对象池工具,用于缓存和复用对象,避免频繁创建对象导致的 GC 压力。RECYCLER
是一个静态对象池,专门缓存 PooledUnsafeDirectByteBuf 实例。ObjectCreator
用于在对象池为空时创建新的 PooledUnsafeDirectByteBuf 实例,创建时会传入一个 Handle(对象池句柄),用于后续将对象 “归还” 给池。
因此调用 PooledUnsafeDirectByteBuf#newInstance 方法时便优先从对象池 RECYCLER 中获取复用实例,而非直接 new 一个新对象。具体来说如下:
- 首先通过
RECYCLER.get()
从 对象池 RECYCLER 中获取一个缓存的实例buf
,若池中无实例,则通过 ObjectCreator 新建一个。 - 因为获取到的
buf
可能是新创建的也可能是从 RECYCLER 中获取到,所以需要通过buf.reuse(maxCapacity)
重置该实例的状态(如清空之前的数据、设置新的最大容量等),确保复用前状态干净。
2.2 allocate(cache, buf, reqCapacity)
上面已经完成了一个 PooledUnsafeDirectByteBuf 对象的获取,而这一步则开始对 PooledUnsafeDirectByteBuf 进行内存分配。
allocate(cache, buf, reqCapacity)
的实现是 PoolArena#allocate,实现如下:
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {// 1. 计算大小索引// Netty 预定义了一系列标准内存大小(如 16B、32B、64B、128B... 8KB、16KB 等),形成一个递增的数组,sizeIdx 就是该数组的下标。// 例如:若 reqCapacity=100B,可能会映射到 128B 对应的索引(向上取整到最近的标准大小)。final int sizeIdx = size2SizeIdx(reqCapacity);// 2. 根据内存大小的不同选择不同的内存块分配if (sizeIdx <= smallMaxSizeIdx) {// 小型内存分配 : 从线程本地缓存(PoolThreadCache)中分配小型内存块。tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);} else if (sizeIdx < nSizes) {// 常规内存分配 : 从缓存中分配常规大小的内存块。tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);} else {// 先通过 normalizeSize 对容量进行内存对齐(仅堆外内存需要,避免碎片)。int normCapacity = directMemoryCacheAlignment > 0? normalizeSize(reqCapacity) : reqCapacity;// 大型内存分配 :调用 allocateHuge 直接分配,不经过线程缓存(大内存复用率低,缓存意义不大,反而可能占用过多内存)。// 大内存通常直接从操作系统申请(如堆外内存通过 Unsafe 分配,堆内内存直接创建 byte[])。allocateHuge(buf, normCapacity);}}
这里的内容简单来说就是根据申请的内存大小,判断是否有合适规格的内存缓存可用,如果所有规格都不满足,那就直接调用 allocateHuge 进行真实的内存分配。这里需要注意的是 tcacheAllocateSmall
和 tcacheAllocateNormal
都是从缓存中获取内存块,而 allocateHuge则是直接分配新的内存。
关于
tcacheAllocateSmall
和tcacheAllocateNormal
方法暂不进行分析(感觉过于复杂其意义不大,兴趣不大)
3. 内存池的内存规格
上面 allocate(cache, buf, reqCapacity)
中我们看到了 Netty 会根据实际需要选择不同的内存块来进行内存分配,因此这里我们就来介绍下 Netty 内存池的规格。
在 谈一谈 Netty 的内存管理 —— 且看 Netty 如何实现 Java 版的 Jemalloc(上) 一文中有如下描述:
在 4.1.52.Final 之前 Netty 内存池是基于 jemalloc3 的设计思想实现的,由于在该版本的实现中,内存规格的粒度设计的比较粗,可能会引起比较严重的内存碎片问题。所以为了近一步降低内存碎片,Netty 在 4.1.52.Final 版本中重新基于 jemalloc4 的设计思想对内存池进行了重构,通过将内存规格近一步拆分成更细的粒度,以及重新设计了内存分配算法尽量将内存碎片控制在比较小的范围内。
随后在 4.1.75.Final 版本中,Netty 为了近一步降低不必要的内存消耗,将 ChunkSize 从原来的 16M 改为了 4M 。而且在默认情况下不在为普通的用户线程提供内存池的 Thread Local 缓存。在兼顾性能的前提下,将不必要的内存消耗尽量控制在比较小的范围内。
因此由于本文依赖版本是 4.1.73.Final,所以下面内容会与书中差异较大。
在 Netty 内存池(PooledByteBufAllocator)中,内存规格的划分是基于预定义的标准大小进行的,目的是通过标准化内存块尺寸减少碎片、提升复用率,并配合线程本地缓存(PoolThreadCache)实现高效分配。在上面的代码中 Netty 预定义了一系列标准内存大小(以字节为单位),并通过索引(sizeIdx) 快速定位规格(在上面的 size2SizeIdx(reqCapacity);
方法中完成)。
在新版本中,Netty 划分了下面三种内存规格 (去除了 tiny 内存规格):
- small(小内存块):指 0 Byte-28KB 规格
- normal(中等内存块):指 28KB-16MB 的规格大小
- huge(大内存块):指 16MB 以上的规格大小。huge 规格的内存块不复用线程缓存,直接从操作系统申请(堆外内存通过 Unsafe 分配,堆内内存用 byte[]),释放时直接归还给系统。
当 Netty 内存池不足时,Netty 会动态向系统申请新的内存(申请的大小为配置的 Chunk 尺寸,默认为 16MB,可通过 PooledByteBufAllocator 的构造函数自定义 pageSize 或 maxOrder 来调整),申请后会被纳入内存池管理,供后续分配使用。
-
Netty 4.1.52.Final 版本及之后的内存规格划分:
-
Netty 4.1.52.Final 之前版本的四种内存规格划分:
4. 命中缓存的分配
在 PoolThreadCache 中维护了四个类型为 MemoryRegionCache 的缓存数组(smallSubPageHeapCaches、smallSubPageDirectCaches、normalHeapCaches、normalDirectCaches)分别针对上面说的Netty 内存规格中的 small 和 normal 结构 (有四个是因为堆外内存缓存和堆内内存缓存各有两个)。
在 PoolThreadCache 的构造函数中会根据参数(堆外内存或堆内内存)对其中两个数组进行初始化,如下:
-
堆外内存场景的初始化
smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.numSmallSubpagePools);normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);
-
堆内内存的初始化
smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.numSmallSubpagePools);normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);
可以看到, small
和 normal
缓存数组的初始化就是就交由 PoolThreadCache#createSubPageCaches 和 PoolThreadCache#createNormalCaches 完成的,下面我们具体来看。
4.1 PoolThreadCache#createSubPageCaches
PoolThreadCache#createSubPageCaches 的代码如下:
private static <T> MemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches) {if (cacheSize > 0 && numCaches > 0) {@SuppressWarnings("unchecked")// 初始化一个长度为 numCaches 的 MemoryRegionCache数组MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];// 初始化 cache 数组for (int i = 0; i < cache.length; i++) {// TODO: maybe use cacheSize / cache.length// cacheSize 是控制单个缓存实例所能容纳的最大内存块数量cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);}return cache;} else {return null;}}
PoolThreadCache#createSubPageCaches 的实现并不复杂,简单来说就是创建了一个大小为 numCaches
的 MemoryRegionCache 缓存数组。
small 缓存数组的长度为 39
4.2 PoolThreadCache#createNormalCaches
PoolThreadCache#createNormalCaches 的代码如下:
private static <T> MemoryRegionCache<T>[] createNormalCaches(int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {// 只有当缓存容量和最大可缓存容量均为正数时,才创建缓存数组if (cacheSize > 0 && maxCachedBufferCapacity > 0) {// 计算实际允许缓存的最大内存块容量:取内存池Chunk容量和配置的最大可缓存容量中的较小值// area.chunkSize是常规内存块的理论最大容量(由pageSize和maxOrder决定,默认16MB)int max = Math.min(area.chunkSize, maxCachedBufferCapacity);// 临时使用ArrayList存储缓存实例,方便动态添加List<MemoryRegionCache<T>> cache = new ArrayList<MemoryRegionCache<T>>();// 遍历所有符合条件的常规内存块规格索引,为每个规格创建对应的缓存实例// 1. 起始索引:area.numSmallSubpagePools是小型内存块(Small)的规格总数,从此索引开始为常规内存块规格// 2. 终止条件1:idx < area.nSizes 确保不超过内存池定义的所有规格总数// 3. 终止条件2:area.sizeIdx2size(idx) <= max 确保规格对应的内存块大小不超过最大可缓存容量for (int idx = area.numSmallSubpagePools; idx < area.nSizes && area.sizeIdx2size(idx) <= max; idx++) {// 为当前规格创建一个常规内存缓存实例,容量为cacheSizecache.add(new NormalMemoryRegionCache<T>(cacheSize));}// 将列表转换为MemoryRegionCache数组并返回return cache.toArray(new MemoryRegionCache[0]);} else {// 若缓存容量或最大可缓存容量不合法,则不创建缓存,返回nullreturn null;}}
PoolThreadCache#createNormalCaches 的实现会更复杂一些,实际目的与 PoolThreadCache#createSubPageCaches 类似,即初始化 normal 缓存数组。(太复杂了,看不动了 )
normal 缓存数组的长度为 37
4.3 缓存数据的结构
上面我们提到 Small 缓存数组长度是 39, 缓存对象范围是 16Byte - 28KB,Normal 缓存数组的长度是 37, 缓存对象范围是 28KB - 4M,大于 4M 的范围则属于 Huge,Huge范围的内存并不会做缓存,而是会在使用时向操作系统申请,使用完成后直接释放。
以 smallSubPageDirectCaches
为例,上面我们提到了 smallSubPageDirectCaches
数组的长度是39,而 smallSubPageDirectCaches 的缓存范围是 16Byte - 28KB,因此这里 smallSubPageDirectCaches[0]
的缓存对象所缓存的 ByteBuf 的缓冲区大小就是 16Byte,在 smallSubPageDirectCaches[2]
中缓存的 ByteBuf 大小为 32 Byte,依此类推,smallSubPageDirectCaches[38]
中缓存的 ByteBuf 大小为 28KB (这里数组中每个下标的缓存对象的大小差异并非是等差数列)
综上,Netty 中的内存缓存结构如下图(参照 谈一谈 Netty 的内存管理 —— 且看 Netty 如何实现 Java 版的 Jemalloc(上) 所画):
5. PoolChunkList 结构
在上面我们曾简单提到过 Netty 内存分配的单位 Chunk(Netty 中的实现为 PoolChunk
),一个 Chunk 的大小是 16MB,而 每个 Chunk 都以双向链表的形式保存在一个 ChunkList 中(Netty 中的实现为 PoolChunkList
),多个 ChunkList 也是以双向链表的形式进行关联的,因此,ChunkList 和 Chunk 之间的结构如下:
简单来说 :PoolChunkList
是一个双向链表,每个节点是一个 PoolChunk
。Netty 中有多个 PoolChunkList
,每个 PoolChunkList
负责管理一定使用率范围内的 PoolChunk
,而使用率是指 PoolChunk
中已分配内存的比例。
Netty 在 PoolArena 中定义了关于 ChunkList 的六个变量并在构造函数中进行初始化,如下:
private final PoolChunkList<T> q050;private final PoolChunkList<T> q025;private final PoolChunkList<T> q000;private final PoolChunkList<T> qInit;private final PoolChunkList<T> q075;private final PoolChunkList<T> q100;// 构造函数中初始化protected PoolArena(PooledByteBufAllocator parent, int pageSize,int pageShifts, int chunkSize, int cacheAlignment) {...q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);// 使用双向链表的形式进行连接q100.prevList(q075);q075.prevList(q050);q050.prevList(q025);q025.prevList(q000);q000.prevList(null);qInit.prevList(qInit);...}
我们以 q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
为例进行简单介绍 :
q075
表示当前q050
的下一个节点,- 参数
50
:表示当前 ChunkList 中存储的 Chunk 的内存使用率都在 50%以上。 - 参数
100
:表示当前 ChunkList 中存储的 Chunk 的内存使用率都在 100%以下。 chunkSize
:表示设置的 Chunk 的容量大小。
所以上面多个 ChunkList 就是按照使用率进行划分,如 q050
代表使用率在 50%~75%,q025
则代表使用率在 25% ~ 50%,依次类推。
在初始化完成后,ChunkList 的节点关系如下图:
Netty 中,Chunk 又包含了多个 Page,每个 Page 的大小为 8KB,如果要分配 16KB 的内存,则在 Chunk 中找到连续的两个 Page 即可,对应关系如下图:
但在很多场景下,为缓冲区分配 8KB 的内存也是一种浪费,比如只需要分配 2KB 的缓冲区,如果使用 8KB 会造成 6KB 的浪费,这种情况下 Netty 又会将 Page 切分成多个 SubPage,每个 SubPage 大小要根据分配的缓冲区大小而定,比如要分配 2KB 的内存,就会将一个 Page 切分成 4 个 SubPage ,每个 SubPage 的大小为 2KB,如下图所示:
PoolSubpage 的基本结构的代码如下:
final PoolChunk<T> chunk;final int elemSize;private final int pageShifts;private final int runOffset;private final int runSize;private final long[] bitmap;PoolSubpage<T> prev;PoolSubpage<T> next;
其中几个参数的如下:
- chunk 代表其子页属于哪个 Chunk;
- bitmap 用于记录子页的内存分配情况;
- prev 和 next 代表 子页是按照双向链表进行关联的,分别指向上一个节点和下一个节点;
- elemSize 属性代表的事子页是按照多大内存进行划分的,如果按照 1KB 划分,则可以划分出 8 个子页。
四、参考内容
- 《Netty4核心原理》
- 豆包
- 死磕 Netty 之内存篇:再探 Netty 池化内存分配管理
- 谈一谈 Netty 的内存管理 —— 且看 Netty 如何实现 Java 版的 Jemalloc(上)