【项目(建议收藏)】九万字手把手教你写高并发内存池(化简版tcmalloc)

发布时间:2024年01月18日

高并发内存池

在这里插入图片描述

前言

本篇所讲项目可以直接往简历中写,但前提是你对这个项目足够熟悉。

强调一下,这个项目只是为了学习大佬的研究成果,从而提升自己,本项目只是把tcmalloc中特别核心的内容拿出来讲一讲,并不是为了完全复刻tcmalloc的源码,不要想着自己一个人复刻一个tcmalloc出来,源码中的tcmalloc得要好几万行的,本项目最终代码量也才不过两千行,如果你对于tcmalloc的源码感兴趣,后面会给出相关的链接。

引例介绍

池化技术,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。

什么是过量的资源呢?
就是现在用不完的资源。

就拿生活中的例子来说。

比如说你宿舍楼道里有饮水机,如果你只有一个杯子,那每次想要喝水的时候就要跑到饮水机那里接水,不过这样好麻烦。想要提高一点效率的话可以买一个水壶,每次去接水的时候拿水壶去接,接完之后放宿舍,这样想喝水的话就从水壶里面倒一点,等水壶里的水喝完之后再拿着水壶去接水……这样的话,喝等量的水,需要接水的次数就大不一样了。显然后者能更方便一点。这就算是生活中的一种池化技术。

上面的这个例子中,拿着水壶接水就是在向系统申请过量的资源。

本篇要讲的内存池就是先向系统申请一大块空间,这样后续使用空间的时候不是再向系统申请资源了,而是到已经申请过的这一大块空间中去取。

同样的,在我前面的博客中也提到过一些池化技术,都有:

  • 连接池。比如说数据库的连接池,建立连接是会有一定消耗的,如果建立连接之后只是执行一条简单的SQL就关闭连接的话就会很浪费时间,所以直接搞很多的连接,有SQL要执行的时候挑出来一个闲置的连接让它去执行就行,执行完了不用关闭,继续连着,等待下一条SQL的到来。这样就能避免频繁地与数据库建立连接和释放连接。
  • 线程池。主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。在我前面的博客中也是模拟实现过线程池的,感兴趣的同学可以看看:【Linux】线程详解完结篇——信号量 + 线程池 + 单例模式 + 读写锁

还有其他的池,这里就不再过多介绍了。这些池化计数在思想上都是很类似的。

这个项目会用到C/C++、数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等等方面的知识。本篇对于这些基础知识不会做过多讲解,所以如果你对于这些内容不太熟悉,可以看看我前面的博客,或者是自己在网上找找相关的资料,先把这些基础内容搞清楚后再看这篇博客。

正式开始

项目介绍

当前项目是实现一个高并发的内存池,原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存分配相关的函数(malloc、free)。

本篇所讲的项目是把tcmalloc最核心的框架简化后拿出来,模拟实现出一个化简版的高并发内存池,目的就是学习tcamlloc的精华,就像我之前在讲STL容器的时候那样,先简单介绍,然后模拟实现。但是相比STL容器部分,tcmalloc的代码量和复杂度上升了很多。虽然难度虽然上升了,但里面的知识对于我们的提升也是很有帮助的。

另一方面,tcmalloc是全球大厂google开源的,可以认为是由当时顶尖的C++高手写出来的,其的知名度也是非常高的,不少公司都在用它,Go语言直接用它做了自己内存分配器。所以很多程序员是熟悉这个项目的,那么有好处,也有坏处。好处就是把这个项目理解扎实了,会很受面试官的认可。坏处就是面试官可能也比较熟悉项目,对项目会问得比较深,比较细。如果你对项目掌握得不扎实,那么就容易碰钉子。

tcmalloc的源码:tcmalloc

内存池的作用

内存池主要解决的问题有两个:

  1. 效率问题
  2. 内存碎片问题

分开来说。

效率问题

其实本篇最开始讲的那个例子就算是一个解决效率问题的例子了。这里再来说一个生活中的例子。

把进程向os申请内存想象成要生活费。

如果没有池化技术,就是每顿饭花了多少钱都要给父母汇报一下,然后父母再给你打钱,如果早上花了5块就打电话给你爸说 “早饭花了5块钱,给我转一下。” 假如中午花了十块,再找你爸要,下午又是这样……这样每次花钱都要找父母要一下,假如说一个月要花1200,这1200块钱都是零碎的找父母要的,那这个过程效率就会很低,每次要钱都要进行沟通,非常不方便。

那用上池化技术呢?
就是先算一下一个月大概就是1100 ~ 1300,那父母在月初的时候给你转1300,这1300就是你的money池,用钱的时候就去money池中取,下个月再给你转1300,这样每次要用钱的时候就不需要沟通了,去money池中拿就行,这样就高效了很多。

内存池也是这样的道理,每次用空间时都要向os申请一下效率太低了,建一个内存池,保存一大块空间,这样就解决了效率上的问题。

内存碎片

平时我们动态申请内存都是在堆上进行申请,假如说现在申请的资源如下:
在这里插入图片描述

申请时空间由低到高申请(堆),但是释放的时候谁先释放或谁后释放是无法确定的。

动态申请的空间是非常自由的,谁不用了谁就直接释放,比如说:
在这里插入图片描述

虽然此时vector和list的空间释放了,总共回收了384B的空间,但是从当前的图中来看的话,如果此时我们再去申请空间就无法申请超过256B(vector的那一块)的连续空间,因为动态申请的空间得要连续,但是这里的256B和128B是不连续的,这就是所谓的碎片化了,申请的空间只要大于256B就会失败,无法申请出来。导致看起来空间虽然够,但是不连续,申请不出来完整的空间也是白瞎。

所以如果我们直接从堆上申请就会存在这样的内存碎片的问题,而内存池就可以想办法去解决这样的问题,至于怎么实现,等后面讲的时候就知道了。

补充一点小知识,内存碎片其实分两种,一种叫外碎片,一种叫内碎片。

刚刚这里将的就是外碎片。

内碎片是指已经有一块空间开好了,但是这块空间中可能会因一些对齐的问题导致用不上,注意是已经开好的空间,内碎片的问题这里不太方便解释,下面项目中会遇到,遇到了之后再细说,到时候再正好与外碎片做一下对比。

所以内存池要解决的就是效率问题和内存碎片问题。

tcmalloc和malloc的关系

前面说了动态申请和释放内存是在进程地址空间的堆区上申请的,但实际上一般不会直接去堆上申请,就是因为效率和内存碎片这两个问题。

实际上C/C++程序不是申请空间的时候不是直接去堆上,而是借助一个函数申请的,这个函数叫做malloc,释放的时候就是free,也就是C标准库提供的这两个函数。

说到这可能有的同学会有点疑问:C++中不是在用new和delete吗?

  • 这个问题我在前面的博客中也是讲过的(想要了解详情的同学可以看我这篇:【C,C++】内存管理new和delete),new和delete归根结底中还是用了malloc和free的。
  • new和delete中封装了operator new和operator delete。 operator new中又调用了malloc来帮助new来开空间,封装malloc复合C++中new失败了抛异常的机制。 operator delete中也是通过宏定义后的free来释放空间。
  • 所以说C/C++中申请和释放内存块最终还是通过malloc和free来实现的。

malloc和free都属于是用户操作接口这一层(库),而进程是用户层的,进程想要空间,不是直接去堆上找,都是去调用malloc,malloc再去调用系统提供的相关申请空间的接口,然后os再去堆上找空闲空间。

在这里插入图片描述

而malloc实际上就是一个内存池,比如说我们现在想要8B,malloc不是只向os申请8B,而是申请过量的,比如说要个1M,下次用户进程再要空间的时候malloc就不会再去向os申请了,而是在刚申请出来的1M中去拿,同时还要尝试解决一些内存碎片问题。

这里还要知道一个东西,malloc的实现有很多种(你没有听错),也就是实现内存池有很多种方式,C标准库只是规定了要实现malloc这个内存池,只是规定了malloc()相当于向os“批发”了一块较大的内存空间,然后“零售”给程序用。

一般不同编译器平台用的都是不同的。比如windows的vs系列用的微软自己写的一套,linux gcc用的glibc中的ptmalloc。下面有几篇关于这块的文章,大概可以去简单看看了解一下,关于ptmalloc,学完本篇的项目以后,有兴趣大家可以去看看它的实现细节。
一文了解,Linux内存管理,malloc、free 实现原理
malloc()背后的实现原理——内存池
malloc的底层实现(ptmalloc)

可能有的同学就要问malloc都是内存池了,为啥还要学tcmalloc?
其实tcmalloc是用于优化C++写的多线程应用,比glibc2.3的malloc块(在多线程的场景下)。

所以tcmalloc是比malloc更好的一个东西,但不仅有tcmalloc,还有jemalloc等,都是很优秀的内存池。学完tcmalloc后,有可能面试时会问malloc的底层,可以详细看看我上面三篇文章中的第一篇。推荐是项目学完了之后再看,尤其是当问到为什么tcmalloc更快的时候,可以大致了解一下malloc。

定长内存池

做一个开胃菜,实现一个定长的内存池,前面刚说了实现内存池有不同的方式,俗话说:“尺有所短寸有所长”。说的就是一些东西就算很厉害也会有自己局限的地方,一些东西很简单,也是有自己适合的地方的。

那malloc其实是实现了一个通用的,针对各种场景的,各种大小的内存池,就是因为是通用的,设计的相对复杂,整体也就会有一些问题,包括本篇要讲的tcmalloc。

如果说某些场景下需要固定大小的内存池,如何设计呢?能不能设计简单一点,还能让效率达到极致的内存池呢?

  • 答案是可以做到的,下面就来实现一个针对定长场景的内存池来作为一个开胃菜,对内存池建立一个初步的认识。

可能有的同学要问为什么不直接搞tcmalloc呢?

  1. tcmalloc相对很复杂
  2. 这里要实现的定长内存池将会作为后面tcmalloc中的的一个小组件来解决tcmalloc中某些方面的问题,具体什么问题等后面学到了再说。

这里的定长内存池代码不多,不到100行就能搞定。

注意事项

  1. 需要一个池子,将申请的一大块内存存起来,这里可以用一个char*的指针来实现:
    在这里插入图片描述
  2. 内存池的所有管理不止申请,还有释放,用户用完的空间还到内存池中怎么办呢?不能说直接给丢了,也不能说把还回来的空间直接还给操作系统,因为向os申请和释放是有要求的,申请的时候假如申请了1M,那还回去的时候就要把这些空间全还回去,但是现在申请了一大块内存,把它切成一小块一小块的拿去用,用之后还回来怎么管理呢?用自由链表来解决。
    ? 还回来的都是一块一块的小空间,这些小空间要组织起来,用链表会很方便,直接从头到尾串起来就行。如图:
    在这里插入图片描述
    每一小块中提取出一个指针大小,然后指向其他的小块,这样就能将所有的小块空间串一个串。这样这些小的块还可以重复利用。具体怎么实现等会写代码的时候就知道了。

定长内存池的意思是解决固定大小的内存申请释放需求,特点有二:

  1. 性能达到极致。
  2. 不考虑内存碎片等问题。

编写代码

搞一个新项目,项目名就直接叫做ConcurrentMemoryPool,也就是本篇要讲的高并发内存池:
在这里插入图片描述

进去之后搞两个文件,一个头,一个源:
在这里插入图片描述
这里头文件为ObjectPool.h,意思就是针对特定对象的定长内存池。

然后就来写这里的定长内存池。现在做项目就不要用using namespace std了,防止命名污染,把常用的给出来就行:
在这里插入图片描述
先给这两个。

定长内存池的类名就叫做ObjectPool:
在这里插入图片描述

因为是定长内存池嘛,可以给一个非类型模版参数: 在这里插入图片描述
这样可以指定内存池大小,不过刚刚也说了这里是一个针对不同类型对象的定长内存池,也为了能充当后面tcmalloc中的一个小组件,这里就用普通的类模版了,这样针对不同的类可以生成不同的定长内存池:
在这里插入图片描述

两个成员变量

其成员有两个,一个是_memory,用来指向大块的内存:
在这里插入图片描述

这里可以将其类型给为void*吗?
不可以,因为我们需要通过_memeory来对整个大块的内存进行切分,那什么又是切分呢?

看图:
在这里插入图片描述

就是这样,申请一块空间之后_memory就要对应的往后移动一个T类型(模版类型)的大小,假如说为10B,申请10B后,_memory就要指向其原来位置后的10B处,就这样要多少_memory就要挪多少,当剩余空间不够的时候就再向os申请一块等大的定长空间。

如果_memory类型是void*的,那就没有办法往后挪动(也不是没有办法,只不过麻烦了点),因为用void*是没法进行+=或者解引用的操作的,所以直接给成char*会更加方便:
在这里插入图片描述

还有一个成员是用来管理还回来的空间的,这个成员就是自由链表的头指针_freelist。

将还回来的内存空间用链表管理起来,那这样需要像普通的链表那样穿件一个结构体,结构体里面是一个data和一个指针吗?
不用,因为这里连接起来的就是空间,内存空间与内存空间直接互相连接起来形成一个链表,如图:
在这里插入图片描述

这里_freelist直接给成void*就行,因为这里不会对_freelist进行解引用或者增减操作,只是单纯的指向而已。

两个指针都用缺省参数给成nullptr就行:
在这里插入图片描述

接口New——申请T类型大小的空间

再给一个接口New()用来开空间,先来写一个有点问题的:
在这里插入图片描述

有什么问题呢?
这里指定开128K的空间,假设一个类型是4B,那这样恰好可以将128K分完,但如果是5B呢?会有剩余的空间没有分完,但这样也没有关系,直接将剩余的空间保留就行,不分除去就好(浪费那点不会超过5B)。这都是小问题,不必过度纠结。

最重要的问题是当128K的空间都分配完之后,如果再向当前内存池要空间,_memory应该怎么办?这里空间分配完之后并没有把malloc搞成空,想搞也有点难,那干脆再搞一个成员size_t _remanentBytes,表示_memory指向空间中剩余多少字节:
在这里插入图片描述

再改一下New:
在这里插入图片描述

这里红色框出来的就是改动的地方。

这样就好了,这样在申请空间的时候就是一个T类型一个T类型大小的给,然后不够一个T类型大小的时候再向OS申请128K的空间。

所以我前面画的这样的图:
在这里插入图片描述

都是逻辑图,这些块的真实物理地址都还是连续的,并不是图中这样分开的,只不过这里逻辑图画出来之后更好理解。

还有一个小问题,T如果小于一个指针的大小(4/8字节)该怎么办?
这个问题现在先不管,先默认都是大于指针大小的,等一会解决。

接口Delete——回收还回来的小空间

对应的New写好之后再来写一个Delete用来回收还回来的小块空间:
在这里插入图片描述

这里就要用到_freelist了,刚刚也画图了,就是直接将还回来的块放到_freelist后面就行,我先来搞的麻烦一点,分个情况,当_freelist为空的时候,再来看一下刚刚的图:
在这里插入图片描述
就拿32位来说,如何取出块中的前四个字节并进行赋值呢?
简单,直接将块强转成int*就行,这样再解引用拿到的就是4个字节的大小,也就是这样:*(int*)obj = nullptr。

但是64位下呢?
可以再强转成long long*,然后用sizeof(void*)判断一下当前是32位还是64位,是4就是32位,对应就用int*,是8就用long long*,但是这样好麻烦。有一个更好的办法。

可以用一个二级指针强转obj后再解引用,这样就会得到一个指针大小的空间,直接用*(void**)obj = nullptr,直接用这个赋值,此时对指针赋值就完全取决于平台了。这里二级指针用什么类型的都可以,因为不会解引用到最后的void,所有的指针大小都取决于是32位还是64位,也就是说*(int**)obj = nullptr也是可以的,最终得到的是一个指针大小的空间。这样就不用再判断一下当前平台位数了。

那么这里的代码就应该是这样的:
在这里插入图片描述

然后再说_freelist不为空的情况:
在这里插入图片描述

这里图中也是说了,采用头插的方式,这样效率会更高,我再把图画的详细一点:
在这里插入图片描述

所以_freelist不为空的时候代码应该是这样的:
在这里插入图片描述

但这里实际上else中也是能用在_freelist为空的情况的,不信你看:
在这里插入图片描述

所以代码写简单点:
在这里插入图片描述

不过这里也会有和New中一样的问题,就是T类型的大小可能会小于指针的大小,这里也是先搁置,等一会和New中的一块解决。

New中添加利用回收空间

Delete可以回收还回来的小空间,此时_freelist中的就是还回来的,那么这些空间都还可以重复利用,那么此时就可以在New中重复利用这些空间,那么此时就可以对自由链表进行头删以拿出来第一个块的空间:
在这里插入图片描述

所以在New的开头加上这个:
在这里插入图片描述

剩下原先New中的逻辑就是else了,除了定义obj和return obj:
在这里插入图片描述

解决 sizeof(T) 小于指针大小的问题

