前言:
本篇旨在熟悉 基于tcmalloc的高性能并发内存池项目之后,对于最后的优化进行的笔记梳理,项目完整代码 可点击:项目代码 进行查看。
优化思想风暴:
- 为了方便根据页号查找到对应的span, 这里我们可以使用红黑树或者哈希表这样一种结构,来存储他们的映射关系,如下:
- 比如: 在下面这个地方,需要根据对齐字节数,查找对应的span,以便于回收的时候使用span里面的_objsize,或者需要根据页号查找前后页的span, 此时都需要访问这个哈希表。
- 所以我们在创建span的时候就应该注入此信息,如下:
不管把此span所涉及的每一个页号都映射(给central cache用),还是只映射首位页(给pagecache cache合并大页的内存用),根据不同的场景,我们根据页号都能查找到对应的span。
当然, 我们在合并完大页的span之后,对于页号已经发生了改变,我们也需要进行及时的修订。
- 所以自然,我们在不同线程进行修改这个span的时候,调用此函数之前,需要加锁一把锁,我们放在page cache的成员里面
这里进行修改的地方,我们都需要加上锁,以防止多线程访问,引发的线程安全问题,当然,我们在读取数据的时候,也需要加上锁,如下:
看清楚!,这里我们加的还是那把pagecache成员变量的那把锁,只不过这把锁具有局部变量的特性,出了函数作用域会自动释放。
- 那么至此,我们可以此代码的性能测试,根据VS下的性能探测器,我们发现MapObjectToSpan非常的消耗时间,主要原因在于unique_lock导致线程阻塞,耗费大量时间,那么我们替换掉这个锁,在读取的时候进行无锁访问,至此,我们就引入了今天的主角(基数树)
简单了解一下基数树的代码实现:
下面这部分代码,对基数树进行了简化,主要有三种基数树,区别就是分别进行了一层,两层,三层的基数树实现,代码下面,博主自行画了一个草图,可以大致了解。
#pragma once//基数树
#include "Common.h"//一层
template<int BITS>
class TCMalloc_PageMap1
{
private:static const int LENGTH = 1 << BITS; //1<<19void** array_;
public:typedef uintptr_t Number;//只能显示调用的构造函数explicit TCMalloc_PageMap1() {//array_ = reinterpret_cast<void**>((*allocator)(sizeof(void*) << BITS));size_t size = sizeof(void*) << BITS; size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); //1<<13 8KBarray_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); //2^19 memset(array_, 0, sizeof(void*) << BITS);}// Return the current value for KEY. Returns NULL if not yet set,// or if k is out of range.void* get(Number k) const {if ((k >> BITS) > 0) {return NULL;}return array_[k];}// REQUIRES "k" is in range "[0,2^BITS-1]".// REQUIRES "k" has been ensured before.//// Sets the value 'v' for key 'k'.void set(Number k, void* v) {array_[k] = v;}
};// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.static const int ROOT_BITS = 5;static const int ROOT_LENGTH = 1 << ROOT_BITS;static const int LEAF_BITS = BITS - ROOT_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Leaf* root_[ROOT_LENGTH]; // Pointers to 32 child nodesvoid* (*allocator_)(size_t); // Memory allocatorpublic:typedef uintptr_t Number;//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {explicit TCMalloc_PageMap2() {//allocator_ = allocator;memset(root_, 0, sizeof(root_));PreallocateMoreMemory();}void* get(Number k) const {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 || root_[i1] == NULL) {return NULL;}return root_[i1]->values[i2];}void set(Number k, void* v) {const Number i1 = k >> LEAF_BITS;const Number i2 = k & (LEAF_LENGTH - 1);ASSERT(i1 < ROOT_LENGTH);root_[i1]->values[i2] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> LEAF_BITS;// Check for overflowif (i1 >= ROOT_LENGTH)return false;// Make 2nd level node if necessaryif (root_[i1] == NULL) {//Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));//if (leaf == NULL) return false;static ObjectPool<Leaf> leafPool;Leaf* leaf = (Leaf*)leafPool.New();memset(leaf, 0, sizeof(*leaf));root_[i1] = leaf;}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {// Allocate enough to keep track of all possible pagesEnsure(0, 1 << BITS);}
};//三层
// Three-level radix tree
template <int BITS>
class TCMalloc_PageMap3 {
private:// How many bits should we consume at each interior levelstatic const int INTERIOR_BITS = (BITS + 2) / 3; // Round-upstatic const int INTERIOR_LENGTH = 1 << INTERIOR_BITS;// How many bits should we consume at leaf levelstatic const int LEAF_BITS = BITS - 2 * INTERIOR_BITS;static const int LEAF_LENGTH = 1 << LEAF_BITS;// Interior nodestruct Node {Node* ptrs[INTERIOR_LENGTH];};// Leaf nodestruct Leaf {void* values[LEAF_LENGTH];};Node* root_; // Root of radix treevoid* (*allocator_)(size_t); // Memory allocatorNode* NewNode() {Node* result = reinterpret_cast<Node*>((*allocator_)(sizeof(Node)));if (result != NULL) {memset(result, 0, sizeof(*result));}return result;}public:typedef uintptr_t Number;explicit TCMalloc_PageMap3(void* (*allocator)(size_t)) {allocator_ = allocator;root_ = NewNode();}void* get(Number k) const {const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);if ((k >> BITS) > 0 ||root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL) {return NULL;}return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3];}void set(Number k, void* v) {ASSERT(k >> BITS == 0);const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1);const Number i3 = k & (LEAF_LENGTH - 1);reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v;}bool Ensure(Number start, size_t n) {for (Number key = start; key <= start + n - 1;) {const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1);// Check for overflowif (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH)return false;// Make 2nd level node if necessaryif (root_->ptrs[i1] == NULL) {Node* n = NewNode();if (n == NULL) return false;root_->ptrs[i1] = n;}// Make leaf node if necessaryif (root_->ptrs[i1]->ptrs[i2] == NULL) {Leaf* leaf = reinterpret_cast<Leaf*>((*allocator_)(sizeof(Leaf)));if (leaf == NULL) return false;memset(leaf, 0, sizeof(*leaf));root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);}// Advance key past whatever is covered by this leaf nodekey = ((key >> LEAF_BITS) + 1) << LEAF_BITS;}return true;}void PreallocateMoreMemory() {}
};
对于一层的基数树,其实就是一个哈希表,也可以理解为数组指针,一共有2^19个格子,存放32位下以8KB为单位的所有页号,它是在一开始就已经开辟好了内存
对于二层的基数树,原理如下,三层类比二层,例如再增加后几位作为第三层的下标,这里需要某个范围的页号对应的span时,会2^14次方一次,这样缓慢的开辟好,需要用到的范围就开辟,缓解了系统申请大块内存的负担,当然也是提前已经根据你需要的页号映射范围提前开辟好的。
我们可以把哈希表或者红黑树替换为这样一种基数树的结构,原因如下:
首先基数树的结构是固定的,在其他线程写入其他位置的时候,不会改变它的结构,而哈希表(可能会动态的扩容),红黑树(在修改节点时会进行旋转而改变结构)在写入其他位置的时候会改变结构,那么其他线程在读取的时候,可能就会访问到NULL等其他情况。
其次,读取的时候,我们需要根据页号访问到对应的span, 此时的span一定是已经被new出来之后,把页号和span*进行映射,所以我们访问时的那个span,一定时已经存在的,不会涉及正在读取此span,又被其他线程所修改,按照本项目访问所涉及到的代码进行讲解:
第一处,访问页号对应的span*,是为了替代掉释放时还需要传递size的情况,此时的span已经被new出来了,那么它所对应的页号早都已经映射上了,所以不会对此位置进行修改,读取和写入是分离的。
第二处,此时是需要根据小块内存地址,求出页号,根据页号找到对应的span, 进行内存的回收的,那么此时的span, 他也一定是被创建出来了以后,页号和span已经经过了映射,所以访问和写入不会同时发生,此时也是分离的。
此时再看两处涉及到修改基数树的位置,第一处就是初始化span的时候,此时只能有一个线程进行写入,此span的状态还是false, 还未被初始化,当它被创建好时,才会分配给centralcache,此时才能被使用,才会涉及到调用此函数获取objectsize,或者回收小块内存的情况,所以此时刻读写分离。
第二处同理,都是发生在pagecache中,此时是不再使用了,所以也不会涉及到上述两种访问的情况,所以此刻的读写也是分离的。

综上,同一个span之间, 访问和写入是互相分离的,而在不同的span之间,写入的时候使用红黑树或者哈希表可能会影响到其他的span, 会修改结构,但是基数树可以完美解决,故此,我们可以把unique这把锁移除删除掉。
此刻我们再次运行代码:
前:
后: