您好,登錄后才能下訂單哦!
本篇內容介紹了“Golang Mutex互斥鎖實例代碼分析”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
Mutex
就是一把互斥鎖,可以想象成一個令牌,有且只有這一個令牌,只有持有令牌的 goroutine
才能進入房間(臨界區),在房間內執行完任務后,走出房間并把令牌交出來,如果還有其余的 goroutine
等著獲取這個令牌,讓他們再去搶這個令牌,搶到的重復上述過程,沒搶到的繼續等。
上述是從宏觀角度來看待互斥鎖的,但是在 Mutex
內部,有著非常復雜的搶鎖邏輯,Mutex
的發展也經歷了幾個版本,我們可以用拿令牌進餐廳吃飯來形象比喻下幾個主要版本的變化。
前提:餐廳一次只能進入一個人,餐廳有一個令牌,只有持有這個令牌的人才能進去;從餐廳出來后,需要把這個令牌歸還
版本一
餐廳在門外設置了一個隊伍,如果令牌空閑,拿著令牌去餐廳用餐;如果令牌不是空閑的,新來的人就要去隊伍后面排隊等待叫號。(不是空閑包含兩種情況:持有令牌的人在餐廳里面,隊伍是空的;隊伍有人排隊。)
此版本的問題就是:只要令牌不是空閑的,新來的人必須直接去排隊,沒有商量的余地。這樣看起來很公平,遵循先來后到的原則,但是對于餐廳來說,營業效率就會有所降低,即單位時間內接待顧客的數量(IO)會減少。為什么這樣說呢,舉個例子,有個顧客從餐廳出來歸還令牌后,需要去等待隊列去叫號,被叫到號的這個人需要花費時間走到餐廳(獲取到CPU),這中間就浪費了不少時間。
版本二
為了提高營業效率,允許剛到門口的顧客和被叫到號的顧客一起去搶令牌,而不是直接去排隊,這樣就給了新人機會。舉個例子:當持有令牌的人從餐廳出來歸還令牌后,去等待隊列叫個號,如果此時有顧客剛到門口,被叫到號的和新到的顧客一起搶令牌,搶到的就可以直接進入餐廳,搶不到的接著去排隊,由于剛到的顧客離門口近(正在占據CPU),被叫到號的顧客離得遠(需要等CPU),而且剛到的顧客可能不只一個,所以被叫到號的顧客很大概率搶不到令牌,可能還沒走到門口(還沒獲取到CPU)就被新來的顧客搶走了。不管怎么樣,這樣提高了餐廳的效率,可以在單位時間內接待更多的客戶。
版本三
餐廳發現有些人用餐很快,如果讓搶不到令牌的先別直接去排隊,而是在門口轉悠會(當然不能一直轉悠,有條件限制,到了限制還是要去排隊),這種方式類似樂觀鎖,那么有顧客從餐廳出來后,就不用去叫號了,直接讓門口的這些顧客繼續搶就行了,這樣就進一步提高了餐廳的運行效率,畢竟叫號真的太浪費時間了。
版本四
經過了多個版本的優化,餐廳的運營效率是越來越高了,但是有些人可要準備要罵娘了,這些人是誰呢,當然是已經在隊伍里等待的那些人。由于給了新人機會,如果持續有新顧客來,那么已經在隊伍里的那些人永遠也拿不到令牌,可真的要餓死了。
Mutex
在這個版本只為三件事:公平、公平、還是tm的公平!堅持讓每一個人都不餓肚子的原則,餐廳搞出了一個新的模式:饑餓模式。如果有顧客等的時間超過了閾值(1ms),餐廳變為饑餓模式,在該模式下,所有新來的顧客直接去排隊,然后按照先來先到的順序,依次將令牌給等待隊列隊首的顧客。
那么什么時候由饑餓模式變為正常模式呢?當拿到令牌的顧客發現自己從等待到拿到令牌的時間小于閾值(1ms)了,或者等待隊伍沒人等了,此時餐廳就變為正常模式,畢竟上述兩個條件都說明當前餐廳競爭不是很激烈了。
同時這個版本修復了以前的一個問題:之前從等待隊列喚醒的顧客如果沒有搶到令牌,再回到隊列后是插到隊尾,這樣對已經排到第一位的顧客太不友好了。在這個版本中修復了該問題,喚醒的顧客如果沒有搶到令牌,直接插入到隊首,下次叫號還是他。
特性總結
經過了多次迭代,目前的版本有了如下特性:
給新人機會:讓剛來的顧客和從隊列喚醒的顧客一起去搶令牌,喚醒也是按照先來先到的原則喚醒;
保持樂觀態度:沒搶到不是直接去排隊,而是可以在門口轉悠會,說不定里面的人馬上就出來了;
正常模式和饑餓模式的切換:為了公平起見,正常模式下給了新人機會,一起去搶令牌;饑餓模式下照顧老人,所有人老老實實排隊,按照先來先到的順序拿令牌。整個餐廳既保持了公平,又提高了運行效率,一切井然有序起來了。
回歸正題
讓我們從餐廳回到 Go
中來,Mutex
有兩種模式:正常模式和饑餓模式:
正常模式下,如果當前鎖正在被持有,搶不到鎖的就會進入一個先進先出的等待隊列。當持有鎖的 goroutine
釋放鎖之后,按照從前到后的順序喚醒等待隊列的第一個等待者,但是不會直接給被喚醒者鎖,還是需要他去搶,即在喚醒等待隊列等待者這個時間,同時也會有正在運行且還未進入等待隊列的 goroutine
正在搶鎖 (數量可能還很多),這些都會和剛喚醒的等待者一起去搶,剛喚醒的可能還沒有分到 CPU
,而正在運行的正在占據了CPU
,所以正在運行的更有可能獲取到鎖,被喚醒的等待者可能搶鎖失敗。如果等待者搶鎖失敗,他會被放到等待隊列的隊首,如果超過 1ms
都沒搶到鎖,就會從 正常模式 切換到 饑餓模式。
饑餓模式下,要釋放鎖的 goroutine
會將鎖直接交給等待隊列的第一個等待者,不需要去搶了,而且新來的 goroutine
也不會嘗試去搶鎖,直接加入到等待隊列的尾部。那么什么時候會從饑餓模式切換到正常模式呢:
(1)如果當前被喚醒的等待者獲得到鎖后,發現自己是隊列中的最后一個,隊列中沒有其他等待者了,此時會切換到正常模式
(2)如果當前被喚醒的等待者獲得到鎖后,發現自己總共的等待時間不超過 1ms
,就獲得到鎖了,此時也會切換到正常模式
正常模式會帶來更高的吞吐量:一個 goroutine
要釋放鎖,更大可能會被正在運行的 goroutine
搶到,這就避免了協程的上下文切換,運行更多的 goroutine
,但是有可能造成一個問題,就是鎖始終被新來的 goroutine
搶走,在等待隊列中的等待者始終搶不到鎖,這就會導致饑餓問題。饑餓模式就是為了解決這個問題出現的,保證了每個 goroutine
都有運行的機會,防止等待時間過長。
// 互斥鎖 type Mutex struct { state int32 // 狀態 sema uint32 // 信號量 } const ( mutexLocked = 1 << iota // 1 mutexWoken // 2 mutexStarving // 4 mutexWaiterShift = iota // 3 starvationThresholdNs = 1e6 // 判斷是否要進入饑餓狀態的閾值 )
信號量 sema 就相當于我們說的令牌,state 是 int32 類型,一共 32位,通過每個位記錄了當前的狀態:
state字段
mutexLocked:當前是否已經上鎖,state & mutexLocked = 1
表示已經上鎖;
mutexWoken:標記當前是否有喚醒的 goroutine,state & mutexWoken = 1
表示有喚醒的goroutine;
mutexStarving:當前是否為饑餓狀態,state & mutexWoken = 1
表示處于饑餓狀態;
mutexWaiterShift:29位,state >> mutexWaiterShift
得到等待者的數量;
Lock()
加鎖方法分為兩部分,第一部分是 fast path
,可以理解為快捷通道,如果當前鎖沒被占用,直接獲得鎖返回;否則需要進入 slow path
,判斷各種條件去競爭鎖,主要邏輯都在此處。
了解過原子操作的同學,對 CompareAndSwap(CAS)
應該不陌生,CompareAndSwapInt32(addr *int32, old, new int32)
有三個參數,如果地址 addr
指向的值與 old
相等,則將 addr
的值改為 new
,否則不變,也就是說在我們修改前,如果有人修改了 addr
指向的值,本次修改就會失敗。
// 上鎖 func (m *Mutex) Lock() { // fastpath:期望當前鎖沒有被占用,可以快速獲取到鎖, CAS 修改 state 最后一位的值為1(標記鎖是否被占用) if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } // Slow path : 單獨抽出來放到一個函數里,方便 fast path 被內聯 m.lockSlow() }
func (m *Mutex) lockSlow() { var waitStartTime int64 // // 記錄等待時間 starving := false // 當前的 goroutine 是否已經饑餓了(如果已經饑餓,就會將 state 的饑餓狀態置為 1) awoke := false // 當前的 goroutine 是否被喚醒的 iter := 0 // 自旋次數 old := m.state // 保存當前的 state 狀態 for { /* 自旋:如果滿足如下條件,就會進入 if 語句,然后 continue,不斷自旋: 1. 鎖被占用,且不處于饑餓模式(饑餓狀態直接去排隊,不允許嘗試獲取鎖) 2. 基于當前自旋的次數,再次自旋有意義 runtime_canSpin(iter) 那么退出自旋的條件也就是: 1. 鎖被釋放了,當前處于沒被占用狀態(說明等到了,該goroutine就會立即去獲取鎖) 2. mutex進入了饑餓模式,不自旋了,沒意義(饑餓狀態會直接把鎖交給等待隊列隊首的goroutine) 3. 不符合自旋狀態(自旋次數太多了,自旋失去了意義) 如下代碼是位操作: mutexLocked|mutexStarving = 00000...101 mutexLocked = 00000...001 如果要滿足 old & 00000...101 = 00000...001,需要 old = ...0*1,即狀態為:鎖被占用,且不處于饑餓狀態 runtime_canSpin(iter) 會根據自旋次數,判斷是否可以繼續自旋 */ if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { /* 如果 1. 當前goroutine不是被喚醒的 (awoke=false) 2. 鎖狀態喚醒標志位為0(old&mutexWoken == 0) 3. 等待者數量不為0 (old>>mutexWaiterShift != 0 右移三位得到的就是等待者數量) 那么利用CAS,將 state 的喚醒標記置為1,標記自己是被喚醒的 (將state的喚醒標記置為1,說明外面有喚醒著的goroutine,那么在釋放鎖的時候,就不去等待隊列叫號了,畢竟已經有喚醒的了) 如果有其他 goroutine 已經設置了 state 的喚醒標記位,那么本次就會失敗 */ if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() // 迭代次數加一 iter++ // 獲取最新的狀態 old = m.state // 想再次自旋,看看鎖釋放了沒 continue } // 到這里,說明退出了自旋,當前鎖沒被占用 或者 系統處于饑餓模式 或者 自旋次數太多導致不符合自旋條件 // new 代表當前goroutine 基于當前狀態要設置的新狀態 new := old // 只要不是饑餓狀態,就需要獲取鎖(饑餓狀態直接去排隊,不能搶鎖) if old&mutexStarving == 0 { new |= mutexLocked } // 鎖被占用 或者 處于饑餓模式下,新增一個等待者 if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 當前 goroutine 已經進入饑餓了,且鎖還沒有釋放,需要把 Mutex 的狀態改為饑餓狀態 if starving && old&mutexLocked != 0 { new |= mutexStarving } // 如果是被喚醒的,把喚醒標志位置0,表示外面沒有被喚醒的goroutine了(搶到就獲得鎖、搶不到就睡眠,把喚醒標志置0) if awoke { // 由于是被喚醒的,new 里面的 喚醒標記位一定是 1 if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // a &^ b 的意思就是 清零a中,ab都為1的位,即清除喚醒標記 new &^= mutexWoken } /* 利用CAS,將狀態設置為新的 1. 如果是饑餓狀態,只增加一個等待者數量 2. 正常狀態,加鎖標記置為 1,如果鎖已被占用增加一個等待者數量 3. 如果當前 goroutine 已經饑餓了,將 饑餓標記 置為 1 4. 如果是被喚醒的,清除喚醒標記 */ if atomic.CompareAndSwapInt32(&m.state, old, new) { // 如果改狀態之前,鎖未被占用 且 處于正常模式,那么就相當于獲取到鎖了 if old&(mutexLocked|mutexStarving) == 0 { break } // 到這里說明:1. 之前鎖被占用 或者 2.之前是處于饑餓狀態 // 判斷之前是否等待過(是否從隊列里喚醒的),之前等待過,再次排隊放在隊首 queueLifo := waitStartTime != 0 // 如果之前沒等過(新來的),設置等待起始時間當前時間 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // 之前排過隊的老人,放到等待隊列隊首;新人放到隊尾,然后等待獲取信號量 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 鎖被釋放,goroutine 被喚醒 // 設置當前 goroutine 饑餓狀態,如果之前已經饑餓,或者距離等待開始時間超過了 1ms,也變饑餓 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 獲取最新的狀態 old = m.state // 如果 state 饑餓標記為1,說明當前在饑餓模式,饑餓模式下被喚醒,已經獲取到鎖了; // 饑餓狀態下,釋放鎖沒有更新等待者數量和饑餓標記,需要獲得鎖的goroutine去更新狀態 if old&mutexStarving != 0 { // 正確性校驗: // 1. 鎖還是鎖住的狀態(鎖已經釋放給當前goroutine了,不應該被鎖住) // 2. 或者有被喚醒的goroutine(饑餓模式下不應該有醒著的goroutine,都應該去乖乖等著) // 3. 或者當前goroutine 的等待者數量為0(當前goroutine就是等待者) // 這三種情況不應該出現,與預期狀態不符 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 加鎖,減去一個等待者 delta := int32(mutexLocked - 1<<mutexWaiterShift) // 如果當前的 goroutine 非饑餓,或者等待者只有一個(也就是只有當前goroutine,等待隊列空了),可以取消饑餓狀態,進入正常狀態 if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } // 修改狀態: // 加鎖,減去一個等待者:m.state + mutexLocked - 1<<mutexWaiterShift : // 滿足非饑餓條件,加鎖,減去一個等待者,取消饑餓狀態: // m.state + mutexLocked - 1<<mutexWaiterShift - mutexStarving: atomic.AddInt32(&m.state, delta) // 饑餓模式下被喚醒,相當于獲得鎖了,可以結束 break } // 之前是處于鎖被占用且非饑餓狀態,被喚醒,去繼續搶鎖 awoke = true // 新喚醒的,自旋數量置0 iter = 0 } else { // 修改新狀態失敗,狀態有更新,需要重試 old = m.state } } }
加鎖的這部分代碼,新來的 goroutine
或者從隊列里面喚醒的 goroutine
都會進入如下邏輯,相當于給新人機會
:
1.樂觀態度的自旋:判斷是否可以自旋,如果可以自旋,就自旋等待;如果有可能,把喚醒標記位置為1,標記外面有喚醒的 goroutine
,釋放鎖的時候就不會去隊列里面喚醒了,畢竟已經有人在等待了;
2.修改系統狀態:跳出自旋后,每個 goroutine
根據當前系統狀態修改系統狀態:
非饑餓狀態,想要加鎖(如果本來就是加鎖狀態,將加鎖位 設置為 1 相當于不變)
鎖被占用 或者 處于饑餓模式下,新增一個等待者
當前 goroutine
已經進入饑餓了,且鎖還沒有釋放,需要把 Mutex
的狀態改為饑餓狀態
如果當前 goroutine
是被喚醒的,清除系統喚醒標記
3.利用 CAS
修改系統狀態,同一時刻只有一個 goroutine
能夠設置成功,但是設置成功并不代表獲取到鎖了:
之前是非上鎖的正常狀態,設置成功說明本次搶鎖成功
,可以返回去操作臨界區了;
之前是上鎖狀態或者饑餓狀態,本次只是新增了一個等待者,然后根據是否是新來的,去隊列隊尾或者隊首排隊,等待叫號;
4.從隊列中被叫號喚醒,不一定是獲取到鎖了:
當前是饑餓狀態,那么一定是獲取到鎖了,因為饑餓狀態只把鎖給隊列的第一個 goroutine
非饑餓狀態,將自己狀態置為喚醒,再去搶鎖,重復上述過程
問:系統會不會同時存在 喚醒標志和饑餓標志都為1 的情況呢?
答:不會。只有等待時間大于 1ms
的才會去設置饑餓標記,也就是只有從隊列喚醒的才會去設置,那么從隊列中喚醒的 goroutine
,自身的 awoke=true
,每當去設置饑餓標記的時候會把喚醒標記清除。
Unlock()
解鎖方法也分為兩部分,第一部分是 fast path
,可以理解為快捷通道,直接把鎖狀態位清除,如果此時系統狀態恢復到初始狀態,說明沒有 goroutine 在搶鎖等鎖,直接返回,否則進入 slow path
;
slow path
會根據是否為饑餓狀態,做出不一樣的反應:
正常狀態:喚醒一個 goroutine
去搶鎖,等待者數量減一,并將喚醒狀態置為 1
;
饑餓狀態:直接喚醒等待隊列隊首的 goroutine
,鎖的所有權直接移交(修改等待者數量、是否取消饑餓標記,由喚醒的 goroutine
去處理)。
func (m *Mutex) Unlock() { // Fast path: 把鎖標記清除 new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // 清除完鎖標記,發現還有其他狀態,比如等待隊列不為空,需要喚醒其他 goroutine m.unlockSlow(new) } }
func (m *Mutex) unlockSlow(new int32) { /* 狀態正確性校驗: 1. 如果解鎖一個上鎖狀態的鎖,最后一位則為1,fast path 中 new 已經減去了1, 此時 new 最后一位應當為0 2. 如果解鎖一個未上鎖狀態的鎖,最后一位則為0,fast path 中 new 已經減去了1, 此時 new 最后一位應當為1 如果 (new+mutexLocked)&mutexLocked == 0,說明 new 當前最后一位是1,那么就是解鎖了一個沒有上鎖的鎖,狀態有誤 */ if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 正常模式,非饑餓,可能需要喚醒隊列中的 goroutine,饑餓狀態直接移交鎖 if new&mutexStarving == 0 { old := new for { /* 系統運轉正常,鎖可以正確交接,可以直接返回了: 1. 沒有等待者了 (沒有等鎖的了,去喚醒誰?) 2. 有喚醒狀態的 goroutine (自旋狀態的 goroutine,將喚醒狀態置為1) 3. 有 goroutine 已經獲取了鎖 (Unlock方法已經將鎖標記置為了0,可能自旋的此時已經搶到了鎖) */ if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 沒有喚醒狀態的 goroutine,喚醒一個去搶鎖 // 減去一個等待者,并且將 喚醒標記 置為 1 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { // 第二個參數為false, 喚醒隊首的 goroutine 去搶鎖(不一定能搶到) runtime_Semrelease(&m.sema, false, 1) return } // 上面 CAS 失敗,可能由于新增了一個等待者,for 循環重試 old = m.state } } else { /* 1. 第二個參數為 true,直接將鎖的所有權,交給等待隊列的第一個等待者 2. 注意,此時沒有設置 mutexLocked =1 ,被喚醒的 goroutine 會設置 3. 雖然沒有設置 mutexLocked ,但是饑餓模式下, Mutex 始終被認為是鎖住的,都會直接排隊等待移交鎖 */ runtime_Semrelease(&m.sema, true, 1) } }
“Golang Mutex互斥鎖實例代碼分析”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。