您好,登錄后才能下訂單哦!
這篇文章主要介紹“C++11中線程、鎖和條件變量的介紹”,在日常操作中,相信很多人在C++11中線程、鎖和條件變量的介紹問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”C++11中線程、鎖和條件變量的介紹”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
std::thread類代表了一個可執行的線程,它來自頭文件<thread>。與其它創建線程的API(比如 Windows API中的CreateThread)不同的是, 它可以使用普通函數、lambda函數以及仿函數(實現了operator()函數的類)。另外,它還允許向線程函數傳遞任意數量的參數。
#include <thread> void func() { // do some work } int main() { std::thread t(func); t.join(); return 0; }
在上面的例子中,t是一個線程對象,函數func()運行于該線程之中。調用join函數后,該調用線程(本例中指的就是主線程)就會在join進來進行執行的線程t結束執行之前,一直處于阻塞狀態。如果該線程函數執行結束后返回了一個值,該值也將被忽略。不過,該函數可以接受任意數量的參數。
void func(int i, double d, const std::string& s) { std::cout << i << ", " << d << ", " << s << std::endl; } int main() { std::thread t(func, 1, 12.50, "sample"); t.join(); return 0; }
盡管我們可以向線程函數傳遞任意數量的參數,但是,所有的參數都是按值傳遞的。如果需要將參數按引用進行傳遞,那么就一定要象下例所示一樣,把該參數封裝到 std::ref或者std::cref之中。
void func(int& a) { a++; } int main() { int a = 42; std::thread t(func, std::ref(a)); t.join(); std::cout << a << std::endl; return 0; }
上面程序打印結果為43,但要不是將a封裝到std::ref之中的話,輸出的將是42。
除join方法之外,這個線程類還提供了另外幾個方法:
swap: 將兩個線程對象的底層句柄進行交換
detatch: 允許執行該方法的線程獨立于本線程對象的執行而繼續執行。脫離后的線程就再也不能執行join了(你不能等待到它執行結束了)
<span style="font-family:'Courier New', Arial;font-size:9pt;line-height:1.5;">int</span><span style="font-family:'Courier New', Arial;font-size:9pt;line-height:1.5;"> main()</span> { std::thread t(funct); t.detach(); return 0; }
有一點非常重要,值得注意:線程函數中要是拋出了異常的話,使用通常的try-catch方式是捕獲不到該異常的。換句話說,下面這種做法行不通:
try { std::thread t1(func); std::thread t2(func); t1.join(); t2.join(); } catch(const std::exception& ex) { std::cout << ex.what() << std::endl; }
要在線程間傳遞異常,你可以先在線程函數中捕獲它們,然后再將它們保存到一個合適的地方,隨后再讓另外一個線程從這個地方取得這些異常。
std::vector<std::exception_ptr> g_exceptions; void throw_function() { throw std::exception("something wrong happened"); } void func() { try { throw_function(); } catch(...) { std::lock_guard<std::mutex> lock(g_mutex); g_exceptions.push_back(std::current_exception()); } } int main() { g_exceptions.clear(); std::thread t(func); t.join(); for(auto& e : g_exceptions) { try { if(e != nullptr) { std::rethrow_exception(e); } } catch(const std::exception& e) { std::cout << e.what() << std::endl; } } return 0; }
要獲得更多關于捕獲并傳遞異常的知識,你可以閱讀在主線程中處理工作線程拋出的C++異常以及怎樣才能在線程間傳遞異常?。
在深入討論之前還有一點值得注意,頭文件<thread>里還在命名空間std::this_thread中提供了一些輔助函數:
get_id: 返回膽怯線程的id
yield: 讓調度器先運行其它的線程,這在忙于等待狀態時很有用
sleep_for: 將當前線程置于阻塞狀態,時間不少于參數所指定的時間段
sleep_util: 在指定的時刻來臨前,一直將當前的線程置于阻塞狀態
鎖
在上一個例子中,我需要對g_exceptions這個vector進行同步訪問,以確保同一個時刻只能有一個線程向其中壓入新元素。為了實現同步,我使用了一個互斥量,并在該互斥量上進行了鎖定。互斥量是一個核心的同步原語,C++11的<mutex>頭文件中包含了四種不同的互斥量。
mutex: 提供了核心的lock()函數和unlock()函數,以及非阻塞式的try_lock()方法,該方法在互斥量不可用時會立即返回。
recursive_mutex: 運行在同一線程中,多次獲得同一個互斥量。
timed_mutex: 同第一條中的mutex類似,但它還帶來了另外兩個方法try_lock_for()和try_lock_until(),分別用于在某個時間段內或在某個時刻到來之前獲得該互斥量。
recursive_timed_mutex: 結合了第二條的timed_mutex和第三條的recusive_mutex。
以下所列就是一個使用std::mutex(注意其中get_id()和sleep_for()這兩個前文所述的輔助函數的用法)的例子。
#include <iostream> #include <thread> #include <mutex> #include <chrono> std::mutex g_lock; void func() { g_lock.lock(); std::cout << "entered thread " << std::this_thread::get_id() << std::endl; std::this_thread::sleep_for(std::chrono::seconds(rand() % 10)); std::cout << "leaving thread " << std::this_thread::get_id() << std::endl; g_lock.unlock(); } int main() { srand((unsigned int)time(0)); std::thread t1(func); std::thread t2(func); std::thread t3(func); t1.join(); t2.join(); t3.join(); return 0; }
其輸出將類似如下所示:
entered thread 10144 leaving thread 10144 entered thread 4188 leaving thread 4188 entered thread 3424 leaving thread 3424
lock()和unlock()這兩個方法顧名思義,頭一個方法用來對互斥量進行加鎖,如果互斥量不可得便會處于阻塞狀態;第二個方法用來對互斥量進行解鎖。
接下來的這個例子演示的是一個簡單的線程安全的容器(內部使用的是std::vector)。這個容器具有添加單個元素的add()方法以及添加一批元素的addrange()方法,addrange()方法內只是簡單的調用了add()方法。
template <typename T> class container { std::mutex _lock; std::vector<T> _elements; public: void add(T element) { _lock.lock(); _elements.push_back(element); _lock.unlock(); } void addrange(int num, ...) { va_list arguments; va_start(arguments, num); for (int i = 0; i < num; i++) { _lock.lock(); add(va_arg(arguments, T)); _lock.unlock(); } va_end(arguments); } void dump() { _lock.lock(); for(auto e : _elements) std::cout << e << std::endl; _lock.unlock(); } }; void func(container<int>& cont) { cont.addrange(3, rand(), rand(), rand()); } int main() { srand((unsigned int)time(0)); container<int> cont; std::thread t1(func, std::ref(cont)); std::thread t2(func, std::ref(cont)); std::thread t3(func, std::ref(cont)); t1.join(); t2.join(); t3.join(); cont.dump(); return 0; }
這個程序執行起來會進入死鎖狀態。其原因在于,該容器多次嘗試獲取同一個互斥量而之前卻并沒有釋放該互斥量,這么做是行不通的。這正是std::recursive_mutex的用武之地,它允許同一個線程多次獲得同一個互斥量,可重復獲得的最大次數并未具體說明,但一旦查過一定次數,再對lock進行調用就會拋出std::system錯誤。為了修復上面所列代碼的死鎖問題(不通過修改addrange方法的實現,讓它不對lock和unlock方法進行調用),我們可以將互斥量改為std::recursive_mutex。
template <typename T> class container { std::recursive_mutex _lock; // ... };
經過修改之后,該程序的輸出會同如下所示類似:
6334 18467 41 6334 18467 41 6334 18467 41
明眼的讀者可能已經發現了,每次調用func()所產生的數字序列都完全相同。這是因為對srad的初始化是要分線程進行的,對srand()的調用只是在主線程中進行了初始化。在其它的工作線程中,srand并沒有得到初始化,所以每次產生的數字序列就是完全相同的了。
顯式的加鎖和解鎖可能會導致一定的問題,比如忘了解鎖或者加鎖的順序不對都有可能導致死鎖。本標準提供了幾個類和函數用于幫助解決這類問題。使用這些封裝類就能夠以相互一致的、RAII風格的方式使用互斥量了,它們可以在相應的代碼塊的范圍內進行自動的加鎖和解鎖動作。這些封裝類包括:
lock_guard: 該類的對象在構造之時會試圖獲得互斥量的擁有權(通過調用lock()實現),而在析構之時會自動釋放它所獲得的互斥量(通過調用unlock()實現)。這是一個不可復制的類。
unique_lock: 是一個通用的互斥量封裝類。與lock_quard不同,它還支持延遲加鎖、時間鎖、遞歸鎖、鎖所有權的轉移并且還支持使用條件變量。這也是一個不可復制的類,但它是可以移動的類。
使用這些封裝類,我們可以象這樣來改寫我們的容器:
template <typename T> class container { std::recursive_mutex _lock; std::vector<T> _elements; public: void add(T element) { std::lock_guard<std::recursive_mutex> locker(_lock); _elements.push_back(element); } void addrange(int num, ...) { va_list arguments; va_start(arguments, num); for (int i = 0; i < num; i++) { std::lock_guard<std::recursive_mutex> locker(_lock); add(va_arg(arguments, T)); } va_end(arguments); } void dump() { std::lock_guard<std::recursive_mutex> locker(_lock); for(auto e : _elements) std::cout << e << std::endl; } };
有人會說,既然dump()方法并不會對容器的狀態做出任何修改,所以它應該定義為congst的方法。但要是你真的這么改了之后,編譯器就會報告出如下的錯誤:
‘std::lock_guard<_Mutex>::lock_guard(_Mutex &)' : cannot convert parameter 1 from ‘const std::recursive_mutex' to ‘std::recursive_mutex &'
互斥量(無論使用的是哪一種實現)必須要獲得和釋放,這就意味著要調用非常量型的lock()和unlock()方法。所以,從邏輯上講,lock_guard不能在定義中添加const(因為該方法定義為const的話,互斥量也就必需是const的了)這個問題有個解決辦法,可以讓 mutex變為mutable的。成為 mutable之后就可以在const函數中對狀態進行修改了。不過,這種用法應該只用于隱藏的或者“元”狀態(比如,對計算結果或者查詢到的數據進行緩存,以供下次調用時直接使用而無需再次計算或查詢;再比如,對 只是對對象的實際狀態起著輔助作用的互斥量中的位進行修改)。
template <typename T> class container { mutable std::recursive_mutex _lock; std::vector<T> _elements; public: void dump() const { std::lock_guard<std::recursive_mutex> locker(_lock); for(auto e : _elements) std::cout << e << std::endl; } };
這些封裝類都具有可以接受一個用來指導加鎖策略的參數的構造器,可用的加鎖策略有:
defer_lockof typedefer_lock_t: 不要取得互斥量的擁有權
try_to_lockof typetry_to_lock_t: 在不會被阻塞的情況下嘗試獲得互斥量的擁有權
adopt_lockof typeadopt_lock_t: 假設調用線程已經獲得了互斥量的擁有權
這些策略的定義如下所示:
struct defer_lock_t { }; struct try_to_lock_t { }; struct adopt_lock_t { }; constexpr std::defer_lock_t defer_lock = std::defer_lock_t(); constexpr std::try_to_lock_t try_to_lock = std::try_to_lock_t(); constexpr std::adopt_lock_t adopt_lock = std::adopt_lock_t();
除了這些互斥量的封裝類,本標準還提供了幾個用來對一個或多個互斥量進行加鎖的方法。
lock: 使用一種可避免死鎖的算法對互斥量進行加鎖(通過調用tolock()、try_lock()以及unlock())。
try_lock: 通過調用try_lock()i按照參數里指定的互斥量的順序對多個互斥量進行加鎖。
這里舉一個造成死鎖的例子:我們有一個保存元素的容器,還有一個叫做exchange()的方法,用來將一個元素從一個容器中取出來放入另外一個容器。為了成為線程安全的函數,這個函數通過獲得每個容器的互斥量,對兩個容器的訪問進行了同步處理。
template <typename T> class container { public: std::mutex _lock; std::set<T> _elements; void add(T element) { _elements.insert(element); } void remove(T element) { _elements.erase(element); } }; void exchange(container<int>& cont1, container<int>& cont2, int value) { cont1._lock.lock(); std::this_thread::sleep_for(std::chrono::seconds(1)); // <-- forces context switch to simulate the deadlock cont2._lock.lock(); cont1.remove(value); cont2.add(value); cont1._lock.unlock(); cont2._lock.unlock(); }
假設這個函數是從兩個不同的線程中進行調用的,在第一個線程中有一個元素從第一個容器中取出來,放到了第二個容器中,在第二個線程中該元素又從第二個容器中取出來放回到了第一個容器中。這樣會導致死鎖(如果線程上下文正好在獲得第一個鎖的時候從一個線程切換到了另一個線程的時候就會發生死鎖)。
int main() { srand((unsigned int)time(NULL)); container<int> cont1; cont1.add(1); cont1.add(2); cont1.add(3); container<int> cont2; cont2.add(4); cont2.add(5); cont2.add(6); std::thread t1(exchange, std::ref(cont1), std::ref(cont2), 3); std::thread t2(exchange, std::ref(cont2), std::ref(cont1), 6); t1.join(); t2.join(); return 0; }
要解決該問題,你可以使用以能夠避免死鎖的方式獲得鎖的std::lock:
void exchange(container<int>& cont1, container<int>& cont2, int value) { std::lock(cont1._lock, cont2._lock); cont1.remove(value); cont2.add(value); cont1._lock.unlock(); cont2._lock.unlock(); }
條件變量
C++11還提供了對另外一個同步原語的支持,這個原語就是條件變量。使用條件變量可以將一個或多個線程進入阻塞狀態,直到收到另外一個線程的通知,或者超時或者發生了虛假喚醒,才能退出阻塞狀態。頭文件<condition_variable>中包含的條件變量有兩種實現:
condition_variable: 要求任何想等待該條件變量的線程必需先獲得std::unique_lock鎖。
condition_variable_any: 該實現更加通用,它可以用于任何滿足基本條件的鎖(只要實現了lock()和unlock()方法即可)。因為它使用起來代價要更高一些(從性能和操作系統的字樣的角度講),所以,應該在只有它所提供的額外的靈活性是必不可少的情況下才會選用它。
下面說說條件變量的工作原理:
必須至少要有一個等待條件變為true的線程。等待中的線程必須首先獲得一個unique_lock鎖。 該鎖將會傳遞給wait()方法,然后wait()方法會釋放互斥量并將該線程暫停,直到條件變量得到相應的信號。當接受到信號,線程被喚醒后,該鎖就又被重新獲得了。
必須至少要有一個線程發送信號使得條件變為true。信號可以通過調用notify_one()來發送,發用這個方法發送后就會將處于阻塞狀態的等待該條件獲得信號的線程中的某一個線程(任意一個線程)恢復執行;還可以通過調用notify_all()將等待該條件的所以線程喚醒。
因為在多處理器的環境下,要讓條件喚醒成為完全可預測會有一些復雜情況難以克服,所以就會出現一些虛假喚醒。也就是說,線程甚至在沒有人向條件變量發送信號的情況下就有可能會被喚醒。因此,在線程喚醒后,仍然需要檢測條件是不是還為true。而且因為虛假喚醒可能會多次發生,所以該檢測必須用一個循環來進行。
以下代碼給出了一個利用狀態變量來同步線程的例子:幾個工作線程可能在他們運行的時候產生錯誤并且他們把這些錯誤放到隊列里面。一個記錄線程會通過從隊列得到并輸出錯誤來處理這些錯誤代碼。當有錯誤發生的時候,工作線程會發信號給記錄線程。記錄線程一直在等待著狀態變量接收信號。為了防止虛假的喚醒,所以記錄線程的等待是發生在一個以檢測布爾值(boolean)的循環之中的。
#include <thread> #include <mutex> #include <condition_variable> #include <iostream> #include <queue> #include <random> std::mutex g_lockprint; std::mutex g_lockqueue; std::condition_variable g_queuecheck; std::queue<int> g_codes; bool g_done; bool g_notified; void workerfunc(int id, std::mt19937& generator) { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\trunning..." << std::endl; } // simulate work std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); // simulate error int errorcode = id*100+1; { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl; } // notify error to be logged { std::unique_lock<std::mutex> locker(g_lockqueue); g_codes.push(errorcode); g_notified = true; g_queuecheck.notify_one(); } } void loggerfunc() { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\trunning..." << std::endl; } // loop until end is signaled while(!g_done) { std::unique_lock<std::mutex> locker(g_lockqueue); while(!g_notified) // used to avoid spurious wakeups { g_queuecheck.wait(locker); } // if there are error codes in the queue process them while(!g_codes.empty()) { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl; g_codes.pop(); } g_notified = false; } } int main() { // initialize a random generator std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count()); // start the logger std::thread loggerthread(loggerfunc); // start the working threads std::vector<std::thread> threads; for(int i = 0; i < 5; ++i) { threads.push_back(std::thread(workerfunc, i+1, std::ref(generator))); } // work for the workers to finish for(auto& t : threads) t.join(); // notify the logger to finish and wait for it g_done = true; loggerthread.join(); return 0; } Running this code produces an output that looks like this (notice this output is different with each run because each worker thread works, i.e. sleeps, for a random interval): [logger] running... [worker 1] running... [worker 2] running... [worker 3] running... [worker 4] running... [worker 5] running... [worker 1] an error occurred: 101 [worker 2] an error occurred: 201 [logger] processing error: 101 [logger] processing error: 201 [worker 5] an error occurred: 501 [logger] processing error: 501 [worker 3] an error occurred: 301 [worker 4] an error occurred: 401 [logger] processing error: 301 [logger] processing error: 401
如上所示的wait()方法有兩個重載:
1.一個是只有一個唯一鎖;這個重載釋放鎖,封鎖線程和把線程加入都是等待這一個狀態變量的線程隊列里面;當狀態變量被信號通知后或者是一個假喚醒發生,這些線程就會被喚醒。但他們中任何一個發生時,鎖就被重新獲得然后函數返回。
2.另外一個是對于唯一鎖的添加,它也是使用一個循環的謂語直到它返回false;這個重載可以用來防止假式喚醒。它基本上是與以下是等價的:
while(!predicate()) wait(lock);
因此在上例中,通過使用重載的wait函數以及一個驗證隊列狀態(空或不空)的斷言,就可以避免使用布爾變量g_notified了:
void workerfunc(int id, std::mt19937& generator) { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\trunning..." << std::endl; } // simulate work std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); // simulate error int errorcode = id*100+1; { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[worker " << id << "]\tan error occurred: " << errorcode << std::endl; } // notify error to be logged { std::unique_lock<std::mutex> locker(g_lockqueue); g_codes.push(errorcode); g_queuecheck.notify_one(); } } void loggerfunc() { // print a starting message { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\trunning..." << std::endl; } // loop until end is signaled while(!g_done) { std::unique_lock<std::mutex> locker(g_lockqueue); g_queuecheck.wait(locker, [&](){return !g_codes.empty();}); // if there are error codes in the queue process them while(!g_codes.empty()) { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "[logger]\tprocessing error: " << g_codes.front() << std::endl; g_codes.pop(); } } }
除了這個wait()重載方法,還有另外兩個進行類似重載的等待方法,都有用了一個用來避免虛假喚醒的斷言:
wait_for: 在條件變量收到信號或者指定的超時發生前,一直都將線程置于阻塞狀態。
wait_until: 在條件變量收到信號或者指定的時刻到來前,一直都將線程處于阻塞狀態。
這兩個函數不帶斷言的重載函數會返回一個cv_status狀態,該狀態用來表明線程被喚醒了到底是因為發生了超時還是因為條件變量收到了信號抑或是發生了虛假喚醒。
本標準還提供了一個叫做notified_all_at_thread_exit的函數,它實現了一種機制,在該機制下,我們可以通知其它線程,某個給定的線程執行結束了,并銷毀了所有的thread_local對象。之所以引入該函數,是因為如果使用了thread_local后,采用join()之外的機制等待線程可能會導致不正確甚至是致命的行為,出現這樣的問題是因為 thread_local的析構函數甚至可能會在原本處于等待中的線程繼續執行后被執行了而且還可能已經執行完成了。(有關這方面更多的情況可參見N3070和N2880)。 一般情況下,notified_all_at_thread_exitTypically必須正好在線程生成前調用。下面給出一個例子,演示一下 notify_all_at_thread_exit是如何同condition_variable一起使用來對兩個線程進行同步處理的:
std::mutex g_lockprint; std::mutex g_lock; std::condition_variable g_signal; bool g_done; void workerfunc(std::mt19937& generator) { { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "worker running..." << std::endl; } std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "worker finished..." << std::endl; } std::unique_lock<std::mutex> lock(g_lock); g_done = true; std::notify_all_at_thread_exit(g_signal, std::move(lock)); } int main() { // initialize a random generator std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count()); std::cout << "main running..." << std::endl; std::thread worker(workerfunc, std::ref(generator)); worker.detach(); std::cout << "main crunching..." << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1 + generator() % 5)); { std::unique_lock<std::mutex> locker(g_lockprint); std::cout << "main waiting for worker..." << std::endl; } std::unique_lock<std::mutex> lock(g_lock); while(!g_done) // avoid spurious wake-ups g_signal.wait(lock); std::cout << "main finished..." << std::endl; return 0; }
如果工作線程是在主線程結束之前結束的,輸出將會是如下所示:
main running... worker running... main crunching... worker finished... main waiting for worker... main finished...
如果是主線程在工作線程結束之前結束的,輸出將會是如下所示:
main running... worker running... main crunching... main waiting for worker... worker finished... main finished...
結束語
C++11標準使得C++開發人員能夠以一種標準的和平臺獨立的方式來編寫多線程代碼。本文一一講述了標準所支持的線程和同步機制。<thread>頭文件提供了名為thread的類(另外還包含了一些輔助類或方法),該類代表了一個執行線程。頭文件<mutex>提供了幾種互斥量的實現,以及對線程進行同步訪問的封裝類。頭文件<condition_variable>為條件變量提供了兩種實現,利用這些實現可以讓一個或多個線程進入阻塞狀態,直到從收到來自另外一個或多個線程的通知、或者發生超時或虛假喚醒為止才會被喚醒。推薦在這方面再閱讀一些別的資料來獲得更詳細的信息。
到此,關于“C++11中線程、鎖和條件變量的介紹”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。