您好,登錄后才能下訂單哦!
本篇文章和大家了解一下Golang sync包中errgroup的如何使用。有一定的參考價值,有需要的朋友可以參考一下,希望對大家有所幫助。
WaitGroup 主要用于控制任務組下的并發子任務。它的具體做法就是,子任務 goroutine 執行前通過 Add 方法添加任務數目,子任務 goroutine 結束時調用 Done 標記已完成任務數,主任務 goroutine 通過 Wait 方法等待所有的任務完成后才能執行后續邏輯。
package main import ( "net/http" "sync" ) func main() { var wg sync.WaitGroup var urls = []string{ "http://www.golang.org/", "http://www.baidu.com/", "http://www.bokeyuan12111.com/", } for _, url := range urls { wg.Add(1) go func(url string) { defer wg.Done() resp, err := http.Get(url) if err != nil { return } resp.Body.Close() }(url) } wg.Wait() }
在以上示例代碼中,我們通過三個 goroutine 去并發的請求 url,直到所有的子任務 goroutine 均完成訪問,主任務 goroutine 下的 wg.Wait 才會停止阻塞。
但在實際的項目代碼中,子任務 goroutine 的執行并不總是順風順水,它們也許會產生 error。而 WaitGroup 并沒有告訴我們在子 goroutine 發生錯誤時,如何將其拋給主任務 groutine。
這個時候可以考慮使用 errgroup
package main import ( "fmt" "net/http" "golang.org/x/sync/errgroup" ) func main() { var urls = []string{ "http://www.golang.org/", "http://www.baidu.com/", "http://www.bokeyuan12111.com/", } g := new(errgroup.Group) for _, url := range urls { url := url g.Go(func() error { resp, err := http.Get(url) if err != nil { fmt.Println(err) return err } fmt.Printf("get [%s] success: [%d] \n", url, resp.StatusCode) return resp.Body.Close() }) } if err := g.Wait(); err != nil { fmt.Println(err) } else { fmt.Println("All success!") } }
結果如下:
get [http://www.baidu.com/] success: [200]
Get "http://www.bokeyuan12111.com/": dial tcp: lookup www.bokeyuan12111.com: no such host
Get "http://www.golang.org/": dial tcp 142.251.42.241:80: i/o timeout
Get "http://www.bokeyuan12111.com/": dial tcp: lookup www.bokeyuan12111.com: no such host
可以看到,執行獲取www.bokeyuan12111.com和www.golang.org兩個 url 的子 groutine 均發生了錯誤,在主任務 goroutine 中成功捕獲到了第一個錯誤信息。
除了 擁有 WaitGroup 的控制能力 和 錯誤傳播 的功能之外,errgroup 還有最重要的 context 反向傳播機制,我們來看一下它的設計。
errgroup 的設計非常精練,全部代碼如下
type Group struct { cancel func() wg sync.WaitGroup errOnce sync.Once err error } func WithContext(ctx context.Context) (*Group, context.Context) { ctx, cancel := context.WithCancel(ctx) return &Group{cancel: cancel}, ctx } func (g *Group) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel() } return g.err } func (g *Group) Go(f func() error) { g.wg.Add(1) go func() { defer g.wg.Done() if err := f(); err != nil { g.errOnce.Do(func() { g.err = err if g.cancel != nil { g.cancel() } }) } }() }
可以看到,errgroup 的實現依靠于結構體 Group,它通過封裝 sync.WaitGroup,繼承了 WaitGroup 的特性,在 Go() 方法中新起一個子任務 goroutine,并在 Wait() 方法中通過 sync.WaitGroup 的 Wait 進行阻塞等待。
同時 Group 利用 sync.Once 保證了它有且僅會保留第一個子 goroutine 錯誤。
最后,Group 通過嵌入 context.WithCancel 方法產生的 cancel 函數,能夠在子 goroutine 發生錯誤時,及時通過調用 cancle 函數,將 Context 的取消信號及時傳播出去。當然,這一特性需要用戶代碼的配合。
在 errgroup 的文檔(https://pkg.go.dev/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/errgroup#example-Group-Pipeline)中,它基于 Go 官方文檔的 pipeline( https://blog.golang.org/pipelines) ,實現了一個任務組 goroutine 中上下文取消(Context cancelation)演示的示例。但該 Demo 的前提知識略多,本文這里基于其思想,提供一個易于理解的使用示例。
package main import ( "context" "fmt" "golang.org/x/sync/errgroup" ) func main() { g, ctx := errgroup.WithContext(context.Background()) dataChan := make(chan int, 20) // 數據生產端任務子 goroutine g.Go(func() error { defer close(dataChan) for i := 1; ; i++ { if i == 10 { return fmt.Errorf("data 10 is wrong") } dataChan <- i fmt.Println(fmt.Sprintf("sending %d", i)) } }) // 數據消費端任務子 goroutine for i := 0; i < 3; i++ { g.Go(func() error { for j := 1; ; j++ { select { case <-ctx.Done(): return ctx.Err() case number := <-dataChan: fmt.Println(fmt.Sprintf("receiving %d", number)) } } }) } // 主任務 goroutine 等待 pipeline 結束數據流 err := g.Wait() if err != nil { fmt.Println(err) } fmt.Println("main goroutine done!") }
在以上示例中,我們模擬了一個數據傳送管道。在數據的生產與消費任務集中,有四個子任務 goroutine:一個生產數據的 goroutine,三個消費數據的 goroutine。當數據生產方存在錯誤數據時(數據等于 10 ),我們停止數據的生產與消費,并將錯誤拋出,回到 main goroutine 的執行邏輯中。
可以看到,因為 errgroup 中的 Context cancle 函數的嵌入,我們在子任務 goroutine 中也能反向控制任務上下文。
程序的某一次運行,輸出結果如下:
sending 1
sending 2
sending 3
sending 4
sending 5
sending 6
sending 7
sending 8
sending 9
receiving 1
receiving 3
receiving 2
receiving 4
data 10 is wrong
main goroutine done!
errgroup 是 Go 官方的并發原語補充庫,相對于標準庫中提供的原語而言,顯得沒那么核心。這里總結一下 errgroup 的特性。
繼承了 WaitGroup 的功能
錯誤傳播:能夠返回任務組中發生的第一個錯誤,但有且僅能返回該錯誤
context 信號傳播:如果子任務 goroutine 中有循環邏輯,則可以添加 ctx.Done 邏輯,此時通過 context 的取消信號,提前結束子任務執行。
以上就是Golang sync包中errgroup的如何使用的簡略介紹,當然詳細使用上面的不同還得要大家自己使用過才領會。如果想了解更多,歡迎關注億速云行業資訊頻道哦!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。