当前位置: 首页 > news >正文

Netty内存池核心PoolArena源码解析

PoolArena 是 Netty 内存池化机制的核心组件之一,它负责管理一整块或多块内存(PoolChunk),并将这些内存分配给应用程序。每个 PoolArena 实例都与一个特定的线程相关联(通过 PoolThreadCache),或者在禁用线程缓存时被多个线程共享。Netty 会创建多个 PoolArena 来减少多线程环境下的锁竞争。

PoolArena 是一个抽象类,它有两个具体的子类:

  • HeapArena: 用于分配堆内存 (byte[])。
  • DirectArena: 用于分配直接内存 (ByteBuffer)。

主要成员变量和职责

让我们看一下 PoolArena 类中的一些关键字段:

// ...
abstract class PoolArena<T> implements PoolArenaMetric {// ...enum SizeClass {Small,Normal}final PooledByteBufAllocator parent; // 指向创建此 Arena 的 PooledByteBufAllocatorfinal PoolSubpage<T>[] smallSubpagePools; // 用于管理 Small 类型内存分配的 PoolSubpage 池数组// PoolChunkList 用于根据 PoolChunk 的使用率将其组织起来// qInit: 0-25% 使用率 (最初创建的 Chunk)// q000: < 50% 使用率// q025: 25-75% 使用率// q050: 50-100% 使用率// q075: 75-100% 使用率// q100: 100% 使用率 (已满,但仍可分配 Subpage)private final PoolChunkList<T> q050;private final PoolChunkList<T> q025;private final PoolChunkList<T> q000;private final PoolChunkList<T> qInit;private final PoolChunkList<T> q075;private final PoolChunkList<T> q100;private final List<PoolChunkListMetric> chunkListMetrics; // 用于收集 PoolChunkList 的度量信息// 各种分配和释放的计数器private long allocationsNormal; // Normal 类型分配次数private final LongAdder allocationsSmall = new LongAdder(); // Small 类型分配次数 (线程安全)private final LongAdder allocationsHuge = new LongAdder(); // Huge 类型分配次数 (线程安全)private final LongAdder activeBytesHuge = new LongAdder(); // Huge 类型活跃字节数 (线程安全)private long deallocationsSmall; // Small 类型释放次数private long deallocationsNormal; // Normal 类型释放次数private long pooledChunkAllocations; // 池化 Chunk 的分配次数private long pooledChunkDeallocations; // 池化 Chunk 的释放次数private final LongAdder deallocationsHuge = new LongAdder(); // Huge 类型释放次数 (线程安全)// 使用此 Arena 的线程缓存数量final AtomicInteger numThreadCaches = new AtomicInteger();private final ReentrantLock lock = new ReentrantLock(); // 用于保护 Arena 内部状态的锁final SizeClasses sizeClass; // 描述了 Arena 的大小规格配置 (pageSize, chunkSize 等)// ...
}

  • SizeClass: 枚举类型,表示内存分配的类型,分为 Small (小于等于 pageSize / 2,通常从 PoolSubpage 分配) 和 Normal (大于 pageSize / 2 但小于 chunkSize,直接从 PoolChunk 分配)。
  • parent: 指向 PooledByteBufAllocator,这是内存分配器的顶层入口。
  • smallSubpagePools: 这是一个 PoolSubpage 数组,数组的每个元素是一个双向链表的头节点。相同大小的 Small 类型的 PoolSubpage 会被链接到同一个链表上,便于快速查找和分配。
  • qInitq000q025q050q075q100: 这些是 PoolChunkList 对象,它们形成了一个双向链表结构。PoolArena 根据 PoolChunk 的内存使用率(usage())将其组织在不同的 PoolChunkList 中。例如,q050 存储使用率在 50% 到 100% 之间的 PoolChunk。这种组织方式有助于在分配内存时,优先从使用率较高的 PoolChunk 中分配,以期尽快填满并释放空闲的 PoolChunk,从而减少内存碎片。
  • Metrics Counters: 大量的计数器用于追踪不同类型(Small, Normal, Huge)的分配和释放次数,以及活跃的字节数和 Chunk 数量。这些信息对于监控内存池的性能和状态非常有用。LongAdder 用于在高并发场景下提供比 AtomicLong 更好的性能。
  • numThreadCaches: 记录了当前有多少个 PoolThreadCache 正在使用这个 PoolArena
  • lock: 一个可重入锁,用于在修改 PoolArena 的共享数据结构(如 PoolChunkList)时进行同步,防止并发冲突。
  • sizeClass: (实际上是 this.sizeClass,来自构造函数参数 SizeClasses sizeClass) 这是一个 SizeClasses 对象,它封装了关于内存规格的配置信息,如 pageSize(页大小)、pageShiftschunkSize(块大小)等,并提供了一些计算方法,如根据请求大小计算规格索引 (size2SizeIdx)。

构造函数

PoolArena.java

