您好,登錄后才能下訂單哦!
這篇文章主要介紹了Golang鎖原理如何實現的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Golang鎖原理如何實現文章都會有所收獲,下面我們一起來看看吧。
鎖的本質,就是一種資源,是由操作系統維護的一種專門用于同步的資源
比如說互斥鎖,說白了就是一種互斥的資源。只能有一個進程(線程)占有。當一個進程(線程)通過競爭獲得鎖的時候,其他進程(或線程)將得不到這把鎖。這是內核代碼決定的
如果我們希望某種資源在多個進程(線程/協程)之間共享,但是某一時刻最多有一個進程占有,這不就是互斥鎖的概念嗎,也就是說,我們希望自己的資源也變成一種鎖
最簡單的辦法就是將自己的資源和操作系統定義好的鎖綁定到一起。也就是說,進程要獲取我的資源之前,必須要獲得操作系統的鎖。進一步說,得鎖得資源,失鎖失資源。這樣的話,我們的資源也變成了一把鎖
并發編程中保證數據一致性和安全性的
Golang的提供的同步機制有sync模塊下的Mutex、WaitGroup以及語言自身提供的chan等。 這些同步的方法都是以runtime中實現的底層同步機制(cas、atomic、spinlock、sem)為基礎的
cas(Compare And Swap)和原子運算是其他同步機制的基礎
原子操作:指那些不能夠被打斷的操作被稱為原子操作,當有一個CPU在訪問這塊內容addr時,其他CPU就不能訪問
CAS:比較及交換,其實也屬于原子操作,但它是非阻塞的,所以在被操作值被頻繁變更的情況下,CAS操作并不那么容易成功,不得不利用for循環以進行多次嘗試
自旋鎖是指當一個線程在獲取鎖的時候,如果鎖已經被其他線程獲取,那么該線程將循環等待,然后不斷地判斷是否能夠被成功獲取,知直到獲取到鎖才會退出循環。獲取鎖的線程一直處于活躍狀態
Golang中的自旋鎖用來實現其他類型的鎖,與互斥鎖類似,不同點在于,它不是通過休眠來使進程阻塞,而是在獲得鎖之前一直處于活躍狀態(自旋)
實現休眠和喚醒協程的一種方式
信號量有兩個操作P和V
P(S):分配一個資源
1. 資源數減1:S=S-1
2. 進行以下判斷
如果S<0,進入阻塞隊列等待被釋放
如果S>=0,直接返回V(S):釋放一個資源
1. 資源數加1:S=S+1
2. 進行如下判斷
如果S>0,直接返回
如果S<=0,表示還有進程在請求資源,釋放阻塞隊列中的第一個等待進程
golang中信號量操作:runtime/sema.go
P操作:runtime_Semacquire
V操作:runtime_Semrelease
mutex的使用
package main import ( "fmt" "sync" ) var num int var mtx sync.Mutex var wg sync.WaitGroup func add() { mtx.Lock() //mutex實例無需實例化,聲明即可使用 defer mtx.Unlock() defer wg.Done() num += 1 } func main() { for i := 0; i < 100; i++ { wg.Add(1) go add() } wg.Wait() fmt.Println("num:", num) }
mutex的必要性
鎖在高度競爭時會不斷掛起恢復線程從而讓出cpu資源,原子變量在高度競爭時會一直占用cpu;原子操作時線程級別的,不支持協程
type Mutex struct { state int32 sema uint32 } const ( mutexLocked = 1 << iota mutexWoken mutexWaiterShift = iota //根據 mutex.state >> mutexWaiterShift 得到當前等待的 goroutine 數目 )
state表示當前鎖的狀態,是一個共用變量
state: |32|31|....|3|2|1|
\__________/ | |
| | |
| | 當前mutex是否加鎖
| |
| 當前mutex是否被喚醒
|
等待隊列的goroutine協程數
Lock 方法申請對 mutex 加鎖的時候分兩種情況
無沖突 通過 CAS 操作把當前狀態設置為加鎖狀態
有沖突 通過調用 semacquire 函數來讓當前 goroutine 進入休眠狀態,等待其他協程釋放鎖的時候喚醒
//如果已經加鎖,那么當前協程進入休眠阻塞,等待喚醒 func (m *Mutex) Lock() { // 快速加鎖:CAS更新state為locked if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } awoke := false //當前goroutine是否被喚醒 for { old := m.state // 保存當前state的狀態 new := old | mutexLocked // 新值locked位設置為1 // 如果當前處于加鎖狀態,新到來的goroutine進入等待隊列 if old&mutexLocked != 0 { new = old + 1<<mutexWaiterShift } if awoke { //如果被喚醒,新值需要重置woken位為 0 new &^= mutexWoken } // 兩種情況會走到這里:1.休眠中被喚醒 2.加鎖失敗進入等待隊列 // CAS 更新,如果更新失敗,說明有別的協程搶先一步,那么重新發起競爭。 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 如果更新成功,有兩種情況 // 1.如果為 1,說明當前 CAS 是為了更新 waiter 計數 // 2.如果為 0,說明是搶鎖成功,那么直接 break 退出。 if old&mutexLocked == 0 { break } runtime_Semacquire(&m.sema) // 此時如果 sema <= 0 那么阻塞在這里等待喚醒,也就是 park 住。走到這里都是要休眠了。 awoke = true // 有人釋放了鎖,然后當前 goroutine 被 runtime 喚醒了,設置 awoke true } } if raceenabled { raceAcquire(unsafe.Pointer(m)) } }
UnLock 解鎖分兩步
解鎖,通過CAS操作把當前狀態設置為解鎖狀態
喚醒休眠協程,CAS操作把當前狀態的waiter數減1,然后喚醒休眠goroutine
//鎖沒有和某個特定的協程關聯,可以由一個協程lock,另一個協程unlock func (m *Mutex) Unlock() { if raceenabled { _ = m.state raceRelease(unsafe.Pointer(m)) } // CAS更新state的狀態為locked 注意:解鎖的瞬間可能會有新的協程到來并搶到鎖 new := atomic.AddInt32(&m.state, -mutexLocked) // 釋放了一個沒上鎖的鎖會panic:原先的lock位為0 if (new+mutexLocked)&mutexLocked == 0 { panic("sync: unlock of unlocked mutex") } //判斷是否需要釋放資源 old := new for { /** * 不需要喚醒的情況 * 1.等待隊列為0 * 2.已經有協程搶到鎖(上面的瞬間搶鎖) * 3.已經有協程被喚醒 */ if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 { return } //將waiter計數位減一,并設置state為woken(喚醒) //問:會同時有多個被喚醒的協程存在嗎 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema) // cas成功后,再做sema release操作,喚醒休眠的 goroutine return } old = m.state } }
知識點
使用&來判斷位值,使用|來設置位值,使用&^來清空位置(內存對齊)
一代互斥鎖的問題
處于休眠中的goroutine優先級低于當前活躍的,unlock解鎖的瞬間最新的goroutine會搶到鎖
大多數果鎖的時間很短,所有的goroutine都要休眠,增加runtime調度開銷
Lock 方法申請對 mutex 加鎖的時候分三種情況
無沖突 通過 CAS 操作把當前狀態設置為加鎖狀態
有沖突 開始自旋,并等待鎖釋放,如果其他 goroutine 在這段時間內釋放了該鎖,直接獲得該鎖;如果沒有釋放,進入3
有沖突 通過調用 semacquire 函數來讓當前 goroutine 進入等待狀態,等待其他協程釋放鎖的時候喚醒
func (m *Mutex) Lock() { //快速加鎖,邏輯不變 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } awoke := false iter := 0 for { old := m.state new := old | mutexLocked if old&mutexLocked != 0 { // 如果當前己經上鎖,那么判斷是否可以自旋 //短暫的自旋過后如果無果,就只能通過信號量讓當前goroutine進入休眠等待了 if runtime_canSpin(iter) { // Active spinning makes sense. /** * 自旋的操作:設置state為woken,這樣在unlock的時候就不會喚醒其他協程. * 自旋的條件: * 1.當前協程未被喚醒 !awoke * 2.其他協程未被喚醒 old&mutexWoken == 0 * 3.等待隊列大于0 */ if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } //進行自旋操作 runtime_doSpin() iter++ continue } new = old + 1<<mutexWaiterShit } if awoke { //todo 為什么加這個判斷 if new&mutexWoken == 0 { panic("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&mutexLocked == 0 { break } runtime_Semacquire(&m.sema) awoke = true iter = 0 } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
path: runtime/proc.go
const ( mutex_unlocked = 0 mutex_locked = 1 mutex_sleeping = 2 active_spin = 4 active_spin_cnt = 30 passive_spin = 1 ) /** * 有四種情況會返回false * 1.已經執行了很多次 iter >= active_spin 默認為4。避免長時間自旋浪費CPU * 2.是單核CPU ncpu <= 1 || GOMAXPROCS < 1 保證除了當前運行的Goroutine之外,還有其他的Goroutine在運行 * 3.沒有其他正在運行的p * 4 當前P的G隊列為空 避免自旋鎖等待的條件是由當前p的其他G來觸發,這樣會導致再自旋變得沒有意義,因為條件永遠無法觸發 */ func sync_runtime_canSpin(i int) bool { // sync.Mutex is cooperative, so we are conservative with spinning. // Spin only few times and only if running on a multicore machine and // GOMAXPROCS>1 and there is at least one other running P and local runq is empty. // As opposed to runtime mutex we don't do passive spinning here, // because there can be work on global runq or on other Ps. if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true } // 自旋邏輯 // procyeld函數內部循環調用PAUSE指令,PAUSE指令什么都不做,但是會消耗CPU時間 // 在這里會執行30次PAUSE指令消耗CPU時間等待鎖的釋放; func sync_runtime_doSpin() { procyield(active_spin_cnt) } TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AX again: PAUSE SUBL $1, AX JNZ again RET
問題:
還是沒有解決休眠進程優先級低的問題
基本邏輯
Mutex 兩種工作模式,normal 正常模式,starvation 饑餓模式。normal 情況下鎖的邏輯與老版相似,休眠的 goroutine 以 FIFO 鏈表形式保存在 sudog 中,被喚醒的 goroutine 與新到來活躍的 goroutine 競解,但是很可能會失敗。如果一個 goroutine 等待超過 1ms,那么 Mutex 進入饑餓模式
饑餓模式下,解鎖后,鎖直接交給 waiter FIFO 鏈表的第一個,新來的活躍 goroutine 不參與競爭,并放到 FIFO 隊尾
如果當前獲得鎖的 goroutine 是 FIFO 隊尾,或是等待時長小于 1ms,那么退出饑餓模式
normal 模式下性能是比較好的,但是 starvation 模式能減小長尾 latency
LOCK流程:
無沖突 通過 CAS 操作把當前狀態設置為加鎖狀態
有沖突 開始自旋 如果是饑餓模式禁止自旋,開始自旋,并等待鎖釋放,如果其他 goroutine 在這段時間內釋放了該鎖,直接獲得該鎖;如果沒有釋放,進入3
有沖突,且已經過了自旋階段 通過調用 semacquire 函數來讓當前 goroutine 進入等待狀態,等待其他協程釋放鎖的時候喚醒,休眠前:如果是饑餓模式,把當前協程放到隊列最前面;喚醒后:如果是饑餓模式喚醒的,直接獲得鎖
type Mutex struct { state int32 sema **uint32** } // A Locker represents an object that can be locked and unlocked. type Locker interface { Lock() Unlock() } //為什么使用位掩碼表達式 //第3位到第32位表示等待在mutex上協程數量 const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving //新增饑餓狀態 mutexWaiterShift = iota starvationThresholdNs = 1e6 //饑餓狀態的閾值:等待時間超過1ms就會進入饑餓狀態 ) func (m *Mutex) Lock() { //快速加鎖:邏輯不變 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } var waitStartTime int64 //等待時間 starving := false //饑餓標記 awoke := false //喚醒標記 iter := 0 //循環計數器 old := m.state //保存當前鎖狀態 for { // 自旋的時候增加了一個判斷:如果處于饑餓狀態就不進入自旋,因為饑餓模式下,釋放的鎖會直接給等待隊列的第一個,當前協程直接進入等待隊列 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } runtime_doSpin() iter++ old = m.state continue } new := old // 當mutex不處于饑餓狀態的時候,將new值設置為locked,也就是說如果是饑餓狀態,新到來的goroutine直接排隊 if old&mutexStarving == 0 { new |= mutexLocked } // 當mutex處于加鎖鎖或者饑餓狀態時,新到來的goroutine進入等待隊列 if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 當等待時間超過閾值,當前goroutine切換mutex為饑餓模式,如果未加鎖,就不需要切換 if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { // mutex 處于未加鎖,正常模式下,當前 goroutine 獲得鎖 if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 如果已經在排隊了,就排到隊伍的最前面 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } // queueLifo 為真的時候,當前goroutine會被放到隊頭, // 也就是說被喚醒卻沒搶到鎖的goroutine放到最前面 runtime_SemacquireMutex(&m.sema, queueLifo) // 當前goroutine等待時間超過閾值,切換為饑餓模式,starving設置為true starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state //如果當前是饑餓模式 if old&mutexStarving != 0 { if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // 如果切換為饑餓模式,等待隊列計數減1 delta := int32(mutexLocked - 1<<mutexWaiterShift) // 如果等待時間小于1ms或者自己是最后一個被喚醒的,退出饑餓模式 if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } }
UnLock 解鎖分兩步
解鎖,通過CAS操作把當前狀態設置為解鎖狀態
喚醒休眠協程,CAS操作把當前狀態的waiter數減1,然后喚醒休眠goroutine,如果是饑餓模式的話,喚醒等待隊列的第一個
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { // 正常模式 old := new for { /** * 不需要喚醒的情況 * 1.等待隊列為0 * 2.已經有協程搶到鎖(上面的瞬間搶鎖) * 3.已經有協程被喚醒 * 4.處于饑餓模式 在饑餓模式獲取到鎖的協程仍然處于饑餓狀態,新的goroutine無法獲取到鎖 */ if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false) return } old = m.state } } else { // 饑餓模式 runtime_Semrelease(&m.sema, true) } }
關于“Golang鎖原理如何實現”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Golang鎖原理如何實現”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。