在第一篇文章中,我们介绍了内存池的基本概念,引入了新话题,内存碎片,在内存碎片这个话题中,我们了解到了有两个概念,一个是外碎片,对于外碎片我们的理解是地址空间中因为容量太小,没法直接被分配的空间,另一个是内碎片,对于内碎片的问题在定长内存池我们暂时无法理解,但是在本节内容,我们可以对内碎片有一个非常形象的理解。
那么对于上节内容的定长内存池,我们对于定长的理解不是该内存池的空间是定的,而是申请出去的对象的大小是定的,所以对于定长内存池来说,能使用的只能某个对象,局限性比较大。
当然了,毕竟只是一个开胃菜,有了对内存池的基本了解,我们就开始认识高并发内存池的一个基本框架。
直接进入主题吧!
高并发内存池效率高的其中一个原因就是因为它的三层架构:
其中的框架分为了三层。
这里就需要先考考同学们对于线程的一个基本理解了,提问:线程之间虽然是共享进程中的某些数据,但是哪些资源是线程自己独享的呢?
最重要的答案肯定是栈空间啦,不过,在这里我们并没有使用到线程独有的一个栈空间,而是使用到了线程局部存储的资源,因为也就是说,对于局部存储资源来说,是每个线程自己的,其他线程访问不了。
那么第一个效率提升的点就来了,既然这个资源是别人访问不了的,那么也就是说不管什么时候,对于该资源只有由某个线程访问。既然只有一个线程,那么是否还需要锁呢?
不需要了吧?好了,既然是不需要锁了,每个线程都有这么一份资源,我们假定这个空间的大小在256KB之下,对于用户申请的小于的256KB的空间大小,我们直接就让一个线程从自己的那份局部存储空间里面薅不就可以了吗?
那么对于高并发的场景,假设有一万个用户,对于不同的用户都要申请小于256kb的空间,如果我们使用的是malloc,那肯定是要加锁的,加锁之后,效率慢下来了吧?对于thread cache来说,直接就是随便来,反正我每个线程都可以分配,相当于同时分配好几份空间,效率一下就上来了。
这是对于thread cache的基本认识。
这两个内存在这里咱们先简单认识一下,对于central cache来说,如果是thread cache内存吃满了,需要往上级内存申请,那么就是向central cache申请,当然了,不能只拿不放,对于central cache来说,如果一个线程获取了太多的内存,那么central cache需要在合适的时机回收thread cache中的内存。
不过,因为central cache是所有线程都共享的资源,自然需要加锁,怎么加锁,咱们后面再看~
对于page cache,也是和central cache和 thread cache之间的关系差不多的,当central cache的内存吃紧之后,就从page cache中申请,page cache可就厉害了,它是以页的方式给内存的,它把页分成多个不同的小块给central cache,当合适的时候,可以将相邻的页合并起来组成更大的页,缓解内存碎片的问题。
以上是对于三层架构的一个基本认识。那么,我们直接进入到第一层,即thread cache。
编写thread cache之前,我们先来了解一下thread cache的基本结构。
thread cache的基本结构是这样的:
即抛开了定长内存池的局限性,结合了定长内存池之前的freelist,加上数组用来集中管理,采用哈希映射的方式来分配规则。
这个映射规则是一个向上补齐,比如我要3字节,就分配8字节给我,我要12字节,就分配16字节给我。
但是为什么要这么补齐呢?为什么我们不可以按照4字节补齐?我要3字节你给我4字节不行吗?当然不太行,因为我们需要freelist集中管理,即至少要确保每个内存块能存放一个指针,所以我们至少按照8字节对齐,这样就好满足64位系统了。
可是,这样的话,就存在了一个问题,内碎片。
给用户分配了8字节,但是别人只要3字节,那么多出来的5字节就变成了用户无法访问的空间,但是这个空间又不能回收,即内碎片就是分配出去,但是用户没有办法访问的空间。
那么在thread cache我们就要减少这个内碎片的平均值,即我们要好好斟酌一下这个映射规则了,对于tcmalloc这个项目中的映射规则是有点复杂的,我们采用另一种映射规则:
[1,128] 8byte 对齐 [128+1,1024] 16byte 对齐 [1024+1,8*1024] 128byte 对齐 [8*1024+1,64*1024] 1024byte 对齐 [64*1024+1,256*1024] 8*1024byte 对齐
这里可能让人觉得有点点蒙圈,不是说好的,小于多少字节就网上补齐吗?那么我31字节不应该是32字节吗?为什么还是8字节对齐?
这里说的补齐,实际上是补齐之后是补齐数的多少多少倍,而不是说真的就是8字节的多少多少倍,那到后面真的就亏死了,比如我33字节,给40,亏了五分之一了都。
对于tcmalloc中的映射规则大家下来可以自行了解,我们在这里就按照上述规则来对齐。
那么有了上面的对齐规则,对于freelist肯定是跑不掉的,所以我们需要实现一个类用来管理相同的内存块,这个类就是freelist,我们不妨把它放在公共的头文件里面实现:
class Freelist
{
public:
private:
void* _freelist = nullptr;
};
对于不同内存块的操作,在定长内存池里面我们已经操作过了,即push和pop,所以这里实现起来和定长内存池没有太大的区间,不过我们为了方便好看,可以将内存块前几个空间给为指针的操作单独封装为一个函数:
static void*& NextObj(void* obj)
{
return *(void**)obj;
}
// 内存链表
class Freelist
{
public:
void push(void* obj)
{
assert(obj);
NextObj(obj) = _freelist; // NextObj函数应该加引用 ->error: 可修改的左值
_freelist = obj;
}
void* pop()
{
assert(_freelist);
void* obj = _freelist;
_freelist = NextObj(obj);
return obj;
}
bool empty()
{
return _freelist == nullptr;
}
private:
void* _freelist = nullptr;
};
这段和定长内存池代码中的区别是有两点:
第一点:函数的单独封装
对于内存块的一个解引用的函数我们单独拿出来了,本来在单文件操作里面没啥影响,但是因为这是多文件操作,并且几乎所有的头文件源文件都要包含该文件,那么该函数势必就被重复包含了,如果我们不设置为静态的,报错就是在某某文件.obj里面重复定义,好了,这里为什么是.obj文件呢?
因为只有在链接部分,即.o文件才会开始从符号表找函数了,这里是编译部分的知识。
并且如果我们返回的不是引用,返回的是一个局部变量,也会报错,因为局部变量出了之后就销毁了,所以我们应该返回引用。那有意思的来了,我用引用接收不行吗?返回指针。
这也会报错,毕竟引用,它本身也是在该函数栈帧里面的一个局部变量~
第二点:封装性的保护
其实对于pop函数,我们完全可以_freelist = NextObj(_freelist);return _freelist,这样并不会报错,但是为什么我们不能这样干呢?因为这样我们相当于提供了一个get函数,将该类的私有变量给到了用户,极大的破坏了代码的封装性。如果不是为了封装,这里的代码因为=是从右往左走的,所以_freelist肯定是可以走到正确值的,但是我们可要面向对象~
有了Freelist,我们自然可以动手操作Threadcache.h了:
class ThreadCache
{
public:
// apply/free object
void* Allocate(size_t size);
void Deal_locate(void* ptr, size_t size);
//from central cache get cache
void* FetchFromCentralCache(size_t index, size_t size);
private:
Freelist _freelist[NFREELIST];
};
对于Threadcache来说,分配空间出去和回收空间是必要的,即Allocate和Deal_locate函数,同样,向central cache申请内存的函数也是要有的,不过呢~这里我们先不实现,我们主要实现的还是Allocate Deal_locate函数。
这里的NFREELIST是一个宏,虽然说C++不喜欢用宏,但是这里用宏也无伤大雅,也可以使用static const int来定义:
#define NFREELIST 208
可是为什么是208呢?因为按照上面的映射规则:
按照8字节对齐的时候,freelist的内存块有16个,后面按照16字节和128字节和其他字节数对齐的时候都是56个内存块,加起来也就是208个内存块了。
static thread_local ThreadCache* pTLSthreadcache = nullptr;
对于这个代码我们实际上是已经介绍过了,即每个线程创建的时候,应该申请自己的threadcache对象,那么就要用到TLS技术,即Thread local storage,线程局部存储。
这里我们只用加这个代码就可以了,不过MSVC编译器有自己的一个延申,我们同样也可以这样写:
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;
不过_declspec这个关键字并不具备跨平台性,所以建议使用C++11的thread_local关键字。
当我们着急编写allocate函数的时候,我们是否记得映射规则还没有编写?对于映射规则如果我们直接借助面向过程的方式编写实在是麻烦,所以我们不妨使用一个类,将它的函数声明为static,在该函数里面编写映射规则即可,同freelist一样,我们将它放在common.h里面:
class SizeClass
{
public:
};
对于该类,我们要编写的有两个函数,一个是计算分配到哪个桶里面去,即我们要计算对齐数,一个是计算桶里面的内存块的具体位置,要知道上面的图只是一个形象的画法,我们实际上还是对一块连续的空间划分。
那么对于计算对齐数这里硬憋是憋不出来的,我们提供两个版本,一个是较为理解的,一个是大佬们写的:
// first version
//size_t _AlignNum(size_t bytes, size_t alignnum)
//{
// size_t size;
// if (bytes % alignnum != 0)
// {
// size = (bytes / alignnum + 1) * alignnum;
// }
// else
// size = bytes;
// return size;
//}
// second version
static inline size_t _AlignNum(size_t bytes, size_t alignNum)
{
return ((bytes + alignNum - 1) & ~(alignNum - 1));
}
static size_t AlignNum(size_t bytes)
{
assert(bytes <= MAXBYTES);
if (bytes <= 128)
return _AlignNum(bytes, 8);
else if (bytes <= 1024)
return _AlignNum(bytes, 16);
else if (bytes <= 8 * 1024)
return _AlignNum(bytes, 128);
else if (bytes <= 64 * 1024)
return _AlignNum(bytes, 1024);
else if (bytes <= 256 * 1024)
return _AlignNum(bytes, 8 * 1024);
else
return -1;
}
因为是纯数学问题,就不过多阐述,不过选择second version也是清理之中,因为计算机的按位运算是非常快的嘛。
对于计算索引位置:
static inline size_t _Index(size_t bytes, size_t align_shift)
{
return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;
}
// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t bytes)
{
assert(bytes <= MAXBYTES);
// 每个区间有多少个链
static int group_array[4] = { 16, 56, 56, 56 };
if (bytes <= 128)
return _Index(bytes, 3);
else if (bytes <= 1024)
return _Index(bytes - 128, 4) + group_array[0];
else if (bytes <= 8 * 1024)
return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];
else if (bytes <= 64 * 1024)
return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];
else if (bytes <= 256 * 1024)
return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];
else
assert(false);
return -1;
}
怎么说呢,也是一个纯数学问题,我们普通人还是不太容易想出来的,所以这里直接给代码了,我们能直接理解思想即可。
有了对齐数和索引位置的函数,我们就可以编写threadcache.cpp的函数了,也就没有什么难度了~
void* ThreadCache::Allocate(size_t size)
{
size_t index = SizeClass::Index(size);
size_t alignsize = SizeClass::AlignNum(size);
if (!_freelist[index].empty())
return _freelist[index].pop();
else
return FetchFromCentralCache(index, alignsize);
return nullptr;
}
void ThreadCache::Deal_locate(void* ptr, size_t size)
{
assert(ptr);
assert(size <= MAXBYTES);
size_t index = SizeClass::Index(size);
_freelist[index].push(ptr);
}
基本的框架我们已经搭建出来了,我们得测试一下吧?所以可以来个源文件concurrent.cpp,结合main.cc里面的用例,我们测试一下:
static void* ConcurrentAlloc(size_t size)
{
if (pTLSthreadcache == nullptr)
pTLSthreadcache = new ThreadCache;
cout << std::this_thread::get_id() << ":" << pTLSthreadcache << endl;
return pTLSthreadcache->Allocate(size);
}
static void ConcurrentFree(void* ptr, size_t size)
{
assert(pTLSthreadcache);
pTLSthreadcache->Deal_locate(ptr, size);
}
void Alloc1()
{
for (size_t i = 0; i < 5; ++i)
{
void* ptr = ConcurrentAlloc(600);
}
}
void Alloc2()
{
for (size_t i = 0; i < 5; ++i)
{
void* ptr = ConcurrentAlloc(7);
}
}
void TLSTest()
{
std::thread t1(Alloc1);
t1.join();
std::thread t2(Alloc2);
t2.join();
}
int main()
{
//TestObjectPool();
TLSTest();
return 0;
}
最后的运行是:
当然了,对于threadcache里面还有很多很多细节我们没有捋清楚,这不重要,我们把后面的两个搭建好了,回来一看,就全部能串通了~
感谢阅读!