// ...protected PoolArena(PooledByteBufAllocator parent, SizeClasses sizeClass) {assert null != sizeClass;this.parent = parent;this.sizeClass = sizeClass;smallSubpagePools = newSubpagePoolArray(sizeClass.nSubpages);for (int i = 0; i < smallSubpagePools.length; i ++) {smallSubpagePools[i] = newSubpagePoolHead(i);}q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, sizeClass.chunkSize);q075 = new PoolChunkList<T>(this, q100, 75, 100, sizeClass.chunkSize);q050 = new PoolChunkList<T>(this, q100, 50, 100, sizeClass.chunkSize); // 注意这里 nextList 是 q100q025 = new PoolChunkList<T>(this, q050, 25, 75, sizeClass.chunkSize);q000 = new PoolChunkList<T>(this, q025, 1, 50, sizeClass.chunkSize);qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, sizeClass.chunkSize);q100.prevList(q075);q075.prevList(q050);q050.prevList(q025);q025.prevList(q000);q000.prevList(null); // q000 的前一个 List 是 null,表示它是链表的头部(在查找时)qInit.prevList(qInit); // qInit 的 prevList 指向自身,它是一个特殊的 ListList<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);metrics.add(qInit);metrics.add(q000);metrics.add(q025);metrics.add(q050);metrics.add(q075);metrics.add(q100);chunkListMetrics = Collections.unmodifiableList(metrics);}private PoolSubpage<T> newSubpagePoolHead(int index) {PoolSubpage<T> head = new PoolSubpage<T>(index);head.prev = head;head.next = head;return head;}@SuppressWarnings("unchecked")private PoolSubpage<T>[] newSubpagePoolArray(int size) {return new PoolSubpage[size];}
// ...

构造函数主要做了以下几件事:

  1. 初始化 parent 和 sizeClass
  2. 初始化 smallSubpagePools 数组,其中每个元素都是一个 PoolSubpage 链表的头节点。newSubpagePoolHead 创建一个空的双向循环链表。
  3. 初始化 qInit 到 q100 这些 PoolChunkList。注意它们的 minUsage 和 maxUsage 参数,以及它们之间的 nextList 和 prevList 关系,形成了一个查找链。
    • qInit: 用于存放新创建的 PoolChunk,使用率范围是 Integer.MIN_VALUE 到 25%
    • q000: 使用率 1% 到 50%
    • q025: 使用率 25% 到 75%
    • q050: 使用率 50% 到 100%
    • q075: 使用率 75% 到 100%
    • q100: 使用率 100% 到 Integer.MAX_VALUE (实际上是100%)。 这些 PoolChunkList 通过 prevList 和 nextList 链接起来,方便在分配和释放时根据 PoolChunk 的使用率变化将其移动到合适的 PoolChunkList 中。

内存分配 (allocate)

内存分配是 PoolArena 的核心功能。

PoolArena.java

// ...PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {PooledByteBuf<T> buf = newByteBuf(maxCapacity); // 创建一个 PooledByteBuf 对象 (具体类型由子类决定)allocate(cache, buf, reqCapacity); // 调用内部的分配方法return buf;}private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {final int sizeIdx = sizeClass.size2SizeIdx(reqCapacity); // 根据请求容量计算规格索引if (sizeIdx <= sizeClass.smallMaxSizeIdx) { // Small 类型分配tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);} else if (sizeIdx < sizeClass.nSizes) { // Normal 类型分配tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);} else { // Huge 类型分配 (大于 chunkSize)int normCapacity = sizeClass.directMemoryCacheAlignment > 0? sizeClass.normalizeSize(reqCapacity) : reqCapacity;// Huge allocations are never served via the cache so just call allocateHugeallocateHuge(buf, normCapacity);}}
// ...
  1. allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity): 这是外部调用的入口。它首先通过 newByteBuf(maxCapacity) 创建一个 PooledByteBuf 实例(具体是 PooledHeapByteBuf 还是 PooledDirectByteBuf 等由子类实现),然后调用内部的 allocate 方法来实际分配内存。
  2. allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity):
    • 首先,根据请求容量 reqCapacity 计算出对应的 sizeIdx (size index)。
    • Small Allocation: 如果 sizeIdx 小于等于 smallMaxSizeIdx (通常是 pageSize / 2 对应的索引),则认为是小内存分配,调用 tcacheAllocateSmall
    • Normal Allocation: 如果 sizeIdx 大于 smallMaxSizeIdx 但小于 nSizes (总规格数,对应 chunkSize 的索引),则认为是普通内存分配,调用 tcacheAllocateNormal
    • Huge Allocation: 如果 sizeIdx 超出了 nSizes,表示请求的内存大于 chunkSize,则认为是大内存分配,调用 allocateHuge。大内存分配不会使用线程缓存,并且会创建一个独立的、非池化的 PoolChunk

tcacheAllocateSmall (Small 类型分配)

PoolArena.java

// ...private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,final int sizeIdx) {if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) { // 尝试从线程缓存分配// was able to allocate out of the cache so move onreturn;}/** Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and* {@link PoolChunk#free(long)} may modify the doubly linked list as well.*/final PoolSubpage<T> head = smallSubpagePools[sizeIdx]; // 获取对应 sizeIdx 的 Subpage 链表头final boolean needsNormalAllocation;head.lock(); // 对 Subpage 链表头加锁try {final PoolSubpage<T> s = head.next;needsNormalAllocation = s == head; // 如果链表为空,则需要进行 Normal Allocation 来创建新的 Subpageif (!needsNormalAllocation) {assert s.doNotDestroy && s.elemSize == sizeClass.sizeIdx2size(sizeIdx) : "doNotDestroy=" +s.doNotDestroy + ", elemSize=" + s.elemSize + ", sizeIdx=" + sizeIdx;long handle = s.allocate(); // 从 Subpage 中分配一个元素assert handle >= 0;s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache); // 初始化 ByteBuf}} finally {head.unlock();}if (needsNormalAllocation) { // 如果没有可用的 Subpagelock(); // 获取 Arena 的全局锁try {allocateNormal(buf, reqCapacity, sizeIdx, cache); // 进行 Normal Allocation (可能会创建新的 Chunk 和 Subpage)} finally {unlock();}}incSmallAllocation(); // 增加 Small 分配计数}
// ...
  1. 首先尝试从 PoolThreadCache 中分配。如果成功,则直接返回。
  2. 如果线程缓存分配失败,则从 smallSubpagePools 中查找对应 sizeIdx 的 PoolSubpage 链表。
  3. 对该链表的头节点 head 加锁(head.lock()),这是为了保护 PoolSubpage 链表的并发修改。
  4. 如果链表中有可用的 PoolSubpage (s != head),则从该 PoolSubpage (s) 中调用 s.allocate() 分配一个元素(得到一个 handle),然后用这个 handle 初始化 PooledByteBuf
  5. 如果链表为空 (s == head),说明当前没有合适的 PoolSubpage 可供分配。此时,needsNormalAllocation 为 true
  6. 释放 head 的锁。
  7. 如果 needsNormalAllocation 为 true,则需要进行一次“普通分配”(allocateNormal)。这通常意味着需要从某个 PoolChunk 中分配一个新的 PoolSubpage。这个过程需要获取 PoolArena 的全局锁 (lock())。
  8. 最后,增加 allocationsSmall 计数。

tcacheAllocateNormal (Normal 类型分配)

PoolArena.java

// ...private void tcacheAllocateNormal(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,final int sizeIdx) {if (cache.allocateNormal(this, buf, reqCapacity, sizeIdx)) { // 尝试从线程缓存分配// was able to allocate out of the cache so move onreturn;}lock(); // 获取 Arena 的全局锁try {allocateNormal(buf, reqCapacity, sizeIdx, cache); // 进行 Normal Allocation++allocationsNormal; // 增加 Normal 分配计数} finally {unlock();}}
// ...
  1. 首先尝试从 PoolThreadCache 中分配。如果成功,则直接返回。
  2. 如果线程缓存分配失败,则获取 PoolArena 的全局锁 (lock())。
  3. 调用 allocateNormal 方法进行实际的分配。
  4. 增加 allocationsNormal 计数。
  5. 释放锁。

allocateNormal (核心普通分配逻辑)

// ...private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {assert lock.isHeldByCurrentThread(); // 确认当前线程已持有 Arena 锁if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) || // 尝试从 q050 分配q025.allocate(buf, reqCapacity, sizeIdx, threadCache) || // 尝试从 q025 分配q000.allocate(buf, reqCapacity, sizeIdx, threadCache) || // 尝试从 q000 分配qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) || // 尝试从 qInit 分配q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) { // 最后尝试从 q075 分配 (q100 通常是满的)return;}// Add a new chunk.// 如果所有 PoolChunkList 都分配失败,则创建一个新的 PoolChunkPoolChunk<T> c = newChunk(sizeClass.pageSize, sizeClass.nPSizes, sizeClass.pageShifts, sizeClass.chunkSize);boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache); // 从新 Chunk 中分配assert success; // 新创建的 Chunk 必然能分配成功qInit.add(c); // 将新 Chunk 加入到 qInit 列表++pooledChunkAllocations; // 增加 Chunk 分配计数}
// ...

此方法在持有 PoolArena 全局锁的情况下执行:

  1. 按顺序尝试从 PoolChunkList 分配
    • q050 (50-100% usage)
    • q025 (25-75% usage)
    • q000 (1-50% usage)
    • qInit (newly created chunks, <25% usage)
    • q075 (75-100% usage)
    • q100 列表中的 PoolChunk 通常是满的,但仍可能用于分配 PoolSubpage,这个逻辑在 PoolChunkList.allocate -> PoolChunk.allocate 中处理)。 这个顺序的目的是优先使用那些已经分配了一部分内存的 PoolChunk,以期更快地填满它们,从而减少内存碎片。
  2. 如果上述所有 PoolChunkList 都无法成功分配(即它们内部的 PoolChunk 都没有足够的空间或者无法分配出请求大小的内存块/子页),则需要创建一个新的 PoolChunk
  3. 创建新 PoolChunk: 调用 newChunk(...) 方法(由子类 HeapArena 或 DirectArena 实现)创建一个新的 PoolChunk
  4. 从这个新创建的 PoolChunk 中调用 c.allocate(...) 来分配内存给 buf。新创建的 PoolChunk 肯定是空的,所以这次分配一定会成功。
  5. 将新创建的 PoolChunk 添加到 qInit 列表中。
  6. 增加 pooledChunkAllocations 计数。

为什么对于chunkList 是这样的分配顺序

观察到的分配顺序是 q050 , q025 , q000 , qInit , 最后是 q075 。这个顺序并非简单地按利用率从低到高或从高到低,而是基于一种旨在减少内存碎片的“最佳适应”(Best-Fit)策略的变体,这种策略深受 jemalloc 内存分配器的影响。

首先,我们先明确一下这些 PoolChunkList (也就是 q 系列的链表)各自管理的 PoolChunk 的内存使用率范围。根据 PoolArena.java 中的定义:

  • qInit : 使用率低于 25% 的 Chunk (通常是新创建的)。

  • q000 : 使用率在 1% 到 50% 之间的 Chunk 。

  • q025 : 使用率在 25% 到 75% 之间的 Chunk 。

  • q050 : 使用率在 50% 到 100% 之间的 Chunk 。

  • q075 : 使用率在 75% 到 100% 之间的 Chunk 。

  • q100 : 使用率达到 100% 的 Chunk (已满)。

现在我们来分析这个分配顺序:

  1. 主要分配策略 ( q050 -> q025 -> q000 -> qInit ) : 这个顺序是从使用率较高的 Chunk 列表开始,逐步到使用率较低的列表。这体现了“最佳适应”的思想。 目标是优先填满那些已经分配了较多内存的 Chunk 。这样做的好处是:

    1. 减少内存碎片 :通过集中在少数 Chunk 中进行分配,可以更快地将它们填满(达到100%使用率),然后将它们移到 q100 链表中。一个全满的 Chunk 不再参与后续的分配,当它内部的所有 ByteBuf 都被释放后,这个 Chunk 就可以被完全回收,将内存归还给操作系统。

    2. 提高效率 :如果总是从利用率最低的 Chunk (如 qInit )开始分配,会导致大量 Chunk 都处于“部分使用”的状态,使得内存碎片化严重,难以分配较大的连续内存块,并且降低了内存归还给系统的可能性。

  2. q075 为什么排在最后 : q075 列表中的 Chunk 使用率已经非常高(75%-100%),意味着它们的剩余空间很小。 allocateNormal 方法用于分配“正常大小”的内存块(大于 small ,小于 huge ),这种大小的内存在一个几乎已满的 Chunk 中找到合适空间的概率较低。

    1. 性能优化 :将 q075 放在最后检查是一种性能优化。与其一开始就徒劳地在这些几乎已满的 Chunk 中搜索,不如先尝试其他更有可能成功的 Chunk 列表。只有当其他所有列表都无法满足分配请求时,才最后尝试在这些“残羹剩饭”中寻找机会。这减少了不必要的搜索开销。

总结来说,Netty 的这个分配顺序是一个精心设计的权衡:

  • 主体顺序 ( q050 -> qInit ) 是为了 对抗内存碎片 ,倾向于“物尽其用”,尽快填满并回收 Chunk 。

  • q075 的特殊位置 是为了 提升分配性能 ,避免在成功率低的 Chunk 上浪费时间。

这种设计使得 PooledByteBufAllocator 在高并发和长时间运行的场景下依然能保持高效和稳定的内存管理。

allocateHuge (Huge 类型分配)

内存池的主要优势在于复用频繁申请和释放的小到中等大小的内存块,以避免频繁向操作系统申请内存和垃圾回收带来的开销。

对于“巨大”(Huge)的内存分配(通常大于 chunkSize ,默认16MB),这种分配本身就不频繁。如果将这些巨大的内存块池化,意味着Netty需要长期持有一个或多个非常大的内存块,即使它们在大部分时间里是空闲的。这会导致严重的内存资源浪费,尤其是在高并发环境下,可能会长时间占用大量内存,降低了整体的内存使用效率。

// ...private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {PoolChunk<T> chunk = newUnpooledChunk(reqCapacity); // 创建一个非池化的、大小刚好满足需求的 ChunkactiveBytesHuge.add(chunk.chunkSize()); // 增加 Huge 类型活跃字节数buf.initUnpooled(chunk, reqCapacity); // 用这个非池化 Chunk 初始化 ByteBufallocationsHuge.increment(); // 增加 Huge 分配计数}
// ...
  1. 调用 newUnpooledChunk(reqCapacity) 创建一个非池化的 PoolChunk。这个 PoolChunk 的大小就是 reqCapacity,它不会被放入任何 PoolChunkList 中,也不会被其他分配请求共享。
  2. 更新 activeBytesHuge 和 allocationsHuge 计数。
  3. 调用 buf.initUnpooled(...) 来初始化 PooledByteBuf

内存释放 (free)

