从零实现一个高并发内存池 - 4
上一篇https://blog.csdn.net/Small_entreprene/article/details/147962446?fromshare=blogdetail&sharetype=blogdetail&sharerId=147962446&sharerefer=PC&sharesource=Small_entreprene&sharefrom=from_link
高并发内存池 - page cache
上一篇说到:和central cache一样,thread cache没有对象了,就需要向下一层central cache申请,central cache没有对象了,也是需要向下一层,也就是page cache,进行对象的申请!这是这三层的最后一层了!!!
我们看图,其实page cache的本质也是一个哈希桶的!!!我们下面会详细的说明对象申请的过程,以及和上一层的central cache的哈希桶有什么区别!
一、申请内存
(一)基本检查与分裂操作
当 central cache
向 page cache
申请内存时,page cache
首先会检查申请位置对应的 span
是否存在。如果不存在,page cache
将在其后方寻找一个足够大的 span
。例如,当申请 4 页的内存时,若在第 4 页的位置没有挂载任何 span
,page cache
将向后寻找更大的 span
。假设在第 10 页的位置找到了一个 10 页大小的 span
,那么这个 10 页大小的 span
将会被分裂为两个新的 span
,一个 4 页大小的 span
用于满足此次申请,另一个 6 页大小的 span
则继续保留在自由链表中等待后续的申请。
(二)系统级申请与再次尝试
如果在 _spanList[128]
中都未能找到合适的 span
,page cache
将会向操作系统申请内存。具体来说,它会调用 mmap
、brk
或者在 Windows 系统下使用 VirtualAlloc
等系统调用,申请一个 128 页大小的 span
并将其挂在自由链表中。完成这一步后,page cache
会再次重复第一步的操作,尝试在这个新申请的 span
中找到合适的内存分配方案。
(三)核心结构的区别
需要注意的是,central cache
和 page cache
的核心结构虽然都是基于 spanlist
的哈希桶,但它们在设计和功能上存在本质区别。central cache
中的哈希桶是按照与 thread cache
相同的大小对齐关系映射的。例如,在 thread cache
中,内存块的大小可能被划分成固定的一些大小(如 64 字节、128 字节等),central cache
的 spanlist
中挂载的 span
中的内存也会按照这种映射关系被切成小块,并且这些小块内存会通过自由链表链接起来,以便在需要时快速分配给 thread cache
。而 page cache
中的 spanlist
则是按下标桶号映射的(直接定址法)。也就是说,第 i 号桶中挂载的 span
所表示的内存块大小正好是 i 页。例如,第 1 号桶的 span
表示 1 页大小的内存块,第 10 号桶的 span
表示 10 页大小的内存块,这种映射方式使得 page cache
在处理大块内存分配时更加高效,能够快速定位到合适的 span
。(page cache的对象是不切的!因为这是针对对central cache的span服务的)(thread cache找central cache ,central cache就去找span,如果对应的桶位没有空闲的span,central cache就会去找下一层page cache ,如果上面thread cache要的单个对象小,比如说是8字节的桶(central cache对应的),那么central cache就要一页或两页的(看页的大小是4KB还是8KB)就行了,如果要的是256K的,那么要的对应的span就是很多个页的!也就是central cache去找page cache的时候,关注的是我要的是几页的span!!!,我们通过直接定址法就可以高效的找到对应的哈希桶的位置)
我们应该注意的是,如果thread cache没有对象的话,就会像下一层central cache去申请对象,发现central cache当中也没有对象,那么就会往下一层page cache去申请对象,所以说page cache这一层也是需要进行加锁的,还是按照上一层ccentral cache的桶锁的方式吗?这是不行的!这样后面要实现的分裂和合并有关,这里就不能使用桶锁了,需要使用使用一个全局的锁来操作!
不可以使用桶锁因为,假设我们已经到了去page cache去申请一个两页的Span,但是对应的哈希桶的位置下已经没有了对应的两页的Span了,这时候只能是向堆申请了,不过向堆申请的时候,又不可以说是每一次都申请2页,肯定是一次就像申请一大块空间,这样就可以保证内存空间的连续性,然后切成对应的两页一个Span再挂入到对应的哈希桶的位置下,这样当要合并的时候还可以保证先将这原来堆中申请的一个连续的大块内存的完整合并!!!我要一个连续很多个页的内存,连续的就涉及我切小了他还回来后还可以合并成大的,如果反复要一个2页的Span就去找堆拿,这就不一定连续,所以其实整体的逻辑是:2页的Span没有,就往下去找,比2页大的,比如说3页的,可以将这3页的Span切成一个2页的和1页的,2页的就使用,一页的挂到对应的1页的Span的哈希桶下。
更好理解的就是:最开始的时候,page cache对应的哈希桶下是一个都没有挂Span对象的!然后假设上面到下面来,目前需要2页的Span,然后这时候可不是直接向堆申请2页的内存空间,而是往下面的哈希桶下找更大的且存在的大于2页的Span,准备进行切分,这就是极端了,后面的一个Span一个都没有,现在只能,也是直接向系统(堆)申请一个128页的连续内存空间,构成一个128页的Span,挂到对应的位置,现在就可以将两页的Span切出来用,剩下的126页的Span挂到对应的126页的哈希桶下!
如果central cache中的Span useCount等于0了,说明切分给thread cache的小块内存都回来了,则central cache把这个Span还给page cache ,page cache通过查页号,查看前后相邻页是否空闲,是的话就合并,合并出更大的页,解决内存碎片问题。
如果我们使用桶锁,假设两个线程同时获取到相同的 Span
,并且同时在对其进行切分操作(假如一个是本来要3页的,一个是要2页的),这时候就可能出现问题。由于两个线程都持有相同的锁,它们可能会同时对同一个 Span
进行切分,导致数据不一致和内存管理混乱。(其实桶锁也行,就是往下找的时候要不断的加锁解锁,这消耗也是很大的)此外,在申请内存的过程中,如果找不到合适的 Span
,需要向系统申请大块内存并切分,这个过程涉及到多个哈希桶的操作。如果使用桶锁,每个桶的锁只能保护该桶内的操作,但在切分和合并过程中,可能会跨多个桶,这会导致锁的竞争和复杂的锁管理问题。例如,当一个线程正在将一个大 Span
切分成小 Span
并挂载到不同的哈希桶时,其他线程可能同时尝试访问这些桶,从而引发锁冲突和潜在的数据不一致问题。因此,桶锁机制在这种复杂的内存分配和释放场景下,并不能有效地保证线程安全和数据一致性,反而会增加系统的复杂性和出错的概率。
还有一个需要理解的:我们既然central cache要使用桶锁,那么如果满足一定条件,要往下一层走,需不需要解锁???
其实是解了比较好的,如果走到下一层,桶锁解了,那么其他线程也就可以进来了,不过page cache不是用来一把大锁锁住了吗,不是一样的只能容许一个线程访问page cache吗?是的,但是我们不要忘了,我们除了申请,我们还需要释放的,如果不解相应的桶锁,直接去找page cache带来的问题就是:释放的时候需要返还Span内存,释放的时候就会堵住,所以说最好解了!我们在central cache的逻辑中进行解锁!!!
thread cache都是独立的,都去找唯一的central cache ,central cache又去找唯一的page cache,所以page cache和central cache一样要设计成单例模式!!!
我们的代码实现会在注释部分详细说明细节,请好好看代码,理清楚代码的调用逻辑!!!
二、释放内存
当 central cache
释放一个 span
回到 page cache
时,会进行一系列的合并操作以减少内存碎片。具体来说,page cache
会依次检查该 span
前后的 page id
,寻找那些没有在使用的空闲 span
。如果发现可以合并,page cache
将会将这些连续的空闲 span
合并成一个更大的 span
,并且继续向前寻找是否有更多的空闲 span
可以进一步合并。通过这种方式,原本被切割成小块的内存能够被重新组合成较大的 span
,从而减少内存碎片化,提高内存的利用率,使得在后续的内存申请时,page cache
能够更容易地找到合适大小的 span
来满足需求。
以上就是 page cache
在申请和释放内存时的详细过程。这种设计既保证了内存分配的高效性,又能有效减少内存碎片,对于高并发场景下的内存管理尤为重要,能够显著提升系统的性能和稳定性。
同样的,我们代码只是完成了申请的流程,对于归还“释放”内存对象,我们现在就已经可以进行调试代码,走通申请流程了,后面我们就可以完成归还操作了!!!
代码部分
common.h
#pragma once
#include<iostream>
#include<vector>
#include<time.h>
#include<assert.h>
#include<thread>
#include<mutex>
#include<algorithm>#ifdef _WIN32
#include<windows.h>
#else
//...
#endif//方便,不使用using namespace std;是因为防止污染
using std::cout;
using std::endl;static const size_t MAX_BYTES = 256 * 1024;
static const size_t NFREELIST = 208;//前两层的哈希桶的数量
static const size_t NPAGES = 129;//pagecache层的哈希桶的数量//数量是128,但是为了对应映射关系,因为数组的下标是从0开始的
static const size_t PAGE_SHIFT = 13;//假设一页是8K,这是进行页的转化的参数#ifdef _WIN64typedef unsigned long long PAGE_ID;
#elif _WIN32typedef size_t PAGE_ID;
#else//Linux
#endif// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else// linux下brk mmap等
#endifif (ptr == nullptr)throw std::bad_alloc();return ptr;
}static void*& NextObj(void* obj)
{return *(void**)obj;
}// 管理切分好的小对象的自由链表
class FreeList
{
public:void Push(void* obj){assert(obj);// 头插//*(void**)obj = _freeList;NextObj(obj) = _freeList;_freeList = obj;}//支持给一个范围,插入到自由链表---头插void PushRange(void* start, void* end){NextObj(end) = _freeList;_freeList = start;}void* Pop(){assert(_freeList);// 头删void* obj = _freeList;_freeList = NextObj(obj);return obj;}bool Empty(){return _freeList == nullptr;}size_t& MaxSize(){return _maxSize;}private:void* _freeList = nullptr;//这个要写,因为我们没有写构造size_t _maxSize = 1;//慢调节算法的要点之一
};// 计算对象大小的对其映射规则
class SizeClass
{
public:// 提供函数来计算给一个字节数,对应到正确的桶位置;// 规则如下:// 整体控制在最多10%左右的内碎片浪费!!!// ***************************************************************************************************// ***************************************************************************************************// [1,128] 8byte对齐 freelist[0,16) 这个没办法>^<// [128+1,1024] 16byte对齐 freelist[16,72) 15/(129+15)=0.10....// [1024+1,8*1024] 128byte对齐 freelist[72,128) 127/(1025+127)=0.11....// [8*1024+1,64*1024] 1024byte对齐 freelist[128,184) //......// [64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208) //......// ***************************************************************************************************// ***************************************************************************************************//相当于RoundUp的子函数:给定当前size大小和对应规则的对齐数AlignNum,用于处理//static inline size_t _RoundUp(size_t size, size_t alignNum)//{// size_t alignSize;// if (size % alignNum != 0)// {// alignSize = (size / alignNum + 1) * alignNum;// }// else//等于0就不需要处理了,已经对齐了// {// alignSize = alignNum;// }//}//上面的是我们普通人玩出来的,下面我们来看看高手是怎么玩的!static inline size_t _RoundUp(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}//你给我一个size,就需要算出对其以后是多少 --- 比如说:8->8 7->8static inline size_t RoundUp(size_t size){if (size <= 128){return _RoundUp(size, 8);}else if (size <= 1024){return _RoundUp(size, 16);}else if (size <= 8 * 1024){return _RoundUp(size, 128);}else if (size <= 64 * 1024){return _RoundUp(size, 1024);}else if (size <= 256 * 1024){return _RoundUp(size, 8 * 1024);}else{//说明大于256KB了,那么就有点问题了assert(false);return -1;}}//一样的,我们来自己写写看,待会换成别人的刚好的方式//size_t _Index(size_t bytes, size_t alignNum)//{// if (bytes % alignNum == 0)// {// return bytes / alignNum - 1;// }// else// {// return bytes / alignNum;// }//}//高手的想法:static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}// 计算映射的哪一个自由链表桶static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);// 每个区间有多少个链static int group_array[4] = { 16, 56, 56, 56 };if (bytes <= 128) {return _Index(bytes, 3);}else if (bytes <= 1024) {return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024) {return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024) {return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];}else if (bytes <= 256 * 1024) {return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];}else {assert(false);}return -1;}// 一次thread cache从中心缓存获取多少个static size_t NumMoveSize(size_t size){assert(size > 0);// [2, 512],一次批量移动多少个对象的(慢启动)上限值// 小对象一次批量上限高// 小对象一次批量上限低int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;}// 计算一次向系统获取几个页// 单个对象 8byte// ...// 单个对象 256KBstatic size_t NumMovePage(size_t size){size_t num = NumMoveSize(size);size_t npage = num * size;npage >>= PAGE_SHIFT;if (npage == 0)npage = 1;return npage;}
};//管理多个连续页的大块内存跨度的结构
struct Span
{PAGE_ID _pageId = 0;//页号:大块内存的起始页的页号size_t _n = 0;//页数//双向链表Span* _next = nullptr;Span* _prev = nullptr;size_t _useCount = 0;//切好的小块内存,被分配给thread cache的计数void* _freeList = nullptr;//用于管理切好的小块内存的自由列表
};//带头双向循环链表
class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}//这里我们没必要实现一个迭代器,我们利用带头双向循环链表的特性:Span* Begin(){return _head->_next;}Span* End(){return _head;}bool Empty(){return _head->_next == _head;}//插入Span:头插void PushFront(Span* span){Insert(Begin(), span);}//弹出一个SpanSpan* PopFront(){Span* front = _head->_next;Erase(front);return front;}void Insert(Span* pos, Span* newSpan){assert(pos);assert(newSpan);Span* prev = pos->_prev;//prev newSpan posprev->_next = newSpan;newSpan->_prev = prev;newSpan->_next = pos;pos->_prev = newSpan;}void Erase(Span* pos){assert(pos);assert(pos != _head);Span* prev = pos->_prev;Span* next = pos->_next;prev->_next = next;next->_prev = prev;}private:Span* _head;
public:std::mutex _mtx;//桶锁
};
CentralCache.cpp
#include"CentralCache.h"
#include"PageCache.h"CentralCache CentralCache::_sInst;//获取一个非空的span
Span* CentralCache::GetOneSpan(SpanList& list, size_t size)
{//需要遍历我们的SpanList,如果有就给,没有就需要向下一层了Span* it = list.Begin();while (it != list.End()){if (it->_freeList != nullptr){//不为空,说明下面挂着有对象return it;}else{//it++;我们没有封装迭代器,不要使用++,但是也是很简单的it = it->_next;}}//先把central cache的桶锁解掉,如果其他线程释放内存对象回来,不会阻塞 --- 提高效率list._mtx.unlock();//走到这说明已经没有空闲的Span了,只能找下一层page cache要了PageCache::GetInstance()->_pageMtx.lock();Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));PageCache::GetInstance()->_pageMtx.unlock();//这里加锁和解锁的锁是同一个锁,即 PageCache 单例对象的 _pageMtx 锁,所以不存在解锁解的不是上次加锁的的情况。//因为 PageCache::GetInstance() 返回的是同一个 PageCache 类的实例,所以 _pageMtx 是同一个互斥锁对象。//锁匹配的本质:锁匹配的核心是加锁和解锁操作必须作用于同一个锁对象。只要满足这一点,加锁和解锁就是匹配的。//单例模式的作用:单例模式在这里的作用是确保 _pageMtx 的唯一性,即每次调用 PageCache::GetInstance() 都返回同一个 PageCache 对象,从而保证 _pageMtx 是同一个锁对象。//因果关系:单例模式是通过确保锁对象的唯一性,间接保证了加锁和解锁操作的匹配性。但锁匹配的直接原因是加锁和解锁操作作用于同一个锁对象。//对获取的Span进行切分是不需要加锁的:因为其他线程已经拿不到这个Span了//我们可以通过页号,计算页的起始地址:页号<<PAGE_SHIFT(0*8K,1*8K,.......}char* start = (char*)(span->_pageId << PAGE_SHIFT);//算到起始地址//要插入到指定桶下,但是要先切size_t bytes = span->_n << PAGE_SHIFT;//计算大块内存的字节数大小//切!!!将大块内存切成自由链表链接起来char* end = start + bytes;//切好了进行尾插比较好:因为这样就会分割的地址是连续链接的//1.先切一块下来去做头,方便尾插span->_freeList = start;start += size;void* tail = span->_freeList;while (start < end){NextObj(tail) = start;tail = NextObj(tail);//tail = start;start += size;}//恢复加锁:切好之后,需要将Span挂到桶里面去的时候,再加锁list._mtx.lock();list.PushFront(span);return span;
}//重中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size)//拿batchNum个size大小的对象,前两个为输出型参数
{size_t index = SizeClass::Index(size);//获取对应的哈希桶的位置//因为多个线程可能同时竞争同一把桶锁,我们需要加锁_spanLists[index]._mtx.lock();Span* span = GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList);//从span中获取batchNum个对象//如果不够batchNum个,有多少取多少start = span->_freeList;end = start;size_t i = 0;size_t actualNum = 1;//注意!!!这个和我们的下面的end的走的逻辑要对应上,不可以取成0!(既然申请到了非空的Span,那么该Span下也就起码有一个对象)//这里我们需要注意我们真的可以申请这么多吗,这个非空的Span真的有这么多吗while (i < batchNum - 1 && NextObj(end) != nullptr){end = NextObj(end);++i;++actualNum;}span->_freeList = NextObj(end);NextObj(end) = nullptr;span->_useCount += actualNum;_spanLists[index]._mtx.unlock();//加锁就需要解锁,那么这时候也应该考虑死锁的问题了return actualNum;
}
PageCache.h
#pragma once
#include"Common.h"//注意这里的哈希桶的映射关系不是上两层的规则了,但是是更简单的直接定址法!!!
class PageCache
{
public:static PageCache* GetInstance(){return &_sInst;}//获取一个k页的SpanSpan* NewSpan(size_t k);std::mutex _pageMtx;//全局的锁,锁住整体
private:SpanList _spanLists[NPAGES];//128个桶,每个桶下挂的是Span!!!PageCache(){}PageCache(const PageCache&) = delete;static PageCache _sInst;//单例模式 --- 饿汉模式
};
PageCache.cpp
#include"PageCache.h"PageCache PageCache::_sInst;//获取一个k页的Span
Span* PageCache::NewSpan(size_t k)
{assert(k > 0 && k < NPAGES);//先检查第k个桶里面有没有Spanif (!_spanLists[k].Empty()){//有就弹一个出去使用return _spanLists->PopFront();}//说明第k个桶式空的,需要往下检查后面的桶里面有没有Span,如果有,可以进行切分使用for (size_t i = k + 1; i < NPAGES; ++i){if (!_spanLists[i].Empty()){//开切:拿下-切割-分配->利用+挂起(k,n-k)//k页的Span返回给CentralCache//n-k页的Span挂到n-k的桶的位置下Span* nSpan = _spanLists[i].PopFront();Span* kSpan = new Span;//在nSpan的头部切一个k页下来kSpan->_pageId = nSpan->_pageId;kSpan->_n = k;nSpan->_pageId += k;nSpan->_n -= k;_spanLists[nSpan->_n].PushFront(nSpan);//挂起return kSpan;}}//走到这个位置就说明后面没有大页的Span了//这时候就需要去找堆要一个128KB的Span了Span* bigSpan = new Span;void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_n = NPAGES - 1;_spanLists[bigSpan->_n].PushFront(bigSpan);//到这还需要像上面一样进行切割分配,我们可以使用挂起+递归一次就很明智的解决了//不过我们是需要加锁的,一些普通的锁就不行了,因为递归,会产生死锁,我们可以使用递归锁!!!recursive_mutex//我们也可以分理出来一个_NewSpan来解决//递归调用自己慢了一点,但是计算机太快了,所以这个完全不是事,我们也当然可以选择一次就直接进行切割分配//自我认为复用更加重要return NewSpan(k);
}