91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

C++高并發內存池如何實現

發布時間:2022-08-18 17:42:58 來源:億速云 閱讀:278 作者:iii 欄目:開發技術

這篇文章主要講解了“C++高并發內存池如何實現”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“C++高并發內存池如何實現”吧!

內存池介紹

池化技術

在說內存池之前,我們得先了解一下“池化技術”。所謂“池化技術”,就是程序先向系統申請過量的資源,然后自己進行管理,以備不時之需。

之所以要申請過量的資源,是因為申請和釋放資源都有較大的開銷,不如提前申請一些資源放入“池”中,當需要資源時直接從“池”中獲取,不需要時就將該資源重新放回“池”中即可。這樣使用時就會變得非常快捷,可以大大提高程序的運行效率。

在計算機中,有很多使用“池”這種技術的地方,除了內存池之外,還有連接池、線程池、對象池等。以服務器上的線程池為例,它的主要思想就是:先啟動若干數量的線程,讓它們處于睡眠狀態,當接收到客戶端的請求時,喚醒池中某個睡眠的線程,讓它來處理客戶端的請求,當處理完這個請求后,線程又進入睡眠狀態。

內存池

內存池是指程序預先向操作系統申請一塊足夠大的內存,此后,當程序中需要申請內存的時候,不是直接向操作系統申請,而是直接從內存池中獲取;同理,當釋放內存的時候,并不是真正將內存返回給操作系統,而是將內存返回給內存池。當程序退出時(或某個特定時間),內存池才將之前申請的內存真正釋放。

內存池主要解決的問題

內存池主要解決的就是效率的問題,它能夠避免讓程序頻繁的向系統申請和釋放內存。其次,內存池作為系統的內存分配器,還需要嘗試解決內存碎片的問題。

內存碎片分為內部碎片和外部碎片:

  • 外部碎片是一些空閑的小塊內存區域,由于這些內存空間不連續,以至于合計的內存足夠,但是不能滿足一些內存分配申請需求。

  • 內部碎片是由于一些對齊的需求,導致分配出去的空間中一些內存無法被利用。

注意: 內存池嘗試解決的是外部碎片的問題,同時也盡可能的減少內部碎片的產生。

malloc

C/C++中我們要動態申請內存并不是直接去堆申請的,而是通過malloc函數去申請的,包括C++中的new實際上也是封裝了malloc函數的。

我們申請內存塊時是先調用malloc,malloc再去向操作系統申請內存。malloc實際就是一個內存池,malloc相當于向操作系統“批發”了一塊較大的內存空間,然后“零售”給程序用,當全部“售完”或程序有大量的內存需求時,再根據實際需求向操作系統“進貨”。

C++高并發內存池如何實現

malloc的實現方式有很多種,一般不同編譯器平臺用的都是不同的。比如Windows的VS系列中的malloc就是微軟自行實現的,而Linux下的gcc用的是glibc中的ptmalloc。

定長內存池的實現

malloc其實就是一個通用的內存池,在什么場景下都可以使用,但這也意味著malloc在什么場景下都不會有很高的性能,因為malloc并不是針對某種場景專門設計的。

定長內存池就是針對固定大小內存塊的申請和釋放的內存池,由于定長內存池只需要支持固定大小內存塊的申請和釋放,因此我們可以將其性能做到極致,并且在實現定長內存池時不需要考慮內存碎片等問題,因為我們申請/釋放的都是固定大小的內存塊。

我們可以通過實現定長內存池來熟悉一下對簡單內存池的控制,其次,這個定長內存池后面會作為高并發內存池的一個基礎組件。

如何實現定長?

在實現定長內存池時要做到“定長”有很多種方法,比如我們可以使用非類型模板參數,使得在該內存池中申請到的對象的大小都是N。

template<size_t N>
class ObjectPool
{};

此外,定長內存池也叫做對象池,在創建對象池時,對象池可以根據傳入的對象類型的大小來實現“定長”,因此我們可以通過使用模板參數來實現“定長”,比如創建定長內存池時傳入的對象類型是int,那么該內存池就只支持4字節大小內存的申請和釋放。

template<class T>
class ObjectPool
{};

如何直接向堆申請空間?

既然是內存池,那么我們首先得向系統申請一塊內存空間,然后對其進行管理。要想直接向堆申請內存空間,在Windows下,可以調用VirtualAlloc函數;在Linux下,可以調用brk或mmap函數。

#ifdef _WIN32
	#include <Windows.h>
#else
	//...
#endif

//直接去堆上申請按頁申請空間
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32
	void* ptr = VirtualAlloc(0, kpage<<13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else
	// linux下brk mmap等
#endif
	if (ptr == nullptr)
		throw std::bad_alloc();
	return ptr;
}

這里我們可以通過條件編譯將對應平臺下向堆申請內存的函數進行封裝,此后我們就不必再關心當前所在平臺,當我們需要直接向堆申請內存時直接調用我們封裝后的SystemAlloc函數即可。

定長內存池中應該包含哪些成員變量?

對于向堆申請到的大塊內存,我們可以用一個指針來對其進行管理,但僅用一個指針肯定是不夠的,我們還需要用一個變量來記錄這塊內存的長度。

由于此后我們需要將這塊內存進行切分,為了方便切分操作,指向這塊內存的指針最好是字符指針,因為指針的類型決定了指針向前或向后走一步有多大距離,對于字符指針來說,當我們需要向后移動n個字節時,直接對字符指針進行加n操作即可。

C++高并發內存池如何實現

其次,釋放回來的定長內存塊也需要被管理,我們可以將這些釋放回來的定長內存塊鏈接成一個鏈表,這里我們將管理釋放回來的內存塊的鏈表叫做自由鏈表,為了能找到這個自由鏈表,我們還需要一個指向自由鏈表的指針。

C++高并發內存池如何實現

因此,定長內存池當中包含三個成員變量:

  • _memory:指向大塊內存的指針。

  • _remainBytes:大塊內存切分過程中剩余字節數。

  • _freeList:還回來過程中鏈接的自由鏈表的頭指針。

內存池如何管理釋放的對象?

對于還回來的定長內存塊,我們可以用自由鏈表將其鏈接起來,但我們并不需要為其專門定義鏈式結構,我們可以讓內存塊的前4個字節(32位平臺)或8個字節(64位平臺)作為指針,存儲后面內存塊的起始地址即可。

因此在向自由鏈表插入被釋放的內存塊時,先讓該內存塊的前4個字節或8個字節存儲自由鏈表中第一個內存塊的地址,然后再讓_freeList指向該內存塊即可,也就是一個簡單的鏈表頭插操作。

C++高并發內存池如何實現

這里有一個有趣問題:如何讓一個指針在32位平臺下解引用后能向后訪問4個字節,在64位平臺下解引用后能向后訪問8個字節?

首先我們得知道,32位平臺下指針的大小是4個字節,64位平臺下指針的大小是8個字節。而指針指向數據的類型,決定了指針解引用后能向后訪問的空間大小,因此我們這里需要的是一個指向指針的指針,這里使用二級指針就行了。

當我們需要訪問一個內存塊的前4/8個字節時,我們就可以先該內存塊的地址先強轉為二級指針,由于二級指針存儲的是一級指針的地址,二級指針解引用能向后訪問一個指針的大小,因此在32位平臺下訪問的就是4個字節,在64位平臺下訪問的就是8個字節,此時我們訪問到了該內存塊的前4/8個字節。

void*& NextObj(void* ptr)
{
	return (*(void**)ptr);
}

需要注意的是,在釋放對象時,我們應該顯示調用該對象的析構函數清理該對象,因為該對象可能還管理著其他某些資源,如果不對其進行清理那么這些資源將無法被釋放,就會導致內存泄漏。

//釋放對象
void Delete(T* obj)
{
	//顯示調用T的析構函數清理對象
	obj->~T();

	//將釋放的對象頭插到自由鏈表
	NextObj(obj) = _freeList;
	_freeList = obj;
}

內存池如何為我們申請對象?

當我們申請對象時,內存池應該優先把還回來的內存塊對象再次重復利用,因此如果自由鏈表當中有內存塊的話,就直接從自由鏈表頭刪一個內存塊進行返回即可。

C++高并發內存池如何實現

如果自由鏈表當中沒有內存塊,那么我們就在大塊內存中切出定長的內存塊進行返回,當內存塊切出后及時更新_memory指針的指向,以及_remainBytes的值即可。

C++高并發內存池如何實現

需要特別注意的是,由于當內存塊釋放時我們需要將內存塊鏈接到自由鏈表當中,因此我們必須保證切出來的對象至少能夠存儲得下一個地址,所以當對象的大小小于當前所在平臺指針的大小時,需要按指針的大小進行內存塊的切分。

此外,當大塊內存已經不足以切分出一個對象時,我們就應該調用我們封裝的SystemAlloc函數,再次向堆申請一塊內存空間,此時也要注意及時更新_memory指針的指向,以及_remainBytes的值。

//申請對象
T* New()
{
	T* obj = nullptr;

	//優先把還回來的內存塊對象,再次重復利用
	if (_freeList != nullptr)
	{
		//從自由鏈表頭刪一個對象
		obj = (T*)_freeList;
		_freeList = NextObj(_freeList);
	}
	else
	{
		//保證對象能夠存儲得下地址
		size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
		//剩余內存不夠一個對象大小時,則重新開大塊空間
		if (_remainBytes < objSize)
		{
			_remainBytes = 128 * 1024;
			_memory = (char*)SystemAlloc(_remainBytes >> 13);
			if (_memory == nullptr)
			{
				throw std::bad_alloc();
			}
		}
		//從大塊內存中切出objSize字節的內存
		obj = (T*)_memory;
		_memory += objSize;
		_remainBytes -= objSize;
	}
	//定位new,顯示調用T的構造函數初始化
	new(obj)T;

	return obj;
}

需要注意的是,與釋放對象時需要顯示調用該對象的析構函數一樣,當內存塊切分出來后,我們也應該使用定位new,顯示調用該對象的構造函數對其進行初始化。

定長內存池整體代碼如下:

//定長內存池
template<class T>
class ObjectPool
{
public:
	//申請對象
	T* New()
	{
		T* obj = nullptr;

		//優先把還回來的內存塊對象,再次重復利用
		if (_freeList != nullptr)
		{
			//從自由鏈表頭刪一個對象
			obj = (T*)_freeList;
			_freeList = NextObj(_freeList);
		}
		else
		{
			//保證對象能夠存儲得下地址
			size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T);
			//剩余內存不夠一個對象大小時,則重新開大塊空間
			if (_remainBytes < objSize)
			{
				_remainBytes = 128 * 1024;
				//_memory = (char*)malloc(_remainBytes);
				_memory = (char*)SystemAlloc(_remainBytes >> 13);
				if (_memory == nullptr)
				{
					throw std::bad_alloc();
				}
			}
			//從大塊內存中切出objSize字節的內存
			obj = (T*)_memory;
			_memory += objSize;
			_remainBytes -= objSize;
		}
		//定位new,顯示調用T的構造函數初始化
		new(obj)T;

		return obj;
	}
	//釋放對象
	void Delete(T* obj)
	{
		//顯示調用T的析構函數清理對象
		obj->~T();

		//將釋放的對象頭插到自由鏈表
		NextObj(obj) = _freeList;
		_freeList = obj;
	}
private:
	char* _memory = nullptr;     //指向大塊內存的指針
	size_t _remainBytes = 0;     //大塊內存在切分過程中剩余字節數

	void* _freeList = nullptr;   //還回來過程中鏈接的自由鏈表的頭指針
};

性能對比

下面我們將實現的定長內存池和malloc/free進行性能對比,測試代碼如下:

struct TreeNode
{
	int _val;
	TreeNode* _left;
	TreeNode* _right;
	TreeNode()
		:_val(0)
		, _left(nullptr)
		, _right(nullptr)
	{}
};

void TestObjectPool()
{
	// 申請釋放的輪次
	const size_t Rounds = 3;
	// 每輪申請釋放多少次
	const size_t N = 1000000;
	std::vector<TreeNode*> v1;
	v1.reserve(N);

	//malloc和free
	size_t begin1 = clock();
	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v1.push_back(new TreeNode);
		}
		for (int i = 0; i < N; ++i)
		{
			delete v1[i];
		}
		v1.clear();
	}
	size_t end1 = clock();

	//定長內存池
	ObjectPool<TreeNode> TNPool;
	std::vector<TreeNode*> v2;
	v2.reserve(N);
	size_t begin2 = clock();
	for (size_t j = 0; j < Rounds; ++j)
	{
		for (int i = 0; i < N; ++i)
		{
			v2.push_back(TNPool.New());
		}
		for (int i = 0; i < N; ++i)
		{
			TNPool.Delete(v2[i]);
		}
		v2.clear();
	}
	size_t end2 = clock();

	cout << "new cost time:" << end1 - begin1 << endl;
	cout << "object pool cost time:" << end2 - begin2 << endl;
}

在代碼中,我們先用new申請若干個TreeNode對象,然后再用delete將這些對象再釋放,通過clock函數得到整個過程消耗的時間。(new和delete底層就是封裝的malloc和free)

然后再重復該過程,只不過將其中的new和delete替換為定長內存池當中的New和Delete,此時再通過clock函數得到該過程消耗的時間。

C++高并發內存池如何實現

可以看到在這個過程中,定長內存池消耗的時間比malloc/free消耗的時間要短。這就是因為malloc是一個通用的內存池,而定長內存池是專門針對申請定長對象而設計的,因此在這種特殊場景下定長內存池的效率更高,正所謂“尺有所短,寸有所長”。

高并發內存池整體框架設計

該項目解決的是什么問題?

現代很多的開發環境都是多核多線程,因此在申請內存的時,必然存在激烈的鎖競爭問題。malloc本身其實已經很優秀了,但是在并發場景下可能會因為頻繁的加鎖和解鎖導致效率有所降低,而該項目的原型tcmalloc實現的就是一種在多線程高并發場景下更勝一籌的內存池。

在實現內存池時我們一般需要考慮到效率問題和內存碎片的問題,但對于高并發內存池來說,我們還需要考慮在多線程環境下的鎖競爭問題。

高并發內存池整體框架設計

C++高并發內存池如何實現

高并發內存池主要由以下三個部分構成:

  • thread cache: 線程緩存是每個線程獨有的,用于小于等于256KB的內存分配,每個線程獨享一個thread cache。

  • central cache: 中心緩存是所有線程所共享的,當thread cache需要內存時會按需從central cache中獲取內存,而當thread cache中的內存滿足一定條件時,central cache也會在合適的時機對其進行回收。

  • page cache: 頁緩存中存儲的內存是以頁為單位進行存儲及分配的,當central cache需要內存時,page cache會分配出一定數量的頁分配給central cache,而當central cache中的內存滿足一定條件時,page cache也會在合適的時機對其進行回收,并將回收的內存盡可能的進行合并,組成更大的連續內存塊,緩解內存碎片的問題。

進一步說明:

每個線程都有一個屬于自己的thread cache,也就意味著線程在thread cache申請內存時是不需要加鎖的,而一次性申請大于256KB內存的情況是很少的,因此大部分情況下申請內存時都是無鎖的,這也就是這個高并發內存池高效的地方。

每個線程的thread cache會根據自己的情況向central cache申請或歸還內存,這就避免了出現單個線程的thread cache占用太多內存,而其余thread cache出現內存吃緊的問題。

多線程的thread cache可能會同時找central cache申請內存,此時就會涉及線程安全的問題,因此在訪問central cache時是需要加鎖的,但central cache實際上是一個哈希桶的結構,只有當多個線程同時訪問同一個桶時才需要加鎖,所以這里的鎖競爭也不會很激烈。

各個部分的主要作用

thread cache主要解決鎖競爭的問題,每個線程獨享自己的thread cache,當自己的thread cache中有內存時該線程不會去和其他線程進行競爭,每個線程只要在自己的thread cache申請內存就行了。

central cache主要起到一個居中調度的作用,每個線程的thread cache需要內存時從central cache獲取,而當thread cache的內存多了就會將內存還給central cache,其作用類似于一個中樞,因此取名為中心緩存。

page cache就負責提供以頁為單位的大塊內存,當central cache需要內存時就會去向page cache申請,而當page cache沒有內存了就會直接去找系統,也就是直接去堆上按頁申請內存塊。

threadcache

threadcache整體設計

定長內存池只支持固定大小內存塊的申請釋放,因此定長內存池中只需要一個自由鏈表管理釋放回來的內存塊。現在我們要支持申請和釋放不同大小的內存塊,那么我們就需要多個自由鏈表來管理釋放回來的內存塊,因此thread cache實際上一個哈希桶結構,每個桶中存放的都是一個自由鏈表。