// ...void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {chunk.decrementPinnedMemory(normCapacity); // 减少 Chunk 的 pinned 内存计数if (chunk.unpooled) { // 如果是 Huge Allocation (非池化 Chunk)int size = chunk.chunkSize();destroyChunk(chunk); // 直接销毁 ChunkactiveBytesHuge.add(-size); // 更新 Huge 类型活跃字节数deallocationsHuge.increment(); // 更新 Huge 类型释放计数} else { // 池化 ChunkSizeClass sizeClass = sizeClass(handle); // 判断是 Small 还是 Normal 释放if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {// 尝试将内存在线程缓存中缓存起来// cached so not free it.return;}// 如果线程缓存失败或没有缓存,则真正释放回 ArenafreeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false);}}private static SizeClass sizeClass(long handle) {return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal;}void freeChunk(PoolChunk<T> chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer,boolean finalizer) {final boolean destroyChunk;lock(); // 获取 Arena 全局锁try {// We only call this if freeChunk is not called because of the PoolThreadCache finalizer as otherwise this// may fail due lazy class-loading in for example tomcat.if (!finalizer) { // 如果不是由 finalizer 触发的释放switch (sizeClass) {case Normal:++deallocationsNormal;break;case Small:++deallocationsSmall;break;default:throw new Error();}}// 调用 PoolChunkList 的 free 方法,该方法内部会调用 PoolChunk 的 free// 如果 PoolChunk 完全空闲,则 !chunk.parent.free(...) 返回 true,表示需要销毁 ChunkdestroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer);if (destroyChunk) {// all other destroyChunk calls come from the arena itself being finalized, so don't need to be counted++pooledChunkDeallocations; // 更新 Chunk 释放计数}} finally {unlock();}if (destroyChunk) {// destroyChunk not need to be called while holding the synchronized lock.destroyChunk(chunk); // 在锁外销毁 Chunk}}
// ...
  1. free(...):
    • 首先减少 chunk 的 pinnedMemory 计数。
    • 非池化 Chunk (Huge): 如果 chunk.unpooled 为 true,说明是为大内存分配创建的独立 PoolChunk。直接调用 destroyChunk(chunk) 销毁它,并更新相应的 Huge 类型计数器。
    • 池化 Chunk:
      • 通过 isSubpage(handle) 判断是 Small 还是 Normal 类型的释放。
      • 尝试将这块内存添加到 PoolThreadCache 中。如果添加成功,则直接返回,内存被缓存了。
      • 如果缓存失败或没有线程缓存,则调用 freeChunk(...) 将内存真正释放回 PoolArena
  2. freeChunk(...):
    • 获取 PoolArena 的全局锁。
    • 更新 deallocationsNormal 或 deallocationsSmall 计数(如果不是由 finalizer 触发)。
    • 调用 chunk.parent.free(...),这里的 chunk.parent 是指该 PoolChunk 所在的 PoolChunkListPoolChunkList.free(...) 方法会进一步调用 PoolChunk.free(handle) 来释放 PoolChunk 内部的内存。如果 PoolChunk 在释放这块内存后变为空闲(freeBytes == chunkSize),PoolChunkList.free(...) 会将该 PoolChunk 从链表中移除,并返回 false。因此,destroyChunk 变量会是 true
    • 如果 destroyChunk 为 true,增加 pooledChunkDeallocations 计数。
    • 释放锁。
    • 如果 destroyChunk 为 true,则在锁外部调用 destroyChunk(chunk) 来实际销毁 PoolChunk(例如,释放底层的 ByteBuffer 或 byte[])。

内存重分配 (reallocate)

reallocate 函数主要在 PooledByteBuf 的容量需要改变,并且无法在当前已分配的内存块( PoolChunk )内完成调整时被调用。具体来说,调用链是这样的:

  1. 当调用 ByteBuf.capacity(int newCapacity) 方法来调整一个池化的 ByteBuf (即 PooledByteBuf )的大小时。

  2. capacity 方法内部,会检查新的容量 newCapacity 是否可以直接在当前内存区域( maxLength )内容纳。

  3. 如果 newCapacity 超出了当前内存块能支持的范围(例如,比当前 length 大,且大于 maxLength ),或者不满足一些特定的收缩条件,就需要进行真正的内存重分配。此时,它会调用 chunk.arena.reallocate(this, newCapacity) ,也就是我们正在讨论的 reallocate 方法。

// ...void reallocate(PooledByteBuf<T> buf, int newCapacity) {assert newCapacity >= 0 && newCapacity <= buf.maxCapacity();final int oldCapacity;final PoolChunk<T> oldChunk;final ByteBuffer oldNioBuffer;final long oldHandle;final T oldMemory;final int oldOffset;final int oldMaxLength;final PoolThreadCache oldCache;// We synchronize on the ByteBuf itself to ensure there is no "concurrent" reallocations for the same buffer.// ...synchronized (buf) { // 对 ByteBuf 对象本身加锁,防止对同一个 buf 的并发重分配oldCapacity = buf.length;if (oldCapacity == newCapacity) {return;}// 保存旧 buf 的信息oldChunk = buf.chunk;oldNioBuffer = buf.tmpNioBuf;oldHandle = buf.handle;oldMemory = buf.memory;oldOffset = buf.offset;oldMaxLength = buf.maxLength;oldCache = buf.cache;// This does not touch buf's reader/writer indices// 为 buf 分配新的内存空间allocate(parent.threadCache(), buf, newCapacity);}int bytesToCopy;if (newCapacity > oldCapacity) {bytesToCopy = oldCapacity;} else {buf.trimIndicesToCapacity(newCapacity); // 如果新容量更小,调整读写指针bytesToCopy = newCapacity;}memoryCopy(oldMemory, oldOffset, buf, bytesToCopy); // 将旧内存数据拷贝到新内存free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, oldCache); // 释放旧的内存块}
// ...

