91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

web開發中怎樣優雅地實現并發編排任務

發布時間:2021-09-17 09:37:55 來源:億速云 閱讀:158 作者:柒染 欄目:web開發

今天就跟大家聊聊有關web開發中怎樣優雅地實現并發編排任務,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。

業務場景

在做任務開發的時候,你們一定會碰到以下場景:

場景1:調用第三方接口的時候, 一個需求你需要調用不同的接口,做數據組裝。

場景2:一個應用首頁可能依托于很多服務。那就涉及到在加載頁面時需要同時請求多個服務的接口。這一步往往是由后端統一調用組裝數據再返回給前端,也就是所謂的  BFF(Backend For Frontend) 層。

針對以上兩種場景,假設在沒有強依賴關系下,選擇串行調用,那么總耗時即:

time=s1+s2+....sn

按照當代秒入百萬的有為青年,這么長時間早就把你祖宗十八代問候了一遍。

為了偉大的KPI,我們往往會選擇并發地調用這些依賴接口。那么總耗時就是:

time=max(s1,s2,s3.....,sn)

當然開始堆業務的時候可以先串行化,等到上面的人著急的時候,亮出絕招。

這樣,年底 PPT 就可以加上濃重的一筆流水賬:為業務某個接口提高百分之XXX性能,間接產生XXX價值。

當然這一切的前提是,做老板不懂技術,做技術”懂”你。

言歸正傳,如果修改成并發調用,你可能會這么寫,

package main  import (     "fmt"     "sync"     "time" )  func main() {     var wg sync.WaitGroup     wg.Add(2)      var userInfo *User     var productList []Product      go func() {         defer wg.Done()         userInfo, _ = getUser()     }()      go func() {         defer wg.Done()         productList, _ = getProductList()     }()     wg.Wait()     fmt.Printf("用戶信息:%+v\n", userInfo)     fmt.Printf("商品信息:%+v\n", productList) }   /********用戶服務**********/  type User struct {     Name string     Age uint8 }  func getUser() (*User, error) {     time.Sleep(500 * time.Millisecond)     var u User     u.Name = "wuqinqiang"     u.Age = 18     return &u, nil }  /********商品服務**********/  type Product struct {     Title string     Price uint32 }  func getProductList() ([]Product, error) {     time.Sleep(400 * time.Millisecond)     var list []Product     list = append(list, Product{         Title: "SHib",         Price: 10,     })     return list, nil }

從實現上來說,需要多少服務,會開多少個 G,利用 sync.WaitGroup 的特性,

實現并發編排任務的效果。

好像,問題不大。

但是隨著代號 996 業務場景的增加,你會發現,好多模塊都有相似的功能,只是對應的業務場景不同而已。

那么我們能不能抽像出一套針對此業務場景的工具,而把具體業務實現交給業務方。

使用

本著不重復造輪子的原則,去搜了下開源項目,最終看上了 go-zero 里面的一個工具 mapreduce。

可以自行 Google 這個名詞。

使用很簡單。我們通過它改造一下上面的代碼:

package main  import (     "fmt"     "github.com/tal-tech/go-zero/core/mr"     "time" )  func main() {     var userInfo *User     var productList []Product     _ = mr.Finish(func() (err error) {         userInfo, err = getUser()         return err     }, func() (err error) {         productList, err = getProductList()         return err     })     fmt.Printf("用戶信息:%+v\n", userInfo)     fmt.Printf("商品信息:%+v\n", productList) } //打印 用戶信息:&{Name:wuqinqiang Age:18} 商品信息:[{Title:SHib Price:10}]

是不是舒服多了。

但是這里還需要注意一點,假設你調用的其中一個服務錯誤,并且你 return err 對應的錯誤,那么其他調用的服務會被取消。

比如我們修改 getProductList 直接響應錯誤。

func getProductList() ([]Product, error) {     return nil, errors.New("test error") } //打印 // 用戶信息:<nil> // 商品信息:[]

那么最終打印的時候連用戶信息都會為空,因為出現一個服務錯誤,用戶服務請求被取消了。

一般情況下,在請求服務錯誤的時候我們會有保底操作,一個服務錯誤不能影響其他請求的結果。

所以在使用的時候具體處理取決于業務場景。

源碼

既然用了,那么就追下源碼吧。

func Finish(fns ...func() error) error {     if len(fns) == 0 {         return nil     }      return MapReduceVoid(func(source chan<- interface{}) {         for _, fn := range fns {             source <- fn         }     }, func(item interface{}, writer Writer, cancel func(error)) {         fn := item.(func() error)         if err := fn(); err != nil {             cancel(err)         }     }, func(pipe <-chan interface{}, cancel func(error)) {         drain(pipe)     }, WithWorkers(len(fns))) }
func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {     _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {         reducer(input, cancel)         drain(input)         // We need to write a placeholder to let MapReduce to continue on reducer done,         // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.         writer.Write(lang.Placeholder)     }, opts...)     return err }

對于 MapReduceVoid函數,主要查看三個閉包參數。

  • 第一個 GenerateFunc 用于生產數據。

  • MapperFunc 讀取生產出的數據,進行處理。

  • VoidReducerFunc 這里表示不對 mapper 后的數據做聚合返回。所以這個閉包在此操作幾乎0作用。

func MapReduce(generate GenerateFunc, mapper MapperFunc, reducer ReducerFunc, opts ...Option) (interface{}, error) {     source := buildSource(generate)      return MapReduceWithSource(source, mapper, reducer, opts...) }  func buildSource(generate GenerateFunc) chan interface{} {     source := make(chan interface{})// 創建無緩沖通道     threading.GoSafe(func() {         defer close(source)         generate(source) //開始生產數據     })      return source //返回無緩沖通道 }

buildSource函數中,返回一個無緩沖的通道。并開啟一個 G 運行  generate(source),往無緩沖通道塞數據。這個generate(source) 不就是一開始 Finish 傳遞的第一個閉包參數。

return MapReduceVoid(func(source chan<- interface{}) {     // 就這個         for _, fn := range fns {             source <- fn         }     })

然后查看 MapReduceWithSource 函數,

func MapReduceWithSource(source <-chan interface{}, mapper MapperFunc, reducer ReducerFunc,     opts ...Option) (interface{}, error) {     options := buildOptions(opts...)     //任務執行結束通知信號     output := make(chan interface{})     //將mapper處理完的數據寫入collector     collector := make(chan interface{}, options.workers)     // 取消操作信號     done := syncx.NewDoneChan()     writer := newGuardedWriter(output, done.Done())     var closeOnce sync.Once     var retErr errorx.AtomicError     finish := func() {         closeOnce.Do(func() {             done.Close()             close(output)         })     }     cancel := once(func(err error) {         if err != nil {             retErr.Set(err)         } else {             retErr.Set(ErrCancelWithNil)         }          drain(source)         finish()     })      go func() {         defer func() {             if r := recover(); r != nil {                 cancel(fmt.Errorf("%v", r))             } else {                 finish()             }         }()         reducer(collector, writer, cancel)         drain(collector)     }()     // 真正從生成器通道取數據執行Mapper     go executeMappers(func(item interface{}, w Writer) {         mapper(item, w, cancel)     }, source, collector, done.Done(), options.workers)      value, ok := <-output     if err := retErr.Load(); err != nil {         return nil, err     } else if ok {         return value, nil     } else {         return nil, ErrReduceNoOutput     } }

這段代碼挺長的,我們說下核心的點。這里使用一個G 調用 executeMappers 方法。

go executeMappers(func(item interface{}, w Writer) {         mapper(item, w, cancel)     }, source, collector, done.Done(), options.workers)
func executeMappers(mapper MapFunc, input <-chan interface{}, collector chan<- interface{},     done <-chan lang.PlaceholderType, workers int) {     var wg sync.WaitGroup     defer func() {         // 等待所有任務全部執行完畢         wg.Wait()         // 關閉通道         close(collector)     }()    //根據指定數量創建 worker池     pool := make(chan lang.PlaceholderType, workers)      writer := newGuardedWriter(collector, done)     for {         select {         case <-done:             return         case pool <- lang.Placeholder:             // 從buildSource() 返回的無緩沖通道取數據             item, ok := <-input              // 當通道關閉,結束             if !ok {                 <-pool                 return             }              wg.Add(1)             // better to safely run caller defined method             threading.GoSafe(func() {                 defer func() {                     wg.Done()                     <-pool                 }()                 //真正運行閉包函數的地方                // func(item interface{}, w Writer) {                // mapper(item, w, cancel)                // }                 mapper(item, writer)             })         }     } }

具體的邏輯已備注,代碼很容易懂。

一旦 executeMappers 函數返回,關閉 collector 通道,那么執行 reducer 不再阻塞。

go func() {         defer func() {             if r := recover(); r != nil {                 cancel(fmt.Errorf("%v", r))             } else {                 finish()             }         }()         reducer(collector, writer, cancel)         //這里         drain(collector)     }()

這里的 reducer(collector, writer, cancel) 其實就是從 MapReduceVoid 傳遞的第三個閉包函數。

func MapReduceVoid(generator GenerateFunc, mapper MapperFunc, reducer VoidReducerFunc, opts ...Option) error {     _, err := MapReduce(generator, mapper, func(input <-chan interface{}, writer Writer, cancel func(error)) {         reducer(input, cancel)         //這里         drain(input)         // We need to write a placeholder to let MapReduce to continue on reducer done,         // otherwise, all goroutines are waiting. The placeholder will be discarded by MapReduce.         writer.Write(lang.Placeholder)     }, opts...)     return err }

然后這個閉包函數又執行了 reducer(input, cancel),這里的 reducer 就是我們一開始解釋過的 VoidReducerFunc,從  Finish() 而來。

web開發中怎樣優雅地實現并發編排任務

等等,看到上面三個地方的 drain(input)了嗎?

// drain drains the channel. func drain(channel <-chan interface{}) {     // drain the channel     for range channel {     } }

其實就是一個排空 channel 的操作,但是三個地方都對同一個 channel做同樣的操作,也是讓我費解。

還有更重要的一點。

go func() {         defer func() {             if r := recover(); r != nil {                 cancel(fmt.Errorf("%v", r))             } else {                 finish()             }         }()         reducer(collector, writer, cancel)         drain(collector)     }()

上面的代碼,假如執行 reducer,writer 寫入引發 panic,那么drain(collector) 將沒有機會執行。

不過作者已經修復了這個問題,直接把 drain(collector) 放入到 defer。

web開發中怎樣優雅地實現并發編排任務

具體 issues[1]。

到這里,關于 Finish 的源碼也就結束了。感興趣的可以看看其他源碼。

很喜歡 go-zero 里的一些工具,但是工具往往并不獨立,依賴于其他文件包,導致明明只想使用其中一個工具卻需要安裝整個包。

所以最終的結果就是扒源碼,創建無依賴庫工具集,遵循 MIT 即可。

看完上述內容,你們對web開發中怎樣優雅地實現并發編排任務有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

田东县| 乐安县| 阜阳市| 封丘县| 璧山县| 韩城市| 景洪市| 宜宾县| 邵阳市| 城口县| 阳高县| 兴义市| 从化市| 翁牛特旗| 廊坊市| 芦山县| 延寿县| 蒲城县| 德州市| 兰坪| 鹤岗市| 宝丰县| 万州区| 定日县| 葵青区| 吴忠市| 华坪县| 萨嘎县| 镇安县| 永靖县| 阜新| 岚皋县| 吉木萨尔县| 六枝特区| 壤塘县| 义乌市| 泗洪县| 西吉县| 马山县| 太湖县| 祥云县|