thread cache支持小于等于256KB內存的申請,如果我們將每種字節數的內存塊都用一個自由鏈表進行管理的話,那么此時我們就需要20多萬個自由鏈表,光是存儲這些自由鏈表的頭指針就需要消耗大量內存,這顯然是得不償失的。

這時我們可以選擇做一些平衡的犧牲,讓這些字節數按照某種規則進行對齊,例如我們讓這些字節數都按照8字節進行向上對齊,那么thread cache的結構就是下面這樣的,此時當線程申請1~8字節的內存時會直接給出8字節,而當線程申請9~16字節的內存時會直接給出16字節,以此類推。

C++高并發內存池如何實現

因此當線程要申請某一大小的內存塊時,就需要經過某種計算得到對齊后的字節數,進而找到對應的哈希桶,如果該哈希桶中的自由鏈表中有內存塊,那就從自由鏈表中頭刪一個內存塊進行返回;如果該自由鏈表已經為空了,那么就需要向下一層的central cache進行獲取了。

但此時由于對齊的原因,就可能會產生一些碎片化的內存無法被利用,比如線程只申請了6字節的內存,而thread cache卻直接給了8字節的內存,這多給出的2字節就無法被利用,導致了一定程度的空間浪費,這些因為某些對齊原因導致無法被利用的內存,就是內存碎片中的內部碎片。

鑒于當前項目比較復雜,我們最好對自由鏈表這個結構進行封裝,目前我們就提供Push和Pop兩個成員函數,對應的操作分別是將對象插入到自由鏈表(頭插)和從自由鏈表獲取一個對象(頭刪),后面在需要時還會添加對應的成員函數。

//管理切分好的小對象的自由鏈表
class FreeList
{
public:
	//將釋放的對象頭插到自由鏈表
	void Push(void* obj)
	{
		assert(obj);

		//頭插
		NextObj(obj) = _freeList;
		_freeList = obj;
	}

	//從自由鏈表頭部獲取一個對象
	void* Pop()
	{
		assert(_freeList);

		//頭刪
		void* obj = _freeList;
		_freeList = NextObj(_freeList);
		return obj;
	}

private:
	void* _freeList = nullptr; //自由鏈表
};

因此thread cache實際就是一個數組,數組中存儲的就是一個個的自由鏈表,至于這個數組中到底存儲了多少個自由鏈表,就需要看我們在進行字節數對齊時具體用的是什么映射對齊規則了。

threadcache哈希桶映射對齊規則

如何進行對齊?

上面已經說了,不是每個字節數都對應一個自由鏈表,這樣開銷太大了,因此我們需要制定一個合適的映射對齊規則。

首先,這些內存塊是會被鏈接到自由鏈表上的,因此一開始肯定是按8字節進行對齊是最合適的,因為我們必須保證這些內存塊,無論是在32位平臺下還是64位平臺下,都至少能夠存儲得下一個指針。

但如果所有的字節數都按照8字節進行對齊的話,那么我們就需要建立 256 &times; 1024 &divide; 8 = 32768 256\times1024\div8=32768 256&times;1024&divide;8=32768個桶,這個數量還是比較多的,實際上我們可以讓不同范圍的字節數按照不同的對齊數進行對齊,具體對齊方式如下:

字節數對齊數哈希桶下標
[ [ [ 1 , 128 1,128 1,128 ] ] ]8 8 8[ [ [ 0 , 16 ) 0,16) 0,16)
[ [ [ 128 + 1 , 1024 128+1,1024 128+1,1024 ] ] ]16 16 16[ [ [ 16 , 72 ) 16,72) 16,72)
[ [ [ 1024 + 1 , 8 &times; 1024 1024+1,8\times1024 1024+1,8&times;1024 ] ] ]128 128 128[ [ [ 72 , 128 ) 72,128) 72,128)
[ [ [ 8 &times; 1024 + 1 , 64 &times; 1024 8\times1024+1,64\times1024 8&times;1024+1,64&times;1024 ] ] ]1024 1024 1024[ [ [ 128 , 184 ) 128,184) 128,184)
[ [ [ 64 &times; 1024 + 1 , 256 &times; 1024 64\times1024+1,256\times1024 64&times;1024+1,256&times;1024 ] ] ]8 &times; 1024 8\times1024 8&times;1024[ [ [ 184 , 208 ) 184,208) 184,208)

空間浪費率

雖然對齊產生的內碎片會引起一定程度的空間浪費,但按照上面的對齊規則,我們可以將浪費率控制到百分之十左右。需要說明的是,1~128這個區間我們不做討論,因為1字節就算是對齊到2字節也有百分之五十的浪費率,這里我們就從第二個區間開始進行計算。

C++高并發內存池如何實現

根據上面的公式,我們要得到某個區間的最大浪費率,就應該讓分子取到最大,讓分母取到最小。比如129~1024這個區間,該區域的對齊數是16,那么最大浪費的字節數就是15,而最小對齊后的字節數就是這個區間內的前16個數所對齊到的字節數,也就是144,那么該區間的最大浪費率也就是 15 &divide; 144 &asymp; 10.42 % 15\div144\approx10.42\% 15&divide;144&asymp;10.42%。同樣的道理,后面兩個區間的最大浪費率分別是 127 &divide; 1152 &asymp; 11.02 % 127\div1152\approx11.02\% 127&divide;1152&asymp;11.02%和 1023 &divide; 9216 &asymp; 11.10 % 1023\div9216\approx11.10\% 1023&divide;9216&asymp;11.10%。

對齊和映射相關函數的編寫

此時有了字節數的對齊規則后,我們就需要提供兩個對應的函數,分別用于獲取某一字節數對齊后的字節數,以及該字節數對應的哈希桶下標。關于處理對齊和映射的函數,我們可以將其封裝到一個類當中。

//管理對齊和映射等關系
class SizeClass
{
public:
	//獲取向上對齊后的字節數
	static inline size_t RoundUp(size_t bytes);
	//獲取對應哈希桶的下標
	static inline size_t Index(size_t bytes);
};

需要注意的是,SizeClass類當中的成員函數最好設置為靜態成員函數,否則我們在調用這些函數時就需要通過對象去調用,并且對于這些可能會頻繁調用的函數,可以考慮將其設置為內聯函數。

在獲取某一字節數向上對齊后的字節數時,可以先判斷該字節數屬于哪一個區間,然后再通過調用一個子函數進行進一步處理。

//獲取向上對齊后的字節數
static inline size_t RoundUp(size_t bytes)
{
	if (bytes <= 128)
	{
		return _RoundUp(bytes, 8);
	}
	else if (bytes <= 1024)
	{
		return _RoundUp(bytes, 16);
	}
	else if (bytes <= 8 * 1024)
	{
		return _RoundUp(bytes, 128);
	}
	else if (bytes <= 64 * 1024)
	{
		return _RoundUp(bytes, 1024);
	}
	else if (bytes <= 256 * 1024)
	{
		return _RoundUp(bytes, 8 * 1024);
	}
	else
	{
		assert(false);
		return -1;
	}
}

此時我們就需要編寫一個子函數,該子函數需要通過對齊數計算出某一字節數對齊后的字節數,最容易想到的就是下面這種寫法。

//一般寫法
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
	size_t alignSize = 0;
	if (bytes%alignNum != 0)
	{
		alignSize = (bytes / alignNum + 1)*alignNum;
	}
	else
	{
		alignSize = bytes;
	}
	return alignSize;
}

除了上述寫法,我們還可以通過位運算的方式來進行計算,雖然位運算可能并沒有上面的寫法容易理解,但計算機執行位運算的速度是比執行乘法和除法更快的。

//位運算寫法
static inline size_t _RoundUp(size_t bytes, size_t alignNum)
{
	return ((bytes + alignNum - 1)&~(alignNum - 1));
}

對于上述位運算,我們以10字節按8字節對齊為例進行分析。 8 &minus; 1 = 7 8-1=7 8&minus;1=7,7就是一個低三位為1其余位為0的二進制序列,我們將10與7相加,相當于將10字節當中不夠8字節的剩余字節數補上了。

C++高并發內存池如何實現

然后我們再將該值與7按位取反后的值進行與運算,而7按位取反后是一個低三位為0其余位為1的二進制序列,該操作進行后相當于屏蔽了該值的低三位而該值的其余位保持不變,此時得到的值就是10字節按8字節對齊后的值,即16字節。

C++高并發內存池如何實現

在獲取某一字節數對應的哈希桶下標時,也是先判斷該字節數屬于哪一個區間,然后再通過調用一個子函數進行進一步處理。

//獲取對應哈希桶的下標
static inline size_t Index(size_t bytes)
{
	//每個區間有多少個自由鏈表
	static size_t groupArray[4] = { 16, 56, 56, 56 };
	if (bytes <= 128)
	{
		return _Index(bytes, 3);
	}
	else if (bytes <= 1024)
	{
		return _Index(bytes - 128, 4) + groupArray[0];
	}
	else if (bytes <= 8 * 1024)
	{
		return _Index(bytes - 1024, 7) + groupArray[0] + groupArray[1];
	}
	else if (bytes <= 64 * 1024)
	{
		return _Index(bytes - 8 * 1024, 10) + groupArray[0] + groupArray[1] + groupArray[2];
	}
	else if (bytes <= 256 * 1024)
	{
		return _Index(bytes - 64 * 1024, 13) + groupArray[0] + groupArray[1] + groupArray[2] + groupArray[3];
	}
	else
	{
		assert(false);
		return -1;
	}
}

此時我們需要編寫一個子函數來繼續進行處理,容易想到的就是根據對齊數來計算某一字節數對應的下標。

//一般寫法
static inline size_t _Index(size_t bytes, size_t alignNum)
{
	size_t index = 0;
	if (bytes%alignNum != 0)
	{
		index = bytes / alignNum;
	}
	else
	{
		index = bytes / alignNum - 1;
	}
	return index;
}

當然,為了提高效率下面也提供了一個用位運算來解決的方法,需要注意的是,此時我們并不是傳入該字節數的對齊數,而是將對齊數寫成2的n次方的形式后,將這個n值進行傳入。比如對齊數是8,傳入的就是3。

//位運算寫法
static inline size_t _Index(size_t bytes, size_t alignShift)
{
	return ((bytes + (1 << alignShift) - 1) >> alignShift) - 1;
}

這里我們還是以10字節按8字節對齊為例進行分析。此時傳入的alignShift就是3,將1左移3位后得到的實際上就是對齊數8, 8 &minus; 1 = 7 8-1=7 8&minus;1=7,相當于我們還是讓10與7相加。

C++高并發內存池如何實現

之后我們再將該值向右移3位,實際上就是讓這個值除以8,此時我們也是相當于屏蔽了該值二進制的低三位,因為除以8得到的值與其二進制的低三位無關,所以我們可以說是將10對齊后的字節數除以了8,此時得到了2,而最后還需要減一是因為數組的下標是從0開始的。

ThreadCache類

按照上述的對齊規則,thread cache中桶的個數,也就是自由鏈表的個數是208,以及thread cache允許申請的最大內存大小256KB,我們可以將這些數據按照如下方式進行定義。

//小于等于MAX_BYTES,就找thread cache申請
//大于MAX_BYTES,就直接找page cache或者系統堆申請
static const size_t MAX_BYTES = 256 * 1024;
//thread cache和central cache自由鏈表哈希桶的表大小
static const size_t NFREELISTS = 208;

現在就可以對ThreadCache類進行定義了,thread cache就是一個存儲208個自由鏈表的數組,目前thread cache就先提供一個Allocate函數用于申請對象就行了,后面需要時再進行增加。

class ThreadCache
{
public:
	//申請內存對象
	void* Allocate(size_t size);

private:
	FreeList _freeLists[NFREELISTS]; //哈希桶
};

在thread cache申請對象時,通過所給字節數計算出對應的哈希桶下標,如果桶中自由鏈表不為空,則從該自由鏈表中取出一個對象進行返回即可;但如果此時自由鏈表為空,那么我們就需要從central cache進行獲取了,這里的FetchFromCentralCache函數也是thread cache類中的一個成員函數,在后面再進行具體實現。

//申請內存對象
void* ThreadCache::Allocate(size_t size)
{
	assert(size <= MAX_BYTES);
	size_t alignSize = SizeClass::RoundUp(size);
	size_t index = SizeClass::Index(size);
	if (!_freeLists[index].Empty())
	{
		return _freeLists[index].Pop();
	}
	else
	{
		return FetchFromCentralCache(index, alignSize);
	}
}

threadcacheTLS無鎖訪問

每個線程都有一個自己獨享的thread cache,那應該如何創建這個thread cache呢?我們不能將這個thread cache創建為全局的,因為全局變量是所有線程共享的,這樣就不可避免的需要鎖來控制,增加了控制成本和代碼復雜度。

要實現每個線程無鎖的訪問屬于自己的thread cache,我們需要用到線程局部存儲TLS(Thread Local Storage),這是一種變量的存儲方法,使用該存儲方法的變量在它所在的線程是全局可訪問的,但是不能被其他線程訪問到,這樣就保持了數據的線程獨立性。

//TLS - Thread Local Storage
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

但不是每個線程被創建時就立馬有了屬于自己的thread cache,而是當該線程調用相關申請內存的接口時才會創建自己的thread cache,因此在申請內存的函數中會包含以下邏輯。

//通過TLS,每個線程無鎖的獲取自己專屬的ThreadCache對象
if (pTLSThreadCache == nullptr)
{
	pTLSThreadCache = new ThreadCache;
}

centralcache

centralcache整體設計

當線程申請某一大小的內存時,如果thread cache中對應的自由鏈表不為空,那么直接取出一個內存塊進行返回即可,但如果此時該自由鏈表為空,那么這時thread cache就需要向central cache申請內存了。

central cache的結構與thread cache是一樣的,它們都是哈希桶的結構,并且它們遵循的對齊映射規則都是一樣的。這樣做的好處就是,當thread cache的某個桶中沒有內存了,就可以直接到central cache中對應的哈希桶里去取內存就行了。

central cache與thread cache的不同之處

central cache與thread cache有兩個明顯不同的地方,首先,thread cache是每個線程獨享的,而central cache是所有線程共享的,因為每個線程的thread cache沒有內存了都會去找central cache,因此在訪問central cache時是需要加鎖的。

但central cache在加鎖時并不是將整個central cache全部鎖上了,central cache在加鎖時用的是桶鎖,也就是說每個桶都有一個鎖。此時只有當多個線程同時訪問central cache的同一個桶時才會存在鎖競爭,如果是多個線程同時訪問central cache的不同桶就不會存在鎖競爭。

central cache與thread cache的第二個不同之處就是,thread cache的每個桶中掛的是一個個切好的內存塊,而central cache的每個桶中掛的是一個個的span。

C++高并發內存池如何實現

每個span管理的都是一個以頁為單位的大塊內存,每個桶里面的若干span是按照雙鏈表的形式鏈接起來的,并且每個span里面還有一個自由鏈表,這個自由鏈表里面掛的就是一個個切好了的內存塊,根據其所在的哈希桶這些內存塊被切成了對應的大小。

centralcache結構設計

頁號的類型?

每個程序運行起來后都有自己的進程地址空間,在32位平臺下,進程地址空間的大小是232;而在64位平臺下,進程地址空間的大小就是264。

頁的大小一般是4K或者8K,我們以8K為例。在32位平臺下,進程地址空間就可以被分成 2 32 &divide; 2 13 = 2 19 2^{32}\div2^{13}=2^{19} 232&divide;213=219個頁;在64位平臺下,進程地址空間就可以被分成 2 64 &divide; 2 13 = 2 51 2^{64}\div2^{13}=2^{51} 264&divide;213=251個頁。頁號本質與地址是一樣的,它們都是一個編號,只不過地址是以一個字節為一個單位,而頁是以多個字節為一個單位。

由于頁號在64位平臺下的取值范圍是 [ [ [ 0 , 2 51 ) 0,2^{51}) 0,251),因此我們不能簡單的用一個無符號整型來存儲頁號,這時我們需要借助條件編譯來解決這個問題。

#ifdef _WIN64
	typedef unsigned long long PAGE_ID;
#elif _WIN32
	typedef size_t PAGE_ID;
#else
	//linux
#endif

需要注意的是,在32位下,_WIN32有定義,_WIN64沒有定義;而在64位下,_WIN32和_WIN64都有定義。因此在條件編譯時,我們應該先判斷_WIN64是否有定義,再判斷_WIN32是否有定義。

span的結構

central cache的每個桶里掛的是一個個的span,span是一個管理以頁為單位的大塊內存,span的結構如下:

//管理以頁為單位的大塊內存
struct Span
{
	PAGE_ID _pageId = 0;        //大塊內存起始頁的頁號
	size_t _n = 0;              //頁的數量

	Span* _next = nullptr;      //雙鏈表結構
	Span* _prev = nullptr;

	size_t _useCount = 0;       //切好的小塊內存,被分配給thread cache的計數
	void* _freeList = nullptr;  //切好的小塊內存的自由鏈表
};

對于span管理的以頁為單位的大塊內存,我們需要知道這塊內存具體在哪一個位置,便于之后page cache進行前后頁的合并,因此span結構當中會記錄所管理大塊內存起始頁的頁號。

至于每一個span管理的到底是多少個頁,這并不是固定的,需要根據多方面的因素來控制,因此span結構當中有一個_n成員,該成員就代表著該span管理的頁的數量。

此外,每個span管理的大塊內存,都會被切成相應大小的內存塊掛到當前span的自由鏈表中,比如8Byte哈希桶中的span,會被切成一個個8Byte大小的內存塊掛到當前span的自由鏈表中,因此span結構中需要存儲切好的小塊內存的自由鏈表。

span結構當中的_useCount成員記錄的就是,當前span中切好的小塊內存,被分配給thread cache的計數,當某個span的_useCount計數變為0時,代表當前span切出去的內存塊對象全部還回來了,此時central cache就可以將這個span再還給page cache。

每個桶當中的span是以雙鏈表的形式組織起來的,當我們需要將某個span歸還給page cache時,就可以很方便的將該span從雙鏈表結構中移出。如果用單鏈表結構的話就比較麻煩了,因為單鏈表在刪除時,需要知道當前結點的前一個結點。

雙鏈表結構

根據上面的描述,central cache的每個哈希桶里面存儲的都是一個雙鏈表結構,對于該雙鏈表結構我們可以對其進行封裝。

//帶頭雙向循環鏈表
class SpanList
{
public:
	SpanList()
	{
		_head = new Span;
		_head->_next = _head;
		_head->_prev = _head;
	}
	void Insert(Span* pos, Span* newSpan)
	{
		assert(pos);
		assert(newSpan);

		Span* prev = pos->_prev;

		prev->_next = newSpan;
		newSpan->_prev = prev;

		newSpan->_next = pos;
		pos->_prev = newSpan;
	}
	void Erase(Span* pos)
	{
		assert(pos);
		assert(pos != _head); //不能刪除哨兵位的頭結點

		Span* prev = pos->_prev;
		Span* next = pos->_next;

		prev->_next = next;
		next->_prev = prev;
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶鎖
};

需要注意的是,從雙鏈表刪除的span會還給下一層的page cache,相當于只是把這個span從雙鏈表中移除,因此不需要對刪除的span進行delete操作。

central cache的結構

central cache的映射規則和thread cache是一樣的,因此central cache里面哈希桶的個數也是208,但central cache每個哈希桶中存儲就是我們上面定義的雙鏈表結構。

class CentralCache
{
public:
	//...
private:
	SpanList _spanLists[NFREELISTS];
};

central cache和thread cache的映射規則一樣,有一個好處就是,當thread cache的某個桶沒有內存了,就可以直接去central cache對應的哈希桶進行申請就行了。

centralcache核心實現

central cache的實現方式

每個線程都有一個屬于自己的thread cache,我們是用TLS來實現每個線程無鎖的訪問屬于自己的thread cache的。而central cache和page cache在整個進程中只有一個,對于這種只能創建一個對象的類,我們可以將其設置為單例模式。

單例模式可以保證系統中該類只有一個實例,并提供一個訪問它的全局訪問點,該實例被所有程序模塊共享。單例模式又分為餓漢模式和懶漢模式,懶漢模式相對較復雜,我們這里使用餓漢模式就足夠了。

//單例模式
class CentralCache
{
public:
	//提供一個全局訪問點
	static CentralCache* GetInstance()
	{
		return &_sInst;
	}
private:
	SpanList _spanLists[NFREELISTS];
private:
	CentralCache() //構造函數私有
	{}
	CentralCache(const CentralCache&) = delete; //防拷貝

	static CentralCache _sInst;
};

為了保證CentralCache類只能創建一個對象,我們需要將central cache的構造函數和拷貝構造函數設置為私有,或者在C++11中也可以在函數聲明的后面加上=delete進行修飾。

CentralCache類當中還需要有一個CentralCache類型的靜態的成員變量,當程序運行起來后我們就立馬創建該對象,在此后的程序中就只有這一個單例了。

CentralCache CentralCache::_sInst;

最后central cache還需要提供一個公有的成員函數,用于獲取該對象,此時在整個進程中就只會有一個central cache對象了。

慢開始反饋調節算法

當thread cache向central cache申請內存時,central cache應該給出多少個對象呢?這是一個值得思考的問題,如果central cache給的太少,那么thread cache在短時間內用完了又會來申請;但如果一次性給的太多了,可能thread cache用不完也就浪費了。

鑒于此,我們這里采用了一個慢開始反饋調節算法。當thread cache向central cache申請內存時,如果申請的是較小的對象,那么可以多給一點,但如果申請的是較大的對象,就可以少給一點。

通過下面這個函數,我們就可以根據所需申請的對象的大小計算出具體給出的對象個數,并且可以將給出的對象個數控制到2~512個之間。也就是說,就算thread cache要申請的對象再小,我最多一次性給出512個對象;就算thread cache要申請的對象再大,我至少一次性給出2個對象。

//管理對齊和映射等關系
class SizeClass
{
public:
	//thread cache一次從central cache獲取對象的上限
	static size_t NumMoveSize(size_t size)
	{
		assert(size > 0);
	
		//對象越小,計算出的上限越高
		//對象越大,計算出的上限越低
		int num = MAX_BYTES / size;
		if (num < 2)
			num = 2;
		if (num > 512)
			num = 512;
	
		return num;
	}
};

但就算申請的是小對象,一次性給出512個也是比較多的,基于這個原因,我們可以在FreeList結構中增加一個叫做_maxSize的成員變量,該變量的初始值設置為1,并且提供一個公有成員函數用于獲取這個變量。也就是說,現在thread cache中的每個自由鏈表都會有一個自己的_maxSize。

//管理切分好的小對象的自由鏈表
class FreeList
{
public:
	size_t& MaxSize()
	{
		return _maxSize;
	}

private:
	void* _freeList = nullptr; //自由鏈表
	size_t _maxSize = 1;
};

此時當thread cache申請對象時,我們會比較_maxSize和計算得出的值,取出其中的較小值作為本次申請對象的個數。此外,如果本次采用的是_maxSize的值,那么還會將thread cache中該自由鏈表的_maxSize的值進行加一。

因此,thread cache第一次向central cache申請某大小的對象時,申請到的都是一個,但下一次thread cache再向central cache申請同樣大小的對象時,因為該自由鏈表中的_maxSize增加了,最終就會申請到兩個。直到該自由鏈表中_maxSize的值,增長到超過計算出的值后就不會繼續增長了,此后申請到的對象個數就是計算出的個數。(這有點像網絡中擁塞控制的機制)

從中心緩存獲取對象

每次thread cache向central cache申請對象時,我們先通過慢開始反饋調節算法計算出本次應該申請的對象的個數,然后再向central cache進行申請。

如果thread cache最終申請到對象的個數就是一個,那么直接將該對象返回即可。為什么需要返回一個申請到的對象呢?因為thread cache要向central cache申請對象,其實由于某個線程向thread cache申請對象但thread cache當中沒有,這才導致thread cache要向central cache申請對象。因此central cache將對象返回給thread cache后,thread cache會再將該對象返回給申請對象的線程。

但如果thread cache最終申請到的是多個對象,那么除了將第一個對象返回之外,還需要將剩下的對象掛到thread cache對應的哈希桶當中。

//從中心緩存獲取對象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size)
{
	//慢開始反饋調節算法
	//1、最開始不會一次向central cache一次批量要太多,因為要太多了可能用不完
	//2、如果你不斷有size大小的內存需求,那么batchNum就會不斷增長,直到上限
	size_t batchNum = std::min(_freeLists[index].MaxSize(), SizeClass::NumMoveSize(size));
	if (batchNum == _freeLists[index].MaxSize())
	{
		_freeLists[index].MaxSize() += 1;
	}
	void* start = nullptr;
	void* end = nullptr;
	size_t actualNum = CentralCache::GetInstance()->FetchRangeObj(start, end, batchNum, size);
	assert(actualNum >= 1); //至少有一個

	if (actualNum == 1) //申請到對象的個數是一個,則直接將這一個對象返回即可
	{
		assert(start == end);
		return start;
	}
	else //申請到對象的個數是多個,還需要將剩下的對象掛到thread cache中對應的哈希桶中
	{
		_freeLists[index].PushRange(NextObj(start), end);
		return start;
	}
}

從中心緩存獲取一定數量的對象

這里我們要從central cache獲取n個指定大小的對象,這些對象肯定都是從central cache對應哈希桶的某個span中取出來的,因此取出來的這n個對象是鏈接在一起的,我們只需要得到這段鏈表的頭和尾即可,這里可以采用輸出型參數進行獲取。

//從central cache獲取一定數量的對象給thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t n, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock(); //加鎖
	
	//在對應哈希桶中獲取一個非空的span
	Span* span = GetOneSpan(_spanLists[index], size);
	assert(span); //span不為空
	assert(span->_freeList); //span當中的自由鏈表也不為空

	//從span中獲取n個對象
	//如果不夠n個,有多少拿多少
	start = span->_freeList;
	end = span->_freeList;
	size_t actualNum = 1;
	while (NextObj(end)&&n - 1)
	{
		end = NextObj(end);
		actualNum++;
		n--;
	}
	span->_freeList = NextObj(end); //取完后剩下的對象繼續放到自由鏈表
	NextObj(end) = nullptr; //取出的一段鏈表的表尾置空
	span->_useCount += actualNum; //更新被分配給thread cache的計數

	_spanLists[index]._mtx.unlock(); //解鎖
	return actualNum;
}

由于central cache是所有線程共享的,所以我們在訪問central cache中的哈希桶時,需要先給對應的哈希桶加上桶鎖,在獲取到對象后再將桶鎖解掉。

在向central cache獲取對象時,先是在central cache對應的哈希桶中獲取到一個非空的span,然后從這個span的自由鏈表中取出n個對象即可,但可能這個非空的span的自由鏈表當中對象的個數不足n個,這時該自由鏈表當中有多少個對象就給多少就行了。

也就是說,thread cache實際從central cache獲得的對象的個數可能與我們傳入的n值是不一樣的,因此我們需要統計本次申請過程中,實際thread cache獲取到的對象個數,然后根據該值及時更新這個span中的小對象被分配給thread cache的計數。

需要注意的是,雖然我們實際申請到對象的個數可能比n要小,但這并不會產生任何影響。因為thread cache的本意就是向central cache申請一個對象,我們之所以要一次多申請一些對象,是因為這樣一來下次線程再申請相同大小的對象時就可以直接在thread cache里面獲取了,而不用再向central cache申請對象。

插入一段范圍的對象到自由鏈表

此外,如果thread cache最終從central cache獲取到的對象個數是大于一的,那么我們還需要將剩下的對象插入到thread cache中對應的哈希桶中,為了能讓自由鏈表支持插入一段范圍的對象,我們還需要在FreeList類中增加一個對應的成員函數。

//管理切分好的小對象的自由鏈表
class FreeList
{
public:
	//插入一段范圍的對象到自由鏈表
	void PushRange(void* start, void* end)
	{
		assert(start);
		assert(end);

		//頭插
		NextObj(end) = _freeList;
		_freeList = start;
	}
private:
	void* _freeList = nullptr; //自由鏈表
	size_t _maxSize = 1;
};

pagecache

pagecache整體設計

page cache與central cache結構的相同之處

page cache與central cache一樣,它們都是哈希桶的結構,并且page cache的每個哈希桶中里掛的也是一個個的span,這些span也是按照雙鏈表的結構鏈接起來的。

page cache與central cache結構的不同之處

首先,central cache的映射規則與thread cache保持一致,而page cache的映射規則與它們都不相同。page cache的哈希桶映射規則采用的是直接定址法,比如1號桶掛的都是1頁的span,2號桶掛的都是2頁的span,以此類推。

其次,central cache每個桶中的span被切成了一個個對應大小的對象,以供thread cache申請。而page cache當中的span是沒有被進一步切小的,因為page cache服務的是central cache,當central cache沒有span時,向page cache申請的是某一固定頁數的span,而如何切分申請到的這個span就應該由central cache自己來決定。

C++高并發內存池如何實現

至于page cache當中究竟有多少個桶,這就要看你最大想掛幾頁的span了,這里我們就最大掛128頁的span,為了讓桶號與頁號對應起來,我們可以將第0號桶空出來不用,因此我們需要將哈希桶的個數設置為129。

//page cache中哈希桶的個數
static const size_t NPAGES = 129;

為什么這里最大掛128頁的span呢?因為線程申請單個對象最大是256KB,而128頁可以被切成4個256KB的對象,因此是足夠的。當然,如果你想在page cache中掛更大的span也是可以的,根據具體的需求進行設置就行了。

在page cache獲取一個n頁的span的過程

如果central cache要獲取一個n頁的span,那我們就可以在page cache的第n號桶中取出一個span返回給central cache即可,但如果第n號桶中沒有span了,這時我們并不是直接轉而向堆申請一個n頁的span,而是要繼續在后面的桶當中尋找span。

直接向堆申請以頁為單位的內存時,我們應該盡量申請大塊一點的內存塊,因為此時申請到的內存是連續的,當線程需要內存時我們可以將其切小后分配給線程,而當線程將內存釋放后我們又可以將其合并成大塊的連續內存。如果我們向堆申請內存時是小塊小塊的申請的,那么我們申請到的內存就不一定是連續的了。

因此,當第n號桶中沒有span時,我們可以繼續找第n+1號桶,因為我們可以將n+1頁的span切分成一個n頁的span和一個1頁的span,這時我們就可以將n頁的span返回,而將切分后1頁的span掛到1號桶中。但如果后面的桶當中都沒有span,這時我們就只能向堆申請一個128頁的內存塊,并將其用一個span結構管理起來,然后將128頁的span切分成n頁的span和128-n頁的span,其中n頁的span返回給central cache,而128-n頁的span就掛到第128-n號桶中。

也就是說,我們每次向堆申請的都是128頁大小的內存塊,central cache要的這些span實際都是由128頁的span切分出來的。

page cache的實現方式

當每個線程的thread cache沒有內存時都會向central cache申請,此時多個線程的thread cache如果訪問的不是central cache的同一個桶,那么這些線程是可以同時進行訪問的。這時central cache的多個桶就可能同時向page cache申請內存的,所以page cache也是存在線程安全問題的,因此在訪問page cache時也必須要加鎖。

但是在page cache這里我們不能使用桶鎖,因為當central cache向page cache申請內存時,page cache可能會將其他桶當中大頁的span切小后再給central cache。此外,當central cache將某個span歸還給page cache時,page cache也會嘗試將該span與其他桶當中的span進行合并。

也就是說,在訪問page cache時,我們可能需要訪問page cache中的多個桶,如果page cache用桶鎖就會出現大量頻繁的加鎖和解鎖,導致程序的效率低下。因此我們在訪問page cache時使用沒有使用桶鎖,而是用一個大鎖將整個page cache給鎖住。

而thread cache在訪問central cache時,只需要訪問central cache中對應的哈希桶就行了,因為central cache的每個哈希桶中的span都被切分成了對應大小,thread cache只需要根據自己所需對象的大小訪問central cache中對應的哈希桶即可,不會訪問其他哈希桶,因此central cache可以用桶鎖。

此外,page cache在整個進程中也是只能存在一個的,因此我們也需要將其設置為單例模式。

//單例模式
class PageCache
{
public:
	//提供一個全局訪問點
	static PageCache* GetInstance()
	{
		return &_sInst;
	}
private:
	SpanList _spanLists[NPAGES];
	std::mutex _pageMtx; //大鎖
private:
	PageCache() //構造函數私有
	{}
	PageCache(const PageCache&) = delete; //防拷貝

	static PageCache _sInst;
};

當程序運行起來后我們就立馬創建該對象即可。

PageCache PageCache::_sInst;

pagecache中獲取Span

獲取一個非空的span

thread cache向central cache申請對象時,central cache需要先從對應的哈希桶中獲取到一個非空的span,然后從這個非空的span中取出若干對象返回給thread cache。那central cache到底是如何從對應的哈希桶中,獲取到一個非空的span的呢?