reallocate 函数的核心作用是“搬家”:为 ByteBuf 申请一块新的、符合 newCapacity 大小的内存,将旧内存中的有效数据复制过去,然后释放旧的内存。这个过程确保了 ByteBuf 扩容或缩容时数据的完整性。

其主要步骤如下:

  1. 线程安全保障 : synchronized (buf) 会锁定当前的 ByteBuf 实例,防止多个线程同时对同一个 ByteBuf 进行重分配,避免状态错乱。

  2. 保存旧内存信息 :记录下当前 ByteBuf 的旧容量、所属的 PoolChunk 、句柄 handle 、内存对象 memory 等所有与旧内存位置相关的信息。

  3. 分配新内存 :调用 allocate(parent.threadCache(), buf, newCapacity) 方法,从内存池中申请一块新的内存。这个调用会更新 buf 对象内部的字段(如 chunk , handle , memory 等),使其指向新的内存位置。

  4. 数据拷贝 :调用 memoryCopy(...) 方法,将旧内存中的数据拷贝到新分配的内存中。拷贝的字节数是新旧容量中较小的那一个,以防止越界。

  5. 释放旧内存 :调用 free(...) 方法,将之前保存的旧内存块信息传入,把旧内存归还给内存池,以便后续复用。

HeapArena (堆内存)

  • lastDestroyedChunkHeapArena 会尝试缓存最后一个被销毁的 PoolChunk<byte[]>。当需要创建新的 PoolChunk 时,如果参数匹配,会复用这个缓存的 PoolChunk,避免重新分配 byte[] 数组的开销。
  • newByteArray(int size): 使用 PlatformDependent.allocateUninitializedArray(size) 来分配字节数组,这可能比 new byte[size] 更高效,因为它可能不会对数组内容进行零初始化(取决于 JVM 实现和 -XX:+AlwaysZeroTLAB 等标志)。
  • newChunk(...): 尝试从 lastDestroyedChunk 复用,否则创建新的 PoolChunk,其内存是新分配的 byte[]
  • newUnpooledChunk(...): 创建新的 PoolChunk,其内存是新分配的 byte[]
  • destroyChunk(...): 如果是池化的 PoolChunk 并且 lastDestroyedChunk 为空,则将其缓存起来。否则依赖 GC 回收 byte[]
  • newByteBuf(...): 根据 PlatformDependent.hasUnsafe() 的结果,创建 PooledUnsafeHeapByteBuf 或 PooledHeapByteBuf
  • memoryCopy(...): 使用 System.arraycopy 进行内存拷贝。

HeapArena 是 Netty 高性能内存池在堆内存管理上的具体体现。它通过继承 PoolArena 的通用分级管理框架(PoolChunkListPoolSubpage),并结合线程缓存(PoolThreadCache)机制,实现了高效的内存分配与回收。

其针对堆内存的特有实现,如复用 PoolChunk 对象和使用 System.arraycopy,进一步优化了性能,有效降低了 GC 压力和内存碎片,是 Netty 实现高性能网络通信的重要基石之一。

HeapArena 继承自 PoolArena,因此其核心结构与 PoolArena 保持一致,主要包括:

  • smallSubpagePools​:PoolSubpage<byte[]>[] 数组,用于管理小规格内存(小于 pageSize)的分配。每个 PoolSubpage 内部通过位图(bitmap)来跟踪更小内存块(element)的分配状态,实现了对小内存的高效利用。
  • PoolChunkList 链表​​(qInitq000q025q050q075q100):这些链表用于管理不同内存使用率区间的 PoolChunkPoolChunk 是内存池的基本分配单元(默认为 16MB)。通过将 PoolChunk 按使用率分组,可以快速找到合适的 Chunk 进行内存分配,实现了分级管理。
  • SizeClasses​:一个用于规格化请求容量的工具类,它将任意大小的内存请求映射到预定义的规格(size index),并提供相关的计算方法。
  • ​锁机制​​:使用 ReentrantLock 来保证在多线程环境下对 Arena 内部数据结构访问的线程安全。

内存分配 (allocate)

