高并发内存池|六、page cache的设计
六、page cache的设计
1. page cache的结构
page cache 也是一个哈希桶结构,但它的映射结构与前两个 cache 不同。它的每一个桶是容量从 1 到 128 页大小的内存块,桶中的每个内存块由 SpanList 管理。page cache 的内存由其向系统申请所得,我们在向系统申请内存时,不会每个桶单独申请对应大小的内存。而是一次申请 128 页的内存,再由 128 页拆分成不同大小的页,交给对应大小的页的 SpanList 管理。
2. 设计细节
2.1 单例模式
central cache 不需要让线程独有,所以我们就设计成只允许存在一个 page cache 即可。
class PageCache
{
public:static PageCache* GetInstance(){return &_instance;}private:PageCache(){}PageCache(const PageCache&) = delete;public:std::mutex _pageMtx;
};
2.2 基数树
在 page cache 中使用全局锁,还是会影响程序的性能。于是在 page cache 的内部,要引入基数树,这个数据结构使得在 page cache 内部,不需要使用锁就能保证线程安全,以此来提升线程并发的性能。
因为基数树在写之前会提前开好空间,在写数据的过程中,不会动树的结构。同时其读写是分离的,一个线程对一个位置读写时,另一个线程不可能对这个位置进行读写。
在 page cache 的外部,还是需要使用全局锁来保证线程安全。
2.3 页号映射管理
在高并发内存池中,central cache 向 page cahce 申请内存时,会以页为单位获得一整块 span,然后将其切分为多个小对象。在对象释放时,为了将其归还给正确的 span,必须实现 “向谁借的,就还给谁” 的原则。
为了实现这一点,page cache 中需要使用一个哈希表来管理内存的页号映射,用于根据对象所在地址快速找到它所归属的 span:
std::unordered_map<PAGE_ID, Span*> _idSpanMap;
这个映射的逻辑很简单,因为我们规定,高并发内存池中的一页的长度是 8KB
,而来自同页的内存的地址的差距必定不会超过 8KB。所以,用内存的地址除以 8KB,如果两个内存来自同页,那么它们经过此运算后所得的值必然相同,这也是页的页号的设定方法:
PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;
3. page cache的内存管理
page cache 是直接向内存申请内存的,为了避免频繁地向系统申请,我们规定每次 page cache 都申请 128 页大小的内存块。当 central cache 向 page cache 申请内存时,再对 page cahce 中的大页内存块进行拆分。如,central cahce 向 page cache 申请 4 页的内存块,此时 page cache 内只有一个 128 页的内存块,page cache 就会将这个 128 页的内存块拆分成一个 4 页和一个 120 页的内存块,将 4 页的内存块分给 central cahce,120 页的内存块映射到存放大小为 120 页内存块的哈希桶中。
4. 代码实现
PageCache.h:
#pragma once
#include "Common.h"
#include <unordered_map>
#include "ObjectPool.h"
#include "PageMap.h"class PageCache
{
public:static PageCache* GetInstance(){return &_instance;}Span* NewSpan(size_t k);Span* MapObjectToSpan(void* obj);void ReleaseSpanToPageCache(Span* span);private:SpanList _spanLists[NPAGES];PageCache(){}PageCache(const PageCache&) = delete;public:std::mutex _pageMtx;
private:static PageCache _instance;//std::unordered_map<PAGE_ID, Span*> _idSpanMap;TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;ObjectPool<Span> _spanPool;
};
PageCache.cpp:
#include "PageCache.h"PageCache PageCache::_instance;Span* PageCache::NewSpan(size_t k)
{assert(k > 0);if (k > NPAGES - 1){void* ptr = SystemAlloc(k);Span* span = _spanPool.New();span->_pageID = (PAGE_ID)ptr >> PAGE_SHIFT;span->_pageNum = k;//_idSpanMap[span->_pageID] = span;_idSpanMap.set(span->_pageID, span);return span;}if (!_spanLists[k].Empty()){//return _spanLists[k].PopFront();Span* kspan = _spanLists[k].PopFront();for (PAGE_ID i = 0; i < kspan->_pageNum; i++){//_idSpanMap[kspan->_pageID + i] = kspan;_idSpanMap.set(kspan->_pageID + i, kspan);}return kspan;}for (size_t i = k + 1; i < NPAGES; i++){if (!_spanLists[i].Empty()){Span* nSpan = _spanLists[i].PopFront();Span* kSpan = _spanPool.New();kSpan->_pageID = nSpan->_pageID;kSpan->_pageNum = k;nSpan->_pageID += k;nSpan->_pageNum -= k;_spanLists[nSpan->_pageNum].PushFront(nSpan);//nSpan可能会继续再分,所以先映射它的头尾页//_idSpanMap[nSpan->_pageID] = nSpan;_idSpanMap.set(nSpan->_pageID, nSpan);//_idSpanMap[nSpan->_pageID + nSpan->_pageNum - 1] = nSpan;_idSpanMap.set(nSpan->_pageID + nSpan->_pageNum - 1, nSpan);for (PAGE_ID i = 0; i < kSpan->_pageNum; i++){//kSpan不会再分,所有对应pageID都映射到kSpan//_idSpanMap[kSpan->_pageID + i] = kSpan;_idSpanMap.set(kSpan->_pageID + i, kSpan);}return kSpan;}}//没有大页span,向堆要一个128页的spanSpan* bigSpan = _spanPool.New();void* ptr = SystemAlloc(NPAGES - 1);bigSpan->_pageID = (PAGE_ID)ptr >> PAGE_SHIFT;bigSpan->_pageNum = NPAGES - 1;_spanLists[bigSpan->_pageNum].PushFront(bigSpan);return NewSpan(k);
}Span* PageCache::MapObjectToSpan(void* obj)
{PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;//std::unique_lock<std::mutex> lock(_pageMtx);//auto ret = _idSpanMap.find(id);//if (ret != _idSpanMap.end())//{// return ret->second;//}//else//{// assert(false);// return nullptr;//}auto ret = (Span*)_idSpanMap.get(id);assert(ret != nullptr);return ret;}void PageCache::ReleaseSpanToPageCache(Span* span)
{if (span->_pageNum > NPAGES - 1){void* ptr = (void*)(span->_pageID << PAGE_SHIFT);SystemFree(ptr);_spanPool.Delete(span);return;}//span合并前页while (1){PAGE_ID prevID = span->_pageID - 1;//auto ret = _idSpanMap.find(prevID);auto ret = (Span*)_idSpanMap.get(prevID);//如果prevID是不存在的页号,就不合并Span* prevSpan = ret;if (ret == nullptr){break;}//如果prevID是正在使用的页号,就不合并if (prevSpan->_isUse == true){break;}//如果合并后的span超过128页,就不合并if (prevSpan->_pageNum + span->_pageNum > NPAGES - 1){break;}//span合并prevSpanspan->_pageID = prevSpan->_pageID;span->_pageNum += prevSpan->_pageNum;_spanLists[prevSpan->_pageNum].Erase(prevSpan);_spanPool.Delete(prevSpan);}//span合并后页while (1){PAGE_ID nextID = span->_pageID + span->_pageNum;auto ret = (Span*)_idSpanMap.get(nextID);//如果nextID是不存在的页号,就不合并if (ret == nullptr){break;}//如果nextID是正在使用的页号,就不合并Span* nextSpan = ret;if (nextSpan ->_isUse == true){break;}//如果合并后的span超过128页,就不合并if (nextSpan->_pageNum + span->_pageNum > NPAGES - 1){break;}//span合并nextSpanspan->_pageNum += nextSpan->_pageNum;_spanLists[nextSpan->_pageNum].Erase(nextSpan);_spanPool.Delete(nextSpan);}_spanLists[span->_pageNum].PushFront(span);span->_isUse = false;_idSpanMap.set(span->_pageID, span);_idSpanMap.set(span->_pageID + span->_pageNum - 1, span);
}