首先當然是先遍歷central cache對應哈希桶當中的雙鏈表,如果該雙鏈表中有非空的span,那么直接將該span進行返回即可。為了方便遍歷這個雙鏈表,我們可以模擬迭代器的方式,給SpanList類提供Begin和End成員函數,分別用于獲取雙鏈表中的第一個span和最后一個span的下一個位置,也就是頭結點。

//帶頭雙向循環鏈表
class SpanList
{
public:
	Span* Begin()
	{
		return _head->_next;
	}
	Span* End()
	{
		return _head;
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶鎖
};

但如果遍歷雙鏈表后發現雙鏈表中沒有span,或該雙鏈表中的span都為空,那么此時central cache就需要向page cache申請內存塊了。

那具體是向page cache申請多大的內存塊呢?我們可以根據具體所需對象的大小來決定,就像之前我們根據對象的大小計算出,thread cache一次向central cache申請對象的個數上限,現在我們是根據對象的大小計算出,central cache一次應該向page cache申請幾頁的內存塊。

我們可以先根據對象的大小計算出,thread cache一次向central cache申請對象的個數上限,然后將這個上限值乘以單個對象的大小,就算出了具體需要多少字節,最后再將這個算出來的字節數轉換為頁數,如果轉換后不夠一頁,那么我們就申請一頁,否則轉換出來是幾頁就申請幾頁。也就是說,central cache向page cache申請內存時,要求申請到的內存盡量能夠滿足thread cache向central cache申請時的上限。

//管理對齊和映射等關系
class SizeClass
{
public:
	//central cache一次向page cache獲取多少頁
	static size_t NumMovePage(size_t size)
	{
		size_t num = NumMoveSize(size); //計算出thread cache一次向central cache申請對象的個數上限
		size_t nPage = num*size; //num個size大小的對象所需的字節數

		nPage >>= PAGE_SHIFT; //將字節數轉換為頁數
		if (nPage == 0) //至少給一頁
			nPage = 1;

		return nPage;
	}
};

代碼中的PAGE_SHIFT代表頁大小轉換偏移,我們這里以頁的大小為8K為例,PAGE_SHIFT的值就是13。

//頁大小轉換偏移,即一頁定義為2^13,也就是8KB
static const size_t PAGE_SHIFT = 13;

需要注意的是,當central cache申請到若干頁的span后,還需要將這個span切成一個個對應大小的對象掛到該span的自由鏈表當中。

如何找到一個span所管理的內存塊呢?首先需要計算出該span的起始地址,我們可以用這個span的起始頁號乘以一頁的大小即可得到這個span的起始地址,然后用這個span的頁數乘以一頁的大小就可以得到這個span所管理的內存塊的大小,用起始地址加上內存塊的大小即可得到這塊內存塊的結束位置。

明確了這塊內存的起始和結束位置后,我們就可以進行切分了。根據所需對象的大小,每次從大塊內存切出一塊固定大小的內存塊尾插到span的自由鏈表中即可。

為什么是尾插呢?因為我們如果是將切好的對象尾插到自由鏈表,這些對象看起來是按照鏈式結構鏈接起來的,而實際它們在物理上是連續的,這時當我們把這些連續內存分配給某個線程使用時,可以提高該線程的CPU緩存利用率。

//獲取一個非空的span
Span* CentralCache::GetOneSpan(SpanList& spanList, size_t size)
{
	//1、先在spanList中尋找非空的span
	Span* it = spanList.Begin();
	while (it != spanList.End())
	{
		if (it->_freeList != nullptr)
		{
			return it;
		}
		else
		{
			it = it->_next;
		}
	}

	//2、spanList中沒有非空的span,只能向page cache申請
	Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
	//計算span的大塊內存的起始地址和大塊內存的大小(字節數)
	char* start = (char*)(span->_pageId << PAGE_SHIFT);
	size_t bytes = span->_n << PAGE_SHIFT;

	//把大塊內存切成size大小的對象鏈接起來
	char* end = start + bytes;
	//先切一塊下來去做尾,方便尾插
	span->_freeList = start;
	start += size;
	void* tail = span->_freeList;
	//尾插
	while (start < end)
	{
		NextObj(tail) = start;
		tail = NextObj(tail);
		start += size;
	}
	NextObj(tail) = nullptr; //尾的指向置空
	
	//將切好的span頭插到spanList
	spanList.PushFront(span);

	return span;
}

需要注意的是,當我們把span切好后,需要將這個切好的span掛到central cache的對應哈希桶中。因此SpanList類還需要提供一個接口,用于將一個span插入到該雙鏈表中。這里我們選擇的是頭插,這樣當central cache下一次從該雙鏈表中獲取非空span時,一來就能找到。

由于SpanList類之前實現了Insert和Begin函數,這里實現雙鏈表頭插就非常簡單,直接在雙鏈表的Begin位置進行Insert即可。

//帶頭雙向循環鏈表
class SpanList
{
public:
	void PushFront(Span* span)
	{
		Insert(Begin(), span);
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶鎖
};

獲取一個k頁的span

當我們調用上述的GetOneSpan從central cache的某個哈希桶獲取一個非空的span時,如果遍歷哈希桶中的雙鏈表后發現雙鏈表中沒有span,或該雙鏈表中的span都為空,那么此時central cache就需要向page cache申請若干頁的span了,下面我們就來說說如何從page cache獲取一個k頁的span。

因為page cache是直接按照頁數進行映射的,因此我們要從page cache獲取一個k頁的span,就應該直接先去找page cache的第k號桶,如果第k號桶中有span,那我們直接頭刪一個span返回給central cache就行了。所以我們這里需要再給SpanList類添加對應的Empty和PopFront函數。

//帶頭雙向循環鏈表
class SpanList
{
public:
	bool Empty()
	{
		return _head == _head->_next;
	}
	Span* PopFront()
	{
		Span* front = _head->_next;
		Erase(front);
		return front;
	}
private:
	Span* _head;
public:
	std::mutex _mtx; //桶鎖
};

如果page cache的第k號桶中沒有span,我們就應該繼續找后面的桶,只要后面任意一個桶中有一個n頁span,我們就可以將其切分成一個k頁的span和一個n-k頁的span,然后將切出來k頁的span返回給central cache,再將n-k頁的span掛到page cache的第n-k號桶即可。

但如果后面的桶中也都沒有span,此時我們就需要向堆申請一個128頁的span了,在向堆申請內存時,直接調用我們封裝的SystemAlloc函數即可。

需要注意的是,向堆申請內存后得到的是這塊內存的起始地址,此時我們需要將該地址轉換為頁號。由于我們向堆申請內存時都是按頁進行申請的,因此我們直接將該地址除以一頁的大小即可得到對應的頁號。

//獲取一個k頁的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0 && k < NPAGES);
	//先檢查第k個桶里面有沒有span
	if (!_spanLists[k].Empty())
	{
		return _spanLists[k].PopFront();
	}
	//檢查一下后面的桶里面有沒有span,如果有可以將其進行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的頭部切k頁下來
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//將剩下的掛到對應映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);
			return kSpan;
		}
	}
	//走到這里說明后面沒有大頁的span了,這時就向堆申請一個128頁的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//盡量避免代碼重復,遞歸調用自己
	return NewSpan(k);
}

這里說明一下,當我們向堆申請到128頁的span后,需要將其切分成k頁的span和128-k頁的span,但是為了盡量避免出現重復的代碼,我們最好不要再編寫對應的切分代碼。我們可以先將申請到的128頁的span掛到page cache對應的哈希桶中,然后再遞歸調用該函數就行了,此時在往后找span時就一定會在第128號桶中找到該span,然后進行切分。

這里其實有一個問題:當central cache向page cache申請內存時,central cache對應的哈希桶是處于加鎖的狀態的,那在訪問page cache之前我們應不應該把central cache對應的桶鎖解掉呢?

這里建議在訪問page cache前,先把central cache對應的桶鎖解掉。雖然此時central cache的這個桶當中是沒有內存供其他thread cache申請的,但thread cache除了申請內存還會釋放內存,如果在訪問page cache前將central cache對應的桶鎖解掉,那么此時當其他thread cache想要歸還內存到central cache的這個桶時就不會被阻塞。

因此在調用NewSpan函數之前,我們需要先將central cache對應的桶鎖解掉,然后再將page cache的大鎖加上,當申請到k頁的span后,我們需要將page cache的大鎖解掉,但此時我們不需要立刻獲取到central cache中對應的桶鎖。因為central cache拿到k頁的span后還會對其進行切分操作,因此我們可以在span切好后需要將其掛到central cache對應的桶上時,再獲取對應的桶鎖。

這里為了讓代碼清晰一點,只寫出了加鎖和解鎖的邏輯,我們只需要將這些邏輯添加到之前實現的GetOneSpan函數的對應位置即可。

spanList._mtx.unlock(); //解桶鎖
PageCache::GetInstance()->_pageMtx.lock(); //加大鎖

//從page cache申請k頁的span

PageCache::GetInstance()->_pageMtx.unlock(); //解大鎖

//進行span的切分...

spanList._mtx.lock(); //加桶鎖

//將span掛到central cache對應的哈希桶

申請內存過程聯調

ConcurrentAlloc函數

在將thread cache、central cache以及page cache的申請流程寫通了之后,我們就可以向外提供一個ConcurrentAlloc函數,用于申請內存塊。每個線程第一次調用該函數時會通過TLS獲取到自己專屬的thread cache對象,然后每個線程就可以通過自己對應的thread cache申請對象了。

static void* ConcurrentAlloc(size_t size)
{
	//通過TLS,每個線程無鎖的獲取自己專屬的ThreadCache對象
	if (pTLSThreadCache == nullptr)
	{
		pTLSThreadCache = new ThreadCache;
	}
	return pTLSThreadCache->Allocate(size);
}

這里說一下編譯時會出現的問題,在C++的algorithm頭文件中有一個min函數,這是一個函數模板,而在Windows.h頭文件中也有一個min,這是一個宏。由于調用函數模板時需要進行參數類型的推演,因此當我們調用min函數時,編譯器會優先匹配Windows.h當中以宏的形式實現的min,此時當我們以std::min的形式調用min函數時就會產生報錯,這就是沒有用命名空間進行封裝的壞處,這時我們只能選擇將std::去掉,讓編譯器調用Windows.h當中的min。

申請內存過程聯調測試一

由于在多線程場景下調試觀察起來非常麻煩,這里就先不考慮多線程場景,看看在單線程場景下代碼的執行邏輯是否符合我們的預期,其次,我們這里就只簡單觀察在一個桶當中的內存申請就行了。

下面該線程進行了三次內存申請,這三次內存申請的字節數最終都對齊到了8,此時當線程申請內存時就只會訪問到thread cache的第0號桶。

void* p1 = ConcurrentAlloc(6);
void* p2 = ConcurrentAlloc(8);
void* p3 = ConcurrentAlloc(1);

當線程第一次申請內存時,該線程需要通過TLS獲取到自己專屬的thread cache對象,然后通過這個thread cache對象進行內存申請。

C++高并發內存池如何實現

在申請內存時通過計算索引到了thread cache的第0號桶,但此時thread cache的第0號桶中是沒有對象的,因此thread cache需要向central cache申請內存塊。

C++高并發內存池如何實現

在向central cache申請內存塊前,首先通過NumMoveSize函數計算得出,thread cache一次最多可向central cache申請8字節大小對象的個數是512,但由于我們采用的是慢開始算法,因此還需要將上限值與對應自由鏈表的_maxSize的值進行比較,而此時對應自由鏈表_maxSize的值是1,所以最終得出本次thread cache向central cache申請8字節對象的個數是1個。

并且在此之后會將該自由鏈表中_maxSize的值進行自增,下一次thread cache再向central cache申請8字節對象時最終申請對象的個數就會是2個了。

C++高并發內存池如何實現

在thread cache向central cache申請對象之前,需要先將central cache的0號桶的鎖加上,然后再從該桶獲取一個非空的span。

C++高并發內存池如何實現

在central cache的第0號桶獲取非空span時,先遍歷對應的span雙鏈表,看看有沒有非空的span,但此時肯定是沒有的,因此在這個過程中我們無法找到一個非空的span。

C++高并發內存池如何實現

那么此時central cache就需要向page cache申請內存了,但在此之前需要先把central cache第0號桶的鎖解掉,然后再將page cache的大鎖給加上,之后才能向page cache申請內存。

C++高并發內存池如何實現

在向page cache申請內存時,由于central cache一次給thread cache8字節對象的上限是512,對應就需要4096字節,所需字節數不足一頁就按一頁算,所以這里central cache就需要向page cache申請一頁的內存塊。

C++高并發內存池如何實現

但此時page cache的第1個桶以及之后的桶當中都是沒有span的,因此page cache需要直接向堆申請一個128頁的span。

C++高并發內存池如何實現

這里通過監視窗口可以看到,用于管理申請到的128頁內存的span信息。

C++高并發內存池如何實現

我們可以順便驗證一下,按頁向堆申請的內存塊的起始地址和頁號之間是可以相互轉換的。

C++高并發內存池如何實現

現在將申請到的128頁的span插入到page cache的第128號桶當中,然后再調用一次NewSpan,在這次調用的時候,雖然在1號桶當中沒有span,但是在往后找的過程中就一定會在第128號桶找到一個span。

C++高并發內存池如何實現

此時我們就可以把這個128頁的span拿出來,切分成1頁的span和127頁的span,將1頁的span返回給central cache,而把127頁的span掛到page cache的第127號桶即可。

C++高并發內存池如何實現

從page cache返回后,就可以把page cache的大鎖解掉了,但緊接著還要將獲取到的1頁的span進行切分,因此這里沒有立刻重新加上central cache對應的桶鎖。

C++高并發內存池如何實現

在進行切分的時候,先通過該span的起始頁號得到該span的起始地址,然后通過該span的頁數得到該span所管理內存塊的總的字節數。

C++高并發內存池如何實現

在確定內存塊的開始和結束后,就可以將其切分成一個個8字節大小的對象掛到該span的自由鏈表中了。在調試過程中通過內存監視窗口可以看到,切分出來的每個8字節大小的對象的前四個字節存儲的都是下一個8字節對象的起始地址。

C++高并發內存池如何實現

當切分結束后再獲取central cache第0號桶的桶鎖,然后將這個切好的span插入到central cache的第0號桶中,最后再將這個非空的span返回,此時就獲取到了一個非空的span。

C++高并發內存池如何實現

由于thread cache只向central cache申請了一個對象,因此拿到這個非空的span后,直接從這個span里面取出一個對象即可,此時該span的_useCount也由0變成了1。

C++高并發內存池如何實現

由于此時thread cache實際只向central cache申請到了一個對象,因此直接將這個對象返回給線程即可。

C++高并發內存池如何實現

當線程第二次申請內存塊時就不會再創建thread cache了,因為第一次申請時就已經創建好了,此時該線程直接獲取到對應的thread cache進行內存塊申請即可。

C++高并發內存池如何實現

當該線程第二次申請8字節大小的對象時,此時thread cache的0號桶中還是沒有對象的,因為第一次thread cache只向central cache申請了一個8字節對象,因此這次申請時還需要再向central cache申請對象。

C++高并發內存池如何實現

這時thread cache向central cache申請對象時,thread cache第0號桶中自由鏈表的_maxSize已經慢增長到2了,所以這次在向central cache申請對象時就會申請2個。如果下一次thread cache再向central cache申請8字節大小的對象,那么central cache會一次性給thread cache3個,這就是所謂的慢增長。

C++高并發內存池如何實現

但由于第一次central cache向page cache申請了一頁的內存塊,并將其切成了1024個8字節大小的對象,因此這次thread cache向central cache申請2兩個8字節的對象時,central cache的第0號桶當中是有對象的,直接返回兩個給thread cache即可,而不用再向page cache申請內存了。

但線程實際申請的只是一個8字節對象,因此thread cache除了將一個對象返回之外,還需要將剩下的一個對象掛到thread cache的第0號桶當中。

C++高并發內存池如何實現

這樣一來,當線程第三次申請1字節的內存時,由于1字節對齊后也是8字節,此時thread cache也就不需要再向central cache申請內存塊了,直接將第0號桶當中之前剩下的一個8字節對象返回即可。

C++高并發內存池如何實現

申請內存過程聯調測試二

為了進一步測試代碼的正確性,我們可以做這樣一個測試:讓線程申請1024次8字節的對象,然后通過調試觀察在第1025次申請時,central cache是否會再向page cache申請內存塊。

for (size_t i = 0; i < 1024; i++)
{
	void* p1 = ConcurrentAlloc(6);
}
void* p2 = ConcurrentAlloc(6);

因為central cache第一次就是向page cache申請的一頁內存,這一頁內存被切成了1024個8字節大小的對象,當這1024個對象全部被申請之后,再申請8字節大小的對象時central cache當中就沒有對象了,此時就應該向page cache申請內存塊。