假如说sizeof(T)大小为3吧,那其实是可以给它直接开一个完整的指针大小的空间,也不是完全给,主要是让整体结构能保持这里所规定的结构,也就是_freelist得要能派上用场,不然整个逻辑就是有问题的,当然给出来的空间T也不会完全使用上,比如说这里3B,在32位下就是要浪费1B,在64位下就是要浪费5B,前面也说了这里的定长内存池不考虑内存碎片问题,所以为了完成这里的逻辑,代码应该稍微改动一下。

在这里插入图片描述

这里只需要改一下_memory指向的空间就行,_freelist的空间就是来自_memory指向的空间的,不用改。

初始化和清理

还有两个小问题:

  1. New中知识开空间了,没有对开好的空间进行初始化,这里在开好空间后用定位new(replacement new)来调用对应T类型的构造函数是再好不过了。
  2. Delete在回收空间前显示调用一下T的析构函数来进行最后的清理。

所以代码再改一下,对于New:
在这里插入图片描述

对于Delete:
在这里插入图片描述

现在这个定长内存池就基本完美了,代码不多。因为我等会还要改一点,这里暂时先不把所有代码完整给出,等彻底写完了再给出完整的代码。

性能测试

下面来测试一下和malloc相比谁的性能更优,测试代码我就直接给出来了:

struct TreeNode // 一个树结构的节点,等会申请空间的时候就用这个树节点来申请
{
	int _val;
	TreeNode* _left;
	TreeNode* _right;

	TreeNode()
		:_val(0)
		, _left(nullptr)
		, _right(nullptr)
	{}
};

void TestObjectPool() // malloc和当前定长内存池性能对比
{
	// 申请释放的轮次
	const size_t Rounds = 5;

	// 每轮申请释放多少次
	const size_t N = 100000;
	
	// 这里总共申请和释放的次数就是Rounds * N次,测试这么些次谁更快

	std::vector<TreeNode*> v1;
	v1.reserve(N);
	
	// 测试malloc的性能
	size_t begin1 = clock();
	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v1.push_back(new TreeNode); // 这里虽然用的是new,但是new底层用的也是malloc
		}
		for (int i = 0; i < N; ++i)
		{
			delete v1[i]; // 同样的,delete底层也是free
		}
		v1.clear(); // 这里clear作用就是将vector中的内容清空,size置零,
		// 但capacity保持不变,这样才能循环上去重新push_back
	}
	size_t end1 = clock();
	
		
	std::vector<TreeNode*> v2;
	v2.reserve(N);
	
	// 定长内存池,其中申请和释放的T类型就是树节点
	ObjectPool<TreeNode> TNPool;
	size_t begin2 = clock();
	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v2.push_back(TNPool.New()); // 定长内存池中的申请空间
		}
		for (int i = 0; i < N; ++i)
		{
			TNPool.Delete(v2[i]); // 定长内存池中的回收空间
		}
		v2.clear();// 这里clear作用就是将vector中的内容清空,size置零,
		// 但capacity保持不变,这样才能循环上去重新push_back
	}
	size_t end2 = clock();


	cout << "new cost time:" << end1 - begin1 << endl; // 这里可以认为时间单位就是ms
	cout << "object pool cost time:" << end2 - begin2 << endl;
}

在Release版本下测试的结果如下:

Rounds = 5,N = 100000:
在这里插入图片描述

Rounds = 10,N = 10 0000:
在这里插入图片描述

Rounds = 10,N = 100 0000:
在这里插入图片描述

就测试这么多,想再测一点的同学可以自己试试。

通过上面的测试可以看到,这里定长内存池的性能还是很理想的,比普通的malloc要快十倍以上,虽然这里用的是new,但是new底层也是malloc。

malloc是针对各种场景取最优而得来的产物,这里的定长内存池是针对单一的场景更加快速,后面项目中也是会用到这里的定长内存池的。

将malloc换成系统调用接口

不过这里定长内存池也是调用了malloc去申请大块的空间,能不能直接调用系统申请空间的接口呢?
可以的,只不过Windows和Linux下的接口不同,Windows下用的是VirtualAlloc函数,Linux下是brk和mmap,brk是把堆往上推,mmap是从共享区中取虚存。关于系统调用接口我这里就不做过多的详细介绍了,想要了解的同学可以自行查找一下资料。

那这里就修改一下刚刚的代码,不用malloc,用条件编译来整一个我们自己的申请空间的函数:
在这里插入图片描述

稍微解释一点,参数kpage表示页数,然后VirtualAlloc中的第二个参数表示的是你想要申请多少B的空间,这里因为我规定的是kpage表示有多少页,比如说kpage为16,意思就是我想申请16页的空间,此时16左移13,就表示 16 ? 2 13 16 * 2^{13} 16?213,也就是16 * 8KB,我这里把一页当8KB来算,那么这里算出来的就是16页的字节数为 16 ? 2 13 16 * 2^{13} 16?213B。

这里再把New中的malloc改为这里的SystemAlloc就可以了:
在这里插入图片描述

现在就是在直接调用系统调用接口了,跳过中间的malloc,直接去找堆,按照页为单位调内存。

OK,到这里定长内存池就算写完了,没有那么难吧。

定长内存池完整代码

这里把完整代码给出:

#pragma once

/*定长内存池*/

#include<iostream>
using std::cout;
using std::endl;

#ifdef _WIN32
	#include<Windows.h> // Windows下的头文件
#else
	// 这里是Linux相关的头文件,我就不写出来了
#endif // _WIN32

// 直接去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32 // Windows下的系统调用接口
	void* ptr = VirtualAlloc(0, kpage << 12, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	// linux下brk mmap等
#endif

	if (ptr == nullptr)
		throw std::bad_alloc();

	return ptr;
}

template<class T>
class ObjectPool
{
public:
	T* New() // 申请一个T类型大小的空间
	{
		T* obj = nullptr; // 最终返回的空间

		if (_freelist)
		{ // _freelist不为空,表示有回收的T大小的小块可以重复利用
			void* next = *(void**)_freelist;
			obj = (T*)_freelist;
			_freelist = next;
			// 头删操作
		}
		else
		{ // 自由链表中没有块,也就没有可以重复利用的空间
			// _memory中剩余空间小于T的大小的时候再开空间
			if (_remanentBytes < sizeof(T)) // 这样也会包含剩余空间为0的情况
			{
				_remanentBytes = 128 * 1024; // 开128K的空间
				//_memory = (char*)malloc(_remanentBytes);
				
				// 右移13位,就是除以8KB,也就是得到的是16,这里就表示申请16页
				_memory = (char*)SystemAlloc(_remanentBytes >> 12); 
				
				if (_memory == nullptr) // 开失败了抛异常
				{
					throw std::bad_alloc();
				}
			}

			obj = (T*)_memory; // 给定一个T类型的大小
			// 判断一下T的大小,小于指针就给一个指针大小,大于指针就还是T的大小
			size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			_memory += objSize; // _memory后移一个T类型的大小
			_remanentBytes -= objSize; // 空间给出后_remanetBytes减少了T类型的大小
		}

		new(obj)T; // 通过定位new调用构造函数进行初始化

		return obj;
	}

	void Delete(T* obj) // 回收还回来的小空间
	{
		// 显示调用析构函数进行清理工作
		obj->~T();

		// 头插
		*(void**)obj = _freelist; // 新块指向旧块(或空)
		_freelist = obj; // 头指针指向新块
	}

private:
	char* _memory = nullptr; // 指向内存块的指针
	size_t _remanentBytes = 0; // 大块内存在切分过程中的剩余字节数
	void* _freelist = nullptr; // 自由链表,用来连接归还的空闲空间
};

【注】这里定长内存池是不会将空间释放的,这样并不会造成内存泄漏的问题,因为这些空间由某个对象申请,不断地申请归还,只要进程是正常结束的,最后还是会归还所有的空间,因为进程结束之后OS会自动回收整个进程的空间。

高并发内存池

从现在开始才是真正有挑战的地方,也就是本篇的核心中的核心——高并发内存池。

整体框架设计

这里高并发内存池设计三个大部分,也就是可以将整个框架分为三层:
在这里插入图片描述

第一层thread cache,第二层central cache,第三层page cache,这三层在目前的进度来说是完全看不懂的,等后面学的细点了才能看懂,这里只是先对项目做个整体介绍。

现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。

前面说了正常内存池都要考虑两个大问题,一个是性能问题,一个是内存碎片问题,malloc对于这两个问题都考虑了,而tcmalloc还考虑了多线程环境下的锁竞争问题。这一点相对于malloc是更胜一筹的,二者整体设计框架也是完全不一样的。

下面来从理论部分来讲解一下thread cache、central cache和page cache。后面对于这三者我就直接用tc(thread cache)、cc(central cache)、pc(page cache)来表示,方便一点,这里各位记一下对应表示那个缓存。

tc——thread cache

在这里插入图片描述

  • 一个进程中有几个线程,就会有几个tc,也就是每一个线程都会有其对应的tc,可以认为tc就是一个类的对象,内部包含一些数据结构,并保存了一些空间。你可以简单理解成刚刚的那个定长内存池,也是一些数据结构,并保存一些空间,tc的相关细节后面会讲。

  • 如上图所示,假设有三个线程,分别为t1、t2、t3。每个线程去动态申请内存时不需要加锁,因为每个线程独享一个tc,如果tc中有空闲空间,线程在申请的时候只会去自己的tc中申请。tc最大为256KB。

  • 如果单次申请的空间是小于256KB的,线程就在自己的tc中申请,且大部分情况下申请空间是不会大于256KB的,毕竟256KB都已经26万多字节了,这样就可以解决大部分情况下的锁竞争问题,因为自己向自己的tc申请空间不需要加锁。

  • 如果单次申请的空间大于了256KB,tc就会用其他方式去申请空间,具体怎么搞后面再说。

cc——central cache

在这里插入图片描述

  • 当线程的tc空间用完后,会向cc申请空间。
  • 当且仅当两个或两个以上的线程用完自己的tc后再回并发的访问cc去申请空间,但此时不一定会发生线程安全问题。
  • 因为cc内部是由哈希桶实现的,哈希桶有多个,每个桶有一串空间,只有多个线程同时并发申请同一个桶的时候才会有线程安全问题,每一个桶自身会有一个桶锁。
  • 比如说t1申请3号桶的空间,t2申请5号桶的空间,此时是不用加锁的;再比如说t1和t2同时申请4号桶的空间,此时才需要对4号桶的申请加锁,所以就算是多线程并发向cc申请锁也不会有非常激烈的竞争。

cc会在合适的时机回收tc中的对象空间,比如t1原先申请了很多内存,用完之后空闲了很多,那么此时cc就可以回收掉t1的tc中空闲的空间,如果此时t2急着用空间,正好就可以将这些空闲空间给t2。这样就起到了一个均衡调度的功能,不至于说一个人吃得太多而另一个没得吃。

当cc中空间不够时会再向page cache申请。而且如果cc回收回来的内存不急着走时还会交给pc来解决内存碎片的问题。

pc——page cache

在这里插入图片描述

pc中会组织很多个叫做span的结构体,span中会管理多个页大小的空间,且会通过一些方式去标记这些span,当这几个页的内存都回来之后pc就会回收cc中满足条件的span。

page cache根据名字就可以知道其是管理页的,一页就是4KB或者8KB,回收回来的页,如果页号与其它页的页号可以拼成很多相邻页号,就会将这些相邻的页合并成更大的页,通过这样的方式去合并来解决内存碎片问题。

thread cache初步实现

tc框架讲解

回顾一下前面的定长内存池:
在这里插入图片描述

里面有一个_freelist,也就是自由链表,用来管理空闲的小块内存空间,不过这里的_freelist是针对定长情况而言的,可以认为是针对特定对象来申请空间,针对某种对象每次申请固定的长度。

而线程申请空间时可不一定每次都申请固定长度的空间,此时想要满足线程申请的不同大小的空间(比如说3B、6B、18K、236K等)就要对应产生不同的_freelist用来回收对应大小的小块空间。

不过前面介绍了tc中单次能申请的最大空间是256KB,如果这256KB中每一个字节都搞一个自由链表就有点太多了,256都20多万字节了,搞20多万个_freelist没有那个必要。

这里可以选择做一些平衡的牺牲,第一个自由链表都挂8B的(8B主要是能够保证在64位下能取出来一个指针的空间,方便挂在_freelist后面),第二个自由链表都挂16B的,再往下就是24B、36B……256KB,就像图中这样:
在这里插入图片描述

不过实际上并不是都是两两相隔8B,如果只是简单的两两相隔8B最后也是有3万多个_freelist,还是太多了,不过这里先简单这样记,后面实现的时候会详细说。

那么这样的话,如果线程还回来的空间小于等于8的就放在第一个_freelist后面,如果大于8但是小于等于16那就放在第二个_freelist后面,同理往下…… 申请空间的时候也是这样划分范围,例如线程申请5B就给8B,要8B还是给8B,要10B就给16B,要15B就给16B,要16B给16B……每次都是对齐着给。

【注】虽然这时会存在一定程度上的空间浪费,但是多给一定是比少给好的,比如说假设你家里很有钱,有天你想要买衣服,你给你妈说我想买件300块的裤子,你妈一听感觉太少了,直接给了你500,那你不就乐开了花了么,要是你妈只给了你50让你去地摊上淘一淘,你肯定是很难接受的。

所以多给一点是OK的,而这里5B给8B,6B给8B,12B给16B,都会浪费一点空间,这些内存空间都是在用的地方碎片化了,这里的碎片就是内碎片,也即因为一些对齐的原因而产生的一些用不上的空间,这就是内碎片。

前面讲的外碎片是一片连续的空间被切分成好多块分出去,只有部分还回来了,但是它们不连续,导致虽然有足够的空间,但是申请不出想要的大块空间。

ok,那现在如果线程想要size大小的空间,就会向tc去申请,而tc内部其实就是哈希桶的结构,哈希中最重要的就是映射么,那第一个位置映射8B的_freelist,第二个位置映射16B的_freelist……一直到256KB:
在这里插入图片描述

每来一个size都要有一套规则去计算对应的桶在哪里,比如要20B就找对应24B的桶,看24B对应的自由链表下面有没有挂空闲的空间,如果有了就拿一块给对应线程,如果没有就向下一层的central cache申请。

tc的大致框架就是这样,下面就来写写代码。

tc代码实现

头文件给个ThreadCache.h:
在这里插入图片描述

因为后面还会有central cache、 page cache等其他头文件,所以这里搞一个Common.h用来把一些要包的头文件都放到这个Common.h里面,然后其他头文件只要引这一个Common.h头文件就可以了:
在这里插入图片描述

ThreadCache.h中定义同名的类,类中要提供两个接口,一个Allocate用来申请空间,一个Deallocate用来回收空间:
在这里插入图片描述

然后tc中用到的数据结构是一个哈希表,而哈希表中每个桶可看做是自由链表,所以这里先来实现一下这些基础的数据结构。

FreeList自由链表类

先来说自由链表,和前面定长内存池中的差不多,不需要搞链表节点,空间就是节点,这里把自由链表实现放在Common.h中,后面cc也会用到,类名就叫FreeList,里面一个指针,然后两个接口,一个Push用来回收块,一个Pop用来提供块:
在这里插入图片描述

回忆一下前面定长内存池是怎么搞的,就是进行头插和头删。

Push:
在这里插入图片描述

Push代码:
在这里插入图片描述

不过这里的*(void**)可读性不太高,来专门搞一个接口,用来返回这个值:
在这里插入图片描述
注意这里返回值记得要给成引用,因为后面会对这个返回值进行赋值,也就是这样:

ObjNext(obj) = _freeList;

如果没有引用,返回的是一个右值,因为ObjNext返回值是一个拷贝,是一个临时对象,而临时对象具有常属性,不能被修改,也就是一个右值,右值无法进行赋值操作:
在这里插入图片描述
只有在加了引用之后才可进行赋值,加了引用等于是直接把那一块空间的地址搞回来了,可以对obj中的前4/8个字节直接修改:
在这里插入图片描述
这样写出来的代码可读性更好一点。

ObjNext用static修饰,防止多个.cpp文件重复包含该Common头文件导致链接时产生冲突

Pop:
在这里插入图片描述

代码:
在这里插入图片描述

为了让代码更严谨一点,加上一些assert:
在这里插入图片描述

tc中的哈希结构

刚刚也说了,tc中要有一个哈希表,每个哈希桶都是一个自由链表,那就可以给一个存放FreeList的数组,不过数组开多大呢?

在这里插入图片描述

这是个问题,前面说了不能两两相隔8B,这样还是会有太多的哈希桶,所以真正的对齐规则应该是这样的:
在这里插入图片描述

我来解释一下,size范围就是指线程申请的空间范围,[1, 128]就是指申请空间在1~128B以内的,对齐数就是8B,也就是如果申请3B,就要对齐到8B,如果申请的是13B就要对齐到16B,就像这样找到对应大于size的8的最小倍数。对齐到几B就给线程提供几B的空间。

那么对应哈希桶下标范围也就很好理解了,下标为0的哈希桶(自由链表)连接的就是大小为8B的块空间,下标为1的哈希桶(自由链表)连接的就是大小为16B的块空间,下标为2的哈希桶(自由链表)连接的就是大小为24B的块空间……

[128 + 1, 1024]就是指申请空间在129~1024B以内的,对齐数就是16B,也就是如果申请130B,就要对齐到128 + 16 = 144B,也就是申请130B就会给线程144B。下标为16的哈希桶连接的就是144B的块空间。

