91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Golang?channel如何應用

發布時間:2022-10-26 09:53:55 來源:億速云 閱讀:120 作者:iii 欄目:開發技術

這篇文章主要介紹“Golang channel如何應用”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“Golang channel如何應用”文章能幫助大家解決問題。

前言

channel是用于 goroutine 之間的同步、通信的數據結構

channel 的底層是通過 mutex 來控制并發的,但它為程序員提供了更高一層次的抽象,封裝了更多的功能,這樣并發編程變得更加容易和安全,得以讓程序員把注意力留到業務上去,提升開發效率

channel的用途包括但不限于以下幾點:

  • 協程間通信,同步

  • 定時任務:和timer結合

  • 解耦生產方和消費方,實現阻塞隊列

  • 控制并發數

整體結構

Go channel的數據結構如下所示:

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters
    lock mutex
}

qcount:已經存儲了多少個元素

dataqsie:最多存儲多少個元素,即緩沖區容量

buf:指向緩沖區的位置,實際上是一個數組

elemsize:每個元素占多大空間

closed:channel能夠關閉,這里記錄其關閉狀態

elemtype:保存數據的類型信息,用于go運行時使用

sendx,recvx:

  • 記錄下一個要發送到的位置,下一次從哪里還是接收

  • 這里用數組模擬隊列,這兩個變量即表示隊列的隊頭,隊尾

  • 因此channel的緩沖也被稱為環形緩沖區

recvq,sendq:

當發送個接收不能立即完成時,需要讓協程在channel上等待,所以有兩個等待隊列,分別針對接收和發送

lock:channel支持協程間并發訪問,因此需要一把鎖來保護

創建

創建channel會被編譯器編譯為調用makechan函數

// 無緩沖通道
ch2 := make(chan int)
// 有緩沖通道
ch3 := make(chan int, 10)

會根據創建的是帶緩存,還是無緩沖,決定第二個參數size的值

可以看出,創建出來的是hchan指針,這樣就能在函數間直接傳遞 channel,而不用傳遞 channel 的指針

func makechan(t *chantype, size int) *hchan {
   elem := t.elem
    
   // mem:緩沖區大小 
   mem, overflow := math.MulUintptr(elem.size, uintptr(size))
   if overflow || mem > maxAlloc-hchanSize || size < 0 {
      panic(plainError( "makechan: size out of range" ))
   }

   var c *hchan
   switch {
   // 緩沖區大小為空,只申請hchanSize大小的內存
   case mem == 0:
       c = (*hchan)(mallocgc(hchanSize, nil, true))
       c.buf = c.raceaddr()
   // 元素類型不包含指針,一次性分配hchanSize+mem大小的內存
   case elem.ptrdata == 0:
       c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
       c.buf = add(unsafe.Pointer(c), hchanSize)
   // 否則就是帶緩存,且有指針,分配兩次內存
   default:
      // Elements contain pointers.
       c = new(hchan)
       c.buf = mallocgc(mem, elem, true)
   }
    
   // 保存元素類型,元素大小,容量
   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)
   
   return c
}

發送

執行以下代碼時:

ch <- 3

編譯器會轉化為對chansend的調用

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   // 如果channel是空
   if c == nil {
      // 非阻塞,直接返回
      if !block {
         return  false
      }
      // 否則阻塞當前協程
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
      throw( "unreachable" )
   }

   // 非阻塞,沒有關閉,且容量滿了,無法發送,直接返回
   if !block && c.closed == 0 && full(c) {
      return  false
   }

   // 加鎖
   lock(&c.lock)

   // 如果已經關閉,無法發送,直接panic
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError( "send on closed channel" ))
   }

   // 從接收隊列彈出一個協程的包裝結構sudog
   if sg := c.recvq.dequeue(); sg != nil {
      // 如果能彈出,即有等到接收的協程,說明:
      // 該channel要么是無緩沖,要么緩沖區為空,不然不可能有協程在等待
      // 將要發送的數據拷貝到該協程的接收指針上
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return  true
}

   // 緩沖區還有空間
   if c.qcount < c.dataqsiz {
      // qp:計算要發送到的位置的地址
      qp := chanbuf(c, c.sendx)
      // 將數據從ep拷貝到qp
      typedmemmove(c.elemtype, qp, ep)
      // 待發送位置移動
      c.sendx++
      // 由于是數組模擬隊列,sendx到頂了需要歸零
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      // 緩沖區數量++
      c.qcount++
      unlock(&c.lock)
      return  true
}

   // 往下就是緩沖區無數據,也沒有等到接收協程的情況了
   
   // 如果是非阻塞模式,直接返回
   if !block {
      unlock(&c.lock)
      return  false
    }

   // 將當前協程包裝成sudog,阻塞到channel上
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
  
   mysg.elem = ep
   mysg.waitlink = nil
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.waiting = mysg
   gp.param = nil
   
   // 當前協程進入發送等待隊列
   c.sendq.enqueue(mysg)
   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
   
 // 被喚醒后從這里開始執行
   
   KeepAlive(ep)

   if mysg != gp.waiting {
      throw( "G waiting list is corrupted" )
   }
   gp.waiting = nil
   gp.activeStackChans = false
   closed := !mysg.success
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg)
   // 被喚醒后發現channel關閉了,panic
   if closed {
      if c.closed == 0 {
         throw( "chansend: spurious wakeup" )
      }
      panic(plainError( "send on closed channel" ))
   }
   return  true
}

整體流程為:

如果當前操作為非阻塞,channel沒有關閉,且容量滿了,無法發送,直接返回

從接收隊列彈出一個協程的包裝結構sudog,如果能彈出,即有等到接收的協程,說明:

  • 該channel要么是無緩沖,要么緩沖區為空,不然不可能有協程在等待

  • 將要發送的數據拷貝到該協程的接收指針上,返回

  • 這里直接從發送者拷貝到接收者的內存,而不是先把數據拷貝到緩沖區,再從緩沖區拷貝到接收者,節約了一次內存拷貝

否則看看緩沖區還有空間,如果有,將數據拷貝到緩沖區上,也返回

接下來就是既沒有接收者等待,緩沖區也為空的情況,就需要將當前協程包裝成sudog,阻塞到channel上

將協程阻塞到channel的等待隊列時,將其包裝成了sudog結構:

type sudog struct {
   // 協程
   g *g
   // 前一個,后一個指針
   next *sudog
   prev *sudog
   // 等到發送的數據在哪,等待從哪個位置接收數據
   elem unsafe.Pointer
   acquiretime int64
   releasetime int64
   ticket      uint32
   isSelect bool
   success bool

   parent   *sudog // semaRoot binary tree
   waitlink *sudog // g.waiting list or semaRoot
   waittail *sudog // semaRoot
   // 在哪個channel上等待
   c        *hchan // channel
}

其目的是:

  • g本身沒有存儲前一個,后一個指針,需要用sudog結構包裝才能加入隊列

  • elem字段存儲等到發送的數據在哪,等待從哪個位置接收數據,用于從數據能從協程到協程的直接拷貝

來看看一些子函數:

1.判斷channel是否是滿的

func full(c *hchan) bool {
   // 無緩沖
   if c.dataqsiz == 0 {
      // 并且沒有其他協程在等待
      return c.recvq.first == nil
   }
   // 有緩沖,但容量裝滿了
   return c.qcount == c.dataqsiz
}

2.send方法:

/**
c:要操作的channel
sg:彈出的接收者協程
ep:要發送的數據在的位置
*/
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果接收者指針不為空,直接把數據從ep拷貝到sg.elem
   if sg.elem != nil {
      sendDirect(c.elemtype, sg, ep)
      sg.elem = nil
   }
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   // 喚醒該接收者協程
   goready(gp, skip+1)
}

接收

從channel中接收數據有幾種寫法:

  • 帶不帶ok

  • 接不接收返回值

根據帶不帶ok,決定用下面哪個方法

func chanrecv1(c *hchan, elem unsafe.Pointer) {
        chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
        _, received = chanrecv(c, elem, true)
        return
}

根據接不接收返回值,決定elem是不是nil

最終都會調用chanrecv方法:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 如果channel為nil,根據參數中是否阻塞來決定是否阻塞
   if c == nil {
      if !block {
         return
   }
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw( "unreachable" )
   }

   // 非阻塞,并且channel為空
   if !block && empty(c) {
      // 如果還沒關閉,直接返回
   if atomic.Load(&c.closed) == 0 {
      return
   }
      // 否則已經關閉,
      // 如果為空,返回該類型的零值
   if empty(c) {
     if ep != nil {
        typedmemclr(c.elemtype, ep)
     }
     return  true, false
       }
   }

   lock(&c.lock)
   
   // 同樣,如果channel已經關閉,且緩沖區沒有元素,返回該類型零值
   if c.closed != 0 && c.qcount == 0 {
      unlock(&c.lock)
      if ep != nil {
         typedmemclr(c.elemtype, ep)
      }
      return  true, false
}
    
   // 如果有發送者正在阻塞,說明:
   // 1.無緩沖
   // 2.有緩沖,但緩沖區滿了。因為只有緩沖區滿了,才可能有發送者在等待
   if sg := c.sendq.dequeue(); sg != nil {
      // 將數據從緩沖區拷貝到ep,再將sg的數據拷貝到緩沖區,該函數詳細流程可看下文
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return  true, true
}
    
   // 如果緩存區有數據, 
   if c.qcount > 0 {
      // qp為緩沖區中下一次接收的位置
      qp := chanbuf(c, c.recvx)
      // 將數據從qp拷貝到ep
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return  true, true
}

   // 接下來就是既沒有發送者在等待,也緩沖區也沒數據
   if !block {
      unlock(&c.lock)
      return  false, false
}

   // 將當前協程包裝成sudog,阻塞到channel中
   gp := getg()
   mysg := acquireSudog()
   mysg.releasetime = 0
   if t0 != 0 {
      mysg.releasetime = -1
   }
   // 記錄接收地址
   mysg.elem = ep
   mysg.waitlink = nil
   gp.waiting = mysg
   mysg.g = gp
   mysg.isSelect = false
   mysg.c = c
   gp.param = nil
   c.recvq.enqueue(mysg)

   atomic.Store8(&gp.parkingOnChan, 1)
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive,        traceEvGoBlockRecv, 2)

   // 從這里喚醒
   if mysg != gp.waiting {
      throw( "G waiting list is corrupted" )
   }
   gp.waiting = nil
   gp.activeStackChans = false
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   success := mysg.success
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg)
   return  true, success
}