通過調試我們可以看到,第1025次申請8字節大小的對象時,central cache第0號桶中的這個span的_useCount已經增加到了1024,也就是說這1024個對象都已經被線程申請了,此時central cache就需要再向page cache申請一頁的span來進行切分了。

C++高并發內存池如何實現

而這次central cache在向page cache申請一頁的內存時,page cache就是將127頁span切分成了1頁的span和126頁的span了,然后central cache拿到這1頁的span后,又會將其切分成1024塊8字節大小的內存塊以供thread cache申請。

C++高并發內存池如何實現

threadcache回收內存

當某個線程申請的對象不用了,可以將其釋放給thread cache,然后thread cache將該對象插入到對應哈希桶的自由鏈表當中即可。

但是隨著線程不斷的釋放,對應自由鏈表的長度也會越來越長,這些內存堆積在一個thread cache中就是一種浪費,我們應該將這些內存還給central cache,這樣一來,這些內存對其他線程來說也是可申請的,因此當thread cache某個桶當中的自由鏈表太長時我們可以進行一些處理。

如果thread cache某個桶當中自由鏈表的長度超過它一次批量向central cache申請的對象個數,那么此時我們就要把該自由鏈表當中的這些對象還給central cache。

//釋放內存對象
void ThreadCache::Deallocate(void* ptr, size_t size)
{
	assert(ptr);
	assert(size <= MAX_BYTES);

	//找出對應的自由鏈表桶將對象插入
	size_t index = SizeClass::Index(size);
	_freeLists[index].Push(ptr);

	//當自由鏈表長度大于一次批量申請的對象個數時就開始還一段list給central cache
	if (_freeLists[index].Size() >= _freeLists[index].MaxSize())
	{
		ListTooLong(_freeLists[index], size);
	}
}

當自由鏈表的長度大于一次批量申請的對象時,我們具體的做法就是,從該自由鏈表中取出一次批量個數的對象,然后將取出的這些對象還給central cache中對應的span即可。

//釋放對象導致鏈表過長,回收內存到中心緩存
void ThreadCache::ListTooLong(FreeList& list, size_t size)
{
	void* start = nullptr;
	void* end = nullptr;
	//從list中取出一次批量個數的對象
	list.PopRange(start, end, list.MaxSize());
	
	//將取出的對象還給central cache中對應的span
	CentralCache::GetInstance()->ReleaseListToSpans(start, size);
}

從上述代碼可以看出,FreeList類需要支持用Size函數獲取自由鏈表中對象的個數,還需要支持用PopRange函數從自由鏈表中取出指定個數的對象。因此我們需要給FreeList類增加一個對應的PopRange函數,然后再增加一個_size成員變量,該成員變量用于記錄當前自由鏈表中對象的個數,當我們向自由鏈表插入或刪除對象時,都應該更新_size的值。

//管理切分好的小對象的自由鏈表
class FreeList
{
public:
	//將釋放的對象頭插到自由鏈表
	void Push(void* obj)
	{
		assert(obj);

		//頭插
		NextObj(obj) = _freeList;
		_freeList = obj;
		_size++;
	}
	//從自由鏈表頭部獲取一個對象
	void* Pop()
	{
		assert(_freeList);

		//頭刪
		void* obj = _freeList;
		_freeList = NextObj(_freeList);
		_size--;

		return obj;
	}
	//插入一段范圍的對象到自由鏈表
	void PushRange(void* start, void* end, size_t n)
	{
		assert(start);
		assert(end);

		//頭插
		NextObj(end) = _freeList;
		_freeList = start;
		_size += n;
	}
	//從自由鏈表獲取一段范圍的對象
	void PopRange(void*& start, void*& end, size_t n)
	{
		assert(n <= _size);

		//頭刪
		start = _freeList;
		end = start;
		for (size_t i = 0; i < n - 1;i++)
		{
			end = NextObj(end);
		}
		_freeList = NextObj(end); //自由鏈表指向end的下一個對象
		NextObj(end) = nullptr; //取出的一段鏈表的表尾置空
		_size -= n;
	}
	bool Empty()
	{
		return _freeList == nullptr;
	}
	size_t& MaxSize()
	{
		return _maxSize;
	}
	size_t Size()
	{
		return _size;
	}
private:
	void* _freeList = nullptr; //自由鏈表
	size_t _maxSize = 1;
	size_t _size = 0;
};

而對于FreeList類當中的PushRange成員函數,我們最好也像PopRange一樣給它增加一個參數,表示插入對象的個數,不然我們這時還需要通過遍歷統計插入對象的個數。

因此之前在調用PushRange的地方就需要修改一下,而我們實際就在一個地方調用過PushRange函數,并且此時插入對象的個數也是很容易知道的。當時thread cache從central cache獲取了actualNum個對象,將其中的一個返回給了申請對象的線程,剩下的actualNum-1個掛到了thread cache對應的桶當中,所以這里插入對象的個數就是actualNum-1。

_freeLists[index].PushRange(NextObj(start), end, actualNum - 1);

說明一下:
當thread cache的某個自由鏈表過長時,我們實際就是把這個自由鏈表當中全部的對象都還給central cache了,但這里在設計PopRange接口時還是設計的是取出指定個數的對象,因為在某些情況下當自由鏈表過長時,我們可能并不一定想把鏈表中全部的對象都取出來還給central cache,這樣設計就是為了增加代碼的可修改性。

其次,當我們判斷thread cache是否應該還對象給central cache時,還可以綜合考慮每個thread cache整體的大小。比如當某個thread cache的總占用大小超過一定閾值時,我們就將該thread cache當中的對象還一些給central cache,這樣就盡量避免了某個線程的thread cache占用太多的內存。對于這一點,在tcmalloc當中就是考慮到了的。

centralcache回收內存

當thread cache中某個自由鏈表太長時,會將自由鏈表當中的這些對象還給central cache中的span。

但是需要注意的是,還給central cache的這些對象不一定都是屬于同一個span的。central cache中的每個哈希桶當中可能都不止一個span,因此當我們計算出還回來的對象應該還給central cache的哪一個桶后,還需要知道這些對象到底應該還給這個桶當中的哪一個span。

如何根據對象的地址得到對象所在的頁號?

首先我們必須理解的是,某個頁當中的所有地址除以頁的大小都等該頁的頁號。比如我們這里假設一頁的大小是100,那么地址0~99都屬于第0頁,它們除以100都等于0,而地址100~199都屬于第1頁,它們除以100都等于1。

如何找到一個對象對應的span?

雖然我們現在可以通過對象的地址得到其所在的頁號,但是我們還是不能知道這個對象到底屬于哪一個span。因為一個span管理的可能是多個頁。

為了解決這個問題,我們可以建立頁號和span之間的映射。由于這個映射關系在page cache進行span的合并時也需要用到,因此我們直接將其存放到page cache里面。這時我們就需要在PageCache類當中添加一個映射關系了,這里可以用C++當中的unordered_map進行實現,并且添加一個函數接口,用于讓central cache獲取這里的映射關系。(下面代碼中只展示了PageCache類當中新增的成員)

//單例模式class PageCache{<!--{C}%3C!%2D%2D%20%2D%2D%3E-->public://獲取從對象到span的映射Span* MapObjectToSpan(void* obj);private:std::unordered_map<PAGE_ID, Span*> _idSpanMap;};

每當page cache分配span給central cache時,都需要記錄一下頁號和span之間的映射關系。此后當thread cache還對象給central cache時,才知道應該具體還給哪一個span。

因此當central cache在調用NewSpan接口向page cache申請k頁的span時,page cache在返回這個k頁的span給central cache之前,應該建立這k個頁號與該span之間的映射關系。

//獲取一個k頁的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0 && k < NPAGES);
	//先檢查第k個桶里面有沒有span
	if (!_spanLists[k].Empty())
	{
		Span* kSpan = _spanLists[k].PopFront();

		//建立頁號與span的映射,方便central cache回收小塊內存時查找對應的span
		for (PAGE_ID i = 0; i < kSpan->_n; i++)
		{
			_idSpanMap[kSpan->_pageId + i] = kSpan;
		}

		return kSpan;
	}
	//檢查一下后面的桶里面有沒有span,如果有可以將其進行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的頭部切k頁下來
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//將剩下的掛到對應映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);

			//建立頁號與span的映射,方便central cache回收小塊內存時查找對應的span
			for (PAGE_ID i = 0; i < kSpan->_n; i++)
			{
				_idSpanMap[kSpan->_pageId + i] = kSpan;
			}

			return kSpan;
		}
	}
	//走到這里說明后面沒有大頁的span了,這時就向堆申請一個128頁的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//盡量避免代碼重復,遞歸調用自己
	return NewSpan(k);
}

此時我們就可以通過對象的地址找到該對象對應的span了,直接將該對象的地址除以頁的大小得到頁號,然后在unordered_map當中找到其對應的span即可。

//獲取從對象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //頁號
	auto ret = _idSpanMap.find(id);
	if (ret != _idSpanMap.end())
	{
		return ret->second;
	}
	else
	{
		assert(false);
		return nullptr;
	}
}

注意一下,當我們要通過某個頁號查找其對應的span時,該頁號與其span之間的映射一定是建立過的,如果此時我們沒有在unordered_map當中找到,則說明我們之前的代碼邏輯有問題,因此當沒有找到對應的span時可以直接用斷言結束程序,以表明程序邏輯出錯。

central cache回收內存

這時當thread cache還對象給central cache時,就可以依次遍歷這些對象,將這些對象插入到其對應span的自由鏈表當中,并且及時更新該span的_usseCount計數即可。

在thread cache還對象給central cache的過程中,如果central cache中某個span的_useCount減到0時,說明這個span分配出去的對象全部都還回來了,那么此時就可以將這個span再進一步還給page cache。

//將一定數量的對象還給對應的span
void CentralCache::ReleaseListToSpans(void* start, size_t size)
{
	size_t index = SizeClass::Index(size);
	_spanLists[index]._mtx.lock(); //加鎖
	while (start)
	{
		void* next = NextObj(start); //記錄下一個
		Span* span = PageCache::GetInstance()->MapObjectToSpan(start);
		//將對象頭插到span的自由鏈表
		NextObj(start) = span->_freeList;
		span->_freeList = start;

		span->_useCount--; //更新被分配給thread cache的計數
		if (span->_useCount == 0) //說明這個span分配出去的對象全部都回來了
		{
			//此時這個span就可以再回收給page cache,page cache可以再嘗試去做前后頁的合并
			_spanLists[index].Erase(span);
			span->_freeList = nullptr; //自由鏈表置空
			span->_next = nullptr;
			span->_prev = nullptr;

			//釋放span給page cache時,使用page cache的鎖就可以了,這時把桶鎖解掉
			_spanLists[index]._mtx.unlock(); //解桶鎖
			PageCache::GetInstance()->_pageMtx.lock(); //加大鎖
			PageCache::GetInstance()->ReleaseSpanToPageCache(span);
			PageCache::GetInstance()->_pageMtx.unlock(); //解大鎖
			_spanLists[index]._mtx.lock(); //加桶鎖
		}

		start = next;
	}

	_spanLists[index]._mtx.unlock(); //解鎖
}

需要注意,如果要把某個span還給page cache,我們需要先將這個span從central cache對應的雙鏈表中移除,然后再將該span的自由鏈表置空,因為page cache中的span是不需要切分成一個個的小對象的,以及該span的前后指針也都應該置空,因為之后要將其插入到page cache對應的雙鏈表中。但span當中記錄的起始頁號以及它管理的頁數是不能清除的,否則對應內存塊就找不到了。

并且在central cache還span給page cache時也存在鎖的問題,此時需要先將central cache中對應的桶鎖解掉,然后再加上page cache的大鎖之后才能進入page cache進行相關操作,當處理完畢回到central cache時,除了將page cache的大鎖解掉,還需要立刻獲得central cache對應的桶鎖,然后將還未還完對象繼續還給central cache中對應的span。

pagecache回收內存

如果central cache中有某個span的_useCount減到0了,那么central cache就需要將這個span還給page cache了。

這個過程看似是非常簡單的,page cache只需將還回來的span掛到對應的哈希桶上就行了。但實際為了緩解內存碎片的問題,page cache還需要嘗試將還回來的span與其他空閑的span進行合并。

page cache進行前后頁的合并

合并的過程可以分為向前合并和向后合并。如果還回來的span的起始頁號是num,該span所管理的頁數是n。那么在向前合并時,就需要判斷第num-1頁對應span是否空閑,如果空閑則可以將其進行合并,并且合并后還需要繼續向前嘗試進行合并,直到不能進行合并為止。而在向后合并時,就需要判斷第num+n頁對應的span是否空閑,如果空閑則可以將其進行合并,并且合并后還需要繼續向后嘗試進行合并,直到不能進行合并為止。

因此page cache在合并span時,是需要通過頁號獲取到對應的span的,這就是我們要把頁號與span之間的映射關系存儲到page cache的原因。

但需要注意的是,當我們通過頁號找到其對應的span時,這個span此時可能掛在page cache,也可能掛在central cache。而在合并時我們只能合并掛在page cache的span,因為掛在central cache的span當中的對象正在被其他線程使用。

可是我們不能通過span結構當中的_useCount成員,來判斷某個span到底是在central cache還是在page cache。因為當central cache剛向page cache申請到一個span時,這個span的_useCount就是等于0的,這時可能當我們正在對該span進行切分的時候,page cache就把這個span拿去進行合并了,這顯然是不合理的。

鑒于此,我們可以在span結構中再增加一個_isUse成員,用于標記這個span是否正在被使用,而當一個span結構被創建時我們默認該span是沒有被使用的。

//管理以頁為單位的大塊內存
struct Span
{
	PAGE_ID _pageId = 0;        //大塊內存起始頁的頁號
	size_t _n = 0;              //頁的數量

	Span* _next = nullptr;      //雙鏈表結構
	Span* _prev = nullptr;

	size_t _useCount = 0;       //切好的小塊內存,被分配給thread cache的計數
	void* _freeList = nullptr;  //切好的小塊內存的自由鏈表

	bool _isUse = false;        //是否在被使用
};

因此當central cache向page cache申請到一個span時,需要立即將該span的_isUse改為true。

span->_isUse = true;

而當central cache將某個span還給page cache時,也就需要將該span的_isUse改成false。

span->_isUse = false;

由于在合并page cache當中的span時,需要通過頁號找到其對應的span,而一個span是在被分配給central cache時,才建立的各個頁號與span之間的映射關系,因此page cache當中的span也需要建立頁號與span之間的映射關系。

與central cache中的span不同的是,在page cache中,只需建立一個span的首尾頁號與該span之間的映射關系。因為當一個span在嘗試進行合并時,如果是往前合并,那么只需要通過一個span的尾頁找到這個span,如果是向后合并,那么只需要通過一個span的首頁找到這個span。也就是說,在進行合并時我們只需要用到span與其首尾頁之間的映射關系就夠了。

因此當我們申請k頁的span時,如果是將n頁的span切成了一個k頁的span和一個n-k頁的span,我們除了需要建立k頁span中每個頁與該span之間的映射關系之外,還需要建立剩下的n-k頁的span與其首尾頁之間的映射關系。

//獲取一個k頁的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0 && k < NPAGES);
	//先檢查第k個桶里面有沒有span
	if (!_spanLists[k].Empty())
	{
		Span* kSpan = _spanLists[k].PopFront();

		//建立頁號與span的映射,方便central cache回收小塊內存時查找對應的span
		for (PAGE_ID i = 0; i < kSpan->_n; i++)
		{
			_idSpanMap[kSpan->_pageId + i] = kSpan;
		}

		return kSpan;
	}
	//檢查一下后面的桶里面有沒有span,如果有可以將其進行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的頭部切k頁下來
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//將剩下的掛到對應映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);
			//存儲nSpan的首尾頁號與nSpan之間的映射,方便page cache合并span時進行前后頁的查找
			_idSpanMap[nSpan->_pageId] = nSpan;
			_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;

			//建立頁號與span的映射,方便central cache回收小塊內存時查找對應的span
			for (PAGE_ID i = 0; i < kSpan->_n; i++)
			{
				_idSpanMap[kSpan->_pageId + i] = kSpan;
			}

			return kSpan;
		}
	}
	//走到這里說明后面沒有大頁的span了,這時就向堆申請一個128頁的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//盡量避免代碼重復,遞歸調用自己
	return NewSpan(k);
}

此時page cache當中的span就都與其首尾頁之間建立了映射關系,現在我們就可以進行span的合并了,其合并邏輯如下:

//釋放空閑的span回到PageCache,并合并相鄰的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
	//對span的前后頁,嘗試進行合并,緩解內存碎片問題
	//1、向前合并
	while (1)
	{
		PAGE_ID prevId = span->_pageId - 1;
		auto ret = _idSpanMap.find(prevId);
		//前面的頁號沒有(還未向系統申請),停止向前合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//前面的頁號對應的span正在被使用,停止向前合并
		Span* prevSpan = ret->second;
		if (prevSpan->_isUse == true)
		{
			break;
		}
		//合并出超過128頁的span無法進行管理,停止向前合并
		if (prevSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//進行向前合并
		span->_pageId = prevSpan->_pageId;
		span->_n += prevSpan->_n;

		//將prevSpan從對應的雙鏈表中移除
		_spanLists[prevSpan->_n].Erase(prevSpan);

		delete prevSpan;
	}
	//2、向后合并
	while (1)
	{
		PAGE_ID nextId = span->_pageId + span->_n;
		auto ret = _idSpanMap.find(nextId);
		//后面的頁號沒有(還未向系統申請),停止向后合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//后面的頁號對應的span正在被使用,停止向后合并
		Span* nextSpan = ret->second;
		if (nextSpan->_isUse == true)
		{
			break;
		}
		//合并出超過128頁的span無法進行管理,停止向后合并
		if (nextSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//進行向后合并
		span->_n += nextSpan->_n;

		//將nextSpan從對應的雙鏈表中移除
		_spanLists[nextSpan->_n].Erase(nextSpan);

		delete nextSpan;
	}
	//將合并后的span掛到對應的雙鏈表當中
	_spanLists[span->_n].PushFront(span);
	//建立該span與其首尾頁的映射
	_idSpanMap[span->_pageId] = span;
	_idSpanMap[span->_pageId + span->_n - 1] = span;
	//將該span設置為未被使用的狀態
	span->_isUse = false;
}

需要注意的是,在向前或向后進行合并的過程中:

  • 如果沒有通過頁號獲取到其對應的span,說明對應到該頁的內存塊還未申請,此時需要停止合并。

  • 如果通過頁號獲取到了其對應的span,但該span處于被使用的狀態,那我們也必須停止合并。

  • 如果合并后大于128頁則不能進行本次合并,因為page cache無法對大于128頁的span進行管理。

在合并span時,由于這個span是在page cache的某個哈希桶的雙鏈表當中的,因此在合并后需要將其從對應的雙鏈表中移除,然后再將這個被合并了的span結構進行delete。

除此之外,在合并結束后,除了將合并后的span掛到page cache對應哈希桶的雙鏈表當中,還需要建立該span與其首位頁之間的映射關系,便于此后合并出更大的span。

釋放內存過程聯調

ConcurrentFree函數

至此我們將thread cache、central cache以及page cache的釋放流程也都寫完了,此時我們就可以向外提供一個ConcurrentFree函數,用于釋放內存塊,釋放內存塊時每個線程通過自己的thread cache對象,調用thread cache中釋放內存對象的接口即可。

static void ConcurrentFree(void* ptr, size_t size/*暫時*/)
{
	assert(pTLSThreadCache);

	pTLSThreadCache->Deallocate(ptr, size);
}

釋放內存過程聯調測試

之前我們在測試申請流程時,讓單個線程進行了三次內存申請,現在我們再將這三個對象再進行釋放,看看這其中的釋放流程是如何進行的。

void* p1 = ConcurrentAlloc(6);
void* p2 = ConcurrentAlloc(8);
void* p3 = ConcurrentAlloc(1);

ConcurrentFree(p1, 6);
ConcurrentFree(p2, 8);
ConcurrentFree(p3, 1);

首先,這三次申請和釋放的對象大小進行對齊后都是8字節,因此對應操作的就是thread cache和central cache的第0號桶,以及page cache的第1號桶。

由于第三次對象申請時,剛好將thread cache第0號桶當中僅剩的一個對象拿走了,因此在三次對象申請后thread cache的第0號桶當中是沒有對象的。

通過監視窗口可以看到,此時thread cache第0號桶中自由鏈表的_maxSize已經慢增長到了3,而當我們釋放完第一個對象后,該自由鏈表當中對象的個數只有一個,因此不會將該自由鏈表當中的對象進一步還給central cache。

C++高并發內存池如何實現

當第二個對象釋放給thread cache的第0號桶后,該桶對應自由鏈表當中對象的個數變成了2,也是不會進行ListTooLong操作的。

C++高并發內存池如何實現

直到第三個對象釋放給thread cache的第0號桶時,此時該自由鏈表的_size的值變為3,與_maxSize的值相等,現在thread cache就需要將對象給central cache了。

C++高并發內存池如何實現

thread cache先是將第0號桶當中的對象彈出MaxSize個,在這里實際上就是全部彈出,此時該自由鏈表_size的值變為0,然后繼續調用central cache當中的ReleaseListToSpans函數,將這三個對象還給central cache當中對應的span。

C++高并發內存池如何實現

在進入central cache的第0號桶還對象之前,先把第0號桶對應的桶鎖加上,然后通過查page cache中的映射表找到其對應的span,最后將這個對象頭插到該span的自由鏈表中,并將該span的_useCount進行--。當第一個對象還給其對應的span時,可以看到該span的_useCount減到了2。

C++高并發內存池如何實現

而由于我們只進行了三次對象申請,并且這些對象大小對齊后大小都是8字節,因此我們申請的這三個對象實際都是同一個span切分出來的。當我們將這三個對象都還給這個span時,該span的_useCount就減為了0。

C++高并發內存池如何實現

現在central cache就需要將這個span進一步還給page cache,而在將該span交給page cache之前,會將該span的自由鏈表以及前后指針都置空。并且在進入page cache之前會先將central cache第0號桶的桶鎖解掉,然后再加上page cache的大鎖,之后才能進入page cache進行相關操作。

C++高并發內存池如何實現

由于這個一頁的span是從128頁的span的頭部切下來的,在向前合并時由于前面的頁還未向系統申請,因此在查映射關系時是無法找到的,此時直接停止了向前合并。

(說明一下:由于下面是重新另外進行的一次調試,因此監視窗口顯示的span的起始頁號與之前的不同,實際應該與上面一致)

C++高并發內存池如何實現

而在向后合并時,由于page cache沒有將該頁后面的頁分配給central cache,因此在向后合并時肯定能夠找到一個127頁的span進行合并。合并后就變成了一個128頁的span,這時我們將原來127頁的span從第127號桶刪除,然后還需要將該127頁的span結構進行delete,因為它管理的127頁已經與1頁的span進行合并了,不再需要它來管理了。

C++高并發內存池如何實現

緊接著將這個128頁的span插入到第128號桶,然后建立該span與其首尾頁的映射,便于下次被用于合并,最后再將該span的狀態設置為未被使用的狀態即可。

C++高并發內存池如何實現

當從page cache回來后,除了將page cache的大鎖解掉,還需要立刻加上central cache中對應的桶鎖,然后繼續將對象還給central cache中的span,但此時實際上是還完了,因此再將central cache的桶鎖解掉就行了。

C++高并發內存池如何實現

至此我們便完成了這三個對象的申請和釋放流程。

大于256KB的大塊內存申請問題

申請過程

之前說到,每個線程的thread cache是用于申請小于等于256KB的內存的,而對于大于256KB的內存,我們可以考慮直接向page cache申請,但page cache中最大的頁也就只有128頁,因此如果是大于128頁的內存申請,就只能直接向堆申請了。

申請內存的大小申請方式
x &le; 256 K B ( 32 頁 ) x \leq 256KB(32頁) x&le;256KB(32頁)向thread cache申請
32 頁 < x &le; 128 頁 32 頁< x \leq 128頁 32頁<x&le;128頁向page cache申請
x &ge; 128 頁 x \geq 128頁 x&ge;128頁向堆申請
&emsp;&emsp;當申請的內存大于256KB時,雖然不是從thread cache進行獲取,但在分配內存時也是需要進行向上對齊的,對于大于256KB的內存我們可以直接按頁進行對齊。 

而我們之前實現RoundUp函數時,對傳入字節數大于256KB的情況直接做了斷言處理,因此這里需要對RoundUp函數稍作修改。

//獲取向上對齊后的字節數
static inline size_t RoundUp(size_t bytes)
{
	if (bytes <= 128)
	{
		return _RoundUp(bytes, 8);
	}
	else if (bytes <= 1024)
	{
		return _RoundUp(bytes, 16);
	}
	else if (bytes <= 8 * 1024)
	{
		return _RoundUp(bytes, 128);
	}
	else if (bytes <= 64 * 1024)
	{
		return _RoundUp(bytes, 1024);
	}
	else if (bytes <= 256 * 1024)
	{
		return _RoundUp(bytes, 8 * 1024);
	}
	else
	{
		//大于256KB的按頁對齊
		return _RoundUp(bytes, 1 << PAGE_SHIFT);
	}
}

現在對于之前的申請邏輯就需要進行修改了,當申請對象的大小大于256KB時,就不用向thread cache申請了,這時先計算出按頁對齊后實際需要申請的頁數,然后通過調用NewSpan申請指定頁數的span即可。

static void* ConcurrentAlloc(size_t size)
{
	if (size > MAX_BYTES) //大于256KB的內存申請
	{
		//計算出對齊后需要申請的頁數
		size_t alignSize = SizeClass::RoundUp(size);
		size_t kPage = alignSize >> PAGE_SHIFT;

		//向page cache申請kPage頁的span
		PageCache::GetInstance()->_pageMtx.lock();
		Span* span = PageCache::GetInstance()->NewSpan(kPage);
		PageCache::GetInstance()->_pageMtx.unlock();

		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		return ptr;
	}
	else
	{
		//通過TLS,每個線程無鎖的獲取自己專屬的ThreadCache對象
		if (pTLSThreadCache == nullptr)
		{
			pTLSThreadCache = new ThreadCache;
		}
		cout << std::this_thread::get_id() << ":" << pTLSThreadCache << endl;

		return pTLSThreadCache->Allocate(size);
	}
}

也就是說,申請大于256KB的內存時,會直接調用page cache當中的NewSpan函數進行申請,因此這里我們需要再對NewSpan函數進行改造,當需要申請的內存頁數大于128頁時,就直接向堆申請對應頁數的內存塊。而如果申請的內存頁數是小于128頁的,那就在page cache中進行申請,因此當申請大于256KB的內存調用NewSpan函數時也是需要加鎖的,因為我們可能是在page cache中進行申請的。

//獲取一個k頁的span
Span* PageCache::NewSpan(size_t k)
{
	assert(k > 0);
	if (k > NPAGES - 1) //大于128頁直接找堆申請
	{
		void* ptr = SystemAlloc(k);
		Span* span = new Span;
		span->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
		span->_n = k;
		//建立頁號與span之間的映射
		_idSpanMap[span->_pageId] = span;
		return span;
	}
	//先檢查第k個桶里面有沒有span
	if (!_spanLists[k].Empty())
	{
		Span* kSpan = _spanLists[k].PopFront();

		//建立頁號與span的映射,方便central cache回收小塊內存時查找對應的span
		for (PAGE_ID i = 0; i < kSpan->_n; i++)
		{
			_idSpanMap[kSpan->_pageId + i] = kSpan;
		}

		return kSpan;
	}
	//檢查一下后面的桶里面有沒有span,如果有可以將其進行切分
	for (size_t i = k + 1; i < NPAGES; i++)
	{
		if (!_spanLists[i].Empty())
		{
			Span* nSpan = _spanLists[i].PopFront();
			Span* kSpan = new Span;
			//在nSpan的頭部切k頁下來
			kSpan->_pageId = nSpan->_pageId;
			kSpan->_n = k;

			nSpan->_pageId += k;
			nSpan->_n -= k;
			//將剩下的掛到對應映射的位置
			_spanLists[nSpan->_n].PushFront(nSpan);
			//存儲nSpan的首尾頁號與nSpan之間的映射,方便page cache合并span時進行前后頁的查找
			_idSpanMap[nSpan->_pageId] = nSpan;
			_idSpanMap[nSpan->_pageId + nSpan->_n - 1] = nSpan;

			//建立頁號與span的映射,方便central cache回收小塊內存時查找對應的span
			for (PAGE_ID i = 0; i < kSpan->_n; i++)
			{
				_idSpanMap[kSpan->_pageId + i] = kSpan;
			}

			return kSpan;
		}
	}
	//走到這里說明后面沒有大頁的span了,這時就向堆申請一個128頁的span
	Span* bigSpan = new Span;
	void* ptr = SystemAlloc(NPAGES - 1);
	bigSpan->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT;
	bigSpan->_n = NPAGES - 1;

	_spanLists[bigSpan->_n].PushFront(bigSpan);

	//盡量避免代碼重復,遞歸調用自己
	return NewSpan(k);
}

釋放過程

當釋放對象時,我們需要判斷釋放對象的大小:

釋放內存的大小釋放方式
x &le; 256 K B ( 32 頁 ) x \leq 256KB(32頁) x&le;256KB(32頁)釋放給thread cache
32 頁 < x &le; 128 頁 32 頁< x \leq 128頁 32頁<x&le;128頁釋放給page cache
x &ge; 128 頁 x \geq 128頁 x&ge;128頁釋放給堆

因此當釋放對象時,我們需要先找到該對象對應的span,但是在釋放對象時我們只知道該對象的起始地址。這也就是我們在申請大于256KB的內存時,也要給申請到的內存建立span結構,并建立起始頁號與該span之間的映射關系的原因。此時我們就可以通過釋放對象的起始地址計算出起始頁號,進而通過頁號找到該對象對應的span。

static void ConcurrentFree(void* ptr, size_t size)
{
	if (size > MAX_BYTES) //大于256KB的內存釋放
	{
		Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);

		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else
	{
		assert(pTLSThreadCache);
		pTLSThreadCache->Deallocate(ptr, size);
	}
}

因此page cache在回收span時也需要進行判斷,如果該span的大小是小于等于128頁的,那么直接還給page cache進行了,page cache會嘗試對其進行合并。而如果該span的大小是大于128頁的,那么說明該span是直接向堆申請的,我們直接將這塊內存釋放給堆,然后將這個span結構進行delete就行了。

//釋放空閑的span回到PageCache,并合并相鄰的span
void PageCache::ReleaseSpanToPageCache(Span* span)
{
	if (span->_n > NPAGES - 1) //大于128頁直接釋放給堆
	{
		void* ptr = (void*)(span->_pageId << PAGE_SHIFT);
		SystemFree(ptr);
		delete span;
		return;
	}
	//對span的前后頁,嘗試進行合并,緩解內存碎片問題
	//1、向前合并
	while (1)
	{
		PAGE_ID prevId = span->_pageId - 1;
		auto ret = _idSpanMap.find(prevId);
		//前面的頁號沒有(還未向系統申請),停止向前合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//前面的頁號對應的span正在被使用,停止向前合并
		Span* prevSpan = ret->second;
		if (prevSpan->_isUse == true)
		{
			break;
		}
		//合并出超過128頁的span無法進行管理,停止向前合并
		if (prevSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//進行向前合并
		span->_pageId = prevSpan->_pageId;
		span->_n += prevSpan->_n;

		//將prevSpan從對應的雙鏈表中移除
		_spanLists[prevSpan->_n].Erase(prevSpan);

		delete prevSpan;
	}
	//2、向后合并
	while (1)
	{
		PAGE_ID nextId = span->_pageId + span->_n;
		auto ret = _idSpanMap.find(nextId);
		//后面的頁號沒有(還未向系統申請),停止向后合并
		if (ret == _idSpanMap.end())
		{
			break;
		}
		//后面的頁號對應的span正在被使用,停止向后合并
		Span* nextSpan = ret->second;
		if (nextSpan->_isUse == true)
		{
			break;
		}
		//合并出超過128頁的span無法進行管理,停止向后合并
		if (nextSpan->_n + span->_n > NPAGES - 1)
		{
			break;
		}
		//進行向后合并
		span->_n += nextSpan->_n;

		//將nextSpan從對應的雙鏈表中移除
		_spanLists[nextSpan->_n].Erase(nextSpan);

		delete nextSpan;
	}
	//將合并后的span掛到對應的雙鏈表當中
	_spanLists[span->_n].PushFront(span);
	//建立該span與其首尾頁的映射
	_idSpanMap[span->_pageId] = span;
	_idSpanMap[span->_pageId + span->_n - 1] = span;
	//將該span設置為未被使用的狀態
	span->_isUse = false;
}

說明一下,直接向堆申請內存時我們調用的接口是VirtualAlloc,與之對應的將內存釋放給堆的接口叫做VirtualFree,而Linux下的brk和mmap對應的釋放接口叫做sbrk和unmmap。此時我們也可以將這些釋放接口封裝成一個叫做SystemFree的接口,當我們需要將內存釋放給堆時直接調用SystemFree即可。

//直接將內存還給堆
inline static void SystemFree(void* ptr)
{
#ifdef _WIN32
	VirtualFree(ptr, 0, MEM_RELEASE);
#else
	//linux下sbrk unmmap等
#endif
}

簡單測試

下面我們對大于256KB的申請釋放流程進行簡單的測試:

//找page cache申請
void* p1 = ConcurrentAlloc(257 * 1024); //257KB
ConcurrentFree(p1, 257 * 1024);

//找堆申請
void* p2 = ConcurrentAlloc(129 * 8 * 1024); //129頁
ConcurrentFree(p2, 129 * 8 * 1024);

當申請257KB的內存時,由于257KB的內存按頁向上對齊后是33頁,并沒有大于128頁,因此不會直接向堆進行申請,會向page cache申請內存,但此時page cache當中實際是沒有內存的,最終page cache就會向堆申請一個128頁的span,將其切分成33頁的span和95頁的span,并將33頁的span進行返回。

C++高并發內存池如何實現

而在釋放內存時,由于該對象的大小大于了256KB,因此不會將其還給thread cache,而是直接調用的page cache當中的釋放接口。

C++高并發內存池如何實現

由于該對象的大小是33頁,不大于128頁,因此page cache也不會直接將該對象還給堆,而是嘗試對其進行合并,最終就會把這個33頁的span和之前剩下的95頁的span進行合并,最終將合并后的128頁的span掛到第128號桶中。

C++高并發內存池如何實現

當申請129頁的內存時,由于是大于256KB的,于是還是調用的page cache對應的申請接口,但此時申請的內存同時也大于128頁,因此會直接向堆申請。在申請后還會建立該span與其起始頁號之間的映射,便于釋放時可以通過頁號找到該span。

C++高并發內存池如何實現

在釋放內存時,通過對象的地址找到其對應的span,從span結構中得知釋放內存的大小大于128頁,于是會將該內存直接還給堆。

C++高并發內存池如何實現

使用定長內存池配合脫離使用new

tcmalloc是要在高并發場景下替代malloc進行內存申請的,因此tcmalloc在實現的時,其內部是不能調用malloc函數的,我們當前的代碼中存在通過new獲取到的內存,而new在底層實際上就是封裝了malloc。

為了完全脫離掉malloc函數,此時我們之前實現的定長內存池就起作用了,代碼中使用new時基本都是為Span結構的對象申請空間,而span對象基本都是在page cache層創建的,因此我們可以在PageCache類當中定義一個_spanPool,用于span對象的申請和釋放。

//單例模式
class PageCache
{
public:
	//...
private:
	ObjectPool<Span> _spanPool;
};

然后將代碼中使用new的地方替換為調用定長內存池當中的New函數,將代碼中使用delete的地方替換為調用定長內存池當中的Delete函數。

//申請span對象
Span* span = _spanPool.New();
//釋放span對象
_spanPool.Delete(span);

注意,當使用定長內存池當中的New函數申請Span對象時,New函數通過定位new也是對Span對象進行了初始化的。

此外,每個線程第一次申請內存時都會創建其專屬的thread cache,而這個thread cache目前也是new出來的,我們也需要對其進行替換。

//通過TLS,每個線程無鎖的獲取自己專屬的ThreadCache對象
if (pTLSThreadCache == nullptr)
{
	static std::mutex tcMtx;
	static ObjectPool<ThreadCache> tcPool;
	tcMtx.lock();
	pTLSThreadCache = tcPool.New();
	tcMtx.unlock();
}

這里我們將用于申請ThreadCache類對象的定長內存池定義為靜態的,保持全局只有一個,讓所有線程創建自己的thread cache時,都在個定長內存池中申請內存就行了。

但注意在從該定長內存池中申請內存時需要加鎖,防止多個線程同時申請自己的ThreadCache對象而導致線程安全問題。

最后在SpanList的構造函數中也用到了new,因為SpanList是帶頭循環雙向鏈表,所以在構造期間我們需要申請一個span對象作為雙鏈表的頭結點。

//帶頭雙向循環鏈表
class SpanList
{
public:
	SpanList()
	{
		_head = _spanPool.New();
		_head->_next = _head;
		_head->_prev = _head;
	}
private:
	Span* _head;
	static ObjectPool<Span> _spanPool;
};

由于每個span雙鏈表只需要一個頭結點,因此將這個定長內存池定義為靜態的,保持全局只有一個,讓所有span雙鏈表在申請頭結點時,都在一個定長內存池中申請內存就行了。

釋放對象時優化為不傳對象大小

當我們使用malloc函數申請內存時,需要指明申請內存的大小;而當我們使用free函數釋放內存時,只需要傳入指向這塊內存的指針即可。

而我們目前實現的內存池,在釋放對象時除了需要傳入指向該對象的指針,還需要傳入該對象的大小。

原因如下:

  • 如果釋放的是大于256KB的對象,需要根據對象的大小來判斷這塊內存到底應該還給page cache,還是應該直接還給堆。

  • 如果釋放的是小于等于256KB的對象,需要根據對象的大小計算出應該還給thread cache的哪一個哈希桶。

如果我們也想做到,在釋放對象時不用傳入對象的大小,那么我們就需要建立對象地址與對象大小之間的映射。由于現在可以通過對象的地址找到其對應的span,而span的自由鏈表中掛的都是相同大小的對象。

因此我們可以在Span結構中再增加一個_objSize成員,該成員代表著這個span管理的內存塊被切成的一個個對象的大小。

//管理以頁為單位的大塊內存
struct Span
{
	PAGE_ID _pageId = 0;        //大塊內存起始頁的頁號
	size_t _n = 0;              //頁的數量

	Span* _next = nullptr;      //雙鏈表結構
	Span* _prev = nullptr;

	size_t _objSize = 0;        //切好的小對象的大小
	size_t _useCount = 0;       //切好的小塊內存,被分配給thread cache的計數
	void* _freeList = nullptr;  //切好的小塊內存的自由鏈表

	bool _isUse = false;        //是否在被使用
};

而所有的span都是從page cache中拿出來的,因此每當我們調用NewSpan獲取到一個k頁的span時,就應該將這個span的_objSize保存下來。

Span* span = PageCache::GetInstance()->NewSpan(SizeClass::NumMovePage(size));
span->_objSize = size;

代碼中有兩處,一處是在central cache中獲取非空span時,如果central cache對應的桶中沒有非空的span,此時會調用NewSpan獲取一個k頁的span;另一處是當申請大于256KB內存時,會直接調用NewSpan獲取一個k頁的span。

此時當我們釋放對象時,就可以直接從對象的span中獲取到該對象的大小,準確來說獲取到的是對齊以后的大小。

static void ConcurrentFree(void* ptr)
{
	Span* span = PageCache::GetInstance()->MapObjectToSpan(ptr);
	size_t size = span->_objSize;
	if (size > MAX_BYTES) //大于256KB的內存釋放
	{
		PageCache::GetInstance()->_pageMtx.lock();
		PageCache::GetInstance()->ReleaseSpanToPageCache(span);
		PageCache::GetInstance()->_pageMtx.unlock();
	}
	else
	{
		assert(pTLSThreadCache);
		pTLSThreadCache->Deallocate(ptr, size);
	}
}

讀取映射關系時的加鎖問題

我們將頁號與span之間的映射關系是存儲在PageCache類當中的,當我們訪問這個映射關系時是需要加鎖的,因為STL容器是不保證線程安全的。

對于當前代碼來說,如果我們此時正在page cache進行相關操作,那么訪問這個映射關系是安全的,因為當進入page cache之前是需要加鎖的,因此可以保證此時只有一個線程在進行訪問。

但如果我們是在central cache訪問這個映射關系,或是在調用ConcurrentFree函數釋放內存時訪問這個映射關系,那么就存在線程安全的問題。因為此時可能其他線程正在page cache當中進行某些操作,并且該線程此時可能也在訪問這個映射關系,因此當我們在page cache外部訪問這個映射關系時是需要加鎖的。

實際就是在調用page cache對外提供訪問映射關系的函數時需要加鎖,這里我們可以考慮使用C++當中的unique_lock,當然你也可以用普通的鎖。

//獲取從對象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //頁號

	std::unique_lock<std::mutex> lock(_pageMtx); //構造時加鎖,析構時自動解鎖
	auto ret = _idSpanMap.find(id);
	if (ret != _idSpanMap.end())
	{
		return ret->second;
	}
	else
	{
		assert(false);
		return nullptr;
	}
}

多線程環境下對比malloc測試

之前我們只是對代碼進行了一些基礎的單元測試,下面我們在多線程場景下對比malloc進行測試。

void BenchmarkMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;
	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&, k]() {
			std::vector<void*> v;
			v.reserve(ntimes);
			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(malloc(16));
					//v.push_back(malloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();
				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					free(v[i]);
				}
				size_t end2 = clock();
				v.clear();
				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}
	for (auto& t : vthread)
	{
		t.join();
	}
	printf("%u個線程并發執行%u輪次,每輪次malloc %u次: 花費:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime);
	printf("%u個線程并發執行%u輪次,每輪次free %u次: 花費:%u ms\n",
		nworks, rounds, ntimes, free_costtime);
	printf("%u個線程并發malloc&free %u次,總計花費:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}

void BenchmarkConcurrentMalloc(size_t ntimes, size_t nworks, size_t rounds)
{
	std::vector<std::thread> vthread(nworks);
	std::atomic<size_t> malloc_costtime = 0;
	std::atomic<size_t> free_costtime = 0;
	for (size_t k = 0; k < nworks; ++k)
	{
		vthread[k] = std::thread([&]() {
			std::vector<void*> v;
			v.reserve(ntimes);
			for (size_t j = 0; j < rounds; ++j)
			{
				size_t begin1 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					v.push_back(ConcurrentAlloc(16));
					//v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));
				}
				size_t end1 = clock();
				size_t begin2 = clock();
				for (size_t i = 0; i < ntimes; i++)
				{
					ConcurrentFree(v[i]);
				}
				size_t end2 = clock();
				v.clear();
				malloc_costtime += (end1 - begin1);
				free_costtime += (end2 - begin2);
			}
		});
	}
	for (auto& t : vthread)
	{
		t.join();
	}
	printf("%u個線程并發執行%u輪次,每輪次concurrent alloc %u次: 花費:%u ms\n",
		nworks, rounds, ntimes, malloc_costtime);
	printf("%u個線程并發執行%u輪次,每輪次concurrent dealloc %u次: 花費:%u ms\n",
		nworks, rounds, ntimes, free_costtime);
	printf("%u個線程并發concurrent alloc&dealloc %u次,總計花費:%u ms\n",
		nworks, nworks*rounds*ntimes, malloc_costtime + free_costtime);
}

int main()
{
	size_t n = 10000;
	cout << "==========================================================" <<
		endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;
	BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" <<
		endl;
	return 0;
}

其中測試函數各個參數的含義如下:

  • ntimes:單輪次申請和釋放內存的次數。

  • nworks:線程數。

  • rounds:輪次。

在測試函數中,我們通過clock函數分別獲取到每輪次申請和釋放所花費的時間,然后將其對應累加到malloc_costtime和free_costtime上。最后我們就得到了,nworks個線程跑rounds輪,每輪申請和釋放ntimes次,這個過程申請所消耗的時間、釋放所消耗的時間、申請和釋放總共消耗的時間。

注意,我們創建線程時讓線程執行的是lambda表達式,而我們這里在使用lambda表達式時,以值傳遞的方式捕捉了變量k,以引用傳遞的方式捕捉了其他父作用域中的變量,因此我們可以將各個線程消耗的時間累加到一起。

我們將所有線程申請內存消耗的時間都累加到malloc_costtime上, 將釋放內存消耗的時間都累加到free_costtime上,此時malloc_costtime和free_costtime可能被多個線程同時進行累加操作的,所以存在線程安全的問題。鑒于此,我們在定義這兩個變量時使用了atomic類模板,這時對它們的操作就是原子操作了。

固定大小內存的申請和釋放

我們先來測試一下固定大小內存的申請和釋放:

v.push_back(malloc(16));
v.push_back(ConcurrentAlloc(16));

此時4個線程執行10輪操作,每輪申請釋放10000次,總共申請釋放了40萬次,運行后可以看到,malloc的效率還是更高的。

C++高并發內存池如何實現

由于此時我們申請釋放的都是固定大小的對象,每個線程申請釋放時訪問的都是各自thread cache的同一個桶,當thread cache的這個桶中沒有對象或對象太多要歸還時,也都會訪問central cache的同一個桶。此時central cache中的桶鎖就不起作用了,因為我們讓central cache使用桶鎖的目的就是為了,讓多個thread cache可以同時訪問central cache的不同桶,而此時每個thread cache訪問的卻都是central cache中的同一個桶。

不同大小內存的申請和釋放

下面我們再來測試一下不同大小內存的申請和釋放:

v.push_back(malloc((16 + i) % 8192 + 1));
v.push_back(ConcurrentAlloc((16 + i) % 8192 + 1));

運行后可以看到,由于申請和釋放內存的大小是不同的,此時central cache當中的桶鎖就起作用了,ConcurrentAlloc的效率也有了較大增長,但相比malloc來說還是差一點點。

C++高并發內存池如何實現

復雜問題的調試技巧

多線程調試比單線程調試要復雜得多,調試時各個線程之間會相互切換,并且每次調試切換的時機也是不固定的,這就使得調試過程變得非常難以控制。

下面給出三個調試時的小技巧:

1、條件斷點

一般情況下我們可以直接運行程序,通過報錯來查找問題。如果此時報的是斷言錯誤,那么我們可以直接定位到報錯的位置,然后將此處的斷言改為與斷言條件相反的if判斷,在if語句里面打上一個斷點,但注意空語句是無法打斷點的,這時我們隨便在if里面加上一句代碼就可以打斷點了。

C++高并發內存池如何實現

此外,條件斷點也可以通過右擊普通斷點來進行設置。

C++高并發內存池如何實現

右擊后即可設置相應的條件,程序運行到此處時如果滿足該條件則會停下來。

C++高并發內存池如何實現

運行到條件斷點處后,我們就可以對當前程序進行進一步分析,找出斷言錯誤的被觸發原因。

2、查看函數棧幀

當程序運行到斷點處時,我們需要對當前位置進行分析,如果檢查后發現當前函數是沒有問題的,這時可能需要回到調用該函數的地方進行進一步分析,此時我們可以依次點擊“調試&rarr;窗口&rarr;調用堆棧”。

C++高并發內存池如何實現

此時我們就可以看到當前函數棧幀的調用情況,其中黃色箭頭指向的是當前所在的函數棧幀。

C++高并發內存池如何實現

雙擊函數棧幀中的其他函數,就可以跳轉到該函數對應的棧幀,此時淺灰色箭頭指向的就是當前跳轉到的函數棧幀。

C++高并發內存池如何實現

需要注意的是,監視窗口只能查看當前棧幀中的變量。如果要查看此時其他函數棧幀中變量的情況,就可以通過函數棧幀跳轉來查看。

3、疑似死循環時中斷程序

當你在某個地方設置斷點后,如果遲遲沒有運行到斷點處,而程序也沒有崩潰,這時有可能是程序進入到某個死循環了。

C++高并發內存池如何實現

這時我們可以依次點擊“調試&rarr;全部中斷”。

C++高并發內存池如何實現

這時程序就會在當前運行的地方停下來。

C++高并發內存池如何實現

性能瓶頸分析

經過前面的測試可以看到,我們的代碼此時與malloc之間還是有差距的,此時我們就應該分析分析我們當前項目的瓶頸在哪里,但這不能簡單的憑感覺,我們應該用性能分析的工具來進行分析。

VS編譯器下性能分析的操作步驟

VS編譯器中就帶有性能分析的工具的,我們可以依次點擊“調試&rarr;性能和診斷”進行性能分析,注意該操作要在Debug模式下進行。

C++高并發內存池如何實現

同時我們將代碼中n的值由10000調成了1000,否則該分析過程可能會花費較多時間,并且將malloc的測試代碼進行了屏蔽,因為我們要分析的是我們實現的高并發內存池。

int main()
{
	size_t n = 1000;
	cout << "==========================================================" <<
		endl;
	BenchmarkConcurrentMalloc(n, 4, 10);
	cout << endl << endl;
	//BenchmarkMalloc(n, 4, 10);
	cout << "==========================================================" <<
		endl;
	return 0;
}

在點擊了“調試&rarr;性能和診斷”后會彈出一個提示框,我們直接點擊“開始”進行了。

C++高并發內存池如何實現

然后會彈出一個選項框,這里我們選擇的是第二個,因為我們要分析的是各個函數的用時時間,然后點擊下一步。

C++高并發內存池如何實現

出現以下選項框繼續點擊下一步。

C++高并發內存池如何實現

最后點擊完成,就可以等待分析結果了。

C++高并發內存池如何實現

分析性能瓶頸

通過分析結果可以看到,光是Deallocate和MapObjectToSpan這兩個函數就占用了一半多的時間。

C++高并發內存池如何實現

而在Deallocate函數中,調用ListTooLong函數時消耗的時間是最多的。

C++高并發內存池如何實現

繼續往下看,在ListTooLong函數中,調用ReleaseListToSpans函數時消耗的時間是最多的。

C++高并發內存池如何實現

再進一步看,在ReleaseListToSpans函數中,調用MapObjectToSpan函數時消耗的時間是最多的。

C++高并發內存池如何實現

也就是說,最終消耗時間最多的實際就是MapObjectToSpan函數,我們這時再來看看為什么調用MapObjectToSpan函數會消耗這么多時間。通過觀察我們最終發現,調用該函數時會消耗這么多時間就是因為鎖的原因。

C++高并發內存池如何實現

因此當前項目的瓶頸點就在鎖競爭上面,需要解決調用MapObjectToSpan函數訪問映射關系時的加鎖問題。tcmalloc當中針對這一點使用了基數樹進行優化,使得在讀取這個映射關系時可以做到不加鎖。

針對性能瓶頸使用基數樹進行優化

基數樹實際上就是一個分層的哈希表,根據所分層數不同可分為單層基數樹、二層基數樹、三層基數樹等。

單層基數樹

單層基數樹實際采用的就是直接定址法,每一個頁號對應span的地址就存儲數組中在以該頁號為下標的位置。

C++高并發內存池如何實現

最壞的情況下我們需要建立所有頁號與其span之間的映射關系,因此這個數組中元素個數應該與頁號的數目相同,數組中每個位置存儲的就是對應span的指針。

//單層基數樹
template <int BITS>
class TCMalloc_PageMap1
{
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap1()
	{
		size_t size = sizeof(void*) << BITS; //需要開辟數組的大小
		size_t alignSize = SizeClass::_RoundUp(size, 1 << PAGE_SHIFT); //按頁對齊后的大小
		array_ = (void**)SystemAlloc(alignSize >> PAGE_SHIFT); //向堆申請空間
		memset(array_, 0, size); //對申請到的內存進行清理
	}
	void* get(Number k) const
	{
		if ((k >> BITS) > 0) //k的范圍不在[0, 2^BITS-1]
		{
			return NULL;
		}
		return array_[k]; //返回該頁號對應的span
	}
	void set(Number k, void* v)
	{
		assert((k >> BITS) == 0); //k的范圍必須在[0, 2^BITS-1]
		array_[k] = v; //建立映射
	}
private:
	void** array_; //存儲映射關系的數組
	static const int LENGTH = 1 << BITS; //頁的數目
};

此時當我們需要建立映射時就調用set函數,需要讀取映射關系時,就調用get函數就行了。

代碼中的非類型模板參數BITS表示存儲頁號最多需要比特位的個數。在32位下我們傳入的是32-PAGE_SHIFT,在64位下傳入的是64-PAGE_SHIFT。而其中的LENGTH成員代表的就是頁號的數目,即 2 B I T S 2^{BITS} 2BITS。

比如32位平臺下,以一頁大小為8K為例,此時頁的數目就是 2 32 &divide; 2 13 = 2 19 2^{32}\div2^{13}=2^{19} 232&divide;213=219,因此存儲頁號最多需要19個比特位,此時傳入非類型模板參數的值就是 32 &minus; 13 = 19 32-13=19 32&minus;13=19。由于32位平臺下指針的大小是4字節,因此該數組的大小就是 2 19 &times; 4 = 2 21 = 2 M 2^{19}\times4=2^{21}=2M 219&times;4=221=2M,內存消耗不大,是可行的。但如果是在64位平臺下,此時該數組的大小是 2 51 &times; 8 = 2 54 = 2 24 G 2^{51}\times8=2^{54}=2^{24}G 251&times;8=254=224G,這顯然是不可行的,實際上對于64位的平臺,我們需要使用三層基數樹。

二層基數樹

這里還是以32位平臺下,一頁的大小為8K為例來說明,此時存儲頁號最多需要19個比特位。而二層基數樹實際上就是把這19個比特位分為兩次進行映射。

比如用前5個比特位在基數樹的第一層進行映射,映射后得到對應的第二層,然后用剩下的比特位在基數樹的第二層進行映射,映射后最終得到該頁號對應的span指針。

C++高并發內存池如何實現

在二層基數樹中,第一層的數組占用 2 5 &times; 4 = 2 7 B y t e 2^{5}\times4=2^{7}Byte 25&times;4=27Byte空間,第二層的數組最多占用 2 5 &times; 2 14 &times; 4 = 2 21 = 2 M 2^{5}\times2^{14}\times4=2^{21}=2M 25&times;214&times;4=221=2M。二層基數樹相比一層基數樹的好處就是,一層基數樹必須一開始就把 2 M 2M 2M的數組開辟出來,而二層基數樹一開始時只需將第一層的數組開辟出來,當需要進行某一頁號映射時再開辟對應的第二層的數組就行了。

//二層基數樹
template <int BITS>
class TCMalloc_PageMap2
{
private:
	static const int ROOT_BITS = 5;                //第一層對應頁號的前5個比特位
	static const int ROOT_LENGTH = 1 << ROOT_BITS; //第一層存儲元素的個數
	static const int LEAF_BITS = BITS - ROOT_BITS; //第二層對應頁號的其余比特位
	static const int LEAF_LENGTH = 1 << LEAF_BITS; //第二層存儲元素的個數
	//第一層數組中存儲的元素類型
	struct Leaf
	{
		void* values[LEAF_LENGTH];
	};
	Leaf* root_[ROOT_LENGTH]; //第一層數組
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap2()
	{
		memset(root_, 0, sizeof(root_)); //將第一層的空間進行清理
		PreallocateMoreMemory(); //直接將第二層全部開辟
	}
	void* get(Number k) const
	{
		const Number i1 = k >> LEAF_BITS;        //第一層對應的下標
		const Number i2 = k & (LEAF_LENGTH - 1); //第二層對應的下標
		if ((k >> BITS) > 0 || root_[i1] == NULL) //頁號值不在范圍或沒有建立過映射
		{
			return NULL;
		}
		return root_[i1]->values[i2]; //返回該頁號對應span的指針
	}
	void set(Number k, void* v)
	{
		const Number i1 = k >> LEAF_BITS;        //第一層對應的下標
		const Number i2 = k & (LEAF_LENGTH - 1); //第二層對應的下標
		assert(i1 < ROOT_LENGTH);
		root_[i1]->values[i2] = v; //建立該頁號與對應span的映射
	}
	//確保映射[start,start_n-1]頁號的空間是開辟好了的
	bool Ensure(Number start, size_t n)
	{
		for (Number key = start; key <= start + n - 1;)
		{
			const Number i1 = key >> LEAF_BITS;
			if (i1 >= ROOT_LENGTH) //頁號超出范圍
				return false;
			if (root_[i1] == NULL) //第一層i1下標指向的空間未開辟
			{
				//開辟對應空間
				static ObjectPool<Leaf> leafPool;
				Leaf* leaf = (Leaf*)leafPool.New();
				memset(leaf, 0, sizeof(*leaf));
				root_[i1] = leaf;
			}
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //繼續后續檢查
		}
		return true;
	}
	void PreallocateMoreMemory()
	{
		Ensure(0, 1 << BITS); //將第二層的空間全部開辟好
	}
};

因此在二層基數樹中有一個Ensure函數,當需要建立某一頁號與其span之間的映射關系時,需要先調用該Ensure函數確保用于映射該頁號的空間是開辟了的,如果沒有開辟則會立即開辟。

而在32位平臺下,就算將二層基數樹第二層的數組全部開辟出來也就消耗了 2 M 2M 2M的空間,內存消耗也不算太多,因此我們可以在構造二層基數樹時就把第二層的數組全部開辟出來。

三層基數樹

上面一層基數樹和二層基數樹都適用于32位平臺,而對于64位的平臺就需要用三層基數樹了。三層基數樹與二層基數樹類似,三層基數樹實際上就是把存儲頁號的若干比特位分為三次進行映射。

C++高并發內存池如何實現

此時只有當要建立某一頁號的映射關系時,再開辟對應的數組空間,而沒有建立映射的頁號就可以不用開辟其對應的數組空間,此時就能在一定程度上節省內存空間。

//三層基數樹
template <int BITS>
class TCMalloc_PageMap3
{
private:
	static const int INTERIOR_BITS = (BITS + 2) / 3;       //第一、二層對應頁號的比特位個數
	static const int INTERIOR_LENGTH = 1 << INTERIOR_BITS; //第一、二層存儲元素的個數
	static const int LEAF_BITS = BITS - 2 * INTERIOR_BITS; //第三層對應頁號的比特位個數
	static const int LEAF_LENGTH = 1 << LEAF_BITS;         //第三層存儲元素的個數
	struct Node
	{
		Node* ptrs[INTERIOR_LENGTH];
	};
	struct Leaf
	{
		void* values[LEAF_LENGTH];
	};
	Node* NewNode()
	{
		static ObjectPool<Node> nodePool;
		Node* result = nodePool.New();
		if (result != NULL)
		{
			memset(result, 0, sizeof(*result));
		}
		return result;
	}
	Node* root_;
public:
	typedef uintptr_t Number;
	explicit TCMalloc_PageMap3()
	{
		root_ = NewNode();
	}
	void* get(Number k) const
	{
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);         //第一層對應的下標
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二層對應的下標
		const Number i3 = k & (LEAF_LENGTH - 1);                    //第三層對應的下標
		//頁號超出范圍,或映射該頁號的空間未開辟
		if ((k >> BITS) > 0 || root_->ptrs[i1] == NULL || root_->ptrs[i1]->ptrs[i2] == NULL)
		{
			return NULL;
		}
		return reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3]; //返回該頁號對應span的指針
	}
	void set(Number k, void* v)
	{
		assert(k >> BITS == 0);
		const Number i1 = k >> (LEAF_BITS + INTERIOR_BITS);         //第一層對應的下標
		const Number i2 = (k >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二層對應的下標
		const Number i3 = k & (LEAF_LENGTH - 1);                    //第三層對應的下標
		Ensure(k, 1); //確保映射第k頁頁號的空間是開辟好了的
		reinterpret_cast<Leaf*>(root_->ptrs[i1]->ptrs[i2])->values[i3] = v; //建立該頁號與對應span的映射
	}
	//確保映射[start,start+n-1]頁號的空間是開辟好了的
	bool Ensure(Number start, size_t n)
	{
		for (Number key = start; key <= start + n - 1;)
		{
			const Number i1 = key >> (LEAF_BITS + INTERIOR_BITS);         //第一層對應的下標
			const Number i2 = (key >> LEAF_BITS) & (INTERIOR_LENGTH - 1); //第二層對應的下標
			if (i1 >= INTERIOR_LENGTH || i2 >= INTERIOR_LENGTH) //下標值超出范圍
				return false;
			if (root_->ptrs[i1] == NULL) //第一層i1下標指向的空間未開辟
			{
				//開辟對應空間
				Node* n = NewNode();
				if (n == NULL) return false;
				root_->ptrs[i1] = n;
			}
			if (root_->ptrs[i1]->ptrs[i2] == NULL) //第二層i2下標指向的空間未開辟
			{
				//開辟對應空間
				static ObjectPool<Leaf> leafPool;
				Leaf* leaf = leafPool.New();
				if (leaf == NULL) return false;
				memset(leaf, 0, sizeof(*leaf));
				root_->ptrs[i1]->ptrs[i2] = reinterpret_cast<Node*>(leaf);
			}
			key = ((key >> LEAF_BITS) + 1) << LEAF_BITS; //繼續后續檢查
		}
		return true;
	}
	void PreallocateMoreMemory()
	{}
};