再往下就不说了,都是一样的规律,这里是对tcmalloc中的对齐规则做了化简,实际tcmalloc中是要比这里复杂一点的,但大差不差。

那么我来计算一下空间浪费率,对于1~128字节的,这里浪费率会比较高,比如1B的,开8B,浪费了7/8的空间,不过这里是小空间,问题不大。每个范围中浪费率最高的是第一个,也就是1B、128+1B、1024+1B、8*1024+1B、64*1024+1B,这些浪费率是最高的,因为每个范围中的size,越往后,分母(分配的空间)越大,而浪费的空间是定的,我就算一下这剩下的四个:

128 + 1B空间浪费率:
在这里插入图片描述
1024 + 1B空间浪费率:
在这里插入图片描述

8*1024 + 1B空间浪费率:
在这里插入图片描述

64*1024 + 1B空间浪费率:

这里都是将所有的空间浪费率控制在10%左右。

这里最重要的就是知道为什么要对齐以及如何对齐,前期对齐数小一点,后面对齐数大一点,减少桶的个数,并保持空间浪费率在10%左右。

所以按照这样的规则对齐的话,最终总共只需要208个桶。直接在代码中定义一个FREE_LIST_NUM表示哈希表中自由链表的个数:
在这里插入图片描述
C++中尽量避免用宏,与之替换的就是static const。

又因为这里单次申请的最大空间不会超过256KB,所以再定义一个MAX_BYTES:
在这里插入图片描述

ThreadCache中:
在这里插入图片描述

SizeClass类

当线程申请一个size时,要计算出对应对齐之后的字节数,那么专门搞一个SizeClass类来计算对齐后的字节数,内部提供接口:
在这里插入图片描述

SizeClass中的接口搞成静态的,虽然其中的函数是被封装了的,但要独立使用。

计算的时候需要根据size来进行分区,就是[1, 128]、[128 + 1, 1024]……在哪一个区中,然后再根据对应的区来计算对应的对齐后的字节数。

那么分区:
在这里插入图片描述

因为每个分区对应的计算对齐后的字节数逻辑都很像,所以这里直接给一个子函数,用来计算对应分区中size对齐后的字节数:
在这里插入图片描述

然后RoundUp中每个分区调用这个子函数:
在这里插入图片描述

这样就能最终求出来size对齐之后的字节数是多少了。

不过子函数_RoundUp还有其他写法,直接用二进制写的:
在这里插入图片描述

大佬写的东西,直接用二进制来搞,效率会高一点,这里我来个例子演示一下:
在这里插入图片描述

剩下的就不演示了,反正是对的,各位自己想算的可以自己算算。

这里只是把size对齐后的字节数算出来了,线程在申请size的时候还要计算出该字节数应该对应到哪一个哈希桶,因为这里实现哈希是直接用数组搞的,所以就是要求出对应的下标。所以还要在SizeClass中加一个Index接口用来求size对应下标,我这里就直接把代码给出来了:

// 求size对应在哈希表中的下标
static inline size_t _Index(size_t size, size_t align_shift)
{							/*这里align_shift是指对齐数的二进制位数。比如size为2的时候对齐数
							为8,8就是2^3,所以此时align_shift就是3*/
	return ((size + (1 << align_shift) - 1) >> align_shift) - 1;
	//这里_Index计算的是当前size所在区域的第几个下标,所以Index的返回值需要加上前面所有区域的哈希桶的个数
}

// 计算映射的哪一个自由链表桶
static inline size_t Index(size_t size)
{
	assert(size <= MAX_BYTES);

	// 每个区间有多少个链
	static int group_array[4] = { 16, 56, 56, 56 };
	if (size <= 128)
	{ // [1,128] 8B -->8B就是2^3B,对应二进制位为3位
		return _Index(size, 3); // 3是指对齐数的二进制位位数,这里8B就是2^3B,所以就是3
	}
	else if (size <= 1024)
	{ // [128+1,1024] 16B -->4位
		return _Index(size - 128, 4) + group_array[0];
	}
	else if (size <= 8 * 1024)
	{ // [1024+1,8*1024] 128B -->7位
		return _Index(size - 1024, 7) + group_array[1] + group_array[0];
	}
	else if (size <= 64 * 1024)
	{ // [8*1024+1,64*1024] 1024B -->10位
		return _Index(size - 8 * 1024, 10) + group_array[2] + group_array[1]
			+ group_array[0];
	}
	else if (size <= 256 * 1024)
	{ // [64*1024+1,256*1024] 8 * 1024B  -->13位
		return _Index(size - 64 * 1024, 13) + group_array[3] +
			group_array[2] + group_array[1] + group_array[0];
	}
	else
	{
		assert(false);
	}
	return -1;
}

然后再回来写ThreadCache中的Allocate。

ThreadCache::Allocate

这里搞一个ThreadCache.cpp专门用来实现ThreadCache的接口:
在这里插入图片描述

Allocate申请size大小空间的时候首先就是用刚才写的两个函数RoundUp和Index来求出size对齐后的字节数和在哈希表中的下标:
在这里插入图片描述

这两个搞好之后就简单了,下面就是去对应哈希桶里面拿空间就行。

如果对应哈希桶里面有空间就直接从哈希桶里面拿(调用_freeList中的Pop就行),如果没有了那就是ThreadCache中不够了,得向下一层的CentralCache拿,不过CentralCache还没有实现,这里就先简单给一个让 ThreadCache 向 CentralCache 申请空间的接口FetchFromCentralCache:
在这里插入图片描述
至于为什么是这两个参数,这里还不能解答,因为CentralCache还没实现,后面等我讲到的时候就知道了。ThreadCache.cpp中给出函数实现(只返回一个空,等会方便测试一下):
在这里插入图片描述

Allocate申请空间的时候如何判断size对应哈希桶中还有没有空间呢?
很简单,通过成员_freeLists,下标index已经找到了,那_freeLists[index]就是对应的哈希桶,再给FreeList类中添加一个接口Empty来判断自由链表是否为空就行:
在这里插入图片描述

然后把Allocate中的代码补充完整:
在这里插入图片描述

这里FetchFromCentralCache第二个参数直接给alignSize,就是指tc向cc申请空间的时候就不需要考虑对齐的问题了,直接申请整块的大小。

ThreadCache::Deallocate

再来把Deallocate完善一下:
在这里插入图片描述

就这么简单。

但其实还有一些东西没搞的,前面讲ThreadCache基本框架也说了,当tc中有空闲空间的时候是可以将空闲空间还给cc的,所以这里Deallocate中还是有一些代码没写的,这些代码得等后续cc实现了之后再考虑。

TLS——thread local storage

这里要说的TLS不是网络中的那个TLS。这里的TLS是thread local storage,也就是线程的本地存储。

前面讲了每个线程都会有一个自己的ThreadCache,但是如何实现呢?
我在讲Linux的时候说过,一个进程可能有多个线程,多个线程几乎是共享整个进程的虚拟地址空间,每个线程有独立的栈、寄存器等独有的空间或数据,这里想要每个线程都有一个ThreadCache,那如何让线程与线程之间的ThreadCache不会相互影响呢?如何控制某个ThreadCache一定属于某个线程呢?一个进程要创建多少个ThreadCache呢?一堆杂七杂八的问题。

这些问题只需要通过TLS就能解决,线程的本地存储。

进程的全局变量是每个线程共享的,那有没有一种全局变量能让某个线程自己独有但是其他线程看不见呢?答案是有的,就是TLS。线程局部存储(TLS),是一种变量的存储方法,这个变量在它所在的线程内是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性,避免了一些加锁操作,控制成本更低。

Linux下和Windows下各自有各自的TLS,我就不细讲TLS相关的知识了,等会只需要用一个声明就可以,这里给出两篇文章,感兴趣的同学可以看看:
TLS 线程局部存储(Windows)
linux gcc下 tls

这里我用的是vs2019,所以就是要用到Windows下的TLS,TLS分静态的和动态的,动态的用着复杂了点,还要调用接口,静态的只需要在定义对象的时候前面加个_declspec(thread)_声明就可以了。

这里再对代码修改修改。

首先直接在ThreadCache.h中定义一个TLS的全局对象:
在这里插入图片描述
这样每个线程都会在最初的时候创建一个全局的pTLSThreadCache指针,相互独立,用这个全局指针进行的任何操作都不会对其他线程产生干扰。

到此ThreadCache就已经大体实现完毕了,不过有很多细节上的东西要做优化,等后期再详谈这些东西。

线程申请和释放空间的接口

整个项目线程要直接调用的就是两个接口,一个叫做ConcurrentAlloc,用来申请空间,一个叫做ConcurrentFree,用来释放空间。

先来搞一个头文件ConcurrentAlloc.h,里面提供这两个接口:
在这里插入图片描述
这里注释也写了,ConcurrentAlloc其实就是原项目中的tcmalloc,这两个接口使用起来就和malloc和free一样,调用malloc传参就是传一个大小,调用free就是传一个指针。这里的两个函数同理,等于是最外部申请和释放空间的接口。

来简单实现一下,主要是为了测试前面的代码能跑通不。

ConcurrentAlloc

这里是线程会直接调用这个ConcurrentAlloc函数,内部实现的时候首先要找对应的ThreadCache,先通过ThreadCache来申请空间,此时刚刚定义的TLS全局的ThreadCache指针pTLSThreadCache就排上用场了,直接通过pTLSThreadCache指针new一个对象出来:
在这里插入图片描述

那么此时就可以直接通过这个pTLSThreadCache来调用ThreadCache中的相关接口了,其实就只需要调用一个接口,那就是ThreadCache中的Allocate:
在这里插入图片描述
可能有的同学有点懵,其实这里就是线程向tc申请,tc中的不够的时候再向cc申请,cc不够的时候再向pc申请,因为这里代码写的不全,所以逻辑上差的比较多,等后面CentralCache代码实现的差不多了就能摸索到其中的门道了。

再来搞一个专门用来单元测试的文件UnitTest.cpp,里面来搞两个线程来测试一下这里的代码能跑不能,两个线程都是调用ConcurrentAlloc来申请空间:
在这里插入图片描述

【注】这里我在调试的时候出了点问题,并行监视窗口添加不了变量,没法直接查看线程id和对应的pTLSThreadCache,所以我这里只能将就着用鼠标放在pTLSThreadCache旁边才能看到线程id和对应pTLSThreadCache:

在这里插入图片描述

这里将这两个线程的执行流大致给出:
在这里插入图片描述

在这里插入图片描述

我再在ConcurrentAlloc中加上打印线程id和pTLSThreadCache的值,并修改一下两个线程的执行顺序:
在这里插入图片描述

打印结果:
在这里插入图片描述

这里是重新编译了,所以得到的线程id和空间和刚刚图中的不一样。

这里测试ConcurrentAlloc主要就是验证一下定义的TLS变量不需要加锁就能帮每个线程申请独有的ThreadCache对象。

ConcurrentFree

再来写写ConcurrentFree:
在这里插入图片描述

这里有点问题,因为我前面ThreadCache中Deallocate给了两个参数,这里第二个参数不能确定,但是为了测试这里就先给ConcurrentFree多给一个参数size,不然这里跑不了,后面在写代码的时候再将Concurrent的第二个参数去掉:
在这里插入图片描述
这个就不测试了,Deallocate代码实现欠缺的有点多,得后面相关逻辑补充的差不多了才能测试。

ThreadCache讲的差不多了,来接着讲讲第二层的CentralCache。

CentralCache初步实现

cc与tc的相同之处

前面说了线程像tc申请空间,如果tc中不够了就需要向cc申请空间。上面实现ThreadCache的时候留下了一个FetchFromCentralCache的接口,这里讲完CentralCache就能实现一下这个接口了。

tc和cc还是很相似的,cc内部其实也是哈希的结构,也是根据块大小来映射,甚至二者的映射规则都是一样的,这样就会有很多好处,比如当tc对应哈希桶没有空间的时候可以直接找cc中相同下标的哈希桶,如tc的1号桶(16B)不够了就可以直接从cc中1号桶拿,这样一一对应寻找起来更加方便。

cc与tc的不同之处

线程向tc申请空间是自由的,不需要加锁,但是当多个tc相同位置的哈希桶没有空间的时候会并发向cc相同位置的哈希桶申请空间(因为cc在整个进程中只有一个),此时就需要加锁。

  • 比如说t1线程向其tc的2号桶(24B)申请空间时该桶为空,且同时t2线程向其tc的2号桶(24B)申请空间该桶也为空,那么此时二者就都会并发向cc的2号桶申请空间,这时候就要加锁了,谁先拿到锁谁就先获取到空间。

前面讲整体框架的时候说过cc中会有桶锁,意思就是每一个桶都会有一把锁,只有当多个线程向同一个桶申请空间的时候才会发生竞争,此时才需要加锁,否则就不需要加锁。

  • 比如t1线程向其tc的2号桶(24B)申请空间时该桶为空,且同时t2线程向其tc的0号桶(8B)申请空间该桶也为空,那此时t1会去找cc中的2号桶,t2会去找cc中的0号桶,二者并不会竞争同一个桶上的空间,那么就不需要加锁。

除了cc比tc多了锁,二者还有不一样的地方。

tc中自由链表挂的是一个个空间块,而cc中自由链表挂的是一个个span(跨度)结构体,如图:
在这里插入图片描述

span是管理以页为单位的大块内存,也就是说span中可以有多个页。span结构体中有一个size_t类型的_n成员,这个成员表示的就是span管理了多少页。

span挂在哪个桶下面就会将span总的空间划分成多个对应桶表示的字节大小的空间。

  • 比如说span挂在0号桶下面,就会被划分成多个8B的小空间,也就是上面图中那样,多个小块的空间还会用链表连起来。span结构体中会有一个void* _list来表示划分好的多个小空间的链表的头节点。当tc空间不够向cc申请时,就是从span管理的小块空间中拿小块空间的。

每个桶下挂的span所包含的页数是不同的,桶对应代表的字节数越小,页数也就越少,代表字节越大页数也就越大。

  • 如0号桶代表的是4B的块,那一个span一两页就够了,假设按照一页4KB来算,那一页也都可以划分成512个4B的块了,两页都1024个了。
  • 再比如最后一号桶为256KB,按照一页4KB的话,那这一个span就要多给些页才能表示一个256KB。

那么要管理这些页,还要区分多个span所管理的页是从第几页到第几页,所以span还要有一个_pageID(我这里故意没有给_pageID的类型,等会细说)字段来表示当前span所管理的开始页是几号页。

可以看到图中span串起来的箭头是双向的,而小块空间的都是单项的,因为span实现之后其实是一个双向链表,这样在对span链表进行增删查改的时候会更方便一些。

  • 因为span是CentralCache中的么,前面说了CentralCache既要给tc分配空间,还要回收tc中空闲的空间,同时cc空间不够的时候还要向pc申请空间,而且cc中有span空闲的时候还要还给pc,这样cc在中间就是起一个承上启下的作用,对其相关的操作效率要高。所以span中还有两个字段,那就是span* _prev和span* _next;

span中还有一个字段要介绍,size_t _usecount。

  • 这个_usecount是用来记录当前span分配出去了多少个块空间,分配一块给tc,对应就要++use_count,如果tc还回来了一块,那就- -use_count。_usecount初始值为0。
  • 当span中的use_count为0的时候可以将其还给pc以供pc拼接更大的页,用来解决内存碎片问题(外碎片)。

那么当前所讲的span中共有这么些字段:
在这里插入图片描述

上面span还有部分字段没有给出来,因为目前阶段暂时用不上,直接给出来的话会有些突兀,等后面讲相关的内容的时候会再补上一些字段。

所以span就是一个管理多页块空间的结构体,多个span组成一个双向链表,一个双向链表就是cc哈希表中的一个哈希桶。

这些等下面实现的时候就知道了,现在先了解一下。

cc代码实现
文件创建

搞两个文件,一个CentralCache.cpp,一个CentralCache.h,搞一个CentralCache类,两个文件一个实现一个声明:
在这里插入图片描述

Span实现

和tc一样,实现CentralCache前先将基础的数据结构给出来。

首先就是span,说说刚刚遗留的一个问题,就是PageID,也就是成员_pageID的类型,那这个类型是啥类型呢?

这个问题是根据平台而定的,这里假设一页是8KB。

  • 32位下,一共有 2 32 / 2 13 ? 2 19 2^{32} / 2^{13} ? 2^{19} 232/213?219 个页,当然这么多页不可能完全申请,用size_t类型是完全能够表示这所有的页的。
  • 但是如果是64位下的呢?一共会有 2 64 / 2 13 ? 2 51 2^{64} / 2^{13} ? 2^{51} 264/213?251 个页,这页数可就比32位下多了太多了,普通的size_t类型是表示不了这么多页的。

所以这里要依据平台位数而决定_pageID的类型,不然平台一换就会出问题。这里就要用到条件编译来实现不同平台下类型的转换,等会就实现。

先来在Common.h中实现一下span,因为PageCache中也会用到这个span。
在这里插入图片描述