接收流程如為:

如果channel為nil,根據參數中是否阻塞來決定是否阻塞

如果channel已經關閉,且緩沖區沒有元素,返回該類型零值

如果有發送者正在阻塞,說明:

  • 要么是無緩沖

  • 有緩沖,但緩沖區滿了。因為只有緩沖區滿了,才可能有發送者在等待

  • 將數據從緩沖區拷貝到ep,再將發送者的數據拷貝到緩沖區,并喚該發送者

如果緩存區有數據, 則從緩沖區將數據復制到ep,返回

接下來就是既沒有發送者在等待,也緩沖區也沒數據的情況:

將當前協程包裝成sudog,阻塞到channel中

來看其中的子函數recv():

/**
c:操作的channel
sg:阻塞的發送協程
ep:接收者接收數據的地址
*/
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   // 如果是無緩沖channel,直接將數據從發送者sg拷貝到ep
   if c.dataqsiz == 0 {
      if ep != nil {
         recvDirect(c.elemtype, sg, ep)
      }
   // 接下來是有緩沖,且緩沖區滿的情況   
   } else {
      // qp為channel緩沖區中,接收者下一次接收的地址
   qp := chanbuf(c, c.recvx)
      // 將數據從qp拷貝到ep
   if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
    }
    // 將發送者的數據從sg.elem拷貝到qp
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
       c.recvx = 0
    }
    // 由于一接收已發送,緩沖區還是滿的,因此 c.sendx = c.recvx
    c.sendx = c.recvx 
}
   sg.elem = nil
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   // 喚醒發送者
   goready(gp, skip+1)
}

關閉

func closechan(c *hchan) {
   // 不能關閉空channel
   if c == nil {
      panic(plainError( "close of nil channel" ))
   }

   lock(&c.lock)
   // 不能重復關閉
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError( "close of closed channel" ))
   }

   // 修改關閉狀態
   c.closed = 1

   var glist gList

   // 釋放所有的接收者協程,并為它們賦予零值
 for {
      sg := c.recvq.dequeue()
      if sg == nil {
         break
      }
      if sg.elem != nil {
         typedmemclr(c.elemtype, sg.elem)
         sg.elem = nil
      }
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = unsafe.Pointer(sg)
      sg.success = false
      glist.push(gp)
   }

   // 釋放所有的發送者協程
 for {
      sg := c.sendq.dequeue()
      if sg == nil {
         break
     }
      sg.elem = nil
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = unsafe.Pointer(sg)
      sg.success = false
      glist.push(gp)
   }
   unlock(&c.lock)

   // 執行喚醒操作
 for !glist.empty() {
      gp := glist.pop()
      gp.schedlink = 0
      goready(gp, 3)
   }
}

關閉的流程比較簡單,可以看出:

不能關閉空channel,不能重復關閉channel

先上一把大鎖,接著把所有掛在這個 channel 上的 sender 和 receiver 全都連成一個 sudog 鏈表,再解鎖。最后,再將所有的 sudog 全都喚醒:

接收者:會收到該類型的零值

這里返回零值沒有問題,因為之所以這些接收者會阻塞,就是因為緩沖區沒有數據,因此channel關閉后該接收者收到零值也符合邏輯

發送者:會被喚醒,然后panic

因此不能在有多個sender的時候貿然關閉channel

關于“Golang channel如何應用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

镇原县| 镇坪县| 上虞市| 灵璧县| 天祝| 阿瓦提县| 大连市| 洞头县| 二连浩特市| 会理县| 枝江市| 界首市| 肥乡县| 深泽县| 治多县| 玛多县| 洛浦县| 三门峡市| 额尔古纳市| 龙川县| 天水市| 正定县| 建平县| 遵义市| 曲水县| 太仆寺旗| 惠安县| 浦北县| 中方县| 马山县| 宁波市| 新干县| 吐鲁番市| 罗甸县| 齐河县| 梁山县| 准格尔旗| 抚松县| 巫溪县| 四会市| 赞皇县|