当前位置: 首页 > ops >正文

Utils系列之内存池(MultiSizePool)

MultiSizePool

MultiSizePool 意思是说我们会分配多个不同大小的chunk,来满足不同size的分配的要求。就像下面的图一样, chunk1~chunk5是不同大小size的chunk,每个chunk下面都有一个block链表来存储数据,同样我们会把空闲的block 放到freelist 中去。

在这里插入图片描述

为了增加性能,我们同样也开辟了了线程缓存,每个线程都有一份自己的缓存,每次取的时候都会先从ThreadCache中先取空闲的block,如果没有了我们再从对应大小的chunk中的freelist 来分配。

Chunk的数组

定义了一个chunk的不同size的数组,可以看到size从8 byte到2k byte 大小不等,这些为小对象的size, 对于超过这些size的,我们直接分配对应的大小(大对象)。 这里的每个size都是一个chunk, 每个chunk都存储了若干个block;block 会预先放到freelist 里面。这里会记录一个block data的大小(block_size), 还有总的大小(block 结构体 + data size);我们会使用 ALIGNMENT 对齐,这里我们是根据C++标准定义’std::max_align_t’,我们这里是16. block_count 的大小我们就可以计算出来啦,CHUNK_SIZE 我们预定义为64k。

我们再看下FreeBlock结构体的定义,定义了一个指向next的指针和size的大小。这里会把存储的数据区放到结构体的后面。

内存结构: struct FreeBlock + data

所以这里有两个函数用于从结构体的首地址指向data 数据区的指针; 和从数据区转到指向结构体地址的指针。

static constexpr std::array<size_t, 16> SIZE_CALSSES = {  //max 2k8, 16, 24, 32, 48, 64, 96, 128, 192, 256, 384, 512, 768, 1024, 1536, 2048
};struct ChunkClass {std::atomic<FreeBlock*> free_list;std::atomic<size_t> allocated_count;std::atomic<size_t> deallocated_count;size_t block_size;  // 每个block data sizesize_t total_block_size; // 一个block + data的大小size_t block_count; // 每个chunk的block数量ChunkClass() = default;explicit ChunkClass(size_t size):free_list(nullptr),block_size(size),total_block_size(align_of(sizeof(FreeBlock) + size, ALIGNMENT)),block_count(CHUNK_SIZE / total_block_size){}
};
// 内存结构: FreeBlock + data
struct FreeBlock {size_t size;std::atomic<FreeBlock*> next;FreeBlock(size_t s):size(s), next(nullptr) {}void* data() { // 指向data 数据return reinterpret_cast<std::byte*>(this) + sizeof(FreeBlock);}static FreeBlock* from_data(void* ptr) { // 从data 数据获取FreeBlock指针return reinterpret_cast<FreeBlock*>(reinterpret_cast<std::byte*>(ptr) - sizeof(FreeBlock));}
};

MultiSizeThreadCache 是一个高效的线程本地缓存实现,它在无锁内存池中扮演着至关重要的角色。这个结构通过减少线程间的竞争,显著提升内存分配和释放的性能。

  • 线程局部存储: 使用thread_local关键字确保每个线程拥有自己独立的缓存实例
  • 分层缓存结构: 为每种内存块大小(SIZE_CLASSES)维护独立的缓存队列
  • 高效访问: 通过数组实现O(1)时间复杂度的块访问

这里我们定义了一个ClassCache来缓存freelist的空闲block,每个chunk size都有一个大小32的block的缓存, count是记录还剩多少的缓存数量。

内存回收机制

当线程终止时,析构函数负责将缓存的所有内存块归还到全局池:

这个过程包含三个关键步骤:

  1. 链表构建: 将离散的块连接成一个链表,准备批量归还
  2. 原子交换: 使用compare_exchange_weak确保线程安全地将链表插入全局链表头部
  3. 计数重置: 归还完成后将缓存计数归零

通过这种线程本地缓存设计,内存池实现了高吞吐量和低延迟的内存分配,特别适合高并发环境下的性能关键型应用。

struct MultiSizeThreadCache {struct ClassCache {FreeBlock* blocks[32]; // 每个大小类缓存块int count;  // 当前缓存的数量};ClassCache caches[SIZE_CALSSES.size()];LockFreeMultiSizePool* pool_ptr; // 指向全局内存池~MultiSizeThreadCache() {if (!pool_ptr) return; // 遍历所有大小类for (size_t index = 0; index < SIZE_CALSSES.size(); ++index) {auto& class_cache = caches[index];if (class_cache.count == 0) continue;// 将块连接成链表for (int i = 0; i < class_cache.count - 1; ++i) {class_cache.blocks[i]->next.store(class_cache.blocks[i + 1], std::memory_order_relaxed);}auto& chunk_class = pool_ptr->chunk_classes[index];FreeBlock* old_head = chunk_class.free_list.load(std::memory_order_relaxed);FreeBlock* cache_head = class_cache.blocks[0];FreeBlock* cache_tail = class_cache.blocks[class_cache.count - 1];// 归还到全局链表do {cache_tail->next.store(old_head, std::memory_order_relaxed);} while (!chunk_class.free_list.compare_exchange_weak(old_head, cache_head, std::memory_order_release, std::memory_order_relaxed));class_cache.count = 0;}}};std::array<ChunkClass, SIZE_CALSSES.size()> chunk_classes;
LockFreeStack<std::unique_ptr<std::byte[]>> allocated_chunks;static constexpr size_t ALIGNMENT = alignof(std::max_align_t);
static constexpr size_t CHUNK_SIZE = 64 * 1024; // 64kstatic inline thread_local MultiSizeThreadCache thread_cache;

初始化

我们在构造的时候就把每个chunk对象的size分配好,这样我们在调用allocate的函数的时候就不需要分配了。注意这里的new (&chunk_classes[i]) ChunkClass(SIZE_CALSSES[i]); 不是分配操作,而是placement new 操作符,在已分配的内存地址上构造对象,而不进行新的分配。

LockFreeMultiSizePool::LockFreeMultiSizePool() {for (size_t i = 0; i < SIZE_CALSSES.size(); ++i) {new (&chunk_classes[i]) ChunkClass(SIZE_CALSSES[i]);}thread_cache.pool_ptr = this;
}

填充缓存

在我们调用allocate分配函数之前,先熟悉下填充缓存的操作。主要的操作有:

  • 检查索引的有效性和缓存是否已有足够内存块
  • 计算批量获取的数量(缓存容量的一半)
  • 尝试从全局空闲列表中获取一串连续的内存块,如果全局空闲列表为空,则分配新的内存块(调用allocate_chunk_for_size_class
  • 使用无锁方式更新全局空闲列表头部
  • 将获取的内存块放入线程本地缓存
bool LockFreeMultiSizePool::fill_class_cache(size_t index) {if (index >= SIZE_CALSSES.size()) return false;ChunkClass& chunk_class = chunk_classes[index];auto& class_cache = thread_cache.caches[index];// 已经有缓存,不需要填充if (class_cache.count > 0) return true;int BATCH_SIZE = std::size(class_cache.blocks) / 2;// 尝试从全局链表获取多个块FreeBlock* old_head = chunk_class.free_list.load(std::memory_order_acquire);if (!old_head) {allocate_chunk_for_size_class(index);old_head = chunk_class.free_list.load(std::memory_order_acquire);if (!old_head) return false;}//获取一串块FreeBlock* current = old_head;FreeBlock* new_head = nullptr;int count = 0;for (int i = 0; i < BATCH_SIZE - 1 && current->next.load(std::memory_order_relaxed); ++i) {current = current->next.load(std::memory_order_relaxed);++count;}new_head = current->next.load(std::memory_order_relaxed);// 更新链表头if(!chunk_class.free_list.compare_exchange_strong(old_head, new_head, std::memory_order_release, std::memory_order_relaxed)) {return false;}//将获取的块放入本地缓存current = old_head;while (current != new_head) {class_cache.blocks[class_cache.count++] = current;current = current->next.load(std::memory_order_relaxed);}return true;
}void LockFreeMultiSizePool::allocate_chunk_for_size_class(size_t index) {if (index >= SIZE_CALSSES.size()) return; // 分配大对象ChunkClass& chunk_class = chunk_classes[index];size_t block_count = chunk_class.block_count;if (block_count == 0) block_count = 1;size_t actual_chunk_size = chunk_class.total_block_size * block_count;auto chunk = std::make_unique<std::byte[]>(actual_chunk_size); // 分配一个chunkstd::byte* ptr = chunk.get();FreeBlock* first_block = nullptr;FreeBlock* prev_block = nullptr;FreeBlock* block  = nullptr;// 将chunk 添加到对应大小的free_list 里面for (size_t i = 0; i < block_count; ++i) {// 获取每个blockblock = new(ptr) FreeBlock(chunk_class.block_size);ptr += chunk_class.total_block_size;if (i == 0) {first_block = block;}if (prev_block) {prev_block->next.store(block, std::memory_order_relaxed);}prev_block = block;}// 将block 添加到free_listFreeBlock* old_block = chunk_class.free_list.load(std::memory_order_relaxed);do {block->next.store(old_block, std::memory_order_relaxed);} while(!chunk_class.free_list.compare_exchange_weak(old_block, first_block,  std::memory_order_release, std::memory_order_relaxed));allocated_chunks.push(std::move(chunk)); // 将chunk 添加到allocated_chunks 管理
}size_t LockFreeMultiSizePool::get_size_class_index(size_t size) {// 二分查找到对应的indexsize_t left = 0;size_t right = SIZE_CALSSES.size() - 1;while (left <= right) {size_t mid = left + (right - left) / 2;if (SIZE_CALSSES[mid] < size)left = mid + 1;else if (SIZE_CALSSES[mid] > size && mid > 0)right = mid - 1;elsereturn mid;}return (left < SIZE_CALSSES.size()) ? left : SIZE_CALSSES.size();
}

allocate 操作

这个函数是内存池的核心操作,我们通过下面步骤:

  1. 大小对齐与分类:首先将请求的内存大小按照指定对齐值(ALIGNMENT)进行对齐,然后确定对应的大小类别索引。
  2. 分配策略层次
    • 首先尝试从线程本地缓存获取内存块
    • 如果本地缓存为空,尝试批量填充本地缓存
    • 如果仍然无法获取,从全局共享的无锁空闲列表获取
    • 最后,如果空闲列表为空,则分配新的内存块
  3. 原子操作保证:使用无锁算法(CAS操作)确保在多线程环境中安全分配内存。
void* LockFreeMultiSizePool::allocate(size_t size) {if (size == 0) return nullptr;thread_cache.pool_ptr = this; // 设置当前线程的内存池实例size_t aligned_size = align_of(size, ALIGNMENT);size_t index = get_size_class_index(aligned_size);if (index >= SIZE_CALSSES.size()) {  // 分配大对象void* ptr = std::aligned_alloc(ALIGNMENT, aligned_size);return ptr;}ChunkClass& chunk_class = chunk_classes[index];FreeBlock* old_block = chunk_class.free_list.load(std::memory_order_acquire);FreeBlock* block = nullptr;// 尝试从本地缓存分配auto& class_cache = thread_cache.caches[index];if (class_cache.count > 0) {FreeBlock* block = class_cache.blocks[--class_cache.count];chunk_class.allocated_count.fetch_add(1, std::memory_order_relaxed);return block->data(); // 返回数据指针}// 尝试批量获取块到本地缓存if (fill_class_cache(index)) {FreeBlock* block = class_cache.blocks[--class_cache.count];chunk_class.allocated_count.fetch_add(1, std::memory_order_relaxed);return block->data(); // 返回数据指针}// 尝试从空闲列表获取while(old_block) {if(chunk_class.free_list.compare_exchange_weak(old_block, old_block->next.load(std::memory_order_relaxed))) {block = old_block;break;}}if (!block) {allocate_chunk_for_size_class(index);old_block = chunk_class.free_list.load(std::memory_order_acquire);// 再次尝试从空闲列表获取while(old_block) {if(chunk_class.free_list.compare_exchange_weak(old_block, old_block->next.load(std::memory_order_relaxed))) {block = old_block;break;}}}if(block) {chunk_class.allocated_count.fetch_add(1, std::memory_order_relaxed);return block->data(); // 返回数据指针}return nullptr;
}

deallocate 操作

deallocate 其实和allocate是一对相反的操作。主要把我们的分配的内存指针放回到cache里面中,如果是大对象,我们直接会释放。对于小对象,我们这里做了一个策略,如果检测到缓存已经满了,我们会先把缓存的空间归还到全局链表中,然后再放回到缓存,这样一次操作大大提高后面的释放的效率。

void LockFreeMultiSizePool::deallocate(void* ptr, size_t size) {if (ptr == nullptr) return;thread_cache.pool_ptr = this; // 设置当前线程的内存池实例size_t align_size = align_of(size, ALIGNMENT);size_t index = get_size_class_index(align_size);if (index >= SIZE_CALSSES.size()) {std::free(ptr);  // 大对象直接释放return;}ChunkClass& chunk_class = chunk_classes[index];FreeBlock* block_ptr = FreeBlock::from_data(ptr);  // 获取block// 尝试先放入本地缓存auto& class_cache = thread_cache.caches[index];if (class_cache.count < static_cast<int>(sizeof(class_cache.blocks) / sizeof(class_cache.blocks[0]))) {class_cache.blocks[class_cache.count++] = block_ptr;} else {// 本地缓存已满,将一半缓存归还全局链表int half_count = class_cache.count / 2;// 构建链表for (int i = 0; i < half_count - 1; ++i) {class_cache.blocks[i]->next.store(class_cache.blocks[i + 1], std::memory_order_relaxed);}// 将新块添加到链表尾部class_cache.blocks[half_count - 1]->next.store(block_ptr, std::memory_order_relaxed);// 归还给全局链表FreeBlock* old_block = chunk_class.free_list.load(std::memory_order_relaxed);do {block_ptr->next.store(old_block, std::memory_order_relaxed);} while(!chunk_class.free_list.compare_exchange_weak(old_block, class_cache.blocks[0], std::memory_order_release, std::memory_order_relaxed));// 压缩剩余缓存 更新class_cachefor (int i = 0; i < class_cache.count - half_count; ++i) {class_cache.blocks[i] = class_cache.blocks[i + half_count];}class_cache.count -= half_count;}chunk_class.deallocated_count.fetch_add(1, std::memory_order_relaxed);
}

最后我们还封装了一个类来简单使用内存池,用户只需要调用allocatedeallocate 就好。

class MemoryPoolAllocater {
private:LockFreeMultiSizePool pool;public:void* allocate(size_t size) {return pool.allocate(size);}void deallocate(void* ptr, size_t size) {pool.deallocate(ptr, size);}template<typename T, typename... Args>T* create(Args&&... args) {void* ptr = allocate(sizeof(T));if (ptr) {return new (ptr) T(std::forward<Args>(args)...);}return nullptr;}template<typename T>void destroy(T* ptr) {if (ptr) {ptr->~T();deallocate(ptr, sizeof(T));}}void print_stats() const {pool.print_stats();}void reset_global_state() {LockFreeMultiSizePool::reset_global_state();}
};

详细代码见: MultiSizePool

http://www.xdnf.cn/news/15274.html

相关文章:

  • 电商系统未来三年趋势:体验升级、技术赋能与模式重构
  • 关于ISO 26262的Single-Point Fault/Residual Fault/Latent Fault/Dual-Point Fault的整理
  • Android 响应式编程完整指南:StateFlow、SharedFlow、LiveData 详解
  • Docker 基于 Cgroups 实现资源限制详解【实战+源码】
  • CAU数据挖掘第四章 分类问题
  • Linux修炼:开发工具
  • 软件开发中的瀑布式开发与敏捷开发
  • 2025湖北省信息安全管理与评估赛项一阶段技能书
  • 在 JetBrains 系列 IDE(如 IntelliJ IDEA、PyCharm 等)中如何新建一个 PlantUML 文件
  • 新手向:使用Python构建高效的日志处理系统
  • Llama系列:Llama1, Llama2,Llama3内容概述
  • Web攻防-PHP反序列化魔术方法触发条件POP链构造变量属性修改黑白盒角度
  • Python爬虫实战:研究xlwings库相关技术
  • Qt 3D模块加载复杂模型
  • CA复习功课
  • 前端进阶之路-从传统前端到VUE-JS(第五期-路由应用)
  • react中为啥使用剪头函数
  • 【Java入门到精通】(三)Java基础语法(下)
  • 博途多重背景、参数实例--(二)
  • 多线程的区别和联系
  • 子数组最大平均数 I
  • Leetcode力扣解题记录--第3题(滑动窗口)
  • WildCard野卡已跑路(包含gpt plus升级方案)
  • 程序改错---字符串
  • 【notes】注意力和KV Cache
  • 检查输入有效性(指针是否为NULL)和检查字符串长度是否为0
  • 阻有形,容无声——STA 签核之RC Corner
  • 加法器学习
  • docker搭建 与镜像加速器
  • scrapy项目开发流程