这里的PageID怎么用条件编译来搞呢?
其实很简单,通过_WIN32和_WIN64就可以(我这里用的是Windows的,Linux下的就不搞了,Linux判断多少位的可以用__x86_64啥的):
在这里插入图片描述
这里x86就指的是32位的,可以看到高亮的就是size_t,

换成x64的:
在这里插入图片描述
好像出现了点小问题,不要慌,我来给你解释一下,再64位下其实不止定义了一个_WIN64的,还定义了一个_WIN32的,所以这里就算切换成了64位的还是会有_WIN32,那判断的时候就直接走ifdef了,所以这里条件编译写法有点问题。

怎么解决呢?很简单,调换一下位置就行:
在这里插入图片描述
可以看到这里就是unsigned long long的高亮了。

还有Linux的我就不写出来了:
在这里插入图片描述

换回32位,此时PageID就是size_t的:
在这里插入图片描述

_list改一下名字吧,就叫_freeList,和前面ThreadCache对上,因为这里_list就是挂的小块空间么,和FreeList结构体中的_freeList功能一样:
在这里插入图片描述

SpanList实现——CentralCache中的哈希桶

然后来搞一个SpanList,就是一个以Span为基本单位的带头双向循环链表,因为这个结构可以说效率是最高的,关于双链表这种基础上数据结构我就不讲了,我前面的博客也是模拟实现过STL中的list的,不懂的同学可以看看我这篇博客:【C++】手把手教你模拟实现list的基本功能

这里SpanList就是CentralCache中的哈希桶,因为要搞带头的,直接搞一个Span成员,构造函数里面创建一个哨兵位的头结点:
在这里插入图片描述

这里就没必要搞什么迭代器了,重点不在遍历上,就给两个接口就行,一个进行插入Span的Insert:
在这里插入图片描述

相关细节就不说了,很简单。

一个进行删除Span的Erase:
在这里插入图片描述
前面说了Span是可以让PageCache回收的,不过PageCache还没实现,所以相关代码先保留。

然后CentralCache类中就需要一个以SpanList为元素的数组,以表示cc中的哈希结构:
在这里插入图片描述

前面说CentralCache中的哈希和ThreadCache中的哈希映射规则一样,所以等会可以直接复用SizeClass类中的接口,也说了CentralCache中每个哈希桶都要有一个桶锁,那么对于这里的哈希桶每个桶就是SpanList,也就是要对每一个SpanList提供一个锁,那就直接在SpanList类中添加一个互斥锁成员就行(记得引头文件):
在这里插入图片描述
这样就正好,没必要再在CentralCache类中搞一堆锁了。

饿汉创建单个CentralCache

前面TreadCache是每个线程都有一个,通过TLS实现了这个一点。而整个进程中CentralCache是只有一个的,想要让所有tc都能访问到这一个cc对象,那用单利模式来实现这一个CentralCache是再好不过了。单例我也就不细讲了,这里直接用,不懂的同学可以看看我前面的博客。

懒汉模式有点麻烦,这里就直接用一个饿汉模式就够了,在cc中搞一个静态的对象成员:
在这里插入图片描述

注意静态成员需要在类外声明,类外初始化,但是这里不要直接在.h中初始化,因为可能会有多个cpp文件包含这里的CentralCache.h,如果在.h中初始化了就会发生连接错误,所以要专门在.cpp中初始化:
在这里插入图片描述

然后构造函数私有、delete掉拷贝构造和复制构造,并提供GetInstance接口,返回_sInst的地址:
在这里插入图片描述

下面来写写tc与cc交互的逻辑,也就是ThreadCache中留下的那个接口FetchFromCentralCache。

ThreadCache::FetchFromCentralCache(size_t index, size_t alignSize)

先来回忆一下。

线程向tc申请空间,tc对应哈希桶中没有空间的时候就要向cc对应哈希桶中申请空间,那么此时就要从cc对应哈希桶中的span中取空间,并给线程提供一块空间。看图:
在这里插入图片描述

不过给tc提供空间要确定一次提供多少,不能多也不能少,如何解决呢?

这里采用慢开始反馈调节算法,就像网络中的拥塞控制一样,刚开始少给一点,如果tc对于单个大小的空间块需求次数在不断增多,那就不断增加单次提供的块数。

  • 比如说线程对于16B(对应1号桶)空间需求较多,那么当tc的1号桶中没有空间块的时候就要向cc的1号桶中的span申请空间。
    • 假如tc第一次向cc的1号桶申请空间时,cc只先给一块16B的空间,并标记一下对应线程的tc向当前1号桶中申请过了1次空间,下次这个tc申请1号桶的时候就多给一块,也就是两块;
    • 那么如果当第二次同样线程的tc过来且还是申请1号桶的空间,那cc根据前一次的统计,发现这个tc之前申请空间的时候给了一块,那这次又来了,可能后面还会再来,那就再在前一次的基础上多给这个tc一块,所以这次就给了这个tc两块16B的空间,同时也做一下标记。
    • 那么如果当第三次同样线程的tc过来且还是申请1号桶的空间,那cc根据前一次的统计,发现这个tc之前申请空间的时候给了两块,那这次又来了,可能后面还会再来,那就再在前一次的基础上多给这个tc一块,所以这次就给了这个tc三块16B的空间,同时也做一下标记。
    • ……
  • 同样的道理,其他块大小的空间也是这样。

不过还要注意一点,不能说次次来次次都要多给一块,cc单次提供的某一大小的块数也应该是有个上限的,一直增长有可能出现后续对于某一空间大小的块单次开的块数过多而导致浪费过多空间的情况,这里等会实现的时候再细说。

那么来写写FetchFromCentralCache这个函数。刚刚讲了两个东西,主要就是为了实现慢开始反馈调节算法。

首先得要有一个能度量tc档次应该申请固定空间块的块数,空间块的大小由哈希桶所在下标的位置来决定,那也就是对应自由链表来决定,那么这个量就应该有对应需求块大小的自由链表来提供,所以在FreeList中添加一个成员size_t _maxSize用来表示未达到上限时当前能够申请的最大块空间是多少,并提供一个MaxSize接口返回该值:
在这里插入图片描述

这里tc在申请块空间的时候,cc会根据MaxSize来决定应该为tc提供多少块固定空间块。初始值给成1就是第一次提供一块空间。MaxSize的返回值给成引用是为了让申请过一次之后就让_maxSize++一下,好进行下一次的判断。

再来实现一下单次申请上限的算法,直接写在SizeClass类中:
在这里插入图片描述

通过这两个东西就可以控制单次申请的块数,下面就来实现FetchFromCentralCache:
在这里插入图片描述

到此就知道了cc本次要给tc提供多少块空间了,然后就是让cc拿出来这么些块空间给tc。不过这个操作也可以直接写成一个函数,名字就叫做FetchRangeObj,意思就是拿出一段范围的obj空间。

  • 这个函数就是从CentralCache对应index下标的哈希桶中拿出batchNum块大小为alignSize的块空间(batchNum是上面图中min的返回值,alignSize是上面图中函数的第二个参数)。

  • 而这个函数应该返回来大小为 batchNum * alignSize 的一段空间,而这一段空间就是从对应index的SpanList中挑出一个Span,然后再从Span中挑出大小为 batchNum * alignSize 的一段空间。

  • 不过此时有可能会出现Span中空间不足以提供这么多的情况(单个Span中的小块空间可能是被多个tc拿走的,那么就可能出现某个tc要的时候不够的情况,此时有多少就给多少,或者是完全没有的时候cc就会去找pc要)。

  • 所以此时就算不够还是要返回一段空间的,那么如何确定返回了多少块呢?
    规定一下返回值返回的是实际提供的大小为alignSize的空间的块数,并且应该给两个指针的参数,一个void* start,一个void* end,用来划定cc所提供的空间的开始和结尾,所以这个函数声明应该长这样:
    在这里插入图片描述
    注意这里参数中start和end给了引用,就是要做输出型参数,方便改传入的指针,当然也可以给二级指针,不过没有引用用起来方便。

这里的这个函数先不实现,先来继续看FetchFromCentralCache,里面调用FetchRangeObj就是这样:
在这里插入图片描述

然后再根据actualNum来判断后续处理动作。

刚刚说了actual可能实际上不会等于batchNum,因为单个span空间可能不够,不过FetchRangeObj一定能保证这里actualNum一定是大于等于1的,这一点在后续代码实现的时候就知道了,这里不做解释。

那么分配回来的空间就是[start, end],这里还需要把这段空间放到tc的对应自由链表后面,此时tc对应index位置的自由链表一定是空的,因为只有为空时才会走这里的FetchFromCentralCache,此时插入只需要调用FreeList中的push就可以。看图:
在这里插入图片描述

图中就是4块8B的空间,那么此时start和end就是这样:
在这里插入图片描述

但前面tc中Allocate的实现逻辑是向cc申请空间时不但要给tc对应的自由链表提供多个块空间,还要同时直接给线程返回一小块空间,这里我怕有的同学可能已经忘了,我再把截图给出来:
在这里插入图片描述

所以应该给tc对应自由链表中插入的是[ObjNext(start), end],然后给线程返回的是start:
在这里插入图片描述

那就要再给FreeList提供一个接口用来插入多块空间的接口(其实也可以只用Push,但是一个一个Push有点麻烦,不如直接将整条链直接链接到自由链表中):
在这里插入图片描述

那么此时就应该考虑一下actualNum是否为1了,为1就不需要给tc的自由链表Push啥的了,直接将这一块空间给线程即可,不为1的时候才需给tc对应自由链表插入[ObjNext(start), end]的空间,故代码应该这样写:
在这里插入图片描述

整个FetchFromCentralCache的代码给出来:

// ThreadCache中空间不够时,向CentralCache申请空间的接口
void* ThreadCache::FetchFromCentralCache(size_t index, size_t alignSize)
{
	// 通过MaxSize和NumMoveSie来控制当前给tc提供多少块alignSize大小的空间
	size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(alignSize));
		/*MaxSize表示index位置的自由链表单次申请未到上限时,能够申请的最大块空间是多少*/
		/*NumMoveSize表示tc单次向cc申请alignSize大小的空间块的最多块数是多少*/
		/*二者取小,得到的就是本次要给tc提供多少块alignSize大小的空间*/
		/*比如说alignSize为8B,MaxSize为1,NumMoveSize为512,那就要给一块8B的空间*/
		/*也就是没到上限就给MaxSize,到了上限就给上限的NumMoveSize*/

	if (batchNum == _freeLists[index].MaxSize())
	{ //如果没有到达上限,那下次再申请这块空间的时候可以多申请一块
		_freeLists[index].MaxSize()++; // 下次多给一块
		// 这里就是慢开始反馈调节的核心
	}

	/*上面就是慢开始反馈调节算法*/

	// 输出型参数,返回之后的结果就是tc想要的空间
	void* start = nullptr;
	void* end = nullptr;

	// 返回值为实际获取到的块数
	size_t actulNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, alignSize);
	
	assert(actulNum >= 1); //actualNum一定是大于等于1的,这是FetchRangeObj能保证的

	if (actulNum == 1)
	{ // 如果actulNum等于1,就直接将start返回给线程
		assert(start == end);
		return start;
	}
	else
	{ // 如果actulNum大于1,就还要给tc对应位置插入[ObjNext(start), end]的空间
		_freeLists[index].PushRange(ObjNext(start), end);

		// 给线程返回start所指空间
		return start;
	}
}

再来写FetchRangeObj。

CentralCache::FetchRangeObj

这个函数作用就是让cc拿出来一段空间来提供给tc,再来看一下声明:
在这里插入图片描述

第一步要算出来size所在桶的对应下标,直接调用前面SizeClass类中的Index接口就行,假设算出来的下标就是index:
在这里插入图片描述

那么此时_spanLists[index]位置处所挂的span情况是不能确定的,大致可分为下面几种(以8B块空间为例):

有span且span所管理的空间不为空:
在这里插入图片描述
当前情况下,直接获取到这个管理空间非空的span即可。

有span但span所管理的空间为空:
在这里插入图片描述
当前情况下需要向pc申请一个新的span。

没有span(双向循环的链表,_head->next = _head):
在这里插入图片描述

当前情况下需要向pc申请一个新的span。

其他的情况都是在这三种情况之上,无非就是多挂了几个span。

但是不管怎么样,都要获取到一个管理空间不为空的span,如果没有就需要向pc申请,所以再提供一个接口,用来获取一个管理空间不为空的span:
在这里插入图片描述
至于为啥传这两个参数等会说,这个函数的逻辑先不写,再试先知道这个函数就是获取一个非空span就行,先实现成返回一个nullptr:
在这里插入图片描述

继续来写FetchRangeObj,因为整个进程中只有一个cc,而且可能会有多个线程向同一个cc中的SpanList申请span中的块空间,所以要对SpanList的操作加锁,而获取到index之后就要访问特定SpanList了,所以下面就要开始加锁了:
在这里插入图片描述
在这里插入图片描述

然后就是获取到一个管理空间非空的Span:在这里插入图片描述

这里把参数n改一下,改成batchNum,不然和传入的参数名字不一样,不太好辨认:
在这里插入图片描述

然后就是在获取到的span中取出batchNum个大小为size的空间,如果不够batchNum个就有几个取几个,首先就是让start和end都指向span的_freeList,也就是第一块size大小的块空间:
在这里插入图片描述
然后让end往后挪动batchNum - 1块空间,如图:
在这里插入图片描述
这样start和end就指向了一段batchNum个size大小的块空间。然后把这块空间给ThreadCache,然后让Span的_freeList指向end的next就可以了:
在这里插入图片描述

不过上面是当Span中有足够的块空间的时候才能获取到batchNum个,还存在不够batchNum个的时候,也就是这样(假设此时Span中只有两块size大小的块空间):
在这里插入图片描述

此时如果要4块的话就不够,所以应该是当end的next为空的时候就要停止了,不然就越界了:
在这里插入图片描述

所以此时返回的就是这两块size大小的空间,这里还要记录一下end走了多少块,以便作为函数的返回值返回(函数返回值为实际获取到的块个数),同样的,也要将Span的_freeList指向end的next:
在这里插入图片描述

那把代码写一下:
在这里插入图片描述
这里actualNum为啥从1开始可以自己推一下,我就不说了。

这里没法测试,申请空间的逻辑还差不少,PageCache还没写,cc还要向pc申请span,所以暂时编译能通过就行:
在这里插入图片描述

另外释放空间的逻辑(严格来说是回收空间的逻辑)也没写,也就是tc中的Deallocate,前面说了tc中空闲的空间可以还给cc,cc中空闲的span可以还给pc,但这里先不考虑回收空间的事,先把申请空间的写完再搞回收空间,这样也更容易理解回收空间的逻辑。

下面来讲PageCache。

PageCache初步实现

pc和cc一样,二者核心结构都是以SpanList为哈希桶,但是二者还有一些本质区别。

PageCache框架介绍

cc的哈希桶和tc的哈希桶映射规则一样。而pc中的SpanList则是按span页数进行映射的,也就是说第i号桶中挂的span都是i页内存,如图:
在这里插入图片描述

span中有多少页,就会挂在第几号桶,我来标注一下:
在这里插入图片描述

所以pc中的哈希是直接按照页数来映射的,而且span的内部不会再切分成小块空间,也就是不会按照8B啥的块切分了。每次cc向pc要到新的span之后,需要cc自己将span切分。

span管理的页数在[1, 128]这个范围内。假设一页有8KB,那这里128页已经1M了,最大的块空间也就256K,也就是说最大的span可以提供4个256KB的块,完全够用了,如果想要span管理的空间再大一些也是可以的,但这里就按照128页来搞。

pc相关申请空间和释放空间的逻辑等会代码实现的时候再来讲。

那这里就直接开始整代码吧。

基本工作

还是两个,一个.cpp,一个.h:
在这里插入图片描述

刚刚讲pc和cc一样,都是SpanList为哈希桶的,所以PageCache的成员就是一个以SpanList为单位的数组,同时刚刚也说一个span最大的页数为128页,那么这里数组就有128个元素,那就在Common.h中定义一个PAGE_NUM表示span的最大页数,同时也是这个数组的元素个数:
在这里插入图片描述
这里我故意给了129,因为这里要用的是直接映射,如果开成128个SpanList的数组的话,映射的时候还要减去一个1,写的时候还麻烦了点,不如多开一个,1号桶(下标1)映射的就是1页的span,n号桶(下标n)映射的就是n页的span。

PageCache中定义成员:
在这里插入图片描述

pc中span的分裂与合并

整个项目中的每一层都少不了申请和回收空间的过程,这里就先简单讲讲向pc中申请span和pc回收cc中的span的过程。

向pc申请span

① 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。

  • 比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页pagespan分裂为一个4页page span和一个6页page span。

② 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复①中的过程。

再来画画图,直观一点,假设现在pc是空的(SpanList是双向循环的,初始时_head->next = _head):
在这里插入图片描述

如果cc向pc要了一个管理页数为2的span,整个流程是这样的:
在这里插入图片描述

之后再将2page的span交给cc。

这就是分裂。

pc回收cc中的span

如果central cache释放回一个span,则依次寻找span所管理的页号的前后页号的页有没有空闲,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。

比如说现在有一个页号为100的span,该span管理了10页空间,那么此时这个span管理的范围就是[100, 110],那么此时就看一下这个范围两边的页号对应的页有没有空闲,也就是99页和111页是否空闲,如果空闲就合并到这个span中,假设这里99页和111页都是空闲的,那么合并之后span就变成了页号为99,管理页数为12的span,此时管理的范围就是[99, 111]。再重复上述步骤,不断往两边扩展,当有一边找不到连续的空闲页的时候就让这一边停止,另一边也是这样找。

这就是span的合并。

pc中的锁

前面cc要加桶锁,这里pc也是要加锁的,因为可能存在多个执行在cc中的多个桶中向pc申请span。

注意cc只是对单个桶进行加锁,不是整个cc加锁。当多个线程向其对应tc申请空间的时候,可能出现多个tc同时向cc申请空间,而cc中又可能出现多个桶都没有空间的情况,那么就会有多执行流向pc申请span,也就是图中的这样:
在这里插入图片描述

但是pc中加的就不是桶锁了,而是对pc整体加锁,这一点和前面讲的span的分裂和合并有关,这里先暂时记住要加一个整体的锁,所以PageCache中加一个锁成员:
在这里插入图片描述

单例创建pc

全局只有一个pc,所以也搞成一个饿汉单例,方法和cc一样,我就不讲了,直接写出来:
在这里插入图片描述

一样的,静态的成员不要在.h中定义,定义在.cpp中,防止链接错误:
在这里插入图片描述

CentralCache::GetOneSpan

前面讲cc中span不够的时候要找pc,也就是调用GetOneSpan,这个GetOneSpan是用来让cc向pc申请的,调用后cc会获得一个管理空间不为空的span,上面讲cc获得到的span基本分三种情况,也就是这样:
在这里插入图片描述

而第1种是cc自己有span的情况,后两种都是需要向pc申请一个全新的span的,那根据后两种情况来让pc提供一个接口NewSpan(size_t k),表示pc从自己的哈希桶中拿出来一个k页的span:
在这里插入图片描述

这里实现先给一个空,后续再实现:
在这里插入图片描述

那么到这来回忆一下当所有Cache都没有空间的时候的整个流程是啥样的:
在这里插入图片描述

这里串一下是不是清晰了很多。

下面来细讲一下GetOneSpan的逻辑

GetOneSpan是为了让cc拿到一个管理空间非空的span,cc中可能有这样的span,也可能没有。

  • 首先要判断cc对应index下挂的有没有管理非空空间的span,若有,则将该span返回。若没有则向pc申请新的span,直接调用NewSpan即可。

想要判断cc中的某个SpanList有没有非空span,就要能够遍历SpanList这个链表,所以这里要在SpanList中加上两个接口Begin和End,分别代表链表的头部和尾部:
在这里插入图片描述
这里就不实现迭代器了,直接用Span指针代替迭代器。

那么首先在cc中找一下有没有管理空间非空的span:
在这里插入图片描述

此时没找到,就要向pc中申请全新的span了,也就是调用NewSpan,但是NewSpan参数为span管理的页数,因为向pc申请span的时候要给pc说要多少页的span,这样pc才能根据自己的映射关系从其对应页数下标处找到对应span。

而这里GetOneSpan的第二个参数size表示的是单个块空间的大小,所以要将这个size转成对应管理页数才能调用NewSpan,不同大小的size所需要的页数是不一样的,假设1页有8KB的话,那一页就可以提供1024个8B的块,但是32页才能提供一个256KB的块,所以需要专门实现一个较为匹配的块页匹配算法。我这里就直接给出来了:
在这里插入图片描述

再举一个size为256KB的例子,num算出来是2,npage算出来是2 * 256KB = 512KB,除以8KB得64页。故要满足256KB的块的单次申请上限,就需要用管理64页的Span。

注释很详细,就不多说啥了,里面的PAGE_SHIFT定义如下:
在这里插入图片描述

那这里就可以继续写GetOneSpan了:
在这里插入图片描述

这样就能让CentralCache从PageCache中获取到一个没有划分过的全新span,注意我加粗了没有划分过,前面说了PageCache中的span都是未划分过的span,所以CentralCache获取到这些未划分的span之后需要自己根据size划分一下这些span,那么怎么划分呢?

首先要拿到span所管理空间的首地址,可以直接通过页号来获取,即 页号 * 单页大小(或者页号左移PAGE_SHIFT) 即为span所管理页的首地址(后续会再NewSpan中保证这一点,不懂的同学暂时记下来)。

NewSpan中实现的时候会设置好返回的span中的_pageID和_n,也就是页号和管理页数。其中的其他项都是默认值,比如_freeList是空的。

再算出span所管理空间的大小,即span->_n * 单页大小,这样首地址加空间大小即end,这样再通过第二个参数size就可以对span所管理的空间进行划分,划分好的空间就直接放到_freeList后面。

如图:
在这里插入图片描述

这样先算出来首地址start = 页号 * 单页大小:
在这里插入图片描述

再通过页数算出总空间大小 = 页数 * 页大小,start加上总空间大小就是end:
在这里插入图片描述

再通过size来划分块:
在这里插入图片描述
虽然上面画出来的看起来像线性表,其实还要用一个个块的前4/8个字节作为下一块的指针,这样就是一个链表的结构。其实也是顺序表,因为新的span管理的空间一定是一块连续的空间,都是从128页的span过来的,而128页的span是直接用系统接口申请的,那就是连续的空间。

然后再将这块划分好的空间放到span的_freeList中(或者最开始得到start的时候就可以直接这样做):
在这里插入图片描述

再来详细说一下划分整个空间的过程,定义一个tail表示当前已经划分过的末尾位置,那tail的初始位置就应该是start,然后不断让tail往后挪动,并链接tail后面的块空间:
在这里插入图片描述

如果想要连接前后的块,那tail就要用到ObjNext,那么此时tail就应该是void*类型的(ObjNext参数为void*的),而成为void*之后就没法往后移动size个大小,此时就应该搞一个char*类型的方便进行+= size操作,不过不用定义别的指针,直接用start就行,因为start也要加整个span管理空间的大小,那么现在就直接实现一下这里:
在这里插入图片描述

这里获得到的span是一个没有被管理的span,所以需要把这个span放到cc对应的哈希桶中,所以我前面声明的时候参数中给了一个对应位置的SpanList:
在这里插入图片描述

这里可以直接调用SpanList中的Insert,不过这里再对SpanList实现一个头插的函数,直接复用Insert就行:
在这里插入图片描述

把这个新span插入到对应cc的SpanList中,然后返回这个span就行:
在这里插入图片描述

所以这里所有的代码就是这样:
在这里插入图片描述

下面来写从PageCache中获取新的span的代码,也就是NewSpan。

PageCache::NewSpan

回顾一下:
在这里插入图片描述

这个函数就是让pc拿出来一个全新的k页span给cc用。

那么来捋一捋这里的逻辑:

① pc先检查自己第k个桶里面有没有span,如果有就返回一个span。
② 如果第k个桶中没有就再往下找更大页的桶中有没有span,找到一个就把其他的span拿出来拆开,假设找到的span中起始页号为id,管理的页数为n,那么就把这个span切分成一个起始页号为id,管理页数为k的span,和一个其实页号为id + k,管理页数为n - k的span(当然你也可以从后往前切),也就是下图所示的这样:
在这里插入图片描述

③如果k号桶中没有,且比k大的桶下面也没有,就要向系统申请一个128页的span,此时申请完后同样进行切分并返回所需要的k页span,和②中的步骤差不多,只是多了个向系统申请的步骤。

那么根据上面这三条,就可以写写代码了,不过写代码前再给SpanList提供两个接口,一个判断链表为空的接口(链表为空指的是是否有span,而不是span所管理的空间是否为空,后者是判断span中的_freeList的):
在这里插入图片描述

这个函数在判断桶中是否有span的时候会有用。

另一个接口是PopFront,用来获取到非空SpanList中的第一个span:
在这里插入图片描述
这个函数能直接获取到非空SpanList中的第一个span,①中找到一个span就可以直接调用这个函数返回,②中找到一个大的span也可以直接调用这个函数来获取到那个span。

那么下面就来写写NewSpan的代码,也是分三种情况:
在这里插入图片描述

第一种:
在这里插入图片描述

第二种:
在这里插入图片描述

当②不满足的时候就走③:
在这里插入图片描述

正好前面在写ObjecPool的时候封装了一个SystemAllco函数,这里就直接将那个函数挪到Common.h里面:
在这里插入图片描述

不过这里kpage左移的时候用一个常数13改的时候不太方便,比如说想要给页大小改为4KB的时候还要专门改一下这里的13,前面定义了一个PAGE_SHIFT,用这个就好:

但是此时编译的话ThreadCache中会出问题:
在这里插入图片描述

因为这里Windows.h中也有一个min,系统的这个min是一个宏,而std中的min是个函数模版,min在编译的时候会首选宏,这里我鼠标挪上去的时候其定义是Windows中的宏,这里也是一个不用命名空间的坏处,出现命名污染了,很烦,那想要解决的话也是用条件编译,当是Windows的时候就用Windows.h中的宏,当是Linux的时候就直接用C++的std中的min:
在这里插入图片描述

这样就没事了,解决了一个小插曲,接着说第三种情况,让pc直接向系统申请空间:
在这里插入图片描述

这里递归用的很妙,可能有的同学会觉得这里递归要比直接写出来将128页分成k页和128-k页的span慢,其实并不会,现在的CPU性能都很高了,一点栈帧的消耗能算啥,同时也要注重代码复用性,如果说这里再写一个切分的逻辑错了还得要改,不如直接用现成的。

这里NewSpan的完整代码:
在这里插入图片描述

PageCache的加锁问题

PageCache和CentralCache不太一样,PageCache中只有一把锁,用来锁整个PageCache,就是因为Span的分裂和合并会直接影响到多个桶,并不是向CentralCache中只会在单个桶上操作,而且如果PageCache中给了PAGE_NUM个桶,对多个桶的操作可能很频繁,而且对桶的操作就几行代码,这样加上锁了之后就执行一点代码就停止了反而会因为短时间内频繁的加锁解锁而导致效率的下降。所以不如说直接用一个锁来将整体PageCache锁住。

那么PageCache中如何进行加锁呢?
个人觉得如果直接在刚刚的NewSpan中加一个互斥锁,也就是这样:
在这里插入图片描述

不过这样后面解锁的时候有点麻烦,因为这三种情况都要再return前进行手动解锁,尤其是第三种递归调用NewSpan的时候,前两种是可以使用lockGuard来进行上锁和解锁的,但是第三种就不行了,因为lockGuard只有在函数栈帧销毁的时候才会释放锁,前两种return时栈帧就没了,此时会将这个互斥锁解锁,但是第三种用lockGuard的时候递归调用NewSpan,当前函数栈帧并不会销毁,进而就不会解锁,那么递归的NewSpan在申请锁上锁的时候会直接阻塞,从而产生死锁的情况,所以就需要在递归调用NewSpan前就解锁,这样在递归调用NewSpan的时候在递归的NewSpan内部才能拿到锁并进行加锁。

这里说三种解决方法:
第一种解法:是直接用recursive_mutex,递归锁能够识别自己,这样在递归调用的时候同一线程进行重复加锁的时候并不会阻塞,那么也就不会产生死锁。代码我就不写了,我等会不用这个。
第二种解法:把这里的NewSpan改成一个子函数_NewSpan,然后再搞一个NewSpan用来调用这个_NewSpan,在调用的两边进行加锁,也就是这样:
在这里插入图片描述
不过我等会也不用这个。
第三种解法是在第二种解法的基础之上的,就是在调用NewSpan的位置两端加锁,也就是在GetOneSpan中:
在这里插入图片描述
这里的_pageMtx和前面SpanList中的桶锁一样,可以给成私有然后提供一个返回锁的接口,但是我嫌麻烦,直接搞成公有的方便一些。

  • 这里tcmalloc源码中相关代码就是这样实现的,相对于前两种好多了,没那么麻烦,我这里也就采用第三种方法了。
cc向pc申请span时解锁

cc向pc申请span的时候要解锁吗?
推荐解掉,也就是这里:
在这里插入图片描述

因为虽然可能出现多个线程的tc向cc同一个桶申请空间的情况,但同时也可能存在多执行流释放空间的情况,如果当一个cc某个桶(准确的来说还是线程在执行)在向pc申请新span的时候还占着span不放,那如果有其他线程的tc向cc中的那个桶归还空间的时候就没法还,那这样既然申请新span的时候用不到这个桶的空间,那不如让还空间的线程对该桶执行相应归还的操作。

当然这样也可能存在多个线程同时向该桶申请空间的情况,但是也就是那一点代码,执行完之后还是会向pc申请span,一样的。所以不如把锁放出来大家一块用,能归还空间的就还,不影响还空间的线程。

所以在这里解一下锁:
在这里插入图片描述

可能有的同学有疑问,这锁哪来的啊?
就是调用GetOneSpan的函数,FetchRangeObj加的锁:
在这里插入图片描述

那么申请到新的span后什么时候再加锁呢?
反正不是当新span拿回来的时候:
在这里插入图片描述
而是将新的span挂到cc的某个桶上前,因为拿回来的新span,cc还要对其进行切分,切分时其他线程是拿不到这个span的,因为这个span还没有被挂在对应的桶上,所以不存在竞争的问题,只有被挂在cc桶上的span才会被其他线程的tc访问到。

所以应该是将span挂到cc桶上之前进行加锁,那就是这里:
在这里插入图片描述

那这里有的同学可能会担心咋解锁,没有关系,外面在调用GetOneSpan的时候会解锁的,也就是还是FetchRangeObj函数:
在这里插入图片描述

这里FetchRangeObj的代码都没有变,加锁的位置还是原来的位置,再来梳理一下这里对cc和pc加锁的流程。

首先向cc自己的哈希桶中拿span的时候要加锁,如果没有span就要解锁,如果cc的桶中没有span就要像pc申请span,那么在cc向pc申请span的时候需要将cc的桶锁解除,然后加上pc的整体的锁,申请到新的span后解除pc的整体锁,然后cc对新span进行切分,切分完毕后,再将切完的span挂到对应cc桶中时就加上锁,然后给将这个新挂上去的span管理的空间交给需要的tc之后再解锁。

这块有点绕,不懂的同学多看两遍。

至此整个申请流程已经完了,下面来对这整个申请的过程调试一下,看看能不能走通。

整个申请流程的两个测试用例

我这里只能说是把一些关键的地方点出来,想要搞懂下面的这两个测试用例必须要调试,我就不带着你们调试了,调试这东西必须要你们自己亲手来搞,我带着调试没啥用,细节太多了,如果你不会调试我也没有啥办法,只能说好好学一下吧,学会调试还是很重要的。

测试用例一:
在这里插入图片描述

先大概讲一下该用例流程,可以看到这里都是小于等于8B的空间,所以在申请的时候对齐后都是8B,那么这整个流程就是这样:
在这里插入图片描述

这里调试的时候注意一下pc向os申请空间的时候得到的首地址:
在这里插入图片描述

这里系统在申请空间的时候能够保证申请的空间是对齐的,所以这里才能用_pageID来规定页号,后续cc在得到新span后才能通过页号计算出首地址。

测试用例二:
在这里插入图片描述

还是都申请8B块,这里for循环目的是申请完cc中0号桶中的那个span,然后再申请一块的时候tc中没有,向cc申请,cc中也没有,向pc中申请,pc中有一个127页的span,分出来一页给cc,自己留下126页的span,然后剩下的流程就和前面一样了,就不细说了。

不光是这里有调试,后面在测试释放空间的时候也是需要调试的,不懂调试的同学好好找找资料学一学。

不过前面有个小问题没处理,useCount忘记修改了,就是Span中的那个统计当前多少块分配出去的字段:
在这里插入图片描述

不过并不会影响到申请的结果,useCount主要是在释放空间的时候才会依赖这个东西,这里只需要在FetchRangeObj中给start和end空间时才会改useCount,就是将cc中span中的空间分出去的时候:
在这里插入图片描述

这里测试的时候,在第二个测试用例中查看一下对应桶下面第二个span的useCount是不是1024(span是头插的,所以cc中的第一个span在最后面):
在这里插入图片描述

在这两个测试用例都能通过的前提下再看下面的内容,如果你没通过上面的这两个用例,先把错误的地方改好再往下看。

回收空间

下面来说说回收空间的流程,先来讲tc回收空间。

ThreadCache回收空间

前面专门把回收空间的代码保留了:
在这里插入图片描述

回收线程还回来的size大小的空间,这里目前是将size大小的块对齐一下,然后挂到tc对应的自由链表中。

那么当tc中单个自由链表中的块数过多的时候就要去掉一部分,放到cc对应的span中,那么什么时候是过多了呢?

  • 这里规定一下:当tc某个桶中块数大于当前单次批量申请块数(也就是MaxSize)的时候就返回档次批量申请块空间。就是tc中某桶中块数超过MaxSize的时候就要还MaxSize个块给cc。

