1. 项目介绍
该项目的原型是Google的开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代内存分配函数malloc和free
我们将tcmalloc的核心框架提取出来,模拟实现一个简单版本的高并发内存池,目的是学习tcmalloc的精华
主要的技术栈:
- 数据结构:链表、哈希桶
- C/C++:基础语法
- 操作系统:内存管理、多线程
- 设计模式:单例模式
2. 池化技术
所谓池化技术,就是一次向系统申请过量的资源,没用完的自己管理起来,以备不时之需
之所以要一次申请过量的资源,是因为向系统申请资源是有很大的开销的,频繁的向系统申请资源会降低系统的效率
如果一次申请过量的资源,下次再要时,就不需要向系统申请了,从而提高整个系统的性能
内存池、进程池、线程池、连接池等都使用了池化技术
内存池
所谓内存池,就是程序预先向操作系统申请一块大量的内存,当需要申请内存时,从内存池中获取,需要释放内存时,向内存池中释放;当程序结束时,内存池中的内存才会归还给操作系统
内存池的作用:
- 提升了系统整体的效率
- 减少系统的内存外碎片问题
上述图中,两块内存已经释放,但不连续的状况,称为内存外碎片问题
malloc
平常想从堆上获取内存,调用malloc函数
实际上,malloc底层也是采用内存池的方式,也就是说,调用malloc并不是直接从堆上获取内存,也是从malloc中的内存池获取内存
既然malloc也是采用内存池实现的,为啥我们还要重新设计自己的内存池?
malloc它适用于大部分场景,既然是通用的,就代表它的性能不会太高;接下来我们设计的内存池针对不同的场景有不同的内存池
3. 定长内存池
- 预先向系统申请一块大内存,挂在memory下
- 用户要申请内存时,向定长内存池获取:首先查看freelist是否为空,不为空直接从freelist取下内存块返回,否则从memory中切下内存返回给用户
- 用户要释放内存时,定长内存池将释放的内存块挂在freelist下,以链表的方式组织起来
- freelist中每个内存块的前4/8字节指向下一个内存块
#ifdef _WIN32#include <windows.h>
#elif// linux下的头文件
#endifvoid* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#elif// linux下申请内存的系统调用
#endifif (ptr == nullptr) throw std::bad_alloc();return ptr;
}// 返回当前内存块的下一个内存块
void*& NextObj(void* ptr)
{return *(void**)ptr;
}template<class T>
class ObjectPool
{
public:T* New(){T* obj = nullptr;if (_freeList){void* next = NextObj(_freeList);obj = (T*)_freeList;_freeList = next;}else{// 一个内存块必须能容纳一个指针size_t objsize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);if (_remain < objsize){_remain = 128 * 1024;_memory = (char*)SystemAlloc(_remain >> 13);}obj = (T*)_memory;_memory += objsize;_remain -= objsize;}// 定位new,显示调用T的构造函数new(obj)T;return obj;}void Delete(T* ptr){// 显示调用T的析构函数ptr->~T();NextObj(ptr) = _freeList;_freeList = ptr;}private:char* _memory = nullptr;size_t _remain = 0;void* _freeList = nullptr;
};
4. 高并发内存池整体框架
大部分的应用场景都是多线程的,必然存在锁竞争的问题;tcmalloc在多线程并发场景下比malloc要略胜一筹,因此,我们模拟实现的tcmalloc主要考虑的问题有:
- 多线程锁竞争
- 性能
- 内存碎片
高并发内存池主要由3个部分构成
ThreadCache
- 每个线程独享一个ThreadCache,因此ThreadCache不需要加锁
- ThreadCache中有多个freelist,每个freelist下挂着不同大小的内存块,用户申请内存最先从这里获取,释放内存也向这里释放,挂在指定的freelist下
- 当某个freelist下内存块的个数超过某个值,将该freelist下的所有内存块归还给CentralCache
CentralCache
- 当ThreadCache不能满足用户的要求,ThreadCache按需向CentralCache获取内存块
- CentralCache在合适的时候回收ThreadCache中的对象,避免一个线程占了太多内存,达到内存分配在多个线程中更均衡的按需调度的目的
- 所有线程共享一个CentralCache,因此需要加锁
PageCache
- 该模块的每个对象是以页为单位的大块内存
- 当CentralCache分配出去的小块内存都回收时,PageCache会回收CentralCache中的对象,同时跟自身的对象合并成更大的页,缓解内存碎片的问题
- 多个线程共享一个PageCache,因此需要加锁
5. 申请内存的逻辑
ThreadCache
ThreadCache是一个哈希桶的结构,每个桶对应不同大小的内存块
申请内存时,先将要申请的空间大小对齐到某个数,再根据这个对齐数计算映射到哪个桶,再去对应的桶中取出内存块
对齐数与映射到的桶之间的关系:
字节数 | 对齐数 | 映射到的桶 |
---|---|---|
[1, 128] | 8 | [0, 15] |
[128 + 1, 1024] | 16 | [16, 71] |
[1024 + 1, 8 * 1024] | 128 | [72, 127] |
[8 * 1024 + 1, 64 * 1024] | 1024 | [128, 183] |
[64 * 1024 + 1, 256 * 1024] | 8 * 1024 | [184, 207] |
能向ThreadCache申请的内存最大为256KB,总共有208个桶
这样做会有一定的内存浪费,也叫做内碎片问题,比如要申请129字节,但对齐到144字节,有15字节浪费了
总体的浪费率保持在10%左右
如何根据字节数算出它对齐到的数以及映射到哪个桶?
// Communal.hclass SizeClass
{inline static size_t _AlignUp(size_t size, size_t align){if (size % align == 0) return size;return (size / align + 1) * align;}// 计算对齐到的数static size_t AlignUp(size_t size){if (size <= 128) return _AlignUp(size, 8);else if (size <= 1024) return _AlignUp(size, 16);else if (size <= 8 * 1024) return _AlignUp(size, 128);else if (size <= 64 * 1024) return _AlignUp(size, 1024);else if (size <= 256 * 1024) return _AlignUp(size, 8 * 1024);else{assert(false);return -1;}}inline static size_t _Index(size_t size, size_t align){if (size % align == 0) return size / align - 1;return size / align;}// 计算映射到的桶的下标static size_t Index(size_t size){int array[4] = { 16, 56, 56, 56 };if (size <= 128)return _Index(size, 8);else if (size <= 1024)return _Index(size - 128, 16) + array[0];else if (size <= 8 * 1024)return _Index(size - 1024, 128) + array[0] + array[1];else if (size <= 64 * 1024)return _Index(size - 8 * 1024, 1024) + array[0] + array[1] + array[2];else if (size <= 256 * 1024)return _Index(size - 64 * 1024, 8 * 1024) + array[0] + array[1] + array[2] + array[3];else{assert(false);return -1;}}
};
首先是最上层ConcurrentAlloc,如果ThreadCache的指针对象为空,则创建一个ThreadCache,向ThreadCache申请内存
void* ConcurrentAlloc(size_t size)
{assert(size <= MAX_SIZE); // 后续调整if (pThreadCache == nullptr)pThreadCache = new ThreadCache;return pThreadCache->Allocate(size);
}
ThreadCache的内存申请结构
// Communal.hstatic void*& NextObj(void* ptr)
{return *(void**)ptr;
}class FreeList
{
public:bool Empty(){return _freeList == nullptr;}void* PopFront(){void* ptr = _freeList;_freeList = NextObj(_freeList);return ptr;}void PushRange(void* start, void* end, size_t n){NextObj(end) = _freeList;_freeList = start;}size_t& Factor(){return _factor;}private:void* _freeList = nullptr;size_t _factor = 1; // 调节因子
};
// ThreadCache.hclass ThreadCache
{
public:// 从ThreadCache中获取内存块void* Allocate(size_t size);// 向CentralCache获取一定数量的小块内存void* FetchMemoryFromCentralCache(size_t size);private:FreeList _freeLists[NFREELIST];
};// 每个线程独有一个ThreadCache
static _declspec(thread) ThreadCache* pThreadCache = nullptr;
// ThreadCache.cppvoid* ThreadCache::Allocate(size_t size)
{size_t alignsize = SizeClass::AlignUp(size);size_t index = SizeClass::Index(size);if (!_freeLists[index].Empty())return _freeLists[index].PopFront();return FetchMemoryFromCentralCache(size);
}void* ThreadCache::FetchMemoryFromCentralCache(size_t size)
{size_t alignsize = SizeClass::AlignUp(size);size_t index = SizeClass::Index(size);/** 慢开始调节机制:* 一方面:小内存给多点,大内存给少点,通过NumOfMemory控制* 另一方面:每个FreeList中都有一个调节因子factor,随着要的次数增多,factor值越大,一次给出的内存个数越多* 取两者中的较小值* 通过这种机制,达到不至于最开始给的太多导致用不完*/int num = std::min(_freeLists[index].Factor(), SizeClass::NumOfMemory(alignsize));if (num == _freeLists[index].Factor())_freeLists[index].Factor()++;void* start = nullptr, * end = nullptr;size_t actual = CentralCache::GetSingleton()->RemoveMemory(size, start, end, num);if (actual == 1)assert(start == end);else_freeLists[index].PushRange(NextObj(start), end, actual - 1);return start;
}
CentralCache
CentralCache中每个桶都有一个SpanList的双向循环链表结构,节点是一个个的Span对象
Span对象中记录着当前Span管理的大块内存起始id、页数、分配给ThreadCache小块内存的个数等属性
由于不同的线程可能会去不同的SpanList获取内存,因此,这里使用桶锁
// Communal.hstruct Span
{PAGE_ID _page_id = 0; // Span管理的内存的起始idsize_t _n = 0; // Span管理内存的页数size_t _count = 0; // CentralCache分配出去的小块内存个数Span* _prev = nullptr;Span* _next = nullptr;void* _freeList = nullptr; // 自由链表
};class SpanList
{
public:SpanList(){_head = new Span;_head->_next = _head;_head->_prev = _head;}
private:Span* _head = nullptr;public:std::mutex _mtx; // 桶锁
};
所有线程公用一个CentralCache,将CentralCache设计为单例模式,在.cpp文件中定义单例对象
// CentralCache.hclass CentralCache
{
public:// 获取单例对象static CentralCache* GetSingleton(){return &_singleton;}// 分配num个内存块给ThreadCache,返回实际分配内存块的个数size_t RemoveMemory(size_t size, void*& start, void*& end, int num);// 获取一个Span对象Span* GetOneSpan(SpanList& spanList, size_t size);private:SpanList _spanLists[NFREELIST];static CentralCache _singleton; // 单例模式CentralCache() {}CentralCache(const CentralCache&) = delete;
};
CentralCache分配指定数量的小块内存给ThreadCache时,先找到对应的桶,在桶中查找是否有Span,其自由链表不为空,这是GetOneSpan的逻辑
拿到一个Span后,其自由链表下小块内存的个数不一定满足需要,这时就有多少给多少
整个RemoveMemory过程分为两步,1) 获取Span,2) 取出小块内存,不管是哪一步,都需要对_spanLists进行修改,因此,操作之前进行加锁,返回实际给出的小块内存的个数
// CentralCache.cppsize_t CentralCache::RemoveMemory(size_t size, void*& start, void*& end, int num)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();Span* span = GetOneSpan(_spanLists[index], size);assert(span);assert(span->_freeList);start = span->_freeList;end = start;int actual = 1;while (actual < num && NextObj(end) != nullptr){end = NextObj(end);actual++;}span->_freeList = NextObj(end);NextObj(end) = nullptr;_spanLists[index]._mtx.unlock();return actual;
}
而在GetOneSpan中,如果没有找到满足要求的Span,则向PageCache申请Span
向PageCache申请之前,其他线程可能要对当前桶申请/释放内存,因此先将桶解锁,申请完Span后,要将新Span插入到当前桶前在进行加锁,从而提高整体的效率
获取到新Span后,根据页数计算出起始地址和结束地址,切分成小块内存,挂到指定的桶中
// CentralCache.cppSpan* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{Span* cur = spanList.Begin();while (cur != spanList.End()){if (!cur->_freeList) return cur;cur = cur->_next;}// 先将spanList的锁解开// 因为向PageCache NewSpan期间可能有其他线程来获取/释放内存块spanList._mtx.unlock();PageCache::GetSingleton()->_mtx.lock();Span* span = PageCache::GetSingleton()->NewSpan(SizeClass::NumOfPage(size));PageCache::GetSingleton()->_mtx.unlock();// 将新获得的Span对象进行小块内存的切分char* start = (char*)(span->_page_id << PAGE_SHIFT);span->_freeList = start;size_t total = span->_n << PAGE_SHIFT;char* end = start + total;void* prev = start;while (start < end - size){start += size;NextObj(prev) = start;prev = start;}NextObj(prev) = nullptr;spanList._mtx.lock();spanList.PushFront(span);return span;
}
PageCache
PageCache中,同样也有很多桶,每个桶中都有一个SpanList,挂着多个Span,不同的是,每个Span管理的是以页为单位的大块内存;同时,PageCache使用全局锁,因为如果在当前页没找到Span,会去更大的页找,如果频繁的加锁解锁,会降低整体的效率
// PageCache.hclass PageCache
{
public:static PageCache* GetSingleton(){return &_singleton;}// 给CentralCache一个Span对象Span* NewSpan(size_t kpage);private:SpanList _pageLists[NPAGE];static PageCache _singleton;PageCache(){}PageCache(const PageCache&) = delete;public:std::mutex _mtx; // 全局锁
};
申请的页数满足: 大内存申请的页数多,小内存申请的页数少
// Communal.hstatic size_t NumOfPage(size_t size)
{size_t num = NumOfMemory(size);size_t npage = num * size;npage >>= PAGE_SHIFT;if (npage == 0) npage = 1;return npage;
}
在NewSpan中,如果在当前页没有Span可以分配,则在更大的页中找,将大页数分割成两个小页数,如果找到128页还是没有,则向系统申请一个128页的内存
// PageCache.cppSpan* PageCache::NewSpan(size_t kpage)
{assert(kpage < NPAGE);if (!_pageLists[kpage].Empty())return _pageLists[kpage].PopFront();for (size_t i = kpage + 1; i < NPAGE; i++){if (_pageLists[i].Empty()) continue;Span* nSpan = _pageLists[i].PopFront();Span* kSpan = new Span;nSpan->_n -= kpage;kSpan->_n = kpage;kSpan->_page_id = nSpan->_page_id + nSpan->_n;_pageLists[nSpan->_n].PushFront(nSpan);return kSpan;}// 向系统申请void* ptr = SystemAlloc(NPAGE - 1);Span* bigSpan = new Span;bigSpan->_n = NPAGE - 1;bigSpan->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;_pageLists[NPAGE - 1].PushFront(bigSpan);return NewSpan(kpage);
}
单元测试
// UnitTest.cpp#include "ConcurrentAlloc.h"void Alloc1()
{for (size_t i = 0; i < 5; ++i){void* ptr = ConcurrentAlloc(6);}
}void Alloc2()
{for (size_t i = 0; i < 5; ++i){void* ptr = ConcurrentAlloc(7);}
}void TLSTest()
{std::thread t1(Alloc1);t1.join();std::thread t2(Alloc2);t2.join();
}void TestConcurrentAlloc1()
{void* p1 = ConcurrentAlloc(6);void* p2 = ConcurrentAlloc(8);void* p3 = ConcurrentAlloc(1);void* p4 = ConcurrentAlloc(7);void* p5 = ConcurrentAlloc(8);std::cout << p1 << std::endl;std::cout << p2 << std::endl;std::cout << p3 << std::endl;std::cout << p4 << std::endl;std::cout << p5 << std::endl;
}void TestConcurrentAlloc2()
{for (size_t i = 0; i < 1024; ++i){void* p1 = ConcurrentAlloc(16);std::cout << p1 << std::endl;}void* p2 = ConcurrentAlloc(8);std::cout << p2 << std::endl;
}int main()
{//TestConcurrentAlloc1();//TestConcurrentAlloc2();//TLSTest();return 0;
}
6. 释放内存的逻辑
从哪个桶申请的内存,就释放回哪个桶,释放内存时也要计算对应桶的下标
因此,释放内存时需要传递内存大小(暂时的做法)
void ConcurrentFree(void* ptr, size_t size)
{assert(pThreadCache);pThreadCache->Free(ptr, size);
}
ThreadCache
// ThreadCache.hclass ThreadCache
{
public:// 从ThreadCache中获取内存块void* Allocate(size_t size);// 向CentralCache获取一定数量的小块内存void* FetchMemoryFromCentralCache(size_t size);// 释放内存回ThreadCachevoid Free(void* ptr, size_t size);// 当FreeList中自由链表下的小块内存个数过多,归还给CentralCachevoid ListTooLong(FreeList& freeList, size_t size);private:FreeList _freeLists[NFREELIST];
};// 每个线程独有一个ThreadCache
static _declspec(thread) ThreadCache* pThreadCache = nullptr;
释放内存回ThreadCache时,直接将内存块挂到对应桶的自由链表下
为了避免一个线程独占过多内存,规定:当桶中自由链表下小块内存的个数超过了该桶的调节因子,就将该桶下的所有小块内存归还给CentralCache
void ThreadCache::Free(void* ptr, size_t size)
{assert(ptr);assert(size <= MAX_SIZE);size_t index = SizeClass::Index(size);_freeLists[index].PushFront(ptr);if (_freeLists[index].Size() >= _freeLists[index].Factor())ListTooLong(_freeLists[index], size);
}void ThreadCache::ListTooLong(FreeList& freeList, size_t size)
{void* start = nullptr, * end = nullptr;freeList.PopRange(start, end, freeList.Size());CentralCache::GetSingleton()->ReleaseToCentralCache(start, size);
}
CentralCache
要想将小块内存释放回CentralCache,首先得知道当前内存是由CentralCache中哪个Span分出来的
因此,代码中需要有PAGE_ID到Span*的映射关系,因为由内存的起始地址,就能计算出PAGE_ID
PAGE_ID = 内存起始地址 / 一页的大小
使用unordered_map数据结构存储映射关系,将数据结构放到PageCache中,因为在PageCache中也需要
用到PAGE_ID到Span*的映射关系
// PageCache.hclass PageCache
{// ...
private:// ...std::unordered_map<PAGE_ID, Span*> _idSpanMap;// ...
};
在通过PAEG_ID查找Span*之前,使用PageCache中的全局锁进行加锁
// PageCache.cppSpan* PageCache::IdToSpan(PAGE_ID id)
{std::unique_lock<std::mutex> lock(_mtx);auto it = _idSpanMap.find(id);if (it == _idSpanMap.end()){assert(false);return nullptr;}return it->second;
}
因此,PageCache在分配Span给CentralCache之前,需要先将Span管理的大块内存的所有PAEG_ID与Span*建立映射关系
// PageCache.cppSpan* PageCache::NewSpan(size_t kpage)
{// ...for (size_t i = kpage + 1; i < NPAGE; i++){if (_pageLists[i].Empty()) continue;Span* nSpan = _pageLists[i].PopFront();Span* kSpan = new Span;nSpan->_n -= kpage;kSpan->_n = kpage;kSpan->_page_id = nSpan->_page_id + nSpan->_n;_idSpanMap[nSpan->_page_id] = nSpan;_idSpanMap[nSpan->_page_id + nSpan->_n - 1] = nSpan;for (size_t i = 0; i < kSpan->_n; i++)_idSpanMap[kSpan->_page_id + i] = kSpan;_pageLists[nSpan->_n].PushFront(nSpan);return kSpan;}// ...
}
ReleaseToCentralCache()的逻辑:
先对桶进行加锁,遍历小块内存,首先找到对应的Span,将小块内存头插到Span的自由链表下,Span中的count–,如果count == 0,表示当前Span分配出去的小块内存已全部归还,这时就需要将Span归还给PageCache,归还之前,先对桶解锁,因为归还过程中可能有其他线程对当前桶进行操作,提高效率,归还给PageCache之后,再加锁,依次往复,直到遍历完小块内存,对桶解锁
// CentralCache.cppvoid CentralCache::ReleaseToCentralCache(void* start, size_t size)
{size_t index = SizeClass::Index(size);_spanLists[index]._mtx.lock();while (start){void* next = NextObj(start);PAGE_ID id = (PAGE_ID)start >> PAGE_SHIFT;Span* span = PageCache::GetSingleton()->IdToSpan(id);NextObj(start) = span->_freeList;span->_freeList = start;span->_count--;// 当前span分配给ThreadCache的小块内存已全部归还if (span->_count == 0){_spanLists[index].Erase(span);span->_prev = span->_next = nullptr;span->_freeList = nullptr;_spanLists[index]._mtx.unlock();PageCache::GetSingleton()->_mtx.lock();PageCache::GetSingleton()->ReleaseSpanToPageCache(span);PageCache::GetSingleton()->_mtx.unlock();_spanLists[index]._mtx.lock();}start = next;}_spanLists[index]._mtx.unlock();
}
PageCache
// PageCache.hclass PageCache
{
public:static PageCache* GetSingleton(){return &_singleton;}// 给CentralCache一个Span对象Span* NewSpan(size_t kpage);// 查找id对应的SpanSpan* IdToSpan(PAGE_ID id);// CetralCache将Span归还给PageCachevoid ReleaseSpanToPageCache(Span* span);private:SpanList _pageLists[NPAGE];std::unordered_map<PAGE_ID, Span*> _idSpanMap;static PageCache _singleton;PageCache(){}PageCache(const PageCache&) = delete;public:std::mutex _mtx; // 全局锁
};
ReleaseSpanToPageCache()的逻辑:
CentralCache归还给PageCache一个Span,PageCache争取将Span与其他Span合并成一个更大的页,首先向前查找当前Span起始PAGE_ID - 1的Span,再向后查找当前Span结束PAGE_ID + 1的Span,如果遇到一下三种情况,则不合并:
- 要查找的Span不存在
- 找到的Span在被使用中(如果Span被分配给了CentralCache)
- 找到的Span与当前Span合并后的页数大于了128页
如何表示当前Span正在被使用?在Span中添加属性isusing,在CentralCache调用完NewSpan()后,即可将Span的isusing属性置true
// Communal.hstruct Span
{// ...bool _isusing = false; // 当前Span是否在被使用
};
// CentralCache.cppSpan* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{// ...Span* span = PageCache::GetSingleton()->NewSpan(SizeClass::NumOfPage(size));span->_isusing = true;//...
}
由于需要在PageCache中查找Span,因此,挂在PageCache中的Span需要将Span的起始PAGE_ID和结束PAGE_ID添加到_idSpanMap中
// PageCache.cppSpan* PageCache::NewSpan(size_t kpage)
{assert(kpage < NPAGE);if (!_pageLists[kpage].Empty()){Span* span = _pageLists[kpage].PopFront();for (size_t i = 0; i < span->_n; i++)_idSpanMap[span->_page_id + i] = span;return span;}for (size_t i = kpage + 1; i < NPAGE; i++){if (_pageLists[i].Empty()) continue;Span* nSpan = _pageLists[i].PopFront();Span* kSpan = new Span;nSpan->_n -= kpage;kSpan->_n = kpage;kSpan->_page_id = nSpan->_page_id + nSpan->_n;_idSpanMap[nSpan->_page_id] = nSpan;_idSpanMap[nSpan->_page_id + nSpan->_n - 1] = nSpan;for (size_t i = 0; i < kSpan->_n; i++)_idSpanMap[kSpan->_page_id + i] = kSpan;_pageLists[nSpan->_n].PushFront(nSpan);return kSpan;}// 向系统申请void* ptr = SystemAlloc(NPAGE - 1);Span* bigSpan = new Span;bigSpan->_n = NPAGE - 1;bigSpan->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;_pageLists[NPAGE - 1].PushFront(bigSpan);return NewSpan(kpage);
}
合并完成之后,记得将原来的Span销毁,并将新Span的起始PAGE_ID和结束PAGE_ID添加到_idSpanMap中,Span的使用状态置为false,并加到PageCache中
// PageCache.cppvoid PageCache::ReleaseSpanToPageCache(Span* span)
{// 向前合并while (1){PAGE_ID prev = span->_page_id - 1;auto it = _idSpanMap.find(prev);if (it == _idSpanMap.end()) break;Span* prevSpan = it->second;if (prevSpan->_isusing == true) break;if (prevSpan->_n + span->_n > NPAGE - 1) break;_pageLists[prevSpan->_n].Erase(prevSpan);span->_page_id = prevSpan->_page_id;span->_n += prevSpan->_n;delete prevSpan;}// 向后合并while (1){PAGE_ID next = span->_page_id + span->_n;auto it = _idSpanMap.find(next);if (it == _idSpanMap.end()) break;Span* nextSpan = it->second;if (nextSpan == nullptr) break;if (nextSpan->_isusing == true) break;if (nextSpan->_n + span->_n > NPAGE - 1) break;_pageLists[nextSpan->_n].Erase(nextSpan);span->_n += nextSpan->_n;delete nextSpan;}_idSpanMap[span->_page_id] = span;_idSpanMap[span->_page_id + span->_n - 1] = span;span->_isusing = false;_pageLists[span->_n].PushFront(span);
}
单元测试
// UnitTest.cppvoid MultiThreadAlloc1()
{std::vector<void*> v;for (size_t i = 0; i < 1000; ++i){void* ptr = ConcurrentAlloc(6);v.push_back(ptr);}for (auto e : v){ConcurrentFree(e, 6);}
}void MultiThreadAlloc2()
{std::vector<void*> v;for (size_t i = 0; i < 1000; ++i){void* ptr = ConcurrentAlloc(16);v.push_back(ptr);}for (auto e : v){ConcurrentFree(e, 16);}
}void TestMultiThread()
{std::thread t1(MultiThreadAlloc1);std::thread t2(MultiThreadAlloc2);t1.join();t2.join();
}int main()
{TestMultiThread();return 0;
}
7. 代码测试
// BenchMark.cpp#include"ConcurrentAlloc.h"// ntimes 一轮申请和释放内存的次数
// nworks 线程的个数
// rounds 轮次
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&, k]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(malloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){free(v[i]);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}// 单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{std::vector<std::thread> vthread(nworks);std::atomic<size_t> malloc_costtime = 0;std::atomic<size_t> free_costtime = 0;for (size_t k = 0; k < nworks; ++k){vthread[k] = std::thread([&]() {std::vector<void*> v;v.reserve(ntimes);for (size_t j = 0; j < rounds; ++j){size_t begin1 = clock();for (size_t i = 0; i < ntimes; i++){v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));}size_t end1 = clock();size_t begin2 = clock();for (size_t i = 0; i < ntimes; i++){ConcurrentFree(v[i], (16 + i) % 8192 + 1);}size_t end2 = clock();v.clear();malloc_costtime += (end1 - begin1);free_costtime += (end2 - begin2);}});}for (auto& t : vthread){t.join();}printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, malloc_costtime.load());printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",nworks, rounds, ntimes, free_costtime.load());printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}int main()
{size_t n = 10000;BenchmarkConcurrentMalloc(n, 4, 10);std::cout << std::endl;//BenchmarkMalloc(n, 4, 10);return 0;
}
8. 细节优化
超过32页的内存的申请与释放
如果要申请和释放的内存超过32页,则直接向PageCache申请和释放
而在PageCache中,如果要申请和释放的内存超过128页,则直接向系统申请和释放
// ConcurrentAlloc.hvoid* ConcurrentAlloc(size_t size)
{// 如果size > 32页,直接找PageCache申请if (size > MAX_SIZE){size_t alignsize = SizeClass::AlignUp(size);size_t kpage = SizeClass::NumOfPage(alignsize);PageCache::GetSingleton()->_mtx.lock();Span* span = PageCache::GetSingleton()->NewSpan(kpage);PageCache::GetSingleton()->_mtx.unlock();return (void*)(span->_page_id << PAGE_SHIFT);}assert(size <= MAX_SIZE);if (pThreadCache == nullptr)pThreadCache = new ThreadCache;return pThreadCache->Allocate(size);
}
// PageCache.cppSpan* PageCache::NewSpan(size_t kpage)
{// kpage > 128,直接向系统申请if (kpage > NPAGE - 1){void* ptr = SystemAlloc(kpage);Span* span = new Span;span->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = kpage;_idSpanMap[span->_page_id] = span;return span;}// ...
}
// ConcurrentAlloc.hvoid ConcurrentFree(void* ptr, size_t size)
{// 如果size > 32页,直接释放给PageCacheif (size > MAX_SIZE){PAGE_ID id = (PAGE_ID)ptr >> PAGE_SHIFT;Span* span = PageCache::GetSingleton()->IdToSpan(id);PageCache::GetSingleton()->_mtx.lock();PageCache::GetSingleton()->ReleaseSpanToPageCache(span);PageCache::GetSingleton()->_mtx.unlock();}else{assert(pThreadCache);pThreadCache->Free(ptr, size);}
}
// PageCache.cppvoid PageCache::ReleaseSpanToPageCache(Span* span)
{if (span->_n > NPAGE - 1){void* ptr = (void*)(span->_page_id >> PAGE_SHIFT);SystemFree(ptr);delete span;return;}// ...
}
释放无需指定内存大小
在Span中添加一个_objSize属性,当CentralCache向PageCache后去到新Span时,更新该属性值
// Communal.hstruct Span
{// ...size_t _objSize = 0;
};
// CentralCache.cppSpan* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{// ...Span* span = PageCache::GetSingleton()->NewSpan(SizeClass::NumOfPage(size));span->_isusing = true;span->_objSize = size;// ...
}
// ConcurrentAlloc.hvoid* ConcurrentAlloc(size_t size)
{// 如果size > 32页,直接找PageCache申请if (size > MAX_SIZE){size_t alignsize = SizeClass::AlignUp(size);size_t kpage = SizeClass::NumOfPage(alignsize);PageCache::GetSingleton()->_mtx.lock();Span* span = PageCache::GetSingleton()->NewSpan(kpage);span->_objSize = alignsize;PageCache::GetSingleton()->_mtx.unlock();return (void*)(span->_page_id << PAGE_SHIFT);}assert(size <= MAX_SIZE);if (pThreadCache == nullptr)pThreadCache = new ThreadCache;return pThreadCache->Allocate(size);
}void ConcurrentFree(void* ptr)
{PAGE_ID id = (PAGE_ID)ptr >> PAGE_SHIFT;Span* span = PageCache::GetSingleton()->IdToSpan(id);size_t size = span->_objSize;// 如果size > 32页,直接释放给PageCacheif (size > MAX_SIZE){PageCache::GetSingleton()->_mtx.lock();PageCache::GetSingleton()->ReleaseSpanToPageCache(span);PageCache::GetSingleton()->_mtx.unlock();}else{assert(pThreadCache);pThreadCache->Free(ptr, size);}
}
接入定长内存池
将项目中所有使用new和delete的地方改成向定长内存池申请和销毁
class PageCache
{// ...ObjectPool<Span> _spanPool;// ...
};
使用_spanPool修改项目中所有使用new和delete的地方
void* ConcurrentAlloc(size_t size)
{// ...static ObjectPool<ThreadCache> pool;if (pThreadCache == nullptr)pThreadCache = pool.New();// ...
}
基数树优化
结果多轮测试,发现我们的高并发内存池总体效率是要比malloc和free要低的
实际上,每次查找PAGE_ID对应的Span时,都需要保证线程安全,频繁的加锁解锁会降低整体的效率,我们的高并发内存池效率低下主要是这个原因
可以使用基数树存储PAGE_ID到Span的映射关系
其本质就是利用数组的下标表示PAGE_ID,数组的内容表示Span,这样查找映射时,既不需要加锁解锁,又能在O(1)的时间内找到
在x86的环境下,系统总内存为232,一页的大小为213,总的页数为2^32 / 2^13 = 219,也就是说总共有219个PAGE_ID
// PageMap.htemplate<size_t BITS>
class PageMap
{
public:PageMap(){size_t size = sizeof(void*) * _length;size_t alignSize = SizeClass::AlignUp(size);_array = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);memset(_array, 0, alignSize);}void set(PAGE_ID id, void* ptr){_array[id] = ptr;}void* get(PAGE_ID id){if ((id >> BITS) > 0) return nullptr;return _array[id];}private:size_t _length = 1 << BITS;void** _array = nullptr;
};
class PageCache
{// ...//std::unordered_map<PAGE_ID, Span*> _idSpanMap;PageMap<32 - PAGE_SHIFT> _idSpanMap;// ...
};
然后将项目中_idSpanMap替换成基数树中的成员函数即可
// PageCache.cppSpan* PageCache::NewSpan(size_t kpage)
{// kpage > 128,直接向系统申请if (kpage > NPAGE - 1){void* ptr = SystemAlloc(kpage);Span* span = _spanPool.New();span->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;span->_n = kpage;_idSpanMap.set(span->_page_id, span);return span;}assert(kpage < NPAGE);if (!_pageLists[kpage].Empty()){Span* span = _pageLists[kpage].PopFront();for (size_t i = 0; i < span->_n; i++)_idSpanMap.set(span->_page_id + i, span);return span;}for (size_t i = kpage + 1; i < NPAGE; i++){if (_pageLists[i].Empty()) continue;Span* nSpan = _pageLists[i].PopFront();Span* kSpan = _spanPool.New();nSpan->_n -= kpage;kSpan->_n = kpage;kSpan->_page_id = nSpan->_page_id + nSpan->_n;_idSpanMap.set(nSpan->_page_id, nSpan);_idSpanMap.set(nSpan->_page_id + nSpan->_n - 1, nSpan);for (size_t i = 0; i < kSpan->_n; i++)_idSpanMap.set(kSpan->_page_id + i, kSpan);_pageLists[nSpan->_n].PushFront(nSpan);return kSpan;}// 向系统申请void* ptr = SystemAlloc(NPAGE - 1);Span* bigSpan = _spanPool.New();bigSpan->_n = NPAGE - 1;bigSpan->_page_id = (PAGE_ID)ptr >> PAGE_SHIFT;_pageLists[NPAGE - 1].PushFront(bigSpan);return NewSpan(kpage);
}Span* PageCache::IdToSpan(PAGE_ID id)
{Span* span = (Span*)_idSpanMap.get(id);return span;
}void PageCache::ReleaseSpanToPageCache(Span* span)
{if (span->_n > NPAGE - 1){void* ptr = (void*)(span->_page_id >> PAGE_SHIFT);SystemFree(ptr);_spanPool.Delete(span);return;}// 向前合并while (1){PAGE_ID prev = span->_page_id - 1;Span* prevSpan = (Span*)_idSpanMap.get(prev);if(prevSpan == nullptr) break;if (prevSpan->_isusing == true) break;if (prevSpan->_n + span->_n > NPAGE - 1) break;_pageLists[prevSpan->_n].Erase(prevSpan);span->_page_id = prevSpan->_page_id;span->_n += prevSpan->_n;_spanPool.Delete(prevSpan);}// 向后合并while (1){PAGE_ID next = span->_page_id + span->_n;Span* nextSpan = (Span*)_idSpanMap.get(next);if (nextSpan == nullptr) break;if (nextSpan->_isusing == true) break;if (nextSpan->_n + span->_n > NPAGE - 1) break;_pageLists[nextSpan->_n].Erase(nextSpan);span->_n += nextSpan->_n;_spanPool.Delete(nextSpan);}_idSpanMap.set(span->_page_id, span);_idSpanMap.set(span->_page_id + span->_n - 1, span);span->_isusing = false;span->_objSize = 0;_pageLists[span->_n].PushFront(span);
}
结果多轮测试,我们的高并发内存池要比malloc和free高效很多
项目位置:高并发内存池