您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關怎么理解C++11 中的線程及鎖和條件變量,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
線程
類std::thread代表一個可執行線程,使用時必須包含頭文件<thread>。std::thread可以和普通函數,匿名函數和仿函數(一個實現了operator()函數的類)一同使用。另外,它允許向線程函數傳遞任意數量的參數。
#include <thread> void func() { // do some work } int main() { std::thread t(func); t.join(); return 0; }
上例中,t 是一個線程對象,函數func()運行于該線程中。對join()函數的調用將使調用線程(本例是指主線程)一直處于阻塞狀態,直到正在執行的線程t執行結束。如果線程函數返回某個值,該值也將被忽略。不過,該函數可以接收任意數量的參數。
void func(int i, double d, const std::string& s) { std::cout << i << ", " << d << ", " << s << std::endl; } int main() { std::thread t(func, 1, 12.50, "sample"); t.join(); return 0; }
盡管可以向線程函數傳遞任意數量的參數,但是所有的參數應當按值傳遞。如果需要將參數按引用傳遞,那要向下例所示那樣,必須將參數用std::ref 或者std::cref進行封裝。
void func(int& a) { a++; } int main() { int a = 42; std::thread t(func, std::ref(a)); t.join(); std::cout << a << std::endl; return 0; }
該程序打印結果為43,但是如果不用std::ref把參數a進行封裝的話,輸出結果將為42.
除了join方法外,該線程類還提供了另外兩個方法:
swap:交換兩個線程對象的底層句柄。
Detach: 允許執行該方法的線程脫離其線程對象而繼續獨立執行。脫離后的線程不再是可結合線程(你不能等待它們執行結束)。
int main() { std::thread t(funct); t.detach(); return 0; }
有一點非常重要,如果線程函數拋出異常,使用常規的try-catch語句是捕獲不到該異常的。換句話說,以下的做法是不可行的:
try { std::thread t1(func); std::thread t2(func); t1.join(); t2.join(); } catch(const std::exception& ex) { std::cout << ex.what() << std::endl; }
要在線程間傳遞異常,你需要在線程函數中捕獲他們,將其存儲在合適的地方,比便于另外的線程可以隨后獲取到這些異常。
std::mutex g_mutex; std::vector<std::exception_ptr> g_exceptions; void throw_function() { throw std::exception("something wrong happened"); } void func() { try { throw_function(); } catch(...) { std::lock_guard<std::mutex> lock(g_mutex); g_exceptions.push_back(std::current_exception()); } } int main() { g_exceptions.clear(); std::thread t(func); t.join(); for(auto& e : g_exceptions) { try { if(e != nullptr) { std::rethrow_exception(e); } } catch(const std::exception& e) { std::cout << e.what() << std::endl; } } return 0; }
想要知道更多的關于捕獲和傳遞異常的知識,可以閱讀這兩本書在主線程中處理輔助線程拋出的C++異常和怎樣在線程間傳遞異常。
在深入學習之前,有一點需要注意 <thread>頭文件在命名空間std::this_thread中提供了一些幫助函數:
get_id: 返回當前線程的id.
yield:在處于等待狀態時,可以讓調度器先運行其他可用的線程。
sleep_for:阻塞當前線程,時間不少于其參數指定的時間。
sleep_util:在參數指定的時間到達之前,使當前線程一直處于阻塞狀態。
鎖
在上面的例子中,我需要對vector g_exceptions進行同步訪問,以確保在同一時間只能有一個線程向其中添加新元素。為此,我使用了互斥量,并對該互斥進行加鎖。互斥量是一個核心 同步原語,C++ 11的<mutex>頭文件里包含了四種不同的互斥量。
Mutex: 提供了核心函數 lock() 和 unlock(),以及非阻塞方法的try_lock()方法,一旦互斥量不可用,該方法會立即返回。
Recursive_mutex:允許在同一個線程中對一個互斥量的多次請求。
Timed_mutex:同上面的mutex類似,但它還有另外兩個方法 try_lock_for() 和 try_lock_until(),分別用于在某個時間段里或者某個時刻到達之間獲取該互斥量。
Recursive_timed_mutex: 結合了timed_mutex 和recuseive_mutex的使用。
下面是一個使用了std::mutex的例子(注意前面提到過的幫助函數get_id()和sleep_for()的用法)。
#include <iostream> #include <thread> #include <mutex> #include <chrono> std::mutex g_lock; void func() { g_lock.lock(); std::cout << "entered thread " << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::seconds(rand() % 10)); std::cout << "leaving thread " << std::this_thread::get_id() << std::endl; g_lock.unlock(); } int main() { srand((unsigned int)time(0)); std::thread t1(func); std::thread t2(func); std::thread t3(func); t1.join(); t2.join(); t3.join(); return 0; }
輸出結果如下所示:
entered thread 10144 leaving thread 10144 entered thread 4188 leaving thread 4188 entered thread 3424 leaving thread 3424
lock()和unlock()這兩個方法應該一目了然,***個方法用來對互斥量加鎖,如果互斥量不可用,便處于阻塞狀態。后者則用來對互斥量解鎖。
下面這個例子展示了一個簡單的線程安全容器(內部使用std::vector).這個容器帶有添加單個元素的add()方法和添加多個元素的addrange()方法,addrange()方法內部僅僅調用了add()方法。
注意:就像下面的評論里所指出的一樣,由于某些原因,包括使用了va_args,這不是一個標準的線程安全容器。而且,dump()方法也不是容器 的方法,從真正的實現上來說,它只是一個幫助(獨立的)函數。這個例子僅僅用來告訴大家一些有關互斥量的概念,而不是實現一個完全成熟的,無任何錯誤的線 程安全容器。
template <typename T> class container { std::mutex _lock; std::vector<T> _elements; public: void add(T element) { _lock.lock(); _elements.push_back(element); _lock.unlock(); } void addrange(int num, ...) { va_list arguments; va_start(arguments, num); for (int i = 0; i < num; i++) { _lock.lock(); add(va_arg(arguments, T)); _lock.unlock(); } va_end(arguments); } void dump() { _lock.lock(); for(auto e : _elements) std::cout << e << std::endl; _lock.unlock(); } }; void func(container<int>& cont) { cont.addrange(3, rand(), rand(), rand()); } int main() { srand((unsigned int)time(0)); container<int> cont; std::thread t1(func, std::ref(cont)); std::thread t2(func, std::ref(cont)); std::thread t3(func, std::ref(cont)); t1.join(); t2.join(); t3.join(); cont.dump(); return 0; }
運行該程序時,會進入死鎖狀態。原因是該容器試圖多次去獲取同一個互斥量,卻一直沒有釋放它,這樣是不可行的。
在這里,使用std::recursive_mutex就可以很好地解決這個問題,它允許同一個線程多次獲取同一個互斥量,可獲取的互斥量的***次數并沒有具體說明。但是一旦超過***次數,再對lock進行調用就會拋出std::system_error錯誤異常。
要想修改上述代碼中的問題(除了修改addrange()方法的實現,使它不去調用lock()和unlock()),還可以將互斥量std::mutex改為std::recursive_mutex
template <typename T> class container { std::mutex _lock; std::vector<T> _elements; public: void add(T element) { _lock.lock(); _elements.push_back(element); _lock.unlock(); } void addrange(int num, ...) { va_list arguments; va_start(arguments, num); for (int i = 0; i < num; i++) { _lock.lock(); add(va_arg(arguments, T)); _lock.unlock(); } va_end(arguments); } void dump() { _lock.lock(); for(auto e : _elements) std::cout << e << std::endl; _lock.unlock(); } }; void func(container<int>& cont) { cont.addrange(3, rand(), rand(), rand()); } int main() { srand((unsigned int)time(0)); container<int> cont; std::thread t1(func, std::ref(cont)); std::thread t2(func, std::ref(cont)); std::thread t3(func, std::ref(cont)); t1.join(); t2.join(); t3.join(); cont.dump(); return 0; }
修改后,就會得到下面的輸出結果。
6334 18467 41 6334 18467 41 6334 18467 41
聰明的讀者會注意到每次調用func()都會產生相同的數字序列。這是因為種子數是線程本地化的,僅僅在主線程中調用了srand()對種子進行了初始化,在其他工作線程中并沒用進行初始化,所以每次都得到相同的數字序列。
顯式的加鎖和解鎖會導致一些問題,比如忘記解鎖或者請求加鎖的順序不正確,進而產生死鎖。該標準提供了一些類和函數幫助解決此類問題。這些封裝類保證了在RAII風格上互斥量使用的一致性,可以在給定的代碼范圍內自動加鎖和解鎖。封裝類包括:
Lock_guard:在構造對象時,它試圖去獲取互斥量的所有權(通過調用lock()),在析構對象時,自動釋放互斥量(通過調用unlock()).這是一個***的類。
Unique_lock:這個一通用的互斥量封裝類,不同于lock_guard,它還支持延遲加鎖,時間加鎖和遞歸加鎖以及鎖所有權的轉移和條件變量的使用。這也是一個***的類,但它是可移動類。
有了這些封裝類,我們可以像下面這樣改寫容器類:
template <typename T> class container { std::recursive_mutex _lock; std::vector<T> _elements; public: void add(T element) { std::lock_guard<std::recursive_mutex> locker(_lock); _elements.push_back(element); } void addrange(int num, ...) { va_list arguments; va_start(arguments, num); for (int i = 0; i < num; i++) { std::lock_guard<std::recursive_mutex> locker(_lock); add(va_arg(arguments, T)); } va_end(arguments); } void dump() { std::lock_guard<std::recursive_mutex> locker(_lock); for(auto e : _elements) std::cout << e << std::endl; } };
有人也許會問,既然dump()方法并沒有對容器的狀態做任何修改,是不是應該定義為const方法呢?但是你如果將它定義為const,編譯器會報出下面的錯誤:
‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)’ : cannot convert parameter 1 from ‘const std::recursive_mutex’ to ‘std::recursive_mutex &’
一個互斥量(不管使用的哪一種實現)必須要獲取和釋放,這就意味著要調用非const的lock()和unlock()方法。所以從邏輯上來 講,lock_guard的參數不能使const(因為如果該方法為const,互斥量也必需是const).解決這個問題的辦法就是將互斥量定義為可變 的mutable,Mutable允許在常函數中修改狀態。
不過,這種方法只能用于隱藏或者元狀態(就像對計算結果或查詢的數據進行緩存,以便下次調用時可以直接使用,不需要進行多次計算和查詢。再或者,對在一個對象的實際狀態起輔助作用的互斥量進行位的修改)。
template <typename T> class container { mutable std::recursive_mutex _lock; std::vector<T> _elements; public: void dump() const { std::lock_guard<std::recursive_mutex> locker(_lock); for(auto e : _elements) std::cout << e << std::endl; } };
這些封裝類的構造函數可以重載,接受一個參數用來指明加鎖策略。可用的策略如下:
defer_lock of type defer_lock_t:不獲取互斥量的擁有權
try_to_lock of type try_to_lock_t:在不阻塞的情況下試圖獲取互斥量的擁有權
adopte_lock of type adopt_lock_t:假設調用線程已經擁有互斥量的所有權
這些策略的聲明如下:
struct defer_lock_t { }; struct try_to_lock_t { }; struct adopt_lock_t { }; constexpr std::defer_lock_t defer_lock = std::defer_lock_t(); constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t(); constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t();
除了這些互斥量的封裝類,該標準還提供了兩個方法,用于對一個或多個互斥量進行加鎖。
lock:使用一種可以避免死鎖的算法對互斥量加鎖(通過調用lock(),try_lock()和unlock()).
try_lock():按照互斥量被指定的順序,試著通過調用try_lock()來對多個互斥量加鎖。
這是一個發生死鎖的例子:有一個用來存儲元素的容器和一個函數exchange(),該函數用來交換兩個容器中的元素。要成為線程安全函數,該函數通過獲取每個容器的互斥量,來對兩個容器的訪問進行同步操作。
template <typename T> class container { public: std::mutex _lock; std::set<T> _elements; void add(T element) { _elements.insert(element); } void remove(T element) { _elements.erase(element); } }; void exchange(container<int>& cont1, container<int>& cont2, int value) { cont1._lock.lock(); std::this_thread::sleep_for(std::chrono::seconds(1)); // <-- forces context switch to simulate the deadlock cont2._lock.lock(); cont1.remove(value); cont2.add(value); cont1._lock.unlock(); cont2._lock.unlock(); }
假設這個函數是由兩個不同的線程進行調用的,***個線程中,一個元素從容器1中移除,添加到容器2中。第二個線程中,該元素又從容器2移除添加到容器1中。這種做法會導致發生死鎖(如果在獲取***個鎖后,線程上下文剛好從一個線程切換到另一個線程,導致發生死鎖)。
int main() { srand((unsigned int)time(NULL)); container<int> cont1; cont1.add(1); cont1.add(2); cont1.add(3); container<int> cont2; cont2.add(4); cont2.add(5); cont2.add(6); std::thread t1(exchange, std::ref(cont1), std::ref(cont2), 3); std::thread t2(exchange, std::ref(cont2), std::ref(cont1), 6) t1.join(); t2.join(); return 0; }
要解決這個問題,可以使用std::lock來確保以避免發生死鎖的方式來獲取鎖。
void exchange(container<int>& cont1, container<int>& cont2, int value) { std::lock(cont1._lock, cont2._lock); cont1.remove(value); cont2.add(value); cont1._lock.unlock(); cont2._lock.unlock(); }
條件變量C++11 還提供了另外一種同步原語,就是條件變量,它能使一個或多個線程進入阻塞狀態,直到接到另一個線程的通知,或者發生超時或虛假喚醒時,才退出阻塞.在頭文件<condition_variable> 里對條件變量有兩種實現:
condition_variable:要求任何在等待該條件變量的線程必須先獲取std::unique_lock鎖。
Condition_variable_any:是一種更加通用的實現,可以用于任意滿足鎖的基本條件的類型(該實現只要提供了lock()和 unlock()方法即可)。因為使用它花費的代價比較高(從性能和操作系統資源的角度來講),所以只有在提供了必不可少的額外的靈活性的條件下才提倡使 用它。
下面來講講條件變量的工作原理: 至少有一個線程在等待某個條件變為true。等待的線程必須先獲取unique_lock 鎖。該鎖被傳遞給wait()方法,wait()方法會釋放互斥量,并將線程掛起,直到條件變量接收到信號。收到信號后,線程會被喚醒,同時該鎖也會被重 新獲取。
至少有一個線程發送信號使某個條件變為true。可以使用notify_one()來發送信號,同時喚醒一個正在等待該條件收到信號的處于阻塞狀態的線程,或者用notify_all()來喚醒在等待該條件的所有線程。
在多處理器系統中,因為一些復雜情況,要想完全預測到條件被喚醒并不容易,還會出現虛假喚醒的情況。就是說,在沒人給條件變量發送信號的情況下,線程也可能會被喚醒。所以線程被喚醒后,還需要檢測條件是否為true。因為可能會多次發生虛假喚醒,所以需要進行循環檢測。
下面代碼是一個使用條件變量來同步線程的例子:幾個工作線程運行時可能會產生錯誤并將錯誤代碼放到隊列里。記錄線程會從隊列里取出錯誤代碼并輸出它 們來處理這些錯誤。發生錯誤的時候,工作線程會給記錄線程發信號。記錄線程一直在等待條件變量接收信號。為了避免發生虛假喚醒,該等待過程在循環檢測條件 的布爾值。
#include <thread> #include <mutex> #include <condition_variable> #include <iostream> #include <queue> #include <random> std::mutex g_lockprint; std::mutex g_lockqueue; std::condition_variable g_queuecheck; std::queue<int> g_codes; bool g_done; bool g_notified; void workerfunc(int id, std::mt19937& generator) { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\trunning..." << std::endl; } // simulate work std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); // simulate error int errorcode = id*100+1; { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl; } // notify error to be logged { std::unique_lock<std::mutex> locker(g_lockqueue); g_codes.push(errorcode); g_notified = true; g_queuecheck.notify_one(); } } void loggerfunc() { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\trunning..." << std::endl; } // loop until end is signaled while(!g_done) { std::unique_lock<std::mutex> locker(g_lockqueue); while(!g_notified) // used to avoid spurious wakeups { g_queuecheck.wait(locker); } // if there are error codes in the queue process them while(!g_codes.empty()) { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl; g_codes.pop(); } g_notified = false; } } int main() { // initialize a random generator std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count()); // start the logger std::thread loggerthread(loggerfunc); // start the working threads std::vector<std::thread> threads; for(int i = 0; i < 5; ++i) { threads.push_back(std::thread(workerfunc, i+1, std::ref(generator))); } // work for the workers to finish for(auto& t : threads) t.join(); // notify the logger to finish and wait for it g_done = true; loggerthread.join(); return 0; }
運行上述代碼,輸出結果如下(注意每次運行,輸出結果都不一樣;因為每個工作線程運行時都有一個隨機的休眠時間)。
[logger] running... [worker 1] running... [worker 2] running... [worker 3] running... [worker 4] running... [worker 5] running... [worker 1] an error occurred: 101 [worker 2] an error occurred: 201 [logger] processing error: 101 [logger] processing error: 201 [worker 5] an error occurred: 501 [logger] processing error: 501 [worker 3] an error occurred: 301 [worker 4] an error occurred: 401 [logger] processing error: 301 [logger] processing error: 401
上面看到的wait()方法有兩個重載:
***個重載帶有鎖unique_lock;這個重載方法可以釋放鎖,阻塞線程,并把線程添加到正在等待這一條件變量的線程隊列里面。當該條件變量收到信號或者發生虛假喚醒時,線程就會被喚醒。它們其中任何一個發生時,鎖都會被重新獲取,函數返回。
第二個重載除了帶有鎖unique_lock外,還帶有循環判定直到返回false值;這個重載是用來避免發生虛假喚醒。它基本上等價于下面的語句:
while(!predicate()) wait(lock);
因此在上面的例子中,通過使用重載的wait()方法以及驗證隊列狀態的判斷(空或不空),就可以避免使用布爾變量g_notified了。
void workerfunc(int id, std::mt19937& generator) { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\trunning..." << std::endl; } // simulate work std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); // simulate error int errorcode = id*100+1; { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl; } // notify error to be logged { std::unique_lock<std::mutex> locker(g_lockqueue); g_codes.push(errorcode); g_queuecheck.notify_one(); } } void loggerfunc() { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\trunning..." << std::endl; } // loop until end is signaled while(!g_done) { std::unique_lock<std::mutex> locker(g_lockqueue); g_queuecheck.wait(locker, [&](){return !g_codes.empty();}); // if there are error codes in the queue process them while(!g_codes.empty()) { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl; g_codes.pop(); } } }
除了這個重載的wait()方法,還有另外兩個類似的重載方法,也帶有避免虛假喚醒的判定。
Wait_for: 在條件變量收到信號或者指定的超時發生前,線程一直處于阻塞狀態;
Wait_until:在條件變量收到信號或者指定的時刻到達之前,線程一直處于阻塞狀態。
這兩個函數的不帶有判定的重載返回cv_status狀態,用來表明發生超時或者線程被喚醒是因為條件變量收到信號或者發生虛假喚醒。
該標準還提供了一個函數notify_all_at_thread_exit,它實現了一個機制,通知其他線程給定線程已經運行結束,并銷毀所有的 thread_local對象。該函數的引進是因為在使用了thread_local后,采用除join()之外的其他機制來等待線程會導致不正確甚至致 命的行為發生。
因為thread_local的析構函數會在等待中的線程恢復執行和可能執行結束的情況下被調用(可參考N3070和N2880得知更多信息)。
通常情況下,對這個函數的調用必須在線程生成之前。下面的例子描述了如何使用notify_all_at_thread_exit和condition_variable共同完成對兩個線程的同步操作:
std::mutex g_lockprint; std::mutex g_lock; std::condition_variable g_signal; bool g_done; void workerfunc(std::mt19937& generator) { { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "worker running..." << std::endl; } std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "worker finished..." << std::endl; } std::unique_lock<std::mutex> lock(g_lock); g_done = true; std::notify_all_at_thread_exit(g_signal, std::move(lock)); } int main() { // initialize a random generator std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count()); std::cout << "main running..." << std::endl; std::thread worker(workerfunc, std::ref(generator)); worker.detach(); std::cout << "main crunching..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "main waiting for worker..." << std::endl; } std::unique_lock<std::mutex> lock(g_lock); while(!g_done) // avoid spurious wake-ups g_signal.wait(lock); std::cout << "main finished..." << std::endl; return 0; }
如果工作線程在主線程執行結束之前結束,輸出結果將如下:
main running... worker running... main crunching... worker finished... main waiting for worker... main finished...
如果主線程比工作線程更早結束,輸出結果將如下:
main running... worker running... main crunching... main waiting for worker... worker finished... main finished...
C++11標準可以讓C++開發者以一種標準的,獨立平臺的方式來編寫多線程。這篇文章大概講述了該標準所支持的線程和同步機制。頭文 件<thread>提供了thread類(和一些幫助函數),表明thread類是一個可執行線程。頭文件<mutex>提供了 幾種互斥量的實現和對線程進行同步訪問的封裝類。頭文件<condition_variable>提供了條件變量的兩種實現,這些實現使一個 或多個線程一直處于阻塞狀態,直到接收到其他線程的通知,或發生超時或者有虛假喚醒發生時才會被喚醒。推薦讀者朋友可以閱讀其他資料來獲取更多的詳細信 息。
看完上述內容,你們對怎么理解C++11 中的線程及鎖和條件變量有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。