因此當我們要建立某一頁號的映射關系時,需要先確保存儲該頁映射的數組空間是開辟好了的,也就是調用代碼中的Ensure函數,如果對應數組空間未開辟則會立馬開辟對應的空間。

使用基數樹進行優化代碼實現

代碼更改

現在我們用基數樹對代碼進行優化,此時將PageCache類當中的unorder_map用基數樹進行替換即可,由于當前是32位平臺,因此這里隨便用幾層基數樹都可以。

//單例模式
class PageCache
{
public:
	//...
private:
	//std::unordered_map<PAGE_ID, Span*> _idSpanMap;
	TCMalloc_PageMap1<32 - PAGE_SHIFT> _idSpanMap;
};

此時當我們需要建立頁號與span的映射時,就調用基數樹當中的set函數。

_idSpanMap.set(span->_pageId, span);

而當我們需要讀取某一頁號對應的span時,就調用基數樹當中的get函數。

Span* ret = (Span*)_idSpanMap.get(id);

并且現在PageCache類向外提供的,用于讀取映射關系的MapObjectToSpan函數內部就不需要加鎖了。

//獲取從對象到span的映射
Span* PageCache::MapObjectToSpan(void* obj)
{
	PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT; //頁號
	Span* ret = (Span*)_idSpanMap.get(id);
	assert(ret != nullptr);
	return ret;
}

