您好,登錄后才能下訂單哦!
怎么在Go中實現并發控制context?很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
1. 前言
Golang context是Golang應用開發常用的并發控制技術,它與WaitGroup最大的不同點是context對于派生goroutine有更強的控制力,它可以控制多級的goroutine。
context翻譯成中文是"上下文",即它可以控制一組呈樹狀結構的goroutine,每個goroutine擁有相同的上下文。
典型的使用場景如下圖所示:
上圖中由于goroutine派生出子goroutine,而子goroutine又繼續派生新的goroutine,這種情況下使用WaitGroup就不太容易,因為子goroutine個數不容易確定。而使用context就可以很容易實現。
2. Context實現原理
context實際上只定義了接口,凡是實現該接口的類都可稱為是一種context,官方包中實現了幾個常用的context,分別可用于不同的場景。
2.1 接口定義
源碼包中src/context/context.go:Context
定義了該接口:
type Context interface { Deadline() (deadline time.Time, ok bool) Done() <-chan struct{} Err() error Value(key interface{}) interface{} }
基礎的context接口只定義了4個方法,下面分別簡要說明一下:
2.1.1 Deadline()
該方法返回一個deadline和標識是否已設置deadline的bool值,如果沒有設置deadline,則ok == false,此時deadline為一個初始值的time.Time值
2.1.2 Done()
該方法返回一個channel,需要在select-case語句中使用,如"case <-context.Done():"。
當context關閉后,Done()返回一個被關閉的管道,關閉的管理仍然是可讀的,據此goroutine可以收到關閉請求;當context還未關閉時,Done()返回nil。
2.1.3 Err()
該方法描述context關閉的原因。關閉原因由context實現控制,不需要用戶設置。比如Deadline context,關閉原因可能是因為deadline,也可能提前被主動關閉,那么關閉原因就會不同:
因deadline關閉:“context deadline exceeded”;
因主動關閉: "context canceled"。
當context關閉后,Err()返回context的關閉原因;當context還未關閉時,Err()返回nil;
2.1.3 Value()
有一種context,它不是用于控制呈樹狀分布的goroutine,而是用于在樹狀分布的goroutine間傳遞信息。
Value()方法就是用于此種類型的context,該方法根據key值查詢map中的value。具體使用后面示例說明。
2.2 空context
context包中定義了一個空的context, 名為emptyCtx,用于context的根節點,空的context只是簡單的實現了Context,本身不包含任何值,僅用于其他context的父節點。
emptyCtx類型定義如下代碼所示:
type emptyCtx int func (*emptyCtx) Deadline() (deadline time.Time, ok bool) { return } func (*emptyCtx) Done() <-chan struct{} { return nil } func (*emptyCtx) Err() error { return nil } func (*emptyCtx) Value(key interface{}) interface{} { return nil }
context包中定義了一個公用的emptCtx全局變量,名為background,可以使用context.Background()獲取它,實現代碼如下所示:
var background = new(emptyCtx) func Background() Context { return background }
context包提供了4個方法創建不同類型的context,使用這四個方法時如果沒有父context,都需要傳入backgroud,即backgroud作為其父節點:
WithCancel()
WithDeadline()
WithTimeout()
WithValue()
context包中實現Context接口的struct,除了emptyCtx外,還有cancelCtx、timerCtx和valueCtx三種,正是基于這三種context實例,實現了上述4種類型的context。
context包中各context類型之間的關系,如下圖所示:
struct cancelCtx、valueCtx、valueCtx都繼承于Context,下面分別介紹這三個struct。
2.3 cancelCtx
源碼包中src/context/context.go:cancelCtx
定義了該類型context:
type cancelCtx struct { Context mu sync.Mutex // protects following fields done chan struct{} // created lazily, closed by first cancel call children map[canceler]struct{} // set to nil by the first cancel call err error // set to non-nil by the first cancel call }
children中記錄了由此context派生的所有child,此context被cancle時會把其中的所有child都cancle掉。
cancelCtx與deadline和value無關,所以只需要實現Done()和Err()接口外露接口即可。
2.3.1 Done()接口實現
按照Context定義,Done()接口只需要返回一個channel即可,對于cancelCtx來說只需要返回成員變量done即可。
這里直接看下源碼,非常簡單:
func (c *cancelCtx) Done() <-chan struct{} { c.mu.Lock() if c.done == nil { c.done = make(chan struct{}) } d := c.done c.mu.Unlock() return d }
由于cancelCtx沒有指定初始化函數,所以cancelCtx.done可能還未分配,所以需要考慮初始化。
cancelCtx.done會在context被cancel時關閉,所以cancelCtx.done的值一般經歷如三個階段:nil --> chan struct{} --> closed chan。
2.3.2 Err()接口實現
按照Context定義,Err()只需要返回一個error告知context被關閉的原因。對于cancelCtx來說只需要返回成員變量err即可。
還是直接看下源碼:
func (c *cancelCtx) Err() error { c.mu.Lock() err := c.err c.mu.Unlock() return err }
cancelCtx.err默認是nil,在context被cancel時指定一個error變量: var Canceled = errors.New("context canceled")
。
2.3.3 cancel()接口實現
cancel()內部方法是理解cancelCtx的最關鍵的方法,其作用是關閉自己和其后代,其后代存儲在cancelCtx.children的map中,其中key值即后代對象,value值并沒有意義,這里使用map只是為了方便查詢而已。
cancel方法實現偽代碼如下所示:
func (c *cancelCtx) cancel(removeFromParent bool, err error) { c.mu.Lock() c.err = err //設置一個error,說明關閉原因 close(c.done) //將channel關閉,以此通知派生的context for child := range c.children { //遍歷所有children,逐個調用cancel方法 child.cancel(false, err) } c.children = nil c.mu.Unlock() if removeFromParent { //正常情況下,需要將自己從parent刪除 removeChild(c.Context, c) } }
實際上,WithCancel()返回的第二個用于cancel context的方法正是此cancel()。
2.3.4 WithCancel()方法實現
WithCancel()方法作了三件事:
初始化一個cancelCtx實例
將cancelCtx實例添加到其父節點的children中(如果父節點也可以被cancel的話)
返回cancelCtx實例和cancel()方法
其實現源碼如下所示:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) { c := newCancelCtx(parent) propagateCancel(parent, &c) //將自身添加到父節點 return &c, func() { c.cancel(true, Canceled) } }
這里將自身添加到父節點的過程有必要簡單說明一下:
如果父節點也支持cancel,也就是說其父節點肯定有children成員,那么把新context添加到children里即可;
如果父節點不支持cancel,就繼續向上查詢,直到找到一個支持cancel的節點,把新context添加到children里;
如果所有的父節點均不支持cancel,則啟動一個協程等待父節點結束,然后再把當前context結束。
2.3.5 典型使用案例
一個典型的使用cancel context的例子如下所示:
package main import ( "fmt" "time" "context" ) func HandelRequest(ctx context.Context) { go WriteRedis(ctx) go WriteDatabase(ctx) for { select { case <-ctx.Done(): fmt.Println("HandelRequest Done.") return default: fmt.Println("HandelRequest running") time.Sleep(2 * time.Second) } } } func WriteRedis(ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println("WriteRedis Done.") return default: fmt.Println("WriteRedis running") time.Sleep(2 * time.Second) } } } func WriteDatabase(ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println("WriteDatabase Done.") return default: fmt.Println("WriteDatabase running") time.Sleep(2 * time.Second) } } } func main() { ctx, cancel := context.WithCancel(context.Background()) go HandelRequest(ctx) time.Sleep(5 * time.Second) fmt.Println("It's time to stop all sub goroutines!") cancel() //Just for test whether sub goroutines exit or not time.Sleep(5 * time.Second) }
上面代碼中協程HandelRequest()用于處理某個請求,其又會創建兩個協程:WriteRedis()、WriteDatabase(),main協程創建創建context,并把context在各子協程間傳遞,main協程在適當的時機可以cancel掉所有子協程。
程序輸出如下所示:
HandelRequest running WriteDatabase running WriteRedis running HandelRequest running WriteDatabase running WriteRedis running HandelRequest running WriteDatabase running WriteRedis running It's time to stop all sub goroutines! WriteDatabase Done. HandelRequest Done. WriteRedis Done.
2.4 timerCtx
源碼包中src/context/context.go:timerCtx
定義了該類型context:
type timerCtx struct { cancelCtx timer *time.Timer // Under cancelCtx.mu. deadline time.Time }
timerCtx在cancelCtx基礎上增加了deadline用于標示自動cancel的最終時間,而timer就是一個觸發自動cancel的定時器。
由此,衍生出WithDeadline()和WithTimeout()。實現上這兩種類型實現原理一樣,只不過使用語境不一樣:
deadline: 指定最后期限,比如context將2018.10.20 00:00:00之時自動結束
timeout: 指定最長存活時間,比如context將在30s后結束。
對于接口來說,timerCtx在cancelCtx基礎上還需要實現Deadline()和cancel()方法,其中cancel()方法是重寫的。
2.4.1 Deadline()接口實現
Deadline()方法僅僅是返回timerCtx.deadline而矣。而timerCtx.deadline是WithDeadline()或WithTimeout()方法設置的。
2.4.2 cancel()接口實現
cancel()方法基本繼承cancelCtx,只需要額外把timer關閉。
timerCtx被關閉后,timerCtx.cancelCtx.err將會存儲關閉原因:
如果deadline到來之前手動關閉,則關閉原因與cancelCtx顯示一致;
如果deadline到來時自動關閉,則原因為:"context deadline exceeded"
2.4.3 WithDeadline()方法實現
WithDeadline()方法實現步驟如下:
初始化一個timerCtx實例
將timerCtx實例添加到其父節點的children中(如果父節點也可以被cancel的話)
啟動定時器,定時器到期后會自動cancel本context
返回timerCtx實例和cancel()方法
也就是說,timerCtx類型的context不僅支持手動cancel,也會在定時器到來后自動cancel。
2.4.4 WithTimeout()方法實現
WithTimeout()實際調用了WithDeadline,二者實現原理一致。
看代碼會非常清晰:
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) { return WithDeadline(parent, time.Now().Add(timeout)) }
2.4.5 典型使用案例
下面例子中使用WithTimeout()獲得一個context并在其了協程中傳遞:
package main import ( "fmt" "time" "context" ) func HandelRequest(ctx context.Context) { go WriteRedis(ctx) go WriteDatabase(ctx) for { select { case <-ctx.Done(): fmt.Println("HandelRequest Done.") return default: fmt.Println("HandelRequest running") time.Sleep(2 * time.Second) } } } func WriteRedis(ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println("WriteRedis Done.") return default: fmt.Println("WriteRedis running") time.Sleep(2 * time.Second) } } } func WriteDatabase(ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println("WriteDatabase Done.") return default: fmt.Println("WriteDatabase running") time.Sleep(2 * time.Second) } } } func main() { ctx, _ := context.WithTimeout(context.Background(), 5 * time.Second) go HandelRequest(ctx) time.Sleep(10 * time.Second) }
主協程中創建一個10s超時的context,并將其傳遞給子協程,10s自動關閉context。程序輸出如下:
HandelRequest running WriteRedis running WriteDatabase running HandelRequest running WriteRedis running WriteDatabase running HandelRequest running WriteRedis running WriteDatabase running HandelRequest Done. WriteDatabase Done. WriteRedis Done.
2.5 valueCtx
源碼包中src/context/context.go:valueCtx
定義了該類型context:
type valueCtx struct { Context key, val interface{} }
valueCtx只是在Context基礎上增加了一個key-value對,用于在各級協程間傳遞一些數據。
由于valueCtx既不需要cancel,也不需要deadline,那么只需要實現Value()接口即可。
2.5.1 Value()接口實現
由valueCtx數據結構定義可見,valueCtx.key和valueCtx.val分別代表其key和value值。 實現也很簡單:
func (c *valueCtx) Value(key interface{}) interface{} { if c.key == key { return c.val } return c.Context.Value(key) }
這里有個細節需要關注一下,即當前context查找不到key時,會向父節點查找,如果查詢不到則最終返回interface{}。也就是說,可以通過子context查詢到父的value值。
2.5.2 WithValue()方法實現
WithValue()實現也是非常的簡單, 偽代碼如下:
func WithValue(parent Context, key, val interface{}) Context { if key == nil { panic("nil key") } return &valueCtx{parent, key, val} }
2.5.3 典型使用案例
下面示例程序展示valueCtx的用法:
package main import ( "fmt" "time" "context" ) func HandelRequest(ctx context.Context) { for { select { case <-ctx.Done(): fmt.Println("HandelRequest Done.") return default: fmt.Println("HandelRequest running, parameter: ", ctx.Value("parameter")) time.Sleep(2 * time.Second) } } } func main() { ctx := context.WithValue(context.Background(), "parameter", "1") go HandelRequest(ctx) time.Sleep(10 * time.Second) }
上例main()中通過WithValue()方法獲得一個context,需要指定一個父context、key和value。然后通將該context傳遞給子協程HandelRequest,子協程可以讀取到context的key-value。
注意:本例中子協程無法自動結束,因為context是不支持cancle的,也就是說<-ctx.Done()永遠無法返回。如果需要返回,需要在創建context時指定一個可以cancel的context作為父節點,使用父節點的cancel()在適當的時機結束整個context。
總結
Context僅僅是一個接口定義,跟據實現的不同,可以衍生出不同的context類型;
cancelCtx實現了Context接口,通過WithCancel()創建cancelCtx實例;
timerCtx實現了Context接口,通過WithDeadline()和WithTimeout()創建timerCtx實例;
valueCtx實現了Context接口,通過WithValue()創建valueCtx實例;
三種context實例可互為父節點,從而可以組合成不同的應用形式;
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。