Kafka 内存池MemoryPool 设计
SimpleMemoryPool
SimpleMemoryPool
是 Kafka 客户端库中一个基础且重要的内存管理工具,它是一个简单的内存池实现,其核心功能是限制待处理(outstanding)内存的总量,防止无限制的内存分配导致程序内存溢出(OOM)。
SimpleMemoryPool
实现了 MemoryPool
接口,提供了一套标准的内存分配和释放的 API。
/*** a simple pool implementation. this implementation just provides a limit on the total outstanding memory.* any buffer allocated must be release()ed always otherwise memory is not marked as reclaimed (and "leak"s)*/
public class SimpleMemoryPool implements MemoryPool {// ...
}
注释里明确指出了它的核心职责:
- 提供一个内存上限:它只关心当前已经分配出去的总内存是否超过了设定的阈值。
- 需要手动释放:任何通过它分配的
ByteBuffer
都必须在使用完毕后调用release()
方法归还,否则这部分内存将被视为“泄漏”,永远不会被回收计入可用内存。
SimpleMemoryPool
通过几个关键的原子(atomic)变量来保证线程安全和高效的内存追踪。
// ... existing code ...
public class SimpleMemoryPool implements MemoryPool {protected final Logger log = LoggerFactory.getLogger(getClass()); //subclass-friendlyprotected final long sizeBytes;protected final boolean strict;protected final AtomicLong availableMemory;protected final int maxSingleAllocationSize;protected final AtomicLong startOfNoMemPeriod = new AtomicLong(); //nanosecondsprotected volatile Sensor oomTimeSensor;// ...
}
sizeBytes
:long
类型,表示这个内存池的总大小(容量),单位是字节。strict
:boolean
类型,一个非常重要的标志,决定了内存分配的策略(详见tryAllocate
方法分析)。availableMemory
:AtomicLong
类型,核心状态变量,用于原子地追踪当前可用的内存大小。初始化时等于sizeBytes
。maxSingleAllocationSize
:int
类型,限制单次能申请的最大内存。任何超过这个大小的请求都会直接失败。startOfNoMemPeriod
:AtomicLong
类型,用于记录内存开始耗尽(即无法满足分配请求)的起始时间点(纳秒),主要用于监控和度量。oomTimeSensor
:Sensor
类型,Kafka Metrics 框架的一部分。当内存池从耗尽状态恢复时,用它来记录从开始耗尽到恢复所经过的时间。
构造函数
构造函数负责初始化内存池的配置。
// ... existing code ...public SimpleMemoryPool(long sizeInBytes, int maxSingleAllocationBytes, boolean strict, Sensor oomPeriodSensor) {if (sizeInBytes <= 0 || maxSingleAllocationBytes <= 0 || maxSingleAllocationBytes > sizeInBytes)throw new IllegalArgumentException("must provide a positive size and max single allocation size smaller than size."+ "provided " + sizeInBytes + " and " + maxSingleAllocationBytes + " respectively");this.sizeBytes = sizeInBytes;this.strict = strict;this.availableMemory = new AtomicLong(sizeInBytes);this.maxSingleAllocationSize = maxSingleAllocationBytes;this.oomTimeSensor = oomPeriodSensor;}
// ... existing code ...
它做了以下几件事:
- 参数校验:确保总大小
sizeInBytes
和单次最大分配maxSingleAllocationBytes
都为正数,并且单次最大分配不能超过总大小。 - 属性赋值:将传入的参数赋值给类的成员变量,并将
availableMemory
初始化为sizeInBytes
。
内存分配 tryAllocate
这是 SimpleMemoryPool
最核心的方法,实现了内存分配的逻辑。
// ... existing code ...@Overridepublic ByteBuffer tryAllocate(int sizeBytes) {if (sizeBytes < 1)throw new IllegalArgumentException("requested size " + sizeBytes + "<=0");if (sizeBytes > maxSingleAllocationSize)throw new IllegalArgumentException("requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize);long available;boolean success = false;//in strict mode we will only allocate memory if we have at least the size required.//in non-strict mode we will allocate memory if we have _any_ memory available (so available memory//can dip into the negative and max allocated memory would be sizeBytes + maxSingleAllocationSize)long threshold = strict ? sizeBytes : 1;while ((available = availableMemory.get()) >= threshold) {success = availableMemory.compareAndSet(available, available - sizeBytes);if (success)break;}if (success) {maybeRecordEndOfDrySpell();} else {if (oomTimeSensor != null) {startOfNoMemPeriod.compareAndSet(0, System.nanoTime());}log.trace("refused to allocate buffer of size {}", sizeBytes);return null;}ByteBuffer allocated = ByteBuffer.allocate(sizeBytes);bufferToBeReturned(allocated);return allocated;}
// ... existing code ...
其执行流程如下:
- 请求校验:检查请求的
sizeBytes
是否合法(大于0且小于等于maxSingleAllocationSize
)。 - 确定分配阈值
threshold
:- 严格模式 (
strict = true
):threshold
等于sizeBytes
。这意味着只有当可用内存足够满足本次请求时,才会尝试分配。 - 非严格模式 (
strict = false
):threshold
等于1
。这意味着只要还有任何一点可用内存,就会尝试分配。在这种模式下,availableMemory
可能会变成负数,总分配内存最多可能超出sizeBytes
接近maxSingleAllocationSize
。这是一种避免饥饿的策略,允许在资源紧张时进行“超额”分配。
- 严格模式 (
- CAS循环尝试分配:
- 在一个
while
循环中,首先检查当前可用内存available
是否大于等于threshold
。 - 如果满足条件,就使用
compareAndSet
(CAS) 原子操作尝试将availableMemory
减去sizeBytes
。CAS操作能保证在多线程环境下只有一个线程能成功修改availableMemory
。 - 如果CAS成功,
success
置为true
并跳出循环。如果失败,说明有其他线程抢先修改了availableMemory
,循环会继续,重新获取最新的availableMemory
值再次尝试。
- 在一个
- 处理分配结果:
- 成功:调用
maybeRecordEndOfDrySpell()
记录可能结束的OOM周期,然后通过ByteBuffer.allocate(sizeBytes)
在JVM堆上分配一块内存,并返回。 - 失败:说明可用内存不足。此时会记录OOM周期的开始时间(如果尚未记录),并返回
null
,表示分配失败。
- 成功:调用
内存释放 release
此方法用于将之前分配的 ByteBuffer
归还给内存池。
// ... existing code ...@Overridepublic void release(ByteBuffer previouslyAllocated) {if (previouslyAllocated == null)throw new IllegalArgumentException("provided null buffer");bufferToBeReleased(previouslyAllocated);availableMemory.addAndGet(previouslyAllocated.capacity());maybeRecordEndOfDrySpell();}
// ... existing code ...
逻辑很简单:
- 校验
previouslyAllocated
不为null
。 - 调用
bufferToBeReleased
钩子方法(用于子类扩展)。 - 通过
addAndGet
原子地将释放的ByteBuffer
的容量加回到availableMemory
。 - 调用
maybeRecordEndOfDrySpell()
,因为释放内存可能使得内存池从“耗尽”状态恢复,从而结束一个OOM周期。
监控与扩展
maybeRecordEndOfDrySpell()
: 这个方法是连接内存管理和监控系统的桥梁。它原子地读取并重置startOfNoMemPeriod
,如果之前的值不为0,说明一个OOM周期刚刚结束,它会计算持续时间并报告给oomTimeSensor
。bufferToBeReturned()
和bufferToBeReleased()
: 这两个protected
方法是为子类设计的扩展点。它们在内存分配给调用者之前和内存被标记为回收之前被调用。一个很好的例子是GarbageCollectedMemoryPool
,它重写了这两个方法来追踪每个分配的ByteBuffer
对象,以便在它们被GC回收时也能自动释放内存。
总结
SimpleMemoryPool
是一个线程安全的、基于计数器的非池化内存管理器。它本身不持有或缓存任何 ByteBuffer
对象(分配是直接 ByteBuffer.allocate
,释放也只是增加计数器),它的唯一目标是确保并发环境下,所有已分配内存的总和不超过一个预设的限制。
它的设计体现了几个原则:
- 简单高效:使用
AtomicLong
和 CAS 操作,避免了使用重量级锁,性能很高。 - 线程安全:天然适合多线程环境。
- 策略可配:通过
strict
参数,可以在“严格保证内存上限”和“尽量避免饥饿”之间做选择。 - 可监控:通过
Sensor
集成了 Kafka 的 Metrics 框架,可以方便地监控内存池的健康状况。
在 Kafka 源码中,它常被用于需要限制内存使用但又不需要复杂缓冲池功能的场景,例如 SocketServer
中用于限制网络请求队列所占用的内存。
BatchMemoryPool
BatchMemoryPool
和 SimpleMemoryPool
虽然都实现了 MemoryPool
接口,但它们的设计哲学和应用场景有显著的不同。BatchMemoryPool
是一个真正的缓冲池,它会缓存并复用 ByteBuffer
对象,而 SimpleMemoryPool
更像一个内存分配的“记账员”,只追踪内存使用量而不持有对象。
/*** Simple memory pool that tries to maintain a limited number of fixed-size buffers.** This type implements an unbounded memory pool. When releasing byte buffers they will get pooled* up to the maximum retained number of batches.*/
public class BatchMemoryPool implements MemoryPool {// ...
}
从注释中可以提炼出它的核心职责:
- 维护固定大小的缓冲区:这个池子里的所有
ByteBuffer
大小都是固定的(即batchSize
)。 - 有限的池化:它会尝试复用释放的缓冲区,但只会保留(retain)不超过
maxRetainedBatches
个缓冲区。 - 无界内存池:这是一个关键特性。它所谓的“无界”指的是分配请求总能被满足(只要系统内存足够),它不会因为达到某个设定的总容量而拒绝分配。如果池中没有可用的缓冲区,它会直接创建一个新的。
核心属性分析
// ... existing code ...
public class BatchMemoryPool implements MemoryPool {private final ReentrantLock lock;private final Deque<ByteBuffer> free;private final int maxRetainedBatches;private final int batchSize;private int numAllocatedBatches = 0;
// ... existing code ...
lock
: 一个ReentrantLock
。与SimpleMemoryPool
使用原子类(CAS)的无锁思想不同,BatchMemoryPool
采用显式的锁来保证对内部状态访问的线程安全。free
: 一个Deque<ByteBuffer>
(双端队列),用作空闲缓冲区列表。这是实现缓冲区复用的核心数据结构。maxRetainedBatches
:int
类型,指定了free
队列的最大容量,即池子最多保留多少个空闲的ByteBuffer
以备重用。batchSize
:int
类型,池中每个ByteBuffer
的固定大小。numAllocatedBatches
:int
类型,一个计数器,记录了当前由该池管理的缓冲区总数,包括在free
队列中的和已经分配出去正在使用的。
构造函数
// ... existing code ...public BatchMemoryPool(int maxRetainedBatches, int batchSize) {this.maxRetainedBatches = maxRetainedBatches;this.batchSize = batchSize;this.free = new ArrayDeque<>(maxRetainedBatches);this.lock = new ReentrantLock();}
// ... existing code ...
构造函数非常直接,就是初始化上述的几个核心属性。
内存分配 tryAllocate
这是 BatchMemoryPool
的分配逻辑,与 SimpleMemoryPool
的行为差异巨大。
// ... existing code ...@Overridepublic ByteBuffer tryAllocate(int sizeBytes) {if (sizeBytes > batchSize) {throw new IllegalArgumentException("Cannot allocate buffers larger than max " +"batch size of " + batchSize);}lock.lock();try {ByteBuffer buffer = free.poll();// Always allocation a new buffer if there are no free buffersif (buffer == null) {buffer = ByteBuffer.allocate(batchSize);numAllocatedBatches += 1;}return buffer;} finally {lock.unlock();}}
// ... existing code ...
- 加锁:首先获取锁,保证操作的原子性。
- 参数校验:检查请求的
sizeBytes
是否大于batchSize
。这个池子只能分配固定大小的ByteBuffer
,所以任何大于batchSize
的请求都是非法的。 - 尝试从池中获取:调用
free.poll()
尝试从空闲队列的头部取出一个ByteBuffer
。 - 池空则新建:如果
poll()
返回null
,说明空闲队列是空的。此时,它会直接创建一个新的ByteBuffer
(ByteBuffer.allocate(batchSize)
)。这是它“无界”特性的体现。同时,numAllocatedBatches
计数器加一,表示池管理的总缓冲区数量增加了。 - 返回缓冲区:返回获取到的(无论是复用的还是新建的)
ByteBuffer
。 - 释放锁:在
finally
块中确保锁被释放。
关键点:此方法永远不会返回 null
(除非JVM本身OOM),它总能满足分配请求。
内存释放 release
此方法负责回收用完的 ByteBuffer
。
// ... existing code ...@Overridepublic void release(ByteBuffer previouslyAllocated) {lock.lock();try {previouslyAllocated.clear();if (previouslyAllocated.capacity() != batchSize) {throw new IllegalArgumentException("Released buffer with unexpected size "+ previouslyAllocated.capacity());}// Free the buffer if the number of pooled buffers is already the maximum number of batches.// Otherwise return the buffer to the memory pool.if (free.size() >= maxRetainedBatches) {numAllocatedBatches--;} else {free.offer(previouslyAllocated);}} finally {lock.unlock();}}
// ... existing code ...
- 加锁:同样,先获取锁。
- 重置与校验:调用
previouslyAllocated.clear()
重置缓冲区的position
和limit
,为下一次使用做准备。然后校验归还的缓冲区容量是否等于batchSize
。 - 决定是否池化:
- 检查当前空闲队列
free
的大小是否已经达到maxRetainedBatches
的上限。 - 如果已达上限:不再将此
ByteBuffer
放入free
队列。它只是将numAllocatedBatches
减一,然后这个ByteBuffer
对象在方法结束后就没有任何引用了,会被JVM垃圾回收。 - 如果未达上限:调用
free.offer()
将这个ByteBuffer
加入到空闲队列的尾部,以备后续复用。
- 检查当前空闲队列
- 释放锁:在
finally
块中释放锁。
MemoryPool
接口的其他实现
size()
: 返回numAllocatedBatches * batchSize
,即当前池管理的所有缓冲区(无论空闲还是在用)的总内存大小。availableMemory()
: 恒定返回Long.MAX_VALUE
。这明确地告诉调用者,这个池子在逻辑上永远有可用内存,分配请求不会因为容量限制而被阻塞或拒绝。isOutOfMemory()
: 恒定返回false
。与availableMemory()
的行为一致。
总结与对比
BatchMemoryPool
是一个为特定场景优化的内存池,主要用在 Kafka Raft 内部的 BatchAccumulator
中,用于缓存待发送的日志批次。
与 SimpleMemoryPool
的对比:
特性 | SimpleMemoryPool | BatchMemoryPool |
---|---|---|
核心机制 | 基于计数的内存限制器 | 基于队列的 ByteBuffer 对象池 |
内存限制 | 有严格的、可配置的总内存上限 (sizeBytes ) | 逻辑上无界(总能分配),但池化的空闲对象数量有上限 |
分配行为 | 内存不足时返回 null | 总是成功分配(池中取或新建),不返回 null |
对象复用 | 不复用 ByteBuffer 对象,仅增减计数 | 复用 ByteBuffer 对象以减少 GC |
线程安全 | CAS 原子操作 (无锁) | ReentrantLock (高并发有锁,低并发AQS相当于CAS) |
适用场景 | 对总内存使用有严格控制的场景,如网络请求队列 | 需要频繁分配和释放固定大小缓冲区的场景,以减少 GC 开销 |
总而言之,BatchMemoryPool
通过复用固定大小的 ByteBuffer
,有效地减少了在处理 Raft 日志批次时因频繁创建和销毁对象而带来的GC压力,是典型的以空间换时间(减少GC时间)的优化策略。
GarbageCollectedMemoryPool
首先,从类的声明 public class GarbageCollectedMemoryPool extends SimpleMemoryPool implements AutoCloseable
我们可以得到两个关键信息:
- 它继承自
SimpleMemoryPool
,这意味着它拥有SimpleMemoryPool
所有的基础功能,即一个带限额的内存分配器。 - 它实现了
AutoCloseable
接口,这意味着它管理着需要显式关闭的资源,并且可以被用在try-with-resources
语句中。
正如类注释所说,这个类是 SimpleMemoryPool
的一个扩展,其主要目的是追踪已分配的缓冲区,并在它们“泄漏”(即在没有被 release()
的情况下被垃圾回收)时记录错误日志。注释也明确指出:这是一个用于开发和调试的辅助工具,不应该在生产环境中使用。
下面我们来逐一解析它的实现细节。
利用弱引用和引用队列检测内存泄漏
GarbageCollectedMemoryPool
的核心是利用了 Java 的弱引用(WeakReference
)和引用队列(ReferenceQueue
)机制来监控 ByteBuffer
对象是否被垃圾回收。
// ... existing code ...
public class GarbageCollectedMemoryPool extends SimpleMemoryPool implements AutoCloseable {private final ReferenceQueue<ByteBuffer> garbageCollectedBuffers = new ReferenceQueue<>();//serves 2 purposes - 1st it maintains the ref objects reachable (which is a requirement for them//to ever be enqueued), 2nd keeps some (small) metadata for every buffer allocatedprivate final Map<BufferReference, BufferMetadata> buffersInFlight = new ConcurrentHashMap<>();private final Thread gcListenerThread;private volatile boolean alive = true;
// ... existing code ...
garbageCollectedBuffers
: 一个ReferenceQueue
。当一个被弱引用关联的对象被GC回收时,这个弱引用对象本身会被放入这个队列中。buffersInFlight
: 一个ConcurrentHashMap
,用于存储所有“在途”(已分配但未释放)的缓冲区信息。- Key是
BufferReference
,一个自定义的WeakReference
子类。 - Value是
BufferMetadata
,一个简单的内部类,只记录了缓冲区的大小。 这个 Map 有两个作用:1) 保持BufferReference
对象本身是强可达的,这样它们才有机会在关联的ByteBuffer
被回收时进入引用队列;2) 存储每个已分配缓冲区的元数据。
- Key是
gcListenerThread
: 一个后台守护线程,专门负责监听garbageCollectedBuffers
队列。alive
: 一个volatile
布尔值,用于控制后台线程的生命周期。
构造函数与后台线程
// ... existing code ...public GarbageCollectedMemoryPool(long sizeBytes, int maxSingleAllocationSize, boolean strict, Sensor oomPeriodSensor) {super(sizeBytes, maxSingleAllocationSize, strict, oomPeriodSensor);GarbageCollectionListener gcListener = new GarbageCollectionListener();this.gcListenerThread = new Thread(gcListener, "memory pool GC listener");this.gcListenerThread.setDaemon(true); //so we dont need to worry about shutdownthis.gcListenerThread.start();}
// ... existing code ...
构造函数在调用父类 SimpleMemoryPool
的构造函数之后,创建并启动了一个名为 memory pool GC listener
的后台线程。这个线程被设置为守护线程 (setDaemon(true)
),这意味着如果JVM中只剩下守护线程,JVM就会退出,我们不需要手动管理它的关闭(尽管它也提供了 close
方法)。
重写父类方法以实现追踪
GarbageCollectedMemoryPool
通过重写父类的两个 protected
钩子方法 bufferToBeReturned
和 bufferToBeReleased
来注入其追踪逻辑。
bufferToBeReturned(ByteBuffer justAllocated)
当一个缓冲区被分配后,在返回给调用者之前,这个方法会被调用。
// ... existing code ...@Overrideprotected void bufferToBeReturned(ByteBuffer justAllocated) {BufferReference ref = new BufferReference(justAllocated, garbageCollectedBuffers);BufferMetadata metadata = new BufferMetadata(justAllocated.capacity());if (buffersInFlight.put(ref, metadata) != null)//this is a bug. it means either 2 different co-existing buffers got//the same identity or we failed to register a released/GC'ed bufferthrow new IllegalStateException("allocated buffer identity " + ref.hashCode + " already registered as in use?!");log.trace("allocated buffer of size {} and identity {}", sizeBytes, ref.hashCode);}
// ... existing code ...
- 创建一个
BufferReference
(弱引用),将新分配的justAllocated
缓冲区与garbageCollectedBuffers
引用队列关联起来。 - 创建一个
BufferMetadata
来存储缓冲区的大小。 - 将这对
(ref, metadata)
存入buffersInFlight
这个 Map 中,进行“登记”。如果put
方法返回了非null
值,说明发生了逻辑错误,抛出异常。
bufferToBeReleased(ByteBuffer justReleased)
当调用者调用 release()
方法归还缓冲区时,在父类将内存加回 availableMemory
之前,这个方法会被调用。
// ... existing code ...@Overrideprotected void bufferToBeReleased(ByteBuffer justReleased) {BufferReference ref = new BufferReference(justReleased); //used ro lookup onlyBufferMetadata metadata = buffersInFlight.remove(ref);if (metadata == null)//its impossible for the buffer to have already been GC'ed (because we have a hard ref to it//in the function arg) so this means either a double free or not our buffer.throw new IllegalArgumentException("returned buffer " + ref.hashCode + " was never allocated by this pool");if (metadata.sizeBytes != justReleased.capacity()) {//this is a bugthrow new IllegalStateException("buffer " + ref.hashCode + " has capacity " + justReleased.capacity() + " but recorded as " + metadata.sizeBytes);}log.trace("released buffer of size {} and identity {}", metadata.sizeBytes, ref.hashCode);}
// ... existing code ...
- 创建一个临时的
BufferReference
,仅用于在 Map 中进行查找。 - 从
buffersInFlight
中移除对应的条目,完成“注销”。 - 进行严格的检查:
- 如果
remove
返回null
,说明这个缓冲区要么不是从这个池子分配的,要么被重复释放了(double free),抛出异常。 - 检查记录的元数据中的大小是否与归还的缓冲区容量一致,不一致则说明有 Bug,抛出异常。
- 如果
后台监听与泄漏检测
GarbageCollectionListener
是实现泄漏检测的核心。
// ... existing code ...private class GarbageCollectionListener implements Runnable {@Overridepublic void run() {while (alive) {try {BufferReference ref = (BufferReference) garbageCollectedBuffers.remove(); //blocksref.clear();// ...BufferMetadata metadata = buffersInFlight.remove(ref);if (metadata == null) {// ...continue;}availableMemory.addAndGet(metadata.sizeBytes);log.error("Reclaimed buffer of size {} and identity {} that was not properly release()ed. This is a bug.", metadata.sizeBytes, ref.hashCode);} catch (InterruptedException e) {// ...}}log.info("GC listener shutting down");}}
// ... existing code ...
- 线程在一个
while(alive)
循环中运行。 - 核心操作是
garbageCollectedBuffers.remove()
,这是一个阻塞方法。它会一直等待,直到有一个与弱引用关联的ByteBuffer
对象被GC回收,然后该弱引用对象BufferReference
会被放入队列,remove()
方法返回这个BufferReference
。 - 一旦获取到一个被GC的
ref
,它会尝试从buffersInFlight
中移除这个ref
。 - 关键判断:
- 如果
metadata
不为null
,这意味着:一个缓冲区在没有被正常release()
(release
会把它从buffersInFlight
中移除)的情况下,就被GC回收了。这就是内存泄漏! - 此时,程序会:
- 手动将这部分内存加回到
availableMemory
中,防止这部分内存永远“丢失”。 - 打印一条
ERROR
级别的日志,报告发生了内存泄漏,并指明泄漏的缓冲区大小和标识。
- 手动将这部分内存加回到
- 如果
metadata
为null
,这通常是正常情况(缓冲区被release
后,过了一段时间才被GC),但注释中也提到了一种罕见情况:由于ConcurrentHashMap
的懒清理机制,可能一个被正常release
的缓冲区的引用在被移除后,仍然被GC线程捕捉到并入队。这种情况直接continue
忽略。
- 如果
- 当
close()
方法被调用,alive
变为false
,gcListenerThread
被中断,循环退出,线程结束。
总结
GarbageCollectedMemoryPool
是一个非常巧妙的调试工具。它通过继承 SimpleMemoryPool
复用了内存额度控制的逻辑,然后通过重写钩子方法和结合Java的引用机制,增加了一层“内存泄漏”的监控。
- 正常流程:
tryAllocate()
->bufferToBeReturned
(登记) -> 使用 ->release()
->bufferToBeReleased
(注销) ->ByteBuffer
失去引用 -> 被GC。 - 泄漏流程:
tryAllocate()
->bufferToBeReturned
(登记) -> 使用 -> 忘记调用release()
->ByteBuffer
失去引用 -> 被GC ->BufferReference
进入队列 -> 后台线程检测到,并从buffersInFlight
中找到了登记信息 -> 报告错误。