為什么讀取基數樹映射關系時不需要加鎖?

當某個線程在讀取映射關系時,可能另外一個線程正在建立其他頁號的映射關系,而此時無論我們用的是C++當中的map還是unordered_map,在讀取映射關系時都是需要加鎖的。

因為C++中map的底層數據結構是紅黑樹,unordered_map的底層數據結構是哈希表,而無論是紅黑樹還是哈希表,當我們在插入數據時其底層的結構都有可能會發生變化。比如紅黑樹在插入數據時可能會引起樹的旋轉,而哈希表在插入數據時可能會引起哈希表擴容。此時要避免出現數據不一致的問題,就不能讓插入操作和讀取操作同時進行,因此我們在讀取映射關系的時候是需要加鎖的。

而對于基數樹來說就不一樣了,基數樹的空間一旦開辟好了就不會發生變化,因此無論什么時候去讀取某個頁的映射,都是對應在一個固定的位置進行讀取的。并且我們不會同時對同一個頁進行讀取映射和建立映射的操作,因為我們只有在釋放對象時才需要讀取映射,而建立映射的操作都是在page cache進行的。也就是說,讀取映射時讀取的都是對應span的_useCount不等于0的頁,而建立映射時建立的都是對應span的_useCount等于0的頁,所以說我們不會同時對同一個頁進行讀取映射和建立映射的操作。

再次對比malloc進行測試

還是同樣的代碼,只不過我們用基數樹對代碼進行了優化,這時測試固定大小內存的申請和釋放的結果如下:

C++高并發內存池如何實現

可以看到,這時就算申請釋放的是固定大小的對象,其效率都是malloc的兩倍。下面在申請釋放不同大小的對象時,由于central cache的桶鎖起作用了,其效率更是變成了malloc的好幾倍。

C++高并發內存池如何實現

打包成動靜態庫

實際Google開源的tcmalloc是會直接用于替換malloc的,不同平臺替換的方式不同。比如基于Unix的系統上的glibc,使用了weak alias的方式替換;而對于某些其他平臺,需要使用hook的鉤子技術來做。

對于我們當前實現的項目,可以考慮將其打包成靜態庫或動態庫。我們先右擊解決方案資源管理器當中的項目名稱,然后選擇屬性。

C++高并發內存池如何實現

此時會彈出該選項卡,按照以下圖示就可以選擇將其打包成靜態庫或動態庫了。

C++高并發內存池如何實現

感謝各位的閱讀,以上就是“C++高并發內存池如何實現”的內容了,經過本文的學習后,相信大家對C++高并發內存池如何實現這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

c++
AI

攀枝花市| 纳雍县| 乾安县| 登封市| 奎屯市| 攀枝花市| 北碚区| 通许县| 凌源市| 东乌| 荔浦县| 汝州市| 临夏县| 大石桥市| 汉中市| 集贤县| 新邵县| 乌鲁木齐市| 阳泉市| 霍山县| 德格县| 楚雄市| 读书| 清苑县| 怀来县| 怀仁县| 鄂尔多斯市| 南澳县| 乌苏市| 项城市| 颍上县| 肃宁县| 镇雄县| 钟山县| 禹州市| 双城市| 台江县| 浦江县| 长沙市| 开江县| 贡觉县|