【注】在不断申请同一块大小的空间时,同一个FreeList中的MaxSize就是从1开始,每次加1的不断往上增长,也就是一个首项为1,公差为1的等差数列。那么假如说tc向cc申请时,第一次申请得一块,第二次申请得两块,第三层申请得三块……,这样总的申请块数就是MaxSize这个等差数列之和,申请多了后总块数一定是会大于单次申请的MaxSize的(公差为正,前n项和总是大于第n项),只是申请的这些块中有些块正在被线程用着,没有还回来,不用的时候还回来一堆相同大小的块时就可能出现还回来的块数大于MaxSize的情况。

所以这里按照这样的规则让tc向cc归还空间。注意tcmalloc源码中的归还规则可没有这么简单,里面考虑了非常多的因素,比如说整个tc管理的总空间不能超过2M啥的,超过了就要还,还有其他的规则,这里只是化简了之后的归还规则。

FreeList::_size

这里要有一个能够统计某个自由链表中块数的量,所以再在FreeList中加一个成员_size,表示当前自由链表中有多少块空间:
在这里插入图片描述
初始值为0,就表示最开始的时候链表中一个块都没有。

那么就要修改一下里面对于链表增删的接口了,加上对size的修改,Pop:
在这里插入图片描述

Push:
在这里插入图片描述

还有一个PushRange:
在这里插入图片描述

这里我并没有给出修改size的代码,因为如果直接遍历start和end来统计块个数的话太麻烦了,没必要,再传一个参数用来表示当前这个范围内的块数:
在这里插入图片描述

这样就方便多了,那么在调用这个函数的地方改一下:
在这里插入图片描述

再提供一个Size接口用来获取这里的size:
在这里插入图片描述

ThreadCache::ListTooLong

再来专门在ThreadCache中写一个ListTooLong的接口,提供一个参数FreeList& list表示哪个桶中的块数过多了,一个参数size_t size表示该桶下块大小是多少,这个函数用来实现某个桶中块数过多时归还某个桶空间的逻辑:

在这里插入图片描述

先不给这个函数的实现,等会说。

这样满足条件的时候就归还空间:
在这里插入图片描述

tc归还空间的时候需要将MaxSize个块归还给cc,那么就要在一个桶中去掉MaxSize个块,所以在FreeList中提供一个PopRange的接口,里面提供三个参数,start、end、n,表示去掉n个块,范围是[start, end],start和end作为输出型参数,这样删除这段空间之后还能拿到这段空间以交给cc:
在这里插入图片描述

简单讲一下删除n块的逻辑:
在这里插入图片描述

那么首先搞到第一块,也就是_freeList,定义一个start和end都指向这块:
在这里插入图片描述

要3块空间,假设让end走三步:
在这里插入图片描述

这样如果要返回的话,还得要让第三块的next指向空,但是这里FreeList中实现的是单链表,不像SpanList是双链表,那就不要让end走三步,走两步:
在这里插入图片描述

这样让end->next指向空就好办了,不过指向空前要让_head->next = end->next,所以代码就应该这样写:
在这里插入图片描述

这样获得到的空间范围就是[start, end]。

然后在ListTooLong中调用这个函数就能拿到想要的空间:
在这里插入图片描述
此时将这块空间交给cc就可以了,那么再到cc中提供一个获取这块空间的接口:
在这里插入图片描述

也是先不给实现:
在这里插入图片描述
传size也是对齐之后的size,也是要根据映射关系找到对应span。

在ListTooLong中调用:
在这里插入图片描述
这里不需要传end,因为PopRange能保证end后面就是空,所以只需要判断一下next为不为空就能判断出是不是end了。

那么tc将需要回收的空间[start, end]传给cc,cc如何回收呢?

CentralCache回收空间

cc中回收tc空间的函数就是刚刚给出的:
在这里插入图片描述

注意是ToSpans,而不是ToSpan,因为tc传回来的这些块可能不仅仅是一个span中的,因为申请走的块空间返回时间是不能确定的,假设有两个span(span1和span2),假设某个线程申请空间的时候拿走了span1后半部分的空间和span2前半部分的空间,归还的那么块空间返回的时间是不能确定的。

通过块空间的地址算页号

返回给cc的块只能通过块大小size算出来其对应在cc中的那个index桶下挂着,不能确定在index桶中的哪个span,但是没有关系,块空间是有地址的,我们可以通过(块空间的地址 >> 13)来确定这个块在哪个页中。如图,假设现在有两页空间:
在这里插入图片描述

我拿计算器算一下对应的地址,2000页的地址:
在这里插入图片描述

2001页的地址:
在这里插入图片描述

所以这里的地址就是这样的:
在这里插入图片描述

对于2000页中空间的地址,都是在[0x00FA0000, 0x00FA2000)这个范围内的。
对于2001页中空间的地址,都是在[0x00FA2000, 0x00FA4000)这个范围内的。

其中任意一个地址右移13位(除以8KB),得到的就是页号,因为一页8KB,也就是地址的低13位都是某个页中的页内偏移地址。

  • 我来举一个例子试试,如0x00FA10FF,转成二进制就是1111 1010 0001 0000 1111 1111,那么右移13位就是1111 1010 000,这个转成10进制就是2000,也就是说这个地址就是在2000页中,如果你还不太相信,我这里提供一个简单的测试,看一下对不对:
    在这里插入图片描述

那么如何根据页号来找到对应的span呢,更简单了,前面在定义span的时候内部不就有一个表示span所管理空间的起始页号_pageID么,而且还有一个span管理的页数_n,这两个合到一块就可以算出来span算管理空间的页号的范围,也就是[_pageID, _pageID + _n),比如说_pageID为2000,_n为1,那么范围就是[2000, 2001),也就是只有2000这一页。

那么这样就能找到小块空间对应的span了,只需要拿(块空间的地址>>13)和span的管理页的页号范围进行比较,在span管理的页范围中的就说明这个块在这个span中,那么直接将这个块插入到span的_freeList里面就行。

通过页号查找span的效率问题

假如说有n个span,tc还回来了m块空间,直接暴力对比的话效率太低了,拿一块空间就和n个span对比一下才能找到对应span,时间复杂度就变成 O ( m ? n ) O(m * n) O(m?n)了:
在这里插入图片描述

那么如何解决一下这里的效率问题呢?
可以再专门搞一个哈希映射,映射关系K-V就是 页号-span地址,这样将cc中的所有span的页号与span的映射全部添加到这个哈希表中,仅通过A>>13就可以再找的时候就以O(1)的时间找到对应span,非常高效。

那么如何实现这个哈希表呢?
我这里就直接用STL库中的unordered_map了,而且要定义在PageCache中,因为等会PageCache也会用到这个哈希表:
在这里插入图片描述

那么在什么时候添加cc中页号和span的映射关系呢?
就在将pc中的span分配给cc的时候记录一下就行也就是这里:
在这里插入图片描述
还有一个这里:
在这里插入图片描述

这两处记录一下分配出去的span所管理的所有页的页号对应该span的地址之间的映射:
在这里插入图片描述

和这里:
在这里插入图片描述

这样映射就搞好了。

有的同学可能会问,那这里在pc中的span不需要建立映射吗?
用,但不是现在,等会在合并span的时候再细说对pc中span建立映射的逻辑。这里讲有点太早了。

那么后面在归还小块内存的时候只需要先让小块内存的地址右移13位,得到小块地址所在页的页号,然后再根据哈希映射得到页号对应的span,再往span中插入这小块内存就OK了。

这里再到pc中定义一个接口,用来专门查找块地址对应的span:
在这里插入图片描述

然后就可写ReleaseToSpans了。

CentralCache::ReleaseToSpans

首先算出size对应下标:
在这里插入图片描述

然后对其中的桶进行操作,start是tc传来的一段空间,要不断遍历start中的各个块,当遇到空的时候就停止(前面在tc中已经保证了最后一块后面为空),根据块地址放到对应span的_freeList中,来个例子:
在这里插入图片描述

那么挨个遍历start插入就行:
在这里插入图片描述

将各个块插入span的_freeList的代码如下:
在这里插入图片描述

然后前面还讲过一个_useCount,这个是用来统计span中分配出去的块数的,因为这里还回来了很多块,所以要让对应的span的_useCount- -,所以再加上:
在这里插入图片描述

前面还说过,当useCoun减到0的时候就表示当前span中的所有空间都还回来了,所以此时就可以考虑让cc将已经归还的span交给pc来管理,以合并出更大的span,那么这里就要再加上:
在这里插入图片描述

如何交给pc管理呢?
直接在pc中提供一个接口:
在这里插入图片描述
还是先不给实现,等会相关逻辑。

那么就useCount减到0的时候就调用这个:
在这里插入图片描述

但是这样写还是有点问题的,因为这里的span还是在cc中,并没有离开cc,还是链在cc对应的哈希桶中,所以要将这个span在cc对应哈希桶中删掉:
在这里插入图片描述

这里说一下为啥要让span->_freeList指向空,首先要确定一点,span中归还回来的_freeList是乱序的,因为刚刚也说了,申请空间是按照_freeList一块一块申请的,但是归还回来的时间无法确定,所以可能现申请出去的空间后还回来,而这里还回来的时候是直接进行了头插,所以原先span中按照顺序排好的空间(所有页的空间在cc获取到的时候直接用链表传起来了)返回来之后的所有块的连接顺序大概率是被打乱了的,但整体上还是一个链表,只不过每一块的前void*个字节中的空间不一定是其原先内存中下一块的空间。

但是不影响归还,归还的时候只要是整个span中的空间都回来了,那就说明整个span所管理的页的空间都还回来了,那么此时就直接给pc就行,pc按照整个span进行回收,就算是同一个span再提供给cc的时候cc会重新对这些空间按照顺序向链表一样传起来。

所以说这里归还的时候_freeList已经没有用了,只需要保证归还的span中的_pageID和_n是正确的就行,所以_freeList给成空就行。

那么在归还span后,pc会对空间上相邻的span进行合并,和前面的span分裂一样,span的合并也需要加锁,而且加锁的位置也和前面span分裂一样,直接加在调用的ReleaseSpanToPageCache两边就行:
在这里插入图片描述

还有一件事,在归还span的时候需要将当期cc中的桶锁解掉,以便其他线程对该桶进行操作时不会阻塞,就和前面申请的那里一样:
在这里插入图片描述

那么这里cc回收tc中多个内存块的逻辑就完了,下面来说pc回收cc返回的span的逻辑。

PageCache回收空间逻辑

回顾一下pc的结构:
在这里插入图片描述

如果说返回了一个1page的span,那么收否应该不进行合并而直接挂在对应的1页桶的下面?也就是这样:
在这里插入图片描述

答案是不行,如果说申请走的都是小于10页的span,返回来的小页span就直接挂在pc对应的桶后面,那么当申请一个11页的span时这些小于10页的span就排不上用场了,相当于是浪费了,也就是所谓的外碎片问题。

所以这些小页的span可以尝试着拼接一下,管理的页如果相邻就拼到一个span中,这样就能拼成管理更大页的span,这样就能尽量避免外碎片的问题。

【注】内碎片问题在内存池中是无法避免的,只有我们在定义自定义类型的时候注意对齐问题才能解决,不过内碎片不算啥问题,反正提供的空间能回收回来,可以重复利用,但是外碎片不解决就会一直在那。

合并span逻辑

如何对span进行合并呢?

假设现在有一个span:
在这里插入图片描述

那么此时合并,就要找其管理页的相邻页都是啥,这里span管理页的首页ID是1000,一共管理了1页,那么此时span管理的就是[1000, 1001),那么相邻页就是左边的999,和右边的1001:
在这里插入图片描述

那么合并的时候先选出来一边,先按一个方向进行合并,假设这里就先往左开始合并,也就是先找一下页号为999的页所在的span,假设找到了对应span:
在这里插入图片描述

找到一个span,_pageID为996,管理页数为4,那么这个span的管理范围就是[996, 1000),所以这两个span管理页就是相邻的,那么将这两个span合并成一个span,得到的管理起始页_pageID就是996,管理页数就是4 + 1 = 5,那么新合并的span就是这样:
在这里插入图片描述

那么此时继续向左寻找相邻页:
在这里插入图片描述

然后假如说又找到了一个可以合并的span:
在这里插入图片描述
那么就再合并成一个_pageID为994,_n为7的span:
在这里插入图片描述

假设此时pageId为7的页没有被span管理,那么就是找不到了,此时停止左侧寻找,开始继续寻找右侧的span,原理是一样的,也是找到了就合并,找不到就停。

判断相邻span在cc还是pc

那么如何通过左右两侧的pageId来确定该页所在的span呢?
还是用刚刚的哈希,也就是PageCache中的_idSpanMap。

不过映射出来的span有已经由cc还给了pc,也有可能还在cc中,但只有挂在pc中的span才能合并,挂在cc中的span是不能进行合并的,因为cc中的span是正在使用的span,那么如何区分cc和pc中的span呢?

  • 有同学可能会说通过span中的useCount就可以,如果说useCount是0就表示是pc中的,如果不是0就表示是cc中的,这样听起来好像没什么问题,但是你听我讲讲就有问题了。
  • cc在申请到新的span之后会对新的span进行小块空间的切分,也就是这里的代码:
    在这里插入图片描述
    这里是在GetOneSpan中的,但是对于use_count的修改是在FetchRangeObj中的:
    在这里插入图片描述
    如果说cc获取到的span是由pc中拿来的,那么这个新的span在进行小块切分的时候useCount是0,那么此时如果合并的时候恰好碰到了这个正在切分的span,一看这个span的useCount是0,直接拿过去合并了,那这时候就出问题了。实际上这个span本应该是马上切分好了就会给tc提供一端空间,给tc之后就会修改useCount,但是直接被pc当成空闲span进行合并了,此时就会出现这个span既在pc中又在cc中的情况,而且其管理的一部分空间还给了tc,所以这里用useCount来判断是有问题的。

所以想要解决这个问题的话需要再在Span中添加一个成员变量:
在这里插入图片描述

false就表示这个span当前没有被使用,那么就是在pc中,true就是被使用了,那么就在cc中。默认值给成false,所有的span初始情况下都是在pc中的,所以给成false。

在GetOneSpan中cc获取到新Span后改一下:
在这里插入图片描述

向pc申请和回收空间的时候用的是span中的那把锁,所以是不会存在同时向pc申请span和向pc归还span的,那么pc锁内修改_isUse就没有问题。整个流程就只有这里需要把_isUse改为true,也就是span由pc到cc之后。

合并pc中的span

pc中合并span的时候是不管是不是从cc中还回来的span都会进行合并,也就是就算没有分配出去的span也会和还回来的span进行合并。

前面讲了pc中的span也是要映射进入_idSpanMap的,但是映射的时候不需要将pc中的span所有的页都映射成span地址,因为pc中的span是不会用到其中管理的空间的,只需要将pc中的span管理的最头上的一页和最末尾的一页映射进去即可。

因为在合并span的时候只要是在pc中的span就可以,而确定了一个span后往左右两边扩张时只会找这个span所管理空间的左右两页,也就是这样:
在这里插入图片描述

那么这样两边的页只会是某一个span的边缘页,意思就是只会是某个span的pageID或pageID + _n - 1,也就是这样:
在这里插入图片描述

那么这样在pc中的span只需要映射一下其所管理的页_pageID和_pageID + _n - 1就可以了。

前面cc中的span所有页都要映射是因为cc中的span所管理的页都是要用到的,在tc归还小块空间的时候要通过对应页来找到对应span才能归还,而pc中的span内部的页又用不到,也就不需要全部都映射一下。

那么在NewSpan中再加上:
在这里插入图片描述

那么下面就可以进行合并了,先来说说合并的注意事项:

  1. 某一边相邻页没有映射出对应的span后就停止这一边的合并(可能是这一页空间还没有开)。
  2. 相邻页所在span正在被cc使用时就不能合并。
  3. 相邻页所在span与当前页所在span合并起来后页数超过128页就不能合并,因为该项目中设定的span所管理的最大页数为128页,超过128页就不行了,span维护不下。

可能有的同学会问如果有两个空间连续的128页的span,用了一段时间之后前一个span后半部分和前一个span的前半部分合并到了一块会有问题吗?也就是下图所示:
在这里插入图片描述
这样是不会有问题的,反正都是在用,整个空间都是在不断动态被申请和回收的,就算合一块了也可能马上就被分开,不会出问题。

  1. 没有以上问题的就合并,而且合并要不断迭代进行。当前span与相邻span合并后要修改当前span的_n和_pageID,并将相邻span从pc原桶中删掉,然后delete掉相邻span对象的空间(span的对象是new出来的)。

那么下面就来写写ReleaseSpanToPageCache的代码。

PageCache::ReleaseSpanToPageCache

整体逻辑:
在这里插入图片描述

首先是向左合并的:
在这里插入图片描述

然后是向右合并的:
在这里插入图片描述

然后将当前span挂到对应的桶中,而且此时的span也要把其边缘页映射一下,以便后续对这个span进行合并:
在这里插入图片描述

至此,所有的释放逻辑已经写完,下面来进行一些基础的测试。

回收空间的基础测试

首先第一个测试就是申请的时候那5个,直接释放:
在这里插入图片描述

当然项目最终是不会在Free后面还要加上释放空间的大小的,后面会再完善,先来把这里的调通。

这里的这个测试,先来回顾一下前面申请的流程:
在这里插入图片描述

