您好,登錄后才能下訂單哦!
一、自旋鎖
自旋鎖是一種基礎的同步原語,用于保障對共享數據的互斥訪問。與互斥鎖的相比,在獲取鎖失敗的時候不會使得線程阻塞而是一直自旋嘗試獲取鎖。當線程等待自旋鎖的時候,CPU不能做其他事情,而是一直處于輪詢忙等的狀態。自旋鎖主要適用于被持有時間短,線程不希望在重新調度上花過多時間的情況。實際上許多其他類型的鎖在底層使用了自旋鎖實現,例如多數互斥鎖在試圖獲取鎖的時候會先自旋一小段時間,然后才會休眠。如果在持鎖時間很長的場景下使用自旋鎖,則會導致CPU在這個線程的時間片用盡之前一直消耗在無意義的忙等上,造成計算資源的浪費。
使用自旋鎖時要注意:
由于自旋時不釋放CPU,因而持有自旋鎖的線程應該盡快釋放自旋鎖,否則等待該自旋鎖的線程會一直在哪里自旋,這就會浪費CPU時間。
持有自旋鎖的線程在sleep之前應該釋放自旋鎖以便其他咸亨可以獲得該自旋鎖
二、CAS操作實現自旋鎖
CAS(Compare and Swap),即比較并替換,實現并發算法時常用到的一種技術,這種操作提供了硬件級別的原子操作(通過鎖總線的方式)。CAS操作的原型可以認為是:
bool CAS(V, A, B)
其中V代表內存中的變量,A代表期待的值,B表示新值。當V的值與A相等時,將V與B的值交換。邏輯上可以用下面的偽代碼表示:
bool CAS(V, A, B) { if (V == A) { swap(V, B); return true; } return false; }
需要強調的是上面的操作是原子的,要么不做,要么全部完成。
那么已經擁有CAS操作的情況下如何實現一個自旋鎖呢?首先回憶自旋鎖的用途,本質上我們是希望能夠讓一個線程在不滿足進入臨界區的條件時,不停的忙等輪詢,直到可以運行的時候再繼續(進入臨界區)執行。那么,我們可能自然的想到使用一個bool變量來表示是否可以進入臨界區,例如以下面的偽代碼的邏輯:
while(flag == true); flag = true; /* do something ... */ flag = false; ...
這樣做的直觀想法是當flag為true的時候表示已經有線程處于臨界區內,只有當flag為fasle時才能進入,而在進入的時候立即將flag置為true。但是這樣做明顯存在一個問題,判斷flag為false和設置flag為true并不是一個不可分割的整體,有可能出現類似下面這樣的時序, 假設最初flag為false:
step | thread 1 | thread 2 |
---|---|---|
1 | while(flag == true); | |
2 | while(flag == true); | |
3 | flag = true | |
4 | flag = true | |
5 | do something | do something |
6 | flag = false | |
7 | flag = false |
step是虛構的步驟,do something為一系列指令,這里寫在一起表示并發執行。這里可以看出由于thread1讀取判斷flag的值與修改flag的值是兩個獨立的操作,中間插入了thread2的判斷操作,最終使得有兩個線程同時進入了臨界區,這與我們的期望相悖。那么如何解決呢?如果能將讀取判斷與修改的操作合二為一,變成一個不可分割的整體,那么自然就不可能出現這種交錯的場景。對于這樣一個整體操作,我們希望它能讀取內存中變量的值,并且當其等于特定值的時候,修改它為我們需要的另一個值。嗯......沒錯,這樣我們就得到了CAS操作。
現在可以重新修改我們的同步方式,不停的進行期望flag為false的CAS操作 CAS(flag, flase, b) (這里b為true),直到其返回成功為止,再進行臨界區中的操作,離開臨界區時將flag置為false。
b = true; while(!CAS(flag, false, b)); //do something flag = false;
現在,判斷操作與寫入操作已經成為了一個整體,當一個線程的CAS操作成功的時候會阻止其他線程進入臨界區,到達互斥訪問的目的。
現在我們已經可以使用CAS操作來解決臨界區的互斥訪問的問題了,但是如果每次都這樣寫一遍實在太過麻煩,因此可以進行一些封裝使得使用更加方便,也就是說...可以封裝成自旋鎖。我們可以用一個類來表示,將一個bool值作為類的數據成員,同時將CAS操作和賦值操作作為其成員函數,CAS操作其實就是加鎖操作,而后面的賦值操作就是解鎖操作。
三、用C++原子量實現
按照上面的思路,接下來用 C++ 11 引入標準庫的原子量來實現一個自旋鎖并且進行測試。
首先,我們需要一個bool值來表示鎖的狀態,這里直接使用標準庫中的原子量 atomic<bool> (C++ 11的原子量可以參考:https://www.jb51.net/article/141896.htm ,在我的平臺(Cygwin64、GCC7.3)上 atomic<bool> 的成員函數is_lock_free()返回值為true,是無鎖的實現(如果內部使用了鎖來實現的話那還叫什么自旋鎖 = =)。實際上在大多數平臺上 atomic<bool> 都是無鎖的,如果不確定的話也可以使用C++標準規定必須為無鎖實現的atomic_flag。
接下來,我們需要兩個原子操作,CAS和賦值,C++11標準庫在原子量的成員函數中直接提供了這兩個操作。
//CAS std::atomic::compare_exchange_weak( T& expected, T desired, std::memory_order order = std::memory_order_seq_cst ), std::atomic::compare_exchange_strong( T& expected, T desired, std::memory_order order = std::memory_order_seq_cst ) //賦值 void store( T desired, std::memory_order order = std::memory_order_seq_cst )
compare_exchange_weak 與 compare_exchange_strong 主要的區別在于內存中的值與expected相等的時候,CAS操作是否一定能成功,compare_exchange_weak有概率會返回失敗,而compare_exchange_strong則一定會成功。因此,compare_exchange_weak必須與循環搭配使用來保證在失敗的時候重試CAS操作。得到的好處是在某些平臺上compare_exchange_weak性能更好。按照上面的模型,我們本來就要和while搭配使用,可以使用compare_exchange_weak。最后內存序的選擇沒有特殊需求直接使用默認的std::memory_order_seq_cst。而賦值操作非常簡單直接,這個調用一定會成功(只是賦值而已 = =),沒有返回值。
實現代碼非常短,下面是源代碼:
#include <atomic> class SpinLock { public: SpinLock() : flag_(false) {} void lock() { bool expect = false; while (!flag_.compare_exchange_weak(expect, true)) { //這里一定要將expect復原,執行失敗時expect結果是未定的 expect = false; } } void unlock() { flag_.store(false); } private: std::atomic<bool> flag_; };
如上面所說,lock操作不停的嘗試CAS操作直到成功為止,unlock操作則將bool標志位復原。使用方式如下:
SpinLock myLock; myLock.lock(); //do something myLock.unlock();
接下來,我們進行正確性測試,以經典的i++ 問題為例:
#include <atomic> #include <thread> #include <vector> //自旋鎖類定義 class SpinLock { public: SpinLock() : flag_(false) {} void lock() { bool expect = false; while (!flag_.compare_exchange_weak(expect, true)) { expect = false; } } void unlock() { flag_.store(false); } private: std::atomic<bool> flag_; }; //每個線程自增次數 const int kIncNum = 1000000; //線程數 const int kWorkerNum = 10; //自增計數器 int count = 0; //自旋鎖 SpinLock spinLock; //每個線程的工作函數 void IncCounter() { for (int i = 0; i < kIncNum; ++i) { spinLock.lock(); count++; spinLock.unlock(); } } int main() { std::vector<std::thread> workers; std::cout << "SpinLock inc MyTest start" << std::endl; count = 0; std::cout << "start " << kWorkerNum << " workers_" << "every worker inc " << kIncNum << std::endl; std::cout << "count_: " << count << std::endl; //創建10個工作線程進行自增操作 for (int i = 0; i < kWorkerNum; ++i) workers.push_back(std::move(std::thread(IncCounter))); for (auto it = workers.begin(); it != workers.end(); it++) it->join(); std::cout << "workers_ end" << std::endl; std::cout << "count_: " << count << std::endl; //驗證結果 if (count == kIncNum * kWorkerNum) { std::cout << "SpinLock inc MyTest passed" << std::endl; return true; } else { std::cout << "SpinLock inc MyTest failed" << std::endl; return false; } return 0; }
上面的代碼中創建了10個線程對共享的全局變量count分別進行一百萬次++操作,然后驗證結果是否正確,最終執行的輸出為:
SpinLock inc MyTest start start 10 workers_every worker inc 1000000 count_: 0 workers_ end count_: 10000000 SpinLock inc MyTest passed
從結果中可以看出我們實現的自旋鎖起到了保護臨界區(這里就是i++ )的作用,count最后的值等于每個線程執行自增的數目之和。作為對比,可以去掉IncCounter中的加鎖解鎖操作:
void IncCounter() { for (int i = 0; i < kIncNum; ++i) { //spinLock.lock(); count++; //spinLock.unlock(); } }
執行后的輸出為:
SpinLock inc MyTest start start 10 workers_every worker inc 1000000 count_: 0 workers_ end count_: 7254522 SpinLock inc MyTest failed
結果由于多個線程同時執行 i++ 造成結果錯誤。
到這里,我們就通過 C++ 11的原子量實現了一個簡單的自旋鎖。這里只是對C++原子量的一個小使用,無論是自旋鎖本身還是原子量都還有許多值得探究的地方。
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。