【高并发内存池】四、中心缓存的设计
文章目录
- Ⅰ. `central cache`中心缓存的结构设计
- Ⅱ. 切分小内存后的页数类型
- Ⅲ. `span`结构的实现
- Ⅳ. `central cache`的实现
- 1、主体框架
- 2、完善`thread cache`中的 `fetch_from_CentralCache()` 函数
- 3、实现 **`give_memory_to_ThreadCache()`** 函数

Ⅰ. central cache
中心缓存的结构设计
central cache
其实也是一个哈希桶结构,它的哈希桶与对象大小的映射关系跟 thread cache
是一样的,如下图所示:
不同的是它的 每个哈希桶位置挂是一个 SpanList
带头节点的双向链表结构(注意这里是双向链表,不是单向链表),其中每个节点都是一个 span
结构,它是一块大内存块,会根据对应的映射关系切成了一个个小内存块对象!
之所以 central cache
中的哈希桶采用的是 待头节点的双向链表,而不是像 thread cache
中的哈希桶那样是单向链表,是因为 central cache
需要协调好不同线程缓存的内存块调度问题,那么就会经常有插入和删除操作,此时我们用双向链表的结构的需求就大大增加了!
如下图所示:(不太准确,应该是和上图一样的对齐数才对,但是这里将就看就行)
如上图所示,假设一个 span
的大小是 8KB
的话,如果当前哈希桶的映射关系是 8B
的话,那么就可以切分为 1024
个小内存块,可以看出,如果对齐数越小的话,切分的内存块就会越多!
它是取内存块规则是这样子的:先从第一个 span
结构中找是否有空闲的内存块,有的话直接就取出使用,没有的话则遍历第二个 span
结构继续判断,以此类推……如果最后还是没找到可用的内存块的话,再去下一层 page cache
中申请内存块!
这就引出一个问题,不同线程来同一个 central cache
中申请内存块,势必涉及到加锁问题,而这里采用的方案是 桶锁!只有 不同线程访问同一个哈希桶的时候对该哈希桶进行加锁,而不是对整个缓存进行加锁,这能很好的提高效率!
有人可能会问,上面的图片不对劲啊,为什么对齐数为
8B
的哈希桶中最后一个的内存块都被取走了,不是说从头开始取内存块的吗❓❓❓ 上面图片是没错的,这个问题也不错,但是忽略了一个点,因为当前可能线程正在执行,而
central cache
在分配出去空间的时候,也会拿回空间,此时有可能拿回来的空间就插在了前面,所以就出现了上图的情况!
申请内存的过程:
- 当
thread cache
中没有内存时,就会向central cache
申请批量的内存对象,这里的批量获取对象的数量使用了类似网络tcp
协议中拥塞控制的慢开始算法,不是一下子获取一大批,而是递进式的获取批量对象。central cache
从对应的span
中取出对象给thread cache
,这个过程进行加锁,不过这里使用的是一个桶锁,尽可能提高效率! - 当
central cache
映射的spanlist
中所有span
结构都没有可用内存块以后,则需要向page cache
申请一个新的span
对象,当申请成功以后就将span
管理的内存按大小切好作为空闲链表链接到一起,然后从span
中取对象给thread cache
。 span
结构中有个use_count
变量用于记录当前分配了多少个对象出去,当分配一个对象出去,就让use_count++
。
释放内存的过程:
- 当
thread cache
内存过多或者线程销毁了,则会将内存释放回central cache
中的,释放回来时进行use_count--
操作,表示回收了一个对象。当use_count
减到0
时则表示所有对象都回到了span
,则将span
结构还给page cache
,在page cache
中会对前后相邻的空闲页进行合并,减少内存碎片!
Ⅱ. 切分小内存后的页数类型
在设计 span
结构之前,我们得先解决一个问题:假设我们设置一个 span
也就是一个页的大小为 8KB
,那么在 32
位环境之下,此时内存大小为 2^32
,那么可以切出 2^19
个页;而在 64
位环境之下,此时内存大小为 2^64
,那么可以切出 2^51
个页,可以看到两个环境之下可以切分的页数差距是很大的!
此时我们就要根据不同的环境给出不同的数据类型来定义页数大小,防止溢出,所以我们 用 size_t
类型来表示 32
位环境下可以切分出来的页数,而 用 unsigned long long
表示 64
位环境下可以切分出来的页数,此时就需要使用 条件编译,如下所示:
#ifdef _WIN64using page_t = unsigned long long;
#elif _WIN32using page_t = size_t;
#endif
可能有人问,为什么
_WIN64
要写在前面呢❓❓❓ 这是因为
_WIN32
表示的是当前为32
位环境下,而不是64
位环境;但是_WIN64
被设置的时候既可以是32
位环境,也可以是64
位环境!如果我们让_WIN32
放在前面判断的话,此时无论是否为64
位环境,都只会走_WIN32
的判断语句,但是这和我们预期不符,所以得先判断_WIN64
才对!
Ⅲ. span
结构的实现
根据上面的设计,可以给出 span
的结构体如下所示:
// 管理以页为单位的大内存块
struct Span
{page_t _pid = 0; // 大块内存起始页的页号size_t _num = 0; // 页的个数Span* _next = nullptr; // 双向链表结构Span* _prev = nullptr;size_t _use_count = 0; // 当前分配给ThreadCache对象的小内存块个数void* _freelist = nullptr; // 当前大内存块对应的空闲链表
};
接着就是要将这些 span
连起来变成一个带头结点的双向链表进行管理,然后提供插入和删除的操作,此外因为涉及到多线程环境,所以要有一个互斥锁来进行保护,最后该结构体的定义如下所示:
// 管理Span结构的带头双向链表
class SpanList
{
private:Span* _head; // 头节点std::mutex _mtx; // 互斥锁,在central cache中用于桶锁保护
public:SpanList(){// 构造函数进行头节点的初始化,让其指向自己_head = new Span;_head->_next = _head;_head->_prev = _head;}// 将node插入到pos位置前(node来自于page cache)void insert(Span* pos, Span* node){assert(pos != nullptr && node != nullptr);Span* prev = pos->_prev;prev->_next = node;node->_prev = prev;node->_next = pos;pos->_prev = node;}// 将pos处的节点从链表中删掉void erase(Span* node){assert(node != nullptr && node != _head);Span* prev = node->_prev;Span* next = node->_next;prev->_next = next;next->_prev = prev;}// 方便后面获取锁来加锁std::mutex& get_mutex() { return _mtx; }
};
Ⅳ. central cache
的实现
1、主体框架
首先和线程缓存不一样的是,中心缓存只有一个,所以我们要保证在程序生命周期中只有一个中心缓存,所以要使用 单例模式!这里采用饿汉和懒汉其实都可以,这里我们就 采用饿汉模式 进行设计!
下面给出 central cache
的主体框架:
// CentralCache.h文件
#pragma once
#include "Common.h"// 单例模式:饿汉方式
class CentralCache
{
private:SpanList _spanlists[FREELIST_NUMS];static CentralCache _central_instance; // 该类的单例对象(需要在cpp文件中定义)
public:// 获取单例对象static CentralCache* get_instance(){return &_central_instance;}// 从central cache中获取一定数量的对象给thread cache// 参数:// start和end分别是返回内存块的起始位置地址,为输出型参数// num是根据慢开始启动算法得到的要申请的内存块个数// size为线程缓存申请的内存块空间的总大小// index为线程缓存当前申请的内存块空间对应的哈希桶下标// 返回值:// 实际从central cache中拿到的内存块个数size_t give_memory_to_ThreadCache(void*& start, void*& end, size_t num, size_t size, size_t index);// 获取一个非空的span对象Span* get_span(SpanList& list, size_t size);
private:// 将构造函数私有化,将拷贝构造和赋值重载封掉CentralCache() {}CentralCache(const CentralCache&) = delete;CentralCache& operator=(const CentralCache&) = delete;
};
在讲 give_memory_to_ThreadCache()
函数之前,我们得先来完善一下 thread cache
之前剩下的一个 fetch_from_CentralCache()
函数,然后 get_span()
函数需要等我们最后了解 page cache
的结构之后再来实现!
2、完善thread cache
中的 fetch_from_CentralCache()
函数
之前因为我们还没接触到中心缓存的结构,现在了解了之后我们就能完善一下前面的线程缓存中的从中心缓存获取内存块的函数了!这个函数的大概实现思路是这样子的:
- 采用慢开始反馈调节算法获取实际申请的内存数量。
- 通过
CentralCache
实例获取实际拿到的内存块的个数,以及首尾地址。 - 根据实际拿到的内存块是否有多申请的内存块:
- 如果只有一个内存块,则直接返回给用户就行。
- 注意这里第一个内存块是要返回给用户使用的,但是因为申请的内存块有多个,所以剩下的内存块要链接到
thread cache
的哈希桶上备用!
下面我们根据该步骤来解释一下:
这里我们获从中心缓存中获取内存块是 一次性获取多个内存块,这样子可以减少线程缓存对中心缓存的频繁申请,顺带的就减少了桶锁的开销,提高了效率!但是这里的一次性获取多个内存块也是有讲究的,如果说我们一次性给太多的话,那么该线程不一定会全部用到,那就浪费了;而如果一次性给的太少的话,那么该线程又会频繁的来调用该函数进行内存申请,所以我们这里采用的获取策略是 慢开始反馈调节算法 进行获取!(这和 TCP
中的慢启动策略是很类似的)
此时我们就得添加一些辅助函数和变量来帮助我们完成该策略的实施!我们需要有当前 FreeList
也就是空闲链表申请空间块时允许获得的最大个数,所以我们需要在 FreeList
类中添加一个变量 _maxsize
,将其初始化为 1
,然后 后续该空闲链表在不断地向中心缓存申请内存块的时候,该 _maxsize
就会递增,而具体如何递增是有多种策略的,可自主尝试!
然后我们再搞个接口来获取这个 _maxsize
即可,如下所示:(只需要看注释部分,这是添加的内容!)
// Common.h文件
class FreeList
{
private:void* _freelist = nullptr;size_t _maxsize = 1; // 表示当前空闲链表申请空间块时允许获得的最大个数(会不断递增)
public:void push(void* obj);void* pop();bool empty();// 获取_maxsize的接口size_t& get_maxsize() { return _maxsize; }
};
有了这个 _maxsize
还不够,我们需要有一个函数来控制我们获取内存块的一个大概范围的极限值,这是因为如果申请的 size
越大,那么一次向中心缓存要的内存块个数就越少;而如果申请的 size
越小,那么一次向中心缓存要的内存块就越多,但是我们要注意的就是不能让它们都给的太多或者太少,所以就需要有一个 get_upper_memory()
函数来控制线程缓存向中心缓存获取内存块的一个大概范围的极限值。
我们将其定义在之前的 AlignClass
类中,如下所示:(同样也是只需要看注释部分!)
// Common.h文件
// 管理对象大小的对齐与映射规则
class AlignClass
{
public:static inline size_t get_align(size_t size);static inline size_t get_index(size_t size);// 根据size返回thread cache从central cache中一次获取多少内存块static inline size_t get_upper_memory(size_t size){assert(size > 0);// 小对象一次获取批量的上限高,控制在512个对象// 大对象一次获取批量的上限低,控制在2个对象// 也就是说控制获取的对象在[2, 512]区间内size_t num = THREAD_MAX_SIZE / size;if (num < 2) num = 2;else if (num > 512) num = 512;return num;}
private:static inline size_t align(size_t bytes, size_t alignNum);static inline size_t index(size_t bytes, size_t alignShift);
};
有了这两个机制之后,我们就能完成慢开始反馈调节了,我们只需要让该申请大小的极限值,与当前空闲链表允许允许获得的最大内存块个数取最小值,得到的就是实际申请的内存块个数,然后判断如果还没到申请大小的极限值的话,则让其申请内存块的数量递增,这里采用的是加二,这样子以后该空闲链表来申请内存块的时候就会多申请两个内存块了!
接着通过 CentralCache
实例获取到实际拿到的内存块的个数,以及首尾地址,这是通过其定义的 give_memory_to_ThreadCache()
函数来实现的,这个我们后面会实现!
至于这里为什么要获取的是内存块的首尾地址,是因为我们获取到的内存块可能不只是一个,而是多个,所以要知道内存块的首尾位置才行,而返回值就是实际拿到的内存块个数,通过判断它来进行不同结果的返回!
如果只有一个内存块的话,则直接返回给用户就行;而如果有多个内存块的话,我们要将第一个内存块分配给用户使用,然后将剩下的插入到对应的空闲链表中,所以我们这里还得在 FreeList
中实现一个 append()
接口,用于插入多个内存块,如下所示:(同样也是只看注释部分即可!)
// Common.h
class FreeList
{
private:void* _freelist = nullptr; size_t _maxsize = 1;
public:void push(void* obj);void* pop();bool empty();size_t& get_maxsize();// 将多个内存对象头插插入空闲链表void append(void* start, void* end){assert(start != nullptr && end != nullptr);get_next(end) = _freelist;_freelist = start;}};
有了这些基础,我们就能实现完整的 fetch_from_CentralCache()
函数了,如下所示:
// ThreadCache.cpp文件
void* ThreadCache::fetch_from_CentralCache(size_t index, size_t size)
{// 1. 采用慢开始反馈调节算法获取实际申请的内存数量/*原理:让该申请大小的极限值,与当前空闲链表允许允许获得的最大内存块个数取最小值,得到的就是实际申请的内存块个数其作用如下:1、最开始不会一次向central cache一次批量要太多,因为要太多了可能用不完2、如果你不要这个size大小内存需求,那么num就会不断增长,直到上限3、size越大,一次向central cache要的num就越小4、size越小,一次向central cache要的num就越大*/size_t& maxsize = _freelists[index].get_maxsize();size_t upper_size = AlignClass::get_upper_memory(size);size_t num = min(maxsize, upper_size);if (num == maxsize) // 如果还不是到申请大小的极限值的话,则让其申请内存块的数量递增maxsize += 1;// 2. 通过CentralCache实例获取实际拿到的内存块的个数,以及首尾地址void* start = nullptr;void* end = nullptr;size_t actual_num = CentralCache::get_instance()->give_memory_to_ThreadCache(start, end, num, size, index);// 3. 根据实际拿到的内存块是否有多申请的内存块if (actual_num == 1){// 如果只有一个内存块,则直接返回给用户就行assert("start == end");return start;}else{/* 注意这里第一个内存块是要返回给用户使用的,但是因为申请的内存块有多个,所以剩下的内存块要链接到thread cache的哈希桶上备用!*/_freelists[index].append(get_next(start), end);return start;}
}
3、实现 give_memory_to_ThreadCache()
函数
这个函数的任务就是从中心缓存中获取一个非空的 span
对象然后将要分配给 ThreadCache
的那一部分内存块进行首尾定位后从 span
对象中取出,其实就是一个链表删除操作,然后将实际拿到的内存块的个数返回给 ThreadCache
即可!
因为这个过程可能有多个线程来申请,所以 需要加锁!
然后我们把获取一个非空的 span
对象的任务交给另一个函数 get_span()
,这是因为有可能中心缓存当前的 span
对象中空闲链表都为空了,那么就得向下一层的 page cache
进行内存申请,但是目前我们还没了解 page cache
的结构,所以我们将这个函数放到后面再去实现!
下面是该函数的实现,细节参考代码:
// CentralCache.cpp文件
// 从central cache中获取一定数量的对象给thread cache
size_t CentralCache::give_memory_to_ThreadCache(void*& start, void*& end, size_t num, size_t size, size_t index)
{// 1. 因为可能多个线程都来获取,所以需要加锁,并且这里是桶锁,所以效率不会很低!_spanlists[index].get_mutex().lock();// 2. 调用函数获取一个span对象(函数内部会将span对象都插入到spanlists中)Span* span = get_span(_spanlists[index], size);assert(span != nullptr && span->_freelist != nullptr);// 3. 在span对象中定位要获取的内存块的首尾地址(因为有可能不足num个小内存块,所以用actual_num记录实际拿到的内存块)size_t actual_num = 1;start = span->_freelist;end = start;int i = 0;while (i < num - 1 && get_next(end) != nullptr){// 因为我们需要将span中的小内存块取走,就需要考虑小内存块个数不足num的情况,就需要判断一下是否下个位置为空// 并且我们每次只走num-1步就停下来,这样子方便下面取走小内存块的操作,因为freelist是单向链表!end = get_next(end);++i;++actual_num;}// 4. 根据首尾地址将内存块从span对象中取出,就是链表删除操作!span->_freelist = get_next(end);get_next(end) = nullptr;// 5. 将使用数量增加实际取到的个数span->_use_count += actual_num;// 6. 别忘了释放锁_spanlists[index].get_mutex().unlock();return actual_num;
}