您好,登錄后才能下訂單哦!
今天小編給大家分享一下Go調度器學習之goroutine調度怎么創建的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
在以下情形中,goroutine
可能會發生調度:
情形 | 說明 |
---|---|
go func(){} | 使用go關鍵字創建一個新的goroutine,調度器會考慮調度 |
GC | 由于GC也需要在系統線程M上執行,且其中需要所有的goroutine都停止運行,所以也會發生調度 |
系統調用 | 發生系統的調用時,會阻塞M,所以它會被調度走,同時新的goroutine也會被調度上來 |
同步內存訪問 | mutex、channel等操作會使得goroutine阻塞,因此會被調度走,等條件滿足后,還會被調度上來繼續運行 |
其中,使用go
關鍵字創建協程時的調度分析,上篇博客做了初步的分析,特別是有關調度循環的分析,但是我們沒有具體分析,當創建協程時,系統是怎么發生調度的。
func newproc(fn *funcval) { gp := getg() pc := getcallerpc() systemstack(func() { newg := newproc1(fn, gp, pc) _p_ := getg().m.p.ptr() runqput(_p_, newg, true) if mainStarted { wakep() } }) }
我們還記得,go
關鍵字在創建協程時,Go
的編譯器會將其轉換為runtime.newproc
函數,上篇我們詳細分析了main goroutine
的創建過程,在runtime.main
函數中,全局變量mainStarted
會被置為true
,之后普通協程的創建,則會調用runtime.wakep
函數嘗試喚醒空閑的P。
func wakep() { if atomic.Load(&sched.npidle) == 0 { return } // be conservative about spinning threads if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) { return } startm(nil, true) }
wakep
函數首先確認是否有其他線程正在處于spinning
狀態,即M是否在找工作,如果沒有的話,則調用startm
函數創建一個新的、或者喚醒一個處于睡眠狀態的工作線程出來工作。
func startm(_p_ *p, spinning bool) { // Disable preemption. // // Every owned P must have an owner that will eventually stop it in the // event of a GC stop request. startm takes transient ownership of a P // (either from argument or pidleget below) and transfers ownership to // a started M, which will be responsible for performing the stop. // // Preemption must be disabled during this transient ownership, // otherwise the P this is running on may enter GC stop while still // holding the transient P, leaving that P in limbo and deadlocking the // STW. // // Callers passing a non-nil P must already be in non-preemptible // context, otherwise such preemption could occur on function entry to // startm. Callers passing a nil P may be preemptible, so we must // disable preemption before acquiring a P from pidleget below. mp := acquirem() // 保證在此期間不會發生棧擴展 lock(&sched.lock) if _p_ == nil { // 沒有指定p,那么需要從空閑隊列中取一個p _p_ = pidleget() if _p_ == nil {// 如果沒有空閑的p,直接返回 unlock(&sched.lock) if spinning { // The caller incremented nmspinning, but there are no idle Ps, // so it's okay to just undo the increment and give up. if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("startm: negative nmspinning") } } releasem(mp) return } } nmp := mget() // 如果有空閑的p,那么取出一個空閑的m if nmp == nil {// 如果沒有空閑的m,那么調用newm創建一個,然后返回 // No M is available, we must drop sched.lock and call newm. // However, we already own a P to assign to the M. // // Once sched.lock is released, another G (e.g., in a syscall), // could find no idle P while checkdead finds a runnable G but // no running M's because this new M hasn't started yet, thus // throwing in an apparent deadlock. // // Avoid this situation by pre-allocating the ID for the new M, // thus marking it as 'running' before we drop sched.lock. This // new M will eventually run the scheduler to execute any // queued G's. id := mReserveID() unlock(&sched.lock) var fn func() if spinning { // The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning } newm(fn, _p_, id) // Ownership transfer of _p_ committed by start in newm. // Preemption is now safe. releasem(mp) return } unlock(&sched.lock) if nmp.spinning { throw("startm: m is spinning") } if nmp.nextp != 0 { throw("startm: m has p") } if spinning && !runqempty(_p_) { throw("startm: p has runnable gs") } // The caller incremented nmspinning, so set m.spinning in the new M. nmp.spinning = spinning nmp.nextp.set(_p_) notewakeup(&nmp.park) // 如果有空閑的m,則喚醒這個m // Ownership transfer of _p_ committed by wakeup. Preemption is now // safe. releasem(mp) }
startm
函數首先判斷是否有空閑的P,如果沒有則直接返回;如果有,則判斷是否有空閑的M,如果沒有,則新建一個;如果有空閑的M,則喚醒這個M。說白了,wakep
函數就是為了更大程度的利用P,利用CPU資源。
說到這里,我們就需要重溫一下上篇博客講到的,調度中獲取goroutine
的規則是:
每調度61次就需要從全局隊列中獲取goroutine
;
其次優先從本P所在隊列中獲取goroutine
;
如果還沒有獲取到,則從其他P的運行隊列中竊取goroutine
;
其中,從其他P隊列中竊取goroutine
,調用的是findrunnable
函數,這個函數很長,為了簡化說明,我們刪除一些不是很重要的代碼:
func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() top: _p_ := _g_.m.p.ptr() ... // local runq // 再從本地隊列找找 if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // global runq // 再看看全局隊列 if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } ... // Spinning Ms: steal work from other Ps. // // Limit the number of spinning Ms to half the number of busy Ps. // This is necessary to prevent excessive CPU consumption when // GOMAXPROCS>>1 but the program parallelism is low. procs := uint32(gomaxprocs) if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) { if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } gp, inheritTime, tnow, w, newWork := stealWork(now) // 調用stealWork盜取goroutine now = tnow if gp != nil { // Successfully stole. return gp, inheritTime } if newWork { // There may be new timer or GC work; restart to // discover. goto top } if w != 0 && (pollUntil == 0 || w < pollUntil) { // Earlier timer to wait for. pollUntil = w } } ... // return P and block // 上面的竊取沒有成功,那么解除m和p的綁定,摒棄娥江p放到空閑隊列,然后去休眠 lock(&sched.lock) if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } if releasep() != _p_ { throw("findrunnable: wrong p") } pidleput(_p_) unlock(&sched.lock) ... _g_.m.spinning = false // m即將睡眠,狀態不再是spinning if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } ... stopm() // 休眠 goto top }
從上面的代碼可以看出,工作線程會反復嘗試尋找運行的goroutine
,實在找不到的情況下才會進入到睡眠。需要注意的是,工作線程M從其他P的本地隊列中盜取goroutine時的狀態稱之為自旋(spinning)狀態,而前面講到wakep
調用startm
函數,也是優先從自旋狀態的M中選取,實在沒有才去喚醒休眠的M,再沒有就創建新的M。
竊取算法stealWork
我們就不分析了,有興趣的同學可以看看。下面具體分析下stopm
是怎么實現線程睡眠的。
func stopm() { _g_ := getg() if _g_.m.locks != 0 { throw("stopm holding locks") } if _g_.m.p != 0 { throw("stopm holding p") } if _g_.m.spinning { throw("stopm spinning") } lock(&sched.lock) mput(_g_.m) // 把m放到sched.midle空閑隊列 unlock(&sched.lock) mPark() acquirep(_g_.m.nextp.ptr()) // 綁定這個m和其下一個p,這里沒有看懂為啥這么操作 _g_.m.nextp = 0 } func mPark() { gp := getg() notesleep(&gp.m.park) // 進入睡眠狀態 noteclear(&gp.m.park) }
可以看出,stopm
主要是將m對象放到調度器的空閑線程隊列,然后通過notesleep
進入睡眠狀態。note
是go runtime
實現的一次性睡眠和喚醒機制,通過notesleep
進入睡眠狀態,然后另一個線程可以通過notewakeup
喚醒這個線程。
小結
上面巴拉巴拉講了那么多,看的人有點頭暈,我們接下來講一個很小的例子梳理一下以上的邏輯(主線程的創建和執行在上一篇博客中詳細敘述過,這里不再贅述),主線程創建了一個goroutine
,這時候會觸發wakep
,接下來可能會喚醒空閑的工作線程(如果是第一個非main goroutine
,就沒有空閑的工作線程),或者創建一個新的工作線程,或者什么都不做。
如果是創建一個新的工作線程,那么其開啟執行的點也是mstart
函數(注意區分mstart
和startm
),然后在schedule
函數中會嘗試去獲取goroutine
,如果全局和本地的goroutine
隊列都沒有,則會去其他的P上竊取goroutine
,如果竊取不成功,則會休眠。
如果是去喚醒工作協程,喚醒后會在休眠的地方開始,重新進行竊取。
竊取到工作協程后,就會去執行,然后就會因為各種原因重新開始調度循環。
在Go
中,有很多種情形會導致goroutine
阻塞,即其主動掛起,然后被調度走,等滿足其運行條件時,還會被調度上來繼續運行。比如channel
的讀寫,我們以通道的阻塞讀為例,來介紹goroutine
的主動掛起的調度方式。
和前面介紹的Map一樣,channel
的讀也有以下兩種讀取方式:
v := <- ch v, ok := <- ch
分別對應以下chanrecv1
和chanrecv2
函數:
//go:nosplit func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) } //go:nosplit func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { _, received = chanrecv(c, elem, true) return }
無論是哪個函數,最終調用的都是chanrecv
函數:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... c.recvq.enqueue(mysg) // 將這個goroutine放到channel的recv的queue中 atomic.Store8(&gp.parkingOnChan, 1) // 掛起這個goroutine gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) ... }
chanrecv
會先判斷channel是否有數據可讀,如果有則直接讀取并返回,如果沒有則將這個goroutine
放到channel
的recv
的queue
中,然后調用gopark
函數將當前goroutine
掛起并阻塞。
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { if reason != waitReasonSleep { checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy } mp := acquirem() gp := mp.curg status := readgstatus(gp) if status != _Grunning && status != _Gscanrunning { throw("gopark: bad g status") } mp.waitlock = lock mp.waitunlockf = unlockf gp.waitreason = reason mp.waittraceev = traceEv mp.waittraceskip = traceskip releasem(mp) // can't do anything that might move the G between Ms here. mcall(park_m) }
gopark
函數則使用mcall
函數(前面分析過,主要作用是保存當前goroutine
現場,然后切換到g0
棧去調用作為參數傳入的函數)取執行park_m
函數:
// park continuation on g0. func park_m(gp *g) { _g_ := getg() if trace.enabled { traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip) } casgstatus(gp, _Grunning, _Gwaiting) dropg() if fn := _g_.m.waitunlockf; fn != nil { ok := fn(gp, _g_.m.waitlock) _g_.m.waitunlockf = nil _g_.m.waitlock = nil if !ok { if trace.enabled { traceGoUnpark(gp, 2) } casgstatus(gp, _Gwaiting, _Grunnable) execute(gp, true) // Schedule it back, never returns. } } schedule() }
park_m
首先把當前goroutine
的狀態設置為_Gwaiting
(因為它正在等待其它goroutine
往channel
里面寫數據),然后調用dropg
函數解除g
和m
之間的關系,最后通過調用schedule
函數進入調度循環。
至此,一個goroutine
就被主動掛起了。
我們繼續以上例子,當另一個goroutine
對這個channel
發送數據的時候
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } ... } func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { ... goready(gp, skip+1) }
channel
的發送流程和讀取類似,當檢查到接收隊列中有等待著時,會調用send
函數然后調用goready
喚醒協程:
func goready(gp *g, traceskip int) { systemstack(func() { ready(gp, traceskip, true) }) } func ready(gp *g, traceskip int, next bool) { if trace.enabled { traceGoUnpark(gp, traceskip) } status := readgstatus(gp) // Mark runnable. _g_ := getg() mp := acquirem() // disable preemption because it can be holding p in a local var if status&^_Gscan != _Gwaiting { dumpgstatus(gp) throw("bad g->status in ready") } // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq casgstatus(gp, _Gwaiting, _Grunnable) runqput(_g_.m.p.ptr(), gp, next) wakep() releasem(mp) }
這里發現,ready
函數和創建協程時一樣,會觸發wakep
來檢查是否需要喚醒空閑P來執行。而在此之前,這個被喚醒的goroutine
會放到P的本地隊列的下一個執行goroutine
,以提升時效性。
到這里,一個被掛起的協程也就被喚醒了。
以上就是“Go調度器學習之goroutine調度怎么創建”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。