HeapArena 作为 PoolArena 的子类,重写了几个关键的抽象方法,以适配堆内存的特性:

  • isDirect()​:返回 false,表明它管理的是非直接内存(堆内存)。
  • newChunk(...)​:创建一个新的 PoolChunk<byte[]>。值得注意的是,这里有一个优化:它会尝试复用一个最近被销毁的 PoolChunk(通过 lastDestroyedChunk 字段)。如果复用失败,则通过 newByteArray(chunkSize) 创建一个新的 byte[] 数组作为 PoolChunk 的底层内存。
        private final AtomicReference<PoolChunk<byte[]>> lastDestroyedChunk;protected PoolChunk<byte[]> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {PoolChunk<byte[]> chunk = lastDestroyedChunk.getAndSet(null);if (chunk != null) {assert chunk.chunkSize == chunkSize &&chunk.pageSize == pageSize &&chunk.maxPageIdx == maxPageIdx &&chunk.pageShifts == pageShifts;return chunk; // The parameters are always the same, so it's fine to reuse a previously allocated chunk.}return new PoolChunk<byte[]>(this, null, null, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx);}
  • newUnpooledChunk(...)​:创建一个用于大内存分配的非池化 PoolChunk,底层同样是 byte[]
  • newByteArray(...)​:内部调用 PlatformDependent.allocateUninitializedArray(size) 来分配 byte[] 数组。使用 allocateUninitializedArray 可以避免数组初始化时的额外开销。
  • destroyChunk(...)​:销毁一个 PoolChunk。对于堆内存,销毁操作实际上依赖于 GC。但为了性能,HeapArena 会尝试缓存最后一个被销毁的 ChunklastDestroyedChunk.set(chunk)),以便在下次创建新 Chunk 时可以复用。
  • newByteBuf(...)​:根据 PlatformDependent.hasUnsafe() 的结果,创建 PooledUnsafeHeapByteBufPooledHeapByteBuf 实例。
  • memoryCopy(...)​:使用 System.arraycopy 来实现内存复制,这是针对 byte[] 数组最高效的方式。

DirectArena

​DirectArena​​ 是 PoolArena 的一个静态内部类,专门用于管理堆外内存(Direct Memory),其管理的内存类型是 java.nio.ByteBuffer。它继承了 PoolArena<ByteBuffer>,复用了 PoolArena 中通用的内存池管理算法(如伙伴算法的变体、多层级的 PoolChunkList 管理等),但对直接内存的分配、释放和操作等关键部分提供了专门的实现。


类的定义和构造函数

static final class DirectArena extends PoolArena<ByteBuffer> {DirectArena(PooledByteBufAllocator parent, SizeClasses sizeClass) {super(parent, sizeClass);}
}
  • extends PoolArena<ByteBuffer>
    泛型参数 <ByteBuffer> 表明这个 Arena 管理的底层内存是 ByteBuffer 对象,与管理 byte[]HeapArena 形成对比。
  • static final
    static 意味着 DirectArena 的实例不依赖于外部 PoolArena 的实例;final 表示它不能被继承。


内存块的创建(newChunknewUnpooledChunk

@Override
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {if (sizeClass.directMemoryCacheAlignment == 0) {CleanableDirectBuffer cleanableDirectBuffer = allocateDirect(chunkSize);return new PoolChunk<ByteBuffer>(this, cleanableDirectBuffer, memory, memory, pageSize, pageShifts, chunkSize, maxPageIdx);}CleanableDirectBuffer cleanableDirectBuffer = allocateDirect(chunkSize + sizeClass.directMemoryCacheAlignment);final ByteBuffer memory = PlatformDependent.alignDirectBuffer(base, sizeClass.directMemoryCacheAlignment);return new PoolChunk<ByteBuffer>(this, cleanableDirectBuffer, base, memory, pageSize, pageShifts, chunkSize, maxPageIdx);
}
  • ​核心功能​
    HeapArena 使用 new byte[chunkSize] 不同,DirectArena 通过 PlatformDependent.allocateDirect(capacity) 分配堆外内存(通常调用 ByteBuffer.allocateDirect())。
  • ​内存对齐(directMemoryCacheAlignment)​
    directMemoryCacheAlignment > 0,会分配稍大的内存并通过 PlatformDependent.alignDirectBuffer() 对齐地址,优化 CPU 缓存行利用,防止伪共享(False Sharing)。
  • CleanableDirectBuffer
    包装分配的内存,利用 Java 9+ 的 Cleaner(或旧版类似机制)确保 PoolChunkByteBuffer 被垃圾回收时,底层堆外内存可靠释放,防止泄漏。

内存块的销毁和创建

@Override
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {chunk.cleanable.clean();
}
  • ​与 HeapArena 的区别​
    HeapArena 会缓存最后销毁的 PoolChunk 以供复用(lastDestroyedChunk),而 DirectArena 直接调用 chunk.cleanable.clean() 立即释放内存。

理解为什么可以“销毁”,我们需要知道 destroyChunk 是在什么条件下被触发的。

  • 当一个 PoolChunk 内的所有内存都被释放后,它的使用率会降为 0% ( usage() == 0 )。此时, PoolChunkList 的 free 方法会尝试将这个完全空闲的 chunk 移动到前一个 PoolChunkList (即使用率更低的 List )。
  • 然而,对于管理着 0%-25% 使用率的 qInit 这个 PoolChunkList 来说,它的 prevList 是 null 。当它试图移动一个使用率为 0 的 chunk 时, move0 方法会因为 prevList == null 而返回 false 。这个 false 返回值会一路传递,最终导致 PoolArena 调用 destroyChunk 。

因此 当destroyChunk 被调用,意味着这个 PoolChunk 已经完全变空了 。它不再服务于任何内存分配。


ByteBuf 的创建(newByteBuf

@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {if (HAS_UNSAFE) {return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);} else {return PooledDirectByteBuf.newInstance(maxCapacity);}
}
  • ​创建策略​
    根据 PlatformDependent.hasUnsafe() 结果选择实现:
    • PooledUnsafeDirectByteBuf​(支持 sun.misc.Unsafe):使用 Unsafe API 直接操作内存地址,性能最高。
    • PooledDirectByteBuf​(不支持 Unsafe):回退到标准 ByteBuffer API(get/put),兼容性更好。

内存复制(memoryCopy

@Override
protected void memoryCopy(ByteBuffer src, int srcOffset, PooledByteBuf<ByteBuffer> dstBuf, int length) {if (HAS_UNSAFE) {PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcOffset,PlatformDependent.directBufferAddress(dstBuf.memory) + dstBuf.offset, length);} else {src.position(srcOffset).limit(srcOffset + length);dst.position(dstBuf.offset);dst.put(src);}
}
  • ​高性能复制​
    • 支持 Unsafe 时调用 PlatformDependent.copyMemory(底层为 Unsafe.copyMemory),实现高效内存块复制。
    • 否则回退到 ByteBuffer.put(ByteBuffer) 的标准方法。

度量信息 (Metrics)

PoolArena 实现了 PoolArenaMetric 接口,提供了大量关于内存池状态的度量信息,例如:

  • numThreadCaches(): 使用此 Arena 的线程缓存数量。
  • numSmallSubpages()numChunkLists(): Subpage 和 ChunkList 的数量。
  • smallSubpages()chunkLists(): 返回 Subpage 和 ChunkList 的度量信息列表。
  • numAllocations()numDeallocations(): 总的分配和释放次数。
  • numSmallAllocations()numNormalAllocations()numHugeAllocations(): 不同类型的分配次数。
  • numChunkAllocations()numChunkDeallocations(): Chunk 的分配和释放次数。
  • numActiveAllocations()numActiveSmallAllocations(), etc.: 当前活跃的分配数量。
  • numActiveBytes(): 当前活跃的总字节数。
  • numPinnedBytes(): 当前被固定的字节数(用于直接 I/O 等)。

这些方法大多通过读取内部的计数器或遍历 chunkListMetrics 来获取数据。访问某些计数器(如 allocationsNormal)时会加锁。

总结

PoolArena 是 Netty jemalloc 风格内存池的核心,它通过管理 PoolChunk 和 PoolSubpage 来高效地分配和释放内存。

  • 分级管理: 将内存请求分为 Small, Normal, Huge 三种类型,采用不同的分配策略。
  • Chunk 列表: 使用多个 PoolChunkList (qInit, q000, q025, q050, q075, q100) 根据 PoolChunk 的使用率对其进行组织,优化分配查找。
  • Subpage 池: 对 Small 类型的分配,使用 PoolSubpage 进一步细化内存管理,减少碎片。
  • 线程缓存: 与 PoolThreadCache 配合,为每个线程提供本地缓存,极大减少了对 PoolArena 全局锁的竞争。
  • 同步: 使用 ReentrantLock 保护 Arena 级别的共享数据,使用 PoolSubpage 自身的锁保护其内部链表,使用 synchronized(buf) 保护 reallocate 操作。
  • 堆外/堆内支持: 通过 HeapArena 和 DirectArena 子类分别支持堆内存和直接内存的池化。
  • 度量: 提供丰富的度量信息,方便监控和调优。

PoolArena 的设计体现了 Netty 在高性能网络编程中对内存管理的极致追求,通过精细化的管理和多层次的缓存来提高内存分配效率并减少GC压力。

http://www.xdnf.cn/news/1067563.html

相关文章:

  • 机器学习×第十四卷:集成学习中篇——她从每次错误中修正自己
  • 基于目标驱动的分布式敏捷开发
  • 闲庭信步使用SV搭建图像测试平台:第九课——初步使用类
  • 浅谈开源在线客服系统与 APP 集成的技术方案与优劣势
  • 基于单片机的语音控制设计(论文)
  • 黑马Day01-03集开始
  • Springboot项目中使用手机号短信验证码注册登录实现
  • 北京及其周边理工科大学高考招生情况
  • 前端登录状态管理:主流方案对比与安全实践指南
  • Android系统常见有线网卡丢包问题的调试排查方案
  • 【Linux网络编程】多路转接I/O(一)select,poll
  • ci | cd
  • mapbox基础,导出地图
  • Java+GcExcel,生成自定义工作表
  • Rust 项目实战:多线程 Web 服务器
  • 报错:macOS 安装 sentencepiece
  • CentOS 7 通过YUM安装MySQL 8.0完整指南
  • 专题:2025大模型2.0:GPT到DeepSeek技术演进与产业落地报告|附200+份报告PDF汇总下载
  • 云原生周刊:Argo CD v3.1 正式发布
  • MySQL优化:使用 LIMIT 进行分页查询时,偏移量过大造成查询性能降低问题分析
  • AS32A601与ASM1042芯片在电力系统自动化监控中的应用效能分析
  • 基于PostgreSQL的百度或高德等POI多层级分类的数据库设计
  • Towards Generalizable Diabetic Retinopathy Grading in Unseen Domains
  • 【ARM 嵌入式 编译系列 7.5 -- GCC 打印链接脚本各段使用信息】
  • c++IO类
  • HTML语义化标签
  • ubuntu安装postman教程并中文汉化详细教程
  • 互联网大数据求职面试:从Zookeeper到Flink的技术探讨
  • Gateway路径匹配规则易错点
  • 自回归(AR)与掩码(MLM)的核心区别:续写还是补全?