91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Go調度器學習之goroutine調度怎么創建

發布時間:2023-05-09 17:55:46 來源:億速云 閱讀:152 作者:iii 欄目:開發技術

今天小編給大家分享一下Go調度器學習之goroutine調度怎么創建的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。

    1. 協程調度發生的時機

    在以下情形中,goroutine可能會發生調度:

    情形說明
    go func(){}使用go關鍵字創建一個新的goroutine,調度器會考慮調度
    GC由于GC也需要在系統線程M上執行,且其中需要所有的goroutine都停止運行,所以也會發生調度
    系統調用發生系統的調用時,會阻塞M,所以它會被調度走,同時新的goroutine也會被調度上來
    同步內存訪問mutex、channel等操作會使得goroutine阻塞,因此會被調度走,等條件滿足后,還會被調度上來繼續運行

    2. 創建協程時的調度

    其中,使用go關鍵字創建協程時的調度分析,上篇博客做了初步的分析,特別是有關調度循環的分析,但是我們沒有具體分析,當創建協程時,系統是怎么發生調度的。

    func newproc(fn *funcval) {
       gp := getg()
       pc := getcallerpc()
       systemstack(func() {
          newg := newproc1(fn, gp, pc)
    
          _p_ := getg().m.p.ptr()
          runqput(_p_, newg, true)
    
          if mainStarted {
             wakep()
          }
       })
    }

    我們還記得,go關鍵字在創建協程時,Go的編譯器會將其轉換為runtime.newproc函數,上篇我們詳細分析了main goroutine的創建過程,在runtime.main函數中,全局變量mainStarted會被置為true,之后普通協程的創建,則會調用runtime.wakep函數嘗試喚醒空閑的P。

    func wakep() {
       if atomic.Load(&sched.npidle) == 0 {
          return
       }
       // be conservative about spinning threads
       if atomic.Load(&sched.nmspinning) != 0 || !atomic.Cas(&sched.nmspinning, 0, 1) {
          return
       }
       startm(nil, true)
    }

    wakep函數首先確認是否有其他線程正在處于spinning狀態,即M是否在找工作,如果沒有的話,則調用startm函數創建一個新的、或者喚醒一個處于睡眠狀態的工作線程出來工作。

    func startm(_p_ *p, spinning bool) {
       // Disable preemption.
       //
       // Every owned P must have an owner that will eventually stop it in the
       // event of a GC stop request. startm takes transient ownership of a P
       // (either from argument or pidleget below) and transfers ownership to
       // a started M, which will be responsible for performing the stop.
       //
       // Preemption must be disabled during this transient ownership,
       // otherwise the P this is running on may enter GC stop while still
       // holding the transient P, leaving that P in limbo and deadlocking the
       // STW.
       //
       // Callers passing a non-nil P must already be in non-preemptible
       // context, otherwise such preemption could occur on function entry to
       // startm. Callers passing a nil P may be preemptible, so we must
       // disable preemption before acquiring a P from pidleget below.
       mp := acquirem()  // 保證在此期間不會發生棧擴展
       lock(&sched.lock)
       if _p_ == nil {   // 沒有指定p,那么需要從空閑隊列中取一個p
          _p_ = pidleget()
          if _p_ == nil {// 如果沒有空閑的p,直接返回
             unlock(&sched.lock)
             if spinning {
                // The caller incremented nmspinning, but there are no idle Ps,
                // so it's okay to just undo the increment and give up.
                if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
                   throw("startm: negative nmspinning")
                }
             }
             releasem(mp)
             return
          }
       }
       nmp := mget()  // 如果有空閑的p,那么取出一個空閑的m
       if nmp == nil {// 如果沒有空閑的m,那么調用newm創建一個,然后返回
          // No M is available, we must drop sched.lock and call newm.
          // However, we already own a P to assign to the M.
          //
          // Once sched.lock is released, another G (e.g., in a syscall),
          // could find no idle P while checkdead finds a runnable G but
          // no running M's because this new M hasn't started yet, thus
          // throwing in an apparent deadlock.
          //
          // Avoid this situation by pre-allocating the ID for the new M,
          // thus marking it as 'running' before we drop sched.lock. This
          // new M will eventually run the scheduler to execute any
          // queued G's.
          id := mReserveID()
          unlock(&sched.lock)
    
          var fn func()
          if spinning {
             // The caller incremented nmspinning, so set m.spinning in the new M.
             fn = mspinning
          }
          newm(fn, _p_, id)
          // Ownership transfer of _p_ committed by start in newm.
          // Preemption is now safe.
          releasem(mp)
          return
       }
       unlock(&sched.lock)
       if nmp.spinning {
          throw("startm: m is spinning")
       }
       if nmp.nextp != 0 {
          throw("startm: m has p")
       }
       if spinning && !runqempty(_p_) {
          throw("startm: p has runnable gs")
       }
       // The caller incremented nmspinning, so set m.spinning in the new M.
       nmp.spinning = spinning
       nmp.nextp.set(_p_)
       notewakeup(&nmp.park) // 如果有空閑的m,則喚醒這個m
       // Ownership transfer of _p_ committed by wakeup. Preemption is now
       // safe.
       releasem(mp)
    }

    startm函數首先判斷是否有空閑的P,如果沒有則直接返回;如果有,則判斷是否有空閑的M,如果沒有,則新建一個;如果有空閑的M,則喚醒這個M。說白了,wakep函數就是為了更大程度的利用P,利用CPU資源。

    說到這里,我們就需要重溫一下上篇博客講到的,調度中獲取goroutine的規則是:

    • 每調度61次就需要從全局隊列中獲取goroutine

    • 其次優先從本P所在隊列中獲取goroutine

    • 如果還沒有獲取到,則從其他P的運行隊列中竊取goroutine

    其中,從其他P隊列中竊取goroutine,調用的是findrunnable函數,這個函數很長,為了簡化說明,我們刪除一些不是很重要的代碼:

    func findrunnable() (gp *g, inheritTime bool) {
       _g_ := getg()
    
    top:
       _p_ := _g_.m.p.ptr()
       ...
    
       // local runq
       // 再從本地隊列找找
       if gp, inheritTime := runqget(_p_); gp != nil {
          return gp, inheritTime
       }
    
       // global runq
       // 再看看全局隊列
       if sched.runqsize != 0 {
          lock(&sched.lock)
          gp := globrunqget(_p_, 0)
          unlock(&sched.lock)
          if gp != nil {
             return gp, false
          }
       }
    
       ...
    
       // Spinning Ms: steal work from other Ps.
       //
       // Limit the number of spinning Ms to half the number of busy Ps.
       // This is necessary to prevent excessive CPU consumption when
       // GOMAXPROCS>>1 but the program parallelism is low.
       procs := uint32(gomaxprocs)
       if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
          if !_g_.m.spinning {
             _g_.m.spinning = true
             atomic.Xadd(&sched.nmspinning, 1)
          }
    
          gp, inheritTime, tnow, w, newWork := stealWork(now) // 調用stealWork盜取goroutine
          now = tnow
          if gp != nil {
             // Successfully stole.
             return gp, inheritTime
          }
          if newWork {
             // There may be new timer or GC work; restart to
             // discover.
             goto top
          }
          if w != 0 && (pollUntil == 0 || w < pollUntil) {
             // Earlier timer to wait for.
             pollUntil = w
          }
       }
    
       ...
    
       // return P and block
       // 上面的竊取沒有成功,那么解除m和p的綁定,摒棄娥江p放到空閑隊列,然后去休眠
       lock(&sched.lock)
       if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
          unlock(&sched.lock)
          goto top
       }
       if sched.runqsize != 0 {
          gp := globrunqget(_p_, 0)
          unlock(&sched.lock)
          return gp, false
       }
       if releasep() != _p_ {
          throw("findrunnable: wrong p")
       }
       pidleput(_p_)
       unlock(&sched.lock)
    
       ...
          _g_.m.spinning = false // m即將睡眠,狀態不再是spinning
          if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
             throw("findrunnable: negative nmspinning")
          }
    
       ...
       stopm() // 休眠
       goto top
    }

    從上面的代碼可以看出,工作線程會反復嘗試尋找運行的goroutine,實在找不到的情況下才會進入到睡眠。需要注意的是,工作線程M從其他P的本地隊列中盜取goroutine時的狀態稱之為自旋(spinning)狀態,而前面講到wakep調用startm函數,也是優先從自旋狀態的M中選取,實在沒有才去喚醒休眠的M,再沒有就創建新的M。

    竊取算法stealWork我們就不分析了,有興趣的同學可以看看。下面具體分析下stopm是怎么實現線程睡眠的。

    func stopm() {
       _g_ := getg()
    
       if _g_.m.locks != 0 {
          throw("stopm holding locks")
       }
       if _g_.m.p != 0 {
          throw("stopm holding p")
       }
       if _g_.m.spinning {
          throw("stopm spinning")
       }
    
       lock(&sched.lock)
       mput(_g_.m)         // 把m放到sched.midle空閑隊列
       unlock(&sched.lock)
       mPark()
       acquirep(_g_.m.nextp.ptr()) // 綁定這個m和其下一個p,這里沒有看懂為啥這么操作
       _g_.m.nextp = 0
    }
    
    
    func mPark() {
       gp := getg()
       notesleep(&gp.m.park) // 進入睡眠狀態
       noteclear(&gp.m.park)
    }

    可以看出,stopm主要是將m對象放到調度器的空閑線程隊列,然后通過notesleep進入睡眠狀態。notego runtime實現的一次性睡眠和喚醒機制,通過notesleep進入睡眠狀態,然后另一個線程可以通過notewakeup喚醒這個線程。

    小結

    上面巴拉巴拉講了那么多,看的人有點頭暈,我們接下來講一個很小的例子梳理一下以上的邏輯(主線程的創建和執行在上一篇博客中詳細敘述過,這里不再贅述),主線程創建了一個goroutine,這時候會觸發wakep,接下來可能會喚醒空閑的工作線程(如果是第一個非main goroutine,就沒有空閑的工作線程),或者創建一個新的工作線程,或者什么都不做。

    如果是創建一個新的工作線程,那么其開啟執行的點也是mstart函數(注意區分mstartstartm),然后在schedule函數中會嘗試去獲取goroutine,如果全局和本地的goroutine隊列都沒有,則會去其他的P上竊取goroutine,如果竊取不成功,則會休眠。

    如果是去喚醒工作協程,喚醒后會在休眠的地方開始,重新進行竊取。

    竊取到工作協程后,就會去執行,然后就會因為各種原因重新開始調度循環。

    Go調度器學習之goroutine調度怎么創建

    3. 主動掛起

    Go中,有很多種情形會導致goroutine阻塞,即其主動掛起,然后被調度走,等滿足其運行條件時,還會被調度上來繼續運行。比如channel的讀寫,我們以通道的阻塞讀為例,來介紹goroutine的主動掛起的調度方式。

    3.1 協程掛起

    和前面介紹的Map一樣,channel的讀也有以下兩種讀取方式:

    v := <- ch
    v, ok := <- ch

    分別對應以下chanrecv1chanrecv2函數:

    //go:nosplit
    func chanrecv1(c *hchan, elem unsafe.Pointer) {
       chanrecv(c, elem, true)
    }
    
    //go:nosplit
    func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
       _, received = chanrecv(c, elem, true)
       return
    }

    無論是哪個函數,最終調用的都是chanrecv函數:

    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
       ...
       
       c.recvq.enqueue(mysg) // 將這個goroutine放到channel的recv的queue中
       
       atomic.Store8(&gp.parkingOnChan, 1)
       // 掛起這個goroutine
       gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
    
       ...
    }

    chanrecv會先判斷channel是否有數據可讀,如果有則直接讀取并返回,如果沒有則將這個goroutine放到channelrecvqueue中,然后調用gopark函數將當前goroutine掛起并阻塞。

    func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
       if reason != waitReasonSleep {
          checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
       }
       mp := acquirem()
       gp := mp.curg
       status := readgstatus(gp)
       if status != _Grunning && status != _Gscanrunning {
          throw("gopark: bad g status")
       }
       mp.waitlock = lock
       mp.waitunlockf = unlockf
       gp.waitreason = reason
       mp.waittraceev = traceEv
       mp.waittraceskip = traceskip
       releasem(mp)
       // can't do anything that might move the G between Ms here.
       mcall(park_m)
    }

    gopark函數則使用mcall函數(前面分析過,主要作用是保存當前goroutine現場,然后切換到g0棧去調用作為參數傳入的函數)取執行park_m函數:

    // park continuation on g0.
    func park_m(gp *g) {
       _g_ := getg()
    
       if trace.enabled {
          traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
       }
    
       casgstatus(gp, _Grunning, _Gwaiting)
       dropg()
    
       if fn := _g_.m.waitunlockf; fn != nil {
          ok := fn(gp, _g_.m.waitlock)
          _g_.m.waitunlockf = nil
          _g_.m.waitlock = nil
          if !ok {
             if trace.enabled {
                traceGoUnpark(gp, 2)
             }
             casgstatus(gp, _Gwaiting, _Grunnable)
             execute(gp, true) // Schedule it back, never returns.
          }
       }
       schedule()
    }

    park_m首先把當前goroutine的狀態設置為_Gwaiting(因為它正在等待其它goroutinechannel里面寫數據),然后調用dropg函數解除gm之間的關系,最后通過調用schedule函數進入調度循環。

    至此,一個goroutine就被主動掛起了。

    3.2 協程喚醒

    我們繼續以上例子,當另一個goroutine對這個channel發送數據的時候

    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
       ...
    
       if sg := c.recvq.dequeue(); sg != nil {
          // Found a waiting receiver. We pass the value we want to send
          // directly to the receiver, bypassing the channel buffer (if any).
          send(c, sg, ep, func() { unlock(&c.lock) }, 3)
          return true
       }
    
       ...
    }
    
    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
       ...
       goready(gp, skip+1)
    }

    channel的發送流程和讀取類似,當檢查到接收隊列中有等待著時,會調用send函數然后調用goready喚醒協程:

    func goready(gp *g, traceskip int) {
       systemstack(func() {
          ready(gp, traceskip, true)
       })
    }
    
    func ready(gp *g, traceskip int, next bool) {
       if trace.enabled {
          traceGoUnpark(gp, traceskip)
       }
    
       status := readgstatus(gp)
    
       // Mark runnable.
       _g_ := getg()
       mp := acquirem() // disable preemption because it can be holding p in a local var
       if status&^_Gscan != _Gwaiting {
          dumpgstatus(gp)
          throw("bad g->status in ready")
       }
    
       // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
       casgstatus(gp, _Gwaiting, _Grunnable)
       runqput(_g_.m.p.ptr(), gp, next)
       wakep()
       releasem(mp)
    }

    這里發現,ready函數和創建協程時一樣,會觸發wakep來檢查是否需要喚醒空閑P來執行。而在此之前,這個被喚醒的goroutine會放到P的本地隊列的下一個執行goroutine,以提升時效性。

    到這里,一個被掛起的協程也就被喚醒了。

    以上就是“Go調度器學習之goroutine調度怎么創建”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。

    向AI問一下細節

    免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

    AI

    龙口市| 大余县| 大石桥市| 德格县| 东城区| 江山市| 庆元县| 伊宁市| 舞阳县| 上虞市| 安宁市| 香河县| 穆棱市| 百色市| 边坝县| 黄骅市| 彭泽县| 郴州市| 蒙山县| 凌云县| 榆林市| 泸定县| 凤冈县| 兴国县| 德清县| 黎城县| 商南县| 锦屏县| 西平县| 通河县| 沙湾县| 昌黎县| 易门县| 进贤县| 阿勒泰市| 恭城| 新化县| 深州市| 遂川县| 丽江市| 高尔夫|