画图太麻烦了,我这里简单说一下流程,你调试的时候对照着走能对上就行,首先释放前两个的时候没啥说的,只是给tc释放块空间,释放第三个的时候,也就是这个:
在这里插入图片描述

此时将这个块空间push到了tc对应的桶中之后,那个桶的_size和MaxSize相等了,就会将桶里面的四个块交给cc,cc接收到之后span的useCount是2,因为还有ptr4和ptr5没有归还,所以不会走到向pc归还span的那一步。

但是再往后回收ptr4和ptr5两块空间之后,只会向tc归还一下,并不会向cc归还,因为还给tc之后FreeList的size是2,不等于MaxSize(4),所以这里的调试流程没有走到cc向pc归还的那一步。

不过再加上两个ptr就可以了:
在这里插入图片描述

这里会走到cc向pc释放空间的那里,如果你能走到这里:
在这里插入图片描述

那么恭喜你,你的代码在单线程的场景下应该是没啥问题。

下面再来一个多线程场景的测试,我就直接给代码段了:

void MultiThreadAlloc1()
{
	std::vector<void*> v;
	for (size_t i = 0; i < 7; ++i) // 申请7次,正好单个线程能走到pc回收cc中span的那一步
	{
		void* ptr = ConcurrentAlloc(6); // 申请的都是8B的块空间
		v.push_back(ptr);
	}

	for (auto e : v)
	{
		ConcurrentFree(e, 6);
	}
}

void MultiThreadAlloc2()
{
	std::vector<void*> v;
	for (size_t i = 0; i < 7; ++i)
	{
		void* ptr = ConcurrentAlloc(16); // 申请的都是16B的块空间
		v.push_back(ptr);
	}

	for (int i = 0; i < 7; ++i)
	{
		ConcurrentFree(v[i], 16);
	}
}

void TestMultiThread()
{
	std::thread t1(MultiThreadAlloc1);
	std::thread t2(MultiThreadAlloc2);

	t1.join();
	t2.join();
}

如果你这里多线程也能走到刚刚第二个场景中合并出来一个128页的span,那目前来说你的代码也是没有太大问题的。

一些小细节

如果你看到这里了,恭喜,整体的申请和释放空间的逻辑已经完毕,下面就剩下一些细枝末节的东西了,不过把这些细枝末节的东西搞清楚也是很重要的,废话不多说,开讲。

大于256KB的空间

前面所有单次申请和释放的空间都不会超过256KB,因为本项目中规定的tc中单次申请的空间是不会超过256KB的,但是实际上一定是会有单次申请超过256KB的,那么此时如何解决这些申请和释放的流程呢?

申请流程

既然没法直接从tc中拿,那么就往下层看。本项目整体可以分为3层,也就是tc、cc和pc,如果还可以加上一层的话,那就是最后的操作系统。如图:
在这里插入图片描述

回忆一下,pc中最大的span有多大?
按照一页8KB,最大的span有128页,那么最大的span管理的空间就是128 * 8KB = 1024KB,也就是最大的span可以管理1M的空间。

既然tc中单次申请空间不能超过256KB,那么超过256KB的空间就不要向tc申请了,可以直接向pc申请,只要单次申请的空间在(256KB, 1024KB]之间的,就可以直接向pc要,pc对于这些空间是可以管够的。

但是如果单次申请的空间超过了1M呢?
那就再往下层找,直接向os要,那么整个逻辑就可以串起来了。不过向os要的时候也要经过一下pc,不能说直接就要了,pc还要对其要到的空间做一下管理,至于如何管理那是后话,后面再说。

看图:
在这里插入图片描述

下面就来写写相关的代码。

首先,不管是申请多少页,都需要对齐到一个完整的块大小才能向下层申请,这里超过256KB的空间也是,那么就要修改一下前面的对齐中的规则,如果size大于了256KB,那就直接按照页大小来对齐。

比如说线程申请了一块257KB的空间,256KB按照一页8KB就是32页,257KB大于256KB,直接按照页来对齐,也就是直接按照整页来对齐,32页不够就再多给一整页,那么257KB对齐之后的空间就是264KB(256 + 8)。虽然这里会导致内存碎片很大,都有7KB了,但是用一会就还回来了,不是啥问题。大于128页的空间这个内存池就不起啥作用了,但是没关系,128页都1M了,一般情况下很难开出来这么大的空间。

对应代码如下:
在这里插入图片描述

在申请空间的时候直接对ConcurrentAlloc下手:
在这里插入图片描述

那么再来修改一下NewSpan的逻辑,NewSpan的参数是表示需要多少页,那么这里就是要补充一下当申请页数超过32页的时候的相关逻辑,实际上只是需要补充一下超过128页的,因为小于等于128页的都可以复用原先的代码:
在这里插入图片描述

好了,大于256KB的申请空间的逻辑就这么点,很简单吧,下面说说释放的逻辑。

释放流程

还是和前面申请的一样,不走tc,直接走pc。

那么还是直接从ConcurrentFree下手:
在这里插入图片描述

然后再来写ReleaseSpanToPageCache的逻辑,就是加上释放大于256KB的逻辑,但也还是和申请的NewSpan一样,不需要考虑释放256KB到1024KB的span,只需要释放大于128页的空间就可以了,也就是大于128页的时候直接还给os,但是前面没有写直接将空间还给os的逻辑,这里写一个:
在这里插入图片描述

然后在ReleaseSpanToPageCache中写一下相关逻辑:
在这里插入图片描述

这样整个代码也就串起来了,可以看到代码的本篇代码的复用性还是很强的。

下面来测试一下这些大空间的申请有问题没。

大空间申请测试

我就直接给例子吧,你看看你的测试对不对:
在这里插入图片描述

这里p1就是257KB的,对齐之后33页,你看你申请之后span是不是33页的,会走NewSpan中原先的逻辑,释放的时候走ReleaseSpanToPageCache中原先的逻辑,其他就没啥了。第二个p2就是129页的,也就是大于128页的,申请的时候走NewSpan中申请大于128页的逻辑,释放的时候走ReleaseSpanToPageCache中大于128页的逻辑,也就是都向os申请和释放。你看看你的大致流程是不是这,没有报错就应该没啥问题。报错了就慢慢调试吧。

使用定长内存池配合脱离使用new

这个项目做完以后如果要求没那么高的话是可以替代malloc的,说要求没那么高是因为这个项目也就一千多行代码,想要完全替代malloc是不太可能的,真想完全替代就用谷歌开源的tcmalloc,这里只是为了学习tcmalloc的核心才实现的一个简易版本。

替代malloc,那就不要在项目中使用malloc,因为Span对象的创建和TLSThreadCache都是用new来创建的,new的底层又是malloc,所以这里要把这些用到malloc的地方换一下,就换成前面的定长内存池。

先来换一下每个线程的TLSThreadCache:
在这里插入图片描述
就这一个地方。

这里pTLSThreadCache是不需要进行Delete的,因为整个流程一直在使用。

然后就是Span的创建,因为只有在PageCache中创建了Span,而且创建的地方还不少,所以直接在PageCache中搞一个定长内存池的成员:
在这里插入图片描述

然后就是把调用new的地方直接换成用这里的_spanPool,都在PageCache::NewSpan中,有这些:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

对应的,这些span是需要delete的,因为归还span的时候需要合并,都在PageCache::ReleaseSpanToPageCache中,有这么些:
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

到这里完了吗?
并没有,因为这里如果进行多线程测试的话会出问题,因为pTLSThreadCache是每个线程独有的一个对象,但是为它申请空间的objPool就不是了:
在这里插入图片描述
这是一个静态的对象,整个进程中独一份,而对于静态的对象整个进程中的线程是共享的,那么如果多线程对其中进行操作就可能会出问题,可能看到这里你已经忘记了定长内存池的结构了,没关系,我带你回顾一下,里面有三个字段,分别是_memory、_freeList和_remanentBytes。

_memory指向的就是最初开出来的内存池,内存池中的某些固定大小的块空间还回来之后会直接挂在_freeList后面,_remanentBytes表示的是内存池中剩余空间的大小。如图:

在这里插入图片描述

初次申请空间的时候_memory和_freeList都是空的,_remanentBytes为零,此时会先向系统申请128KB的空间,然后_memory就指向这块空间,然后再从中分出来想要的空间大小,好了就这么点就够了,来看代码,下面的这一点就够了:
在这里插入图片描述
申请的时候假设有t1和t2两个线程:
在这里插入图片描述

初始的时候所有成员都是nullptr或者0,假设t1先走:
在这里插入图片描述

此时t1已经把_remanetBytes改成128KB了,此时t2走到if (_remanentBytes < sizeof(T))时一判断_remanetBytes是一定会大于sizeof(T)的,因为ThreadCache的大小只有两千多字节,那么就不会进入if中,直接走的是obj = (T*)_memory:
在这里插入图片描述

此时的_memory是nullptr,t2再走一步就直接获取到了一个空指针,而再往下走的代码是会调用定位new的:
在这里插入图片描述
给定位new传空指针会直接导致程序崩掉,所以这里是线程不安全的,需要加锁。

有的同学可能说怎么会这么巧?
就是这么巧,我测试了好多次,崩的概率还是挺大的,10次能有个两三次崩的情况。

那怎么加锁呢?
很简单,直接在ObjectPool类中加一个锁成员:
在这里插入图片描述

然后可以再New内部加锁,也可以直接在调用New的地方加锁,我这里选择在创建ThreadCache两边加锁:
在这里插入图片描述

解释一下为什么。这里的创建ThreadCache只会让每个线程执行一次,同时后面的span对象在创建的时候也会用这里的定长内存池,所以说这里如果加到了New中,后面的span在申请的时候也需要加上这里的_poolMtx,就会影响效率。

但为啥不给span创建的时候加锁呢?
因为后面的逻辑中保证了这一点,span对象的创建只会在pc中的NewSpan中,而NewSpan的调用一定是线程安全的,因为NewSpan调用前会加上pc的整体锁,也就是说NewSpan中只会有一个线程在执行,所以就不需要对span的创建加锁了。

那再来测试一下这里的代码。
还是用刚才的两个大空间申请的例子(257KB的和129页的)测试一下,还有这个测试一下:
在这里插入图片描述

还有那个多线程的测试。都调试着看一下,多能走通应该就没啥问题。

释放对象时优化为不传对象大小

这是老前面一留下来的小问题,就是ConcurrentFree的时候需要在第二个参数中传入一个size,表示要释放的空间大小,也就是这里:
在这里插入图片描述

如何做到不需要传这个参数呢?也就是如果直接根据ptr得到其所指空间的大小呢?
两种方法:

  1. 建立页号该页被切分成的块的大小之间的映射。
    ??一页所被切成的块的大小一定都是相等的,因为某一页是属于某个span的,而span在被cc切分时一定是都切成了相同的固定大小的块的,所以说某页的块大小都是相同的。那么在ConcurrentFree中就可以先通过指针右移PAGE_SHIFT位得到页号,再根据页号的映射找到该页的块大小,那么这个块大小就是ptr所指向的空间的大小。
    ??这个想要实现的话可以直接在PageCache中再添加一个映射,在获取到新span后,在cc中进行切分的同时将页号与块大小的映射关系搞好就行。

  2. 在span中加入一个成员_objSize,表示span所管理的页被切分成的块大小,这个相比于前者更好一点。
    ??这个也是在span被切分的时候统计一下_objSize,而且要比前一个简单的多,我这里就用这个第二中方式了:
    在这里插入图片描述
    然后在cc中加上:
    在这里插入图片描述
    归还span的时候可以不用将_objSize归零,因为pc中并不会对这个字段干什么事情,而且分配出去之后就在又这里重新设置了。还有一个NewSpan中申请大于128页的span也需要加上统计_objSize,可以在NewSpan中加,也可以直接在ConcurrentAlloc中加,在NewSpan中加的时候就直接给成页数左移PAGE_SHIFT位就行,在ConcurrentAlloc中加就直接给成申请的size就行,只要在Free的时候能通过span找到_objSize,知道_objSize是大于MAX_BYTES的就行。大于MAX_BYTES就走的不是前面正常的逻辑。

这样在ConcurrentFree的时候就可以先通过ptr左移PAGE_SHIFT位算出来页号,然后再根据页号与span*的映射关系找到对应的span,再根据span中的_objSize就可以得知ptr所指的空间的大小了。

那么实现一下:
在这里插入图片描述
就这么简单。

调用MapObjToSpan的时候加锁

在调用MapObjToSpan的时候加锁,因为增删加锁了,这个函数就是查操作,而STL不是线程安全的,PageCache中用的unordered_map不是线程安全的,如果增删的时候导致原先unordered_map的结构发生了改动(扩容导致原空间丢失什么的),此时查找的时候如果还在查找原先的结构,就可能会找出来一个野指针的span,所以说MapObjToSpan要加锁。

也只是有两种办法:

  1. 在调用这个函数的两边加锁。
  2. 在函数内部进行加锁。

因为MapObjToSpan并不像NewSpan那样是递归的,所以可以直接在MapObjToSpan函数内部加一个互斥锁,只需要加pc中的那一把锁就行。

我就直接用第二种方式了:
在这里插入图片描述

如果此时在MapObjToSpan内部加锁了之后就不要再在调用这个函数的两边加锁了,会导致死锁的。

这样就OK了。

性能测试

至此,所有的内容基本上完了,进行一下和malloc对比的性能测试。给一个benchmark.cpp的文件,专门用来做这里的测试:

/*这里测试的是让多线程申请ntimes*rounds次,比较malloc和刚写完的ConcurrentAlloc的效率*/

/*比较的时候分两种情况,
一种是申请ntimes*rounds次同一个块大小的空间,
一种是申请ntimes*rounds次不同的块大小的空间*/

/*下面的代码稍微过一眼就好*/

#include"ConcurrentAlloc.h"

// ntimes 一轮申请和释放内存的次数
// rounds 轮次
// nwors表示创建多少个线程
void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&, k]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					//v.push_back(malloc(16)); // 每一次申请同一个桶中的块
					v.push_back(malloc((16 + i) % 8192 + 1));// 每一次申请不同桶中的块
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					free(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
			});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

	printf("%u个线程并发执行%u轮次,每轮次malloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime.load());

	printf("%u个线程并发执行%u轮次,每轮次free %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime.load());

	printf("%u个线程并发malloc&free %u次,总计花费:%u ms\n",
		nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}


// 								单轮次申请释放次数 线程数 轮次
void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;

	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&]() {
			std::vector<void*> v;
			v.reserve(ntimes);

			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					//v.push_back(ConcurrentAlloc(16));
					v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();

				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					ConcurrentFree(v[i]);
				}
				size_t end2 = clock();
				v.clear();

				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
			});
	}

	for (auto& t : vthread)
	{
		t.join();
	}

	printf("%u个线程并发执行%u轮次,每轮次concurrent alloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime.load());

	printf("%u个线程并发执行%u轮次,每轮次concurrent dealloc %u次: 花费:%u ms\n",
		nworks, rounds, ntimes, free_costtime.load());

	printf("%u个线程并发concurrent alloc&dealloc %u次,总计花费:%u ms\n",
		nworks, nworks * rounds * ntimes, malloc_costtime.load() + free_costtime.load());
}

int main()
{
	size_t n = 10000;
	cout << "==========================================================" << endl;
	// 这里表示4个线程,每个线程申请10万次,总共申请40万次
	BenchmarkConcurrentMalloc(n, 4, 10); 
	cout << endl << endl;
	
	// 这里表示4个线程,每个线程申请10万次,总共申请40万次
	BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" << endl;

	return 0;
}

那么测试一下,4个线程,每个线程申请10万次,每次申请不同桶下的块空间,结果如下:
在这里插入图片描述

好像这里写的代码效率比malloc还低了一点,不用担心,还没写完呢,后面还有一点。

4个线程,每个线程申请10万次,每次申请同一个桶下的块空间,结果如下:
在这里插入图片描述
哈哈,申请单个桶下的块更拉。

4个线程,每个线程申请1万次,每次申请同一个桶下的块空间,结果如下:
在这里插入图片描述

4个线程,每个线程申请1万次,每次申请不同桶下的块空间,结果如下:
在这里插入图片描述

讲过刚才的对比,写出来的ConcurrentAlloc整体的性能还是没有malloc高,尤其是free的时候,二者差了很多,那么就必须要做出一些优化。

对于项目性能的优化

对项目性能进行优化的时候不要凭着自己的感觉走,要用性能检测的工具,像vs下就有,其他的平台也是有的,甚至有的公司内部会用自己的测试性能的工具,但我这里就不介绍了,我就是在vs下写的,就直接用vs下测试性能的工具了。

vs下测试性能的工具

在哪里呢,在这里(我用的是vs2019,可能不同vs的版本的头上的那个图标不太一样,直接找性能探查器就行):
在这里插入图片描述

点开之后是这样的:
在这里插入图片描述

勾选那个检测,会自动帮你分析你的代码中哪些函数调用了多少次,占比时间等等。

开始之后是这个:
在这里插入图片描述

点击确定之后就开始帮你分析了,可能会比较慢,稍等一会:
在这里插入图片描述

分析完之后是这样子:
在这里插入图片描述

这个表格的下面有两个东西,一个叫热路径,一个叫执行单个工作最多的函数:
在这里插入图片描述

这两个东西中,哪个函数占比最多,就需要对相应的函数做优化。

这里热路径中第一个是std::thread::invoke,std::thread::_invoke 是一个非公共成员函数,用于在 C++ 的 std::thread 对象内部执行线程函数。这个函数实际上是启动线程并执行线程函数的底层机制。

第二个std::invoke是 C++17 引入的一个函数模板,用于调用给定的可调用对象(callable object),例如函数、lambda 表达式、函数指针、成员函数指针、以及可调用对象包装器(如 std::function)。

第三个是lambda底层形成的仿函数。

可以看到std::thread::invoke和std::invoke不在一条直线上,意思就是std::thread::invoke中调用了std::invoke,同样的再往下就像栈帧一样,直接将每个函数中耗时最多的函数显示出来,如果你点击一下函数名,就会转入这个函数进行分析:
在这里插入图片描述
可以直接点击那个红框,就会接着分析std::invoke:
在这里插入图片描述

我把整个都给出来:
在这里插入图片描述

这样就很清晰。

执行单个工作最多的函数是lock,看来加锁解的开销还是很大的。能不能不加锁呢?
可以的,tcmalloc中使用了基数树来解决这里性能方面的问题。

那么本项目的最后一步:

针对性能瓶颈使用基数树进行优化

上面性能问题一方面是数据量大了之后unordered_map查找消耗比较大,还有一个就是锁竞争的消耗比较大,而如果说查的越慢那么锁竞争就会越激烈,所以说上面的实现中,随着数据量的增大,会导致性能越来越不行。

那么tcmalloc中使用了基数树来替代上面代码中的哈希表,这里就来简单介绍一下基数树。

tcmalloc源码中基数树给了3棵,你可以把基数树理解成一个多叉树,3棵树,每棵的层数是不一样的。分别是1层、2层、3层。基数树并没有那么复杂,我简单过一下就行。

先来说最简单的,一层的基数树。

单层基数树

单层的基数树最简单了,就是一个数组,严格的来说就是一个哈希表,一个用直接定址法来映射的哈希表,其中的 K-V 关系就是 页号-span*。

页号就是一个数组,那么在页号位数没有那么大的情况下,把数组的大小开到最大的页号,假设数组就是arr,那么直接arr[页号]就能找到该页号对应的页,这就是单层基数树,如图:
在这里插入图片描述

大概就是这样的,不过基数树本身内部不是span*,而是void*,但也没有关系,都是地址,能找到对应地址就行。

那么来看一下单层基数树的代码(简单过一眼,不要细看):

// Single-level array
template <int BITS>
class TCMalloc_PageMap1 {
private:
	static const int LENGTH = 1 << BITS; // 数组要开的长度
	void** array_; // 底层存放指针的数组

public:
	typedef uintptr_t Number;

	explicit TCMalloc_PageMap1() {// 开空间
		size_t size = sizeof(void*) << BITS;
		size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT);
		array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT);
		memset(array_, 0, sizeof(void*) << BITS);
	}

	// Return the current value for KEY.  Returns NULL if not yet set,
	// or if k is out of range.
	void* get(Number k) const { // 通过k来获取对应的指针
		if ((k >> BITS) > 0) {
			return NULL;
		}
		return array_[k];
	}

	// REQUIRES "k" is in range "[0,2^BITS-1]".
	// REQUIRES "k" has been ensured before.
	//
	// Sets the value 'v' for key 'k'.
	void set(Number k, void* v) { // 将v设置到k下标
		array_[k] = v;
	}
};

来说一下非类型模版参数BITS是啥,BITS表示存储所有的页号至少需要多少比特位。

这就是依据平台而定了,比如说32位下,按一页8KB来算,页内偏移就是13位(8KB = 2^13),所以页号能占32 - 13 = 19位,那么就是32 - PAGE_SHFIT,上面PAGE_SHFIT定义的是13,所以这里BITS就是19。也就是说数组要开 2 19 2^{19} 219个span*大小的空间,也就是 2 19 ? 4 ? 2 21 2^{19} * 4 ? 2^{21} 219?4?221,也就是2M,所以说这里一个数组要开2M的空间,没有那么大,完全是在接受范围内的。

如果是64位呢?64 - PAGE_SHFIT ? 51,64位下一个指针8B,那总共就要开 2 51 ? 8 ? 2 54 2^{51} * 8 ? 2^{54} 251?8?254,要知道1TB也才 2 40 2^{40} 240,这里都干到 2 14 2^{14} 214TB了,这对于我们普通人的内存来说都是一个天文数字了,就算把内存榨干也掏不出来这么多。所以说64位下是不能用这里的一层基数树的,实际上两层也不行,得用三层的。

所以说32位下用一层基数树就可以实现效率上面质的提升,64位后面说。

里面还有两个接口,一个get,一个set,分别是获取某个页号对应的span*,和将某个span*放到对应的数组下标处。详细的逻辑就不说了。

这就是一层的基数树,下面来说两层的,等三个都说完,再来说为啥基数树不需要加锁。

两层基数树

和一层的差不多,只不过是多了一层数组。

不知道你学过页表没有,就像一级页表、二级页表那样。

先挑出来页号的前几位来决定第一层数组的大小,然后后几位来决定第二层数组的大小。

这里32位下,页号有19位,那么挑出来前5位来作为第一层数组的直接定址,然后后面的14位用来第二层的直接定址,看图:
在这里插入图片描述

前五位,也就是2的五次方,需要用长度为32的数组,也就是该基数树的第一层:
在这里插入图片描述

后14位,需要长度为 2 14 2^{14} 214的数组,也就是该基数树的第二层(叶子):
在这里插入图片描述

那么第一层中每个元素指向的都是一个数组,也就是后14位的数组:
在这里插入图片描述

这里第一层中每个元素是一个数组指针,而第二层中每个元素就是一个span*:
在这里插入图片描述

那么这里就是先通过页号的前五位来定位出一个第二层中的数组,然后再通过后14位找到对应的span*。

就这么简单,那这里第二层和第一层也没有啥大区别,32位下,这里最后总共的span*也照样会开到2M的空间,但是比第一点好的是,这里在前期可以稍微节省一点空间,因为如果前5位中如果有一个数还没有映射的时候就可以先不开其对应14位的二层数组,比如前五位为0x00000还没有映射到第一层时,那就不把第一层0号下标对应的二层数组开出来。当需要映射的时候再开。

但是这里32位就直接全部开好得了,64位的也用不了这里的第二层,不够的。

这里把二层的基数树给出来:

// Two-level radix tree
template <int BITS>
class TCMalloc_PageMap2 {
private:
	// Put 32 entries in the root and (2^BITS)/32 entries in each leaf.
	static const int ROOT_BITS = 5; // 32位下前5位搞一个第一层的数组
	static const int ROOT_LENGTH = 1 << ROOT_BITS;

	static const int LEAF_BITS = BITS - ROOT_BITS; // 32位下后14位搞成第二层的数组
	static const int LEAF_LENGTH = 1 << LEAF_BITS;

	// Leaf node
	struct Leaf { // 叶子就是后14位的数组
		void* values[LEAF_LENGTH];
	};

	Leaf* root_[ROOT_LENGTH];             // 根就是前5位的数组
public:
	typedef uintptr_t Number;

	//explicit TCMalloc_PageMap2(void* (*allocator)(size_t)) {
	explicit TCMalloc_PageMap2() { // 直接把所有的空间都开好
		memset(root_, 0, sizeof(root_));
		PreallocateMoreMemory(); // 直接开2M的span*全开出来
	}

	void* get(Number k) const {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		if ((k >> BITS) > 0 || root_[i1] == NULL) {
			return NULL;
		}
		return root_[i1]->values[i2];
	}

	void set(Number k, void* v) {
		const Number i1 = k >> LEAF_BITS;
		const Number i2 = k & (LEAF_LENGTH - 1);
		ASSERT(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v;
	}

	// 确保从start开始往后的n页空间开好了
	bool Ensure(Number start, size_t n) {
		for (Number key = start; key <= start + n - 1;) {
			const Number i1 = key >> LEAF_BITS;

			// Check for overflow
			if (i1 >= ROOT_LENGTH)
				return false;

			// 如果没开好就开空间
			if (root_[i1] == NULL) {
				static ObjectPool<Leaf>	leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();

				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}

			// Advance key past whatever is covered by this leaf node
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS;
		}
		return true;
	}

	// 提前开好空间,这里就把2M的直接开好
	void PreallocateMoreMemory() {
		// Allocate enough to keep track of all possible pages
		Ensure(0, 1 << BITS);
	}
};

里面有一个Ensure函数,就是为了确保某些页号对应的空间开好了,如果没开好就直接在这个函数中开。

三层基数树

同样的逻辑,再多第一层就行,这个主要是给64位下用的,对应的结构就是这样:
在这里插入图片描述

其中第三层也就是绿色的部分,前两层中的数组都是存放下一层中每个数组的指针,也就是说前两层放的都是数组指针,最后一层放的是span*。

同理,这样的结构可以不需要把所有的空间在初始的情况下都开好,这样就能保证需要的页都能映射到对应的span,而且一个进程是不可能将所有的页全部映射的,这样把不需要映射的页对应的数组就不开了,这样就能节省空间,从而再64位下就能用了。

那么下面就来用基数树对前面的代码进行优化。

优化代码

强调一点,我这里就不搞64位的了,光搞一下32位的,所以就只用一下一层的基数树来对代码进行优化,感兴趣的同学可以自行实现一下64位下的代码。

首先直接将PageCache中的unordered_map替换成一层的基数树:
在这里插入图片描述

然后把相关原先用unordered_map中的获取span*换成调用get方法,设置span*的换成调用set方法。

由于修改的地方过于分散,我就不把所有详细的都给出来了,我就直接说一下咋改。

都在PageCache中:

  • 对于原先位_idSpanMap[页号] = span的都改为_idSpanMap(页号, span);

  • 对于原先位_idSpanMap.find(页号)的都改为_idSpanMap.get(页号);而且此时的返回值就是Span,不是原先的迭代器了,所以用到ret = _idSpanMap.find(页号)的地方,如果有Span* span = ret->seconde的都直接变成Span* span = _idSpanMap.get(页号)即可。然后判断中有的地方是ret != _idSpanMap.end(), 直接改成span != nullptr就行。

此时我们就可以直接将MapIdToSpan中的锁去掉了:
在这里插入图片描述

说一下为啥:
这里是在对基数树进行读操作,进行读操作的地方只有两个,一个是在ConcurrentFree里:
在这里插入图片描述
一个是在ReleaseListToSpans中:
在这里插入图片描述
二者都存在于回收的逻辑当中。

而对于基数树的写操作也是只有两个函数会走到,一个是NewSpan:
在这里插入图片描述

一个是ReleaseSpanToPageCache:
在这里插入图片描述

ok,对基数树就只有读写操作,那么我来问如下问题,你来回答一下:

  1. 写操作是在对基数树做什么?读操作是在对基数树做什么?
  2. 对于一个页号的映射,会不会出现一个线程读某个页号,另一个线程写同一个页号?
  3. 基数树的结构在整个流程中会变化吗?

那么我来逐一回答:

  1. 写操作,就是对于数组中的某一个元素进行写入,直接写入一个span*的指针。
    读操作,就是通过下标找到对应span*,从而得到这个span*。

  2. 不会出现问题中的情况,我们的逻辑中是能保证这一点的。
    ??调用MapIdToSpan读一个页号,那么这个页号一定是不会在pc中的,一定是在cc中的,因为MapIdToSpan的调用都是存在于回收的逻辑当中的,通过指针找的span,这个span是要还回去的,一定不在pc中。而对于写操作,一个NewSpan是在将Span从pc中拿出去之前就设立了映射,也就是对基数树进行写操作,一个ReleaseSpanToPageCache是在Span归还到pc中之后才设立映射。所以读写操作是一定不会同时对一个span进行的。

  3. 这一点是最重要的。基数树的结构在整个流程中不会变化。
    ??因为数组的空间都是提前开好了的,或者是遇到了之后在原先的基础之上再开空间,不会修改原先的空间。
    ??而unordered_map遇到容量不够的情况时会出现扩容等情况,假如说有一个线程t1在扩容前的结构上进行查找,而另一个线程t2进行了扩容,此时t1线程可能会找到一个野指针,但t1并不知情,此时t1解引用其找到的Span*就会崩掉,所以STL中的unordered_map在整个流程中是可能修改其本身的结构的。
    ??既然基数树的结构不会修改,那么整个流程中查找到的就一定是一棵树不会改变的树,在这一点的基础上配合第二点,就不需要加锁了。

下面再来测试。

优化后的代码测试

我就直接在Release下测试了。

4个线程,每个申请10000次,申请不同桶中的块的结果:
在这里插入图片描述

4个线程,每个申请10000次,申请相同桶中的块的结果:
在这里插入图片描述

4个线程,每个申请100000次,申请不同桶中的块的结果:
在这里插入图片描述
4个线程,每个申请100000次,申请相同桶中的块的结果:
在这里插入图片描述

可以看到是比malloc快了不少的。

这个项目到此就完成了,再强调一遍,这个项目只是为了学习大佬的成果,从而提升自己,并不是为了完全复刻tcmalloc的源码,不要想着你自己一个人复刻一个tcmalloc出来,源码中的tcmalloc得要好几万行的,我这里最终才写了不到两千行的代码,只是把其中特别核心的内容拿出来讲的,如果你真的对源码感兴趣可以去Gitee上看。

最后的总结

整体框架回顾

最后再对本篇中三个结构进行简单总结,首先是ThreadCache层,其中有如下重要的函数与对象:
在这里插入图片描述

其中底层用到的结构是哈希,每个哈希桶用FreeList来表示,FreeList用来存放和管理相同大小的块空间。

第二层的CentralCache,其中有如下重要的函数与对象:
在这里插入图片描述

底层用到的结构也是哈希,每个哈希桶用SpanList来表示,SpanList是用来管理一个一个Span的带头双向循环链表,每个其中的Span是用来管理和分配页空间的。每个桶都有桶锁,以解决多执行流向同一个桶申请空间时的问题。

CentralCache还用到了单例模式,用饿汉创建单例,以便ThreadCache层和PageCache层能够调用到其中的接口。

第三层的PageCache,其中有如下重要的函数与对象:
在这里插入图片描述

其中存放数据用到的结构也是哈希,每个哈希桶也是用SpanList来表示,每个SpanList用来管理一个一个的span,pc中的每个span是用来存放和管理页空间的。

pc也用到了单例模式,也是用饿汉创建单例对象,以便CentralCache层能调用到其中的接口。

  • 三层中都用到了哈希映射,第一层和第二层的映射规则相同,第三层的映射规则是按照页大小来映射。

  • ThreadCache每个线程都有一个,CentralCache和PageCache在整个流程中都只有一个。

流程梳理

申请的流程

在这里插入图片描述

释放的流程

在这里插入图片描述

关于哈希

项目中三层都用到了哈希,但是除了管理空间之外还有其他的哈希。

  • 优化前,使用unordered_map存放 页号-span* 的映射关系,以便快速查找span来归还空间。

  • 优化后,使用基数树存放 页号-span* 的映射关系,查找效率更高。

关于锁

tc中没有锁,因为每个线程独有一个ThreadCache。

cc中有桶锁,为了尽量保证效率和安全,采用桶锁,以供多线程并发向单个桶申请空间块时安全又高效。

pc中有整体锁,因为多线程并发申请span时会导致span的分裂和合并,只能保证安全。但经过三层的缓存,效率上并不会有太大影响。

定长内存池中需要一个锁,为了防止多线程并发创建ThreadCache时会导致某个线程获取到一个空指针,从而使得定位new时程序崩溃。

结语

实际中我们测试了,当前实现的并发内存池比malloc/free是更加高效的,那么我们能否替换到系统调用malloc呢?

  • 实际上是可以的。但是不同平台替换方式不同。

基于unix的系统上的glibc,使用了weak alias的方式替换。具体来说是因为这些入口函数都被定义成了weak symbols,再加上gcc支持 alias attribute,所以替换就变成了这种通用形式:

void* malloc(size_t size) THROW attribute__ ((alias (tc_malloc)))

这样所有malloc的调用就都跳转到了tc_malloc的实现

想要了解的同学可以看这篇:GCC attribute 之weak,alias属性

有些平台不支持这样的东西,需要使用hook的钩子技术来做。关于hook,可以看这篇:Hook技术

一般而言,像本片中实现出来的东西可以做成动静态库,这样后续可以直接引相应的头文件直接用,想要生成动静态库也很简单,修改一下生成文件就行:
在这里插入图片描述

然后会蹦出来这个窗口:
在这里插入图片描述

选好之后编译一下就能生成对应的动态库或者静态库,而且生成的路径就是你的工作路径下面,我这里选择动态库,完事编译之后就是这个:
在这里插入图片描述
对应生成的动态库文件:
在这里插入图片描述

静态库是同样的做法。

该讲的都讲了,就这么多。

本篇代码仓库

Gitee链接:内存池项目

到此结束。。。

文章来源:https://blog.csdn.net/m0_62782700/article/details/135443352
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。