您好,登錄后才能下訂單哦!
本篇內容介紹了“Go中的并發方法實例代碼分析”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
代碼很簡單——單個通道,單個goroutine,單次寫入,單次讀取。
package main func main() { // 創建一個int類型的通道 ch := make(chan int) // 開啟一個匿名 goroutine go func() { // 向通道發送數字42 ch <- 42 }() // 從通道中讀取 <-ch }
轉到交互式 WebGL 動畫
藍色線代表隨時間運行的goroutine. 連接‘main’和‘go #19’的藍色細線用來標記goroutine的開始和結束同時展示了父子關系,最后,紅線代表發送/接收動作. 雖然這是兩個獨立的動作,我還是嘗試用“從 A 發送到 B”的動畫將他們表示成一個動作. goroutine 名稱中的“#19” 是 goroutine 真實的內部ID, 其獲取方法參考了 Scott Mansfield 的 “Goroutine IDs” 這篇文章。
實際上,你可以通過以下方法構建一個簡單的計時器——創建一個通道, 開啟一個 goroutine 讓其在指定的時間間隔后向通道中寫入數據,然后將這個通道返回給調用者。于是調用函數就會在讀取通道時阻塞,直到之前設定的時間間隔過去。接下來我們調用24次計時器然后嘗試具象化調用過程。
package main import "time" func timer(d time.Duration) <-chan int { c := make(chan int) go func() { time.Sleep(d) c <- 1 }() return c } func main() { for i := 0; i < 24; i++ { c := timer(1 * time.Second) <-c } }
轉到交互式 WebGL 動畫
很整潔,對嗎? 我們繼續。
這個并發例子取自谷歌員工 Sameer Ajmani “Advanced Go Concurrency Patterns” 演講。當然,這個模式不算非常高級,但是對于那些只熟悉Go的并發機制的人來說它看起來可能非常新鮮有趣。
這里我們用一個通道代表乒乓球臺. 一個整型變量代表球, 然后用兩個goroutine代表玩家,玩家通過增加整型變量的值(點擊計數器)模擬擊球動作。
package main import "time" func main() { var Ball int table := make(chan int) go player(table) go player(table) table <- Ball time.Sleep(1 * time.Second) <-table } func player(table chan int) { for { ball := <-table ball++ time.Sleep(100 * time.Millisecond) table <- ball } }
轉到交互式 WebGL 動畫
這里我建議你點擊 鏈接 進入交互式 WebGL 動畫操作一下. 你可以放慢或者加速動畫,從不同的角度觀察。
現在,我們添加三個玩家看看。
go player(table) go player(table) go player(table)
轉到交互式 WebGL 動畫 我們可以看到每個玩家都按照次序輪流操作,你可能會想為什么會這樣。為什么多個玩家(goroutine)會按照嚴格的順序接到“球”呢。
答案是 Go 運行時環境維護了一個 接收者 FIFO 隊列 (存儲需要從某一通道上接收數據的goroutine),在我們的例子里,每個玩家在剛發出球后就做好了接球準備。我們來看一下更復雜的情況,加入100個玩家。
for i := 0; i < 100; i++ { go player(table) }
轉到交互式 WebGL 動畫
先進先出順序很明顯了,是吧? 我們可以創建一百萬個goroutine,因為它們很輕量,但是對于實現我們的目的來說沒有必要。我們來想想其他可以玩的。 例如, 常見的消息傳遞模式。
并發世界中流行的模式之一是所謂的 fan-in 模式。這與 fan-out 模式相反,稍后我們將介紹。簡而言之,fan-in 是一項功能,可以從多個輸入中讀取數據并將其全部多路復用到單個通道中。
舉例來說:
package main import ( "fmt" "time" ) func producer(ch chan int, d time.Duration) { var i int for { ch <- i i++ time.Sleep(d) } } func reader(out chan int) { for x := range out { fmt.Println(x) } } func main() { ch := make(chan int) out := make(chan int) go producer(ch, 100*time.Millisecond) go producer(ch, 250*time.Millisecond) go reader(out) for i := range ch { out <- i } }
Go to interactive WebGL animation
如我們所見,第一個 producer 每100毫秒生成一次值,第二個每250毫秒生成一次值,但是 reader 會立即從這兩個生產者那里接受值。實際上,多路復用發生在 main 的range循環中。
與 fan-in 相反的模式是 fan-out 或者worker 模式。多個 goroutine 可以從單個通道讀取,從而在CPU內核之間分配大量的工作量,因此是 worker 的名稱。在Go中,此模式易于實現-只需以通道為參數啟動多個goroutine,然后將值發送至該通道-Go運行時會自動地進行分配和復用 :)
package main import ( "fmt" "sync" "time" ) func worker(tasksCh <-chan int, wg *sync.WaitGroup) { defer wg.Done() for { task, ok := <-tasksCh if !ok { return } d := time.Duration(task) * time.Millisecond time.Sleep(d) fmt.Println("processing task", task) } } func pool(wg *sync.WaitGroup, workers, tasks int) { tasksCh := make(chan int) for i := 0; i < workers; i++ { go worker(tasksCh, wg) } for i := 0; i < tasks; i++ { tasksCh <- i } close(tasksCh) } func main() { var wg sync.WaitGroup wg.Add(36) go pool(&wg, 36, 50) wg.Wait() }
這里值得一提的是:并行性。如您所見,所有goroutine并行’運行‘,等待通道給予它們’工作‘。鑒于上面的動畫,很容易發現goroutine幾乎立即接連地收到它們的工作。不幸的是,該動畫在goroutine確實在處理工作還是僅僅是在等待輸入的地方沒有用顏色顯示出來,但是此動畫是在GOMAXPROCS=4的情況下錄制的,因此只有4個goroutine有效地并行運行。我們將很快討論這個主題。
現在,讓我們做一些更復雜的事情,并啟動一些有自己workers(subworkers)的workers。
package main import ( "fmt" "sync" "time" ) const ( WORKERS = 5 SUBWORKERS = 3 TASKS = 20 SUBTASKS = 10 ) func subworker(subtasks chan int) { for { task, ok := <-subtasks if !ok { return } time.Sleep(time.Duration(task) * time.Millisecond) fmt.Println(task) } } func worker(tasks <-chan int, wg *sync.WaitGroup) { defer wg.Done() for { task, ok := <-tasks if !ok { return } subtasks := make(chan int) for i := 0; i < SUBWORKERS; i++ { go subworker(subtasks) } for i := 0; i < SUBTASKS; i++ { task1 := task * i subtasks <- task1 } close(subtasks) } } func main() { var wg sync.WaitGroup wg.Add(WORKERS) tasks := make(chan int) for i := 0; i < WORKERS; i++ { go worker(tasks, &wg) } for i := 0; i < TASKS; i++ { tasks <- i } close(tasks) wg.Wait() }
Go to interactive WebGL animation 很好。當然,我們可以將worker和subworker的數量設置為更高的值,但是我試圖使動畫清晰易懂。
更酷的 fan-out 模式確實存在,例如動態數量的worker/subworker,通過通道發送通道,但是 fan-out 的想法現在應該很清楚了。
下一個常見的模式類似于扇出,但是會在很短的時間內生成goroutine,只是為了完成某些任務。它通常用于實現服務器-創建偵聽器,循環運行accept()并為每個接受的連接啟動goroutine。它非常具有表現力,可以實現盡可能簡單的服務器處理程序。看一個簡單的例子:
package main import "net" func handler(c net.Conn) { c.Write([]byte("ok")) c.Close() } func main() { l, err := net.Listen("tcp", ":5000") if err != nil { panic(err) } for { c, err := l.Accept() if err != nil { continue } go handler(c) } }
Go to 交互式WebGL動畫
這不是很有趣-似乎并發方面沒有發生任何事情。當然,在引擎蓋下有很多復雜性,這是我們特意隱藏的。 “簡單性很復雜”.
但是,讓我們回到并發性并向我們的服務器添加一些交互。假設每個處理程序都希望異步寫入記錄器。在我們的示例中,記錄器本身是一個單獨的goroutine
,它可以完成此任務。
package main import ( "fmt" "net" "time" ) func handler(c net.Conn, ch chan string) { ch <- c.RemoteAddr().String() c.Write([]byte("ok")) c.Close() } func logger(ch chan string) { for { fmt.Println(<-ch) } } func server(l net.Listener, ch chan string) { for { c, err := l.Accept() if err != nil { continue } go handler(c, ch) } } func main() { l, err := net.Listen("tcp", ":5000") if err != nil { panic(err) } ch := make(chan string) go logger(ch) go server(l, ch) time.Sleep(10 * time.Second) }
Go to 交互式WebGL動畫
不是嗎?但是很容易看到,如果請求數量增加并且日志記錄操作花費一些時間(例如,準備和編碼數據),我們的* logger * goroutine很快就會成為瓶頸。我們可以使用一個已知的扇出模式。我們開始做吧。
帶工作程序的服務器示例是記錄器的高級版本。它不僅可以完成一些工作,而且還可以通過* results *通道將其工作結果發送回池中。沒什么大不了的,但是它將我們的記錄器示例擴展到了更實際的示例。
讓我們看一下代碼和動畫:
package main import ( "net" "time" ) func handler(c net.Conn, ch chan string) { addr := c.RemoteAddr().String() ch <- addr time.Sleep(100 * time.Millisecond) c.Write([]byte("ok")) c.Close() } func logger(wch chan int, results chan int) { for { data := <-wch data++ results <- data } } func parse(results chan int) { for { <-results } } func pool(ch chan string, n int) { wch := make(chan int) results := make(chan int) for i := 0; i < n; i++ { go logger(wch, results) } go parse(results) for { addr := <-ch l := len(addr) wch <- l } } func server(l net.Listener, ch chan string) { for { c, err := l.Accept() if err != nil { continue } go handler(c, ch) } } func main() { l, err := net.Listen("tcp", ":5000") if err != nil { panic(err) } ch := make(chan string) go pool(ch, 4) go server(l, ch) time.Sleep(10 * time.Second) }
Go to 交互式WebGL動畫 我們在4個goroutine之間分配了工作,有效地提高了記錄器的吞吐量,但是從此動畫中,我們可以看到記錄器仍然可能是問題的根源。成千上萬的連接在分配之前會匯聚在一個通道中,這可能導致記錄器再次成為瓶頸。但是,當然,它會在更高的負載下發生。
足夠的扇入/扇出樂趣。讓我們看看更復雜的并發算法。我最喜歡的例子之一是Concurrent Prime Sieve,可以在[Go Concurrency Patterns]對話中找到。素數篩,或[Eratosthenes篩)是一種古老的算法,用于查找達到給定限制的素數。它通過按順序消除所有質數的倍數來工作。天真的算法并不是真正有效的算法,尤其是在多核計算機上。
該算法的并發變體使用goroutine過濾數字-每個發現的素數一個goroutine,以及用于將數字從生成器發送到過濾器的通道。找到質數后,它將通過通道發送到* main *以進行輸出。當然,該算法也不是很有效,特別是如果您想找到大質數并尋找最低的Big O復雜度,但是我發現它非常優雅。
// 并發的主篩 package main import "fmt" // 將序列2、3、4,...發送到頻道“ ch”。 func Generate(ch chan<- int) { for i := 2; ; i++ { ch <- i // Send 'i' to channel 'ch'. } } //將值從通道“ in”復制到通道“ out”, //刪除可被“素數”整除的那些。 func Filter(in <-chan int, out chan<- int, prime int) { for { i := <-in // Receive value from 'in'. if i%prime != 0 { out <- i // Send 'i' to 'out'. } } } //主篩:菊花鏈過濾器過程。 func main() { ch := make(chan int) // Create a new channel. go Generate(ch) // Launch Generate goroutine. for i := 0; i < 10; i++ { prime := <-ch fmt.Println(prime) ch2 := make(chan int) go Filter(ch, ch2, prime) ch = ch2 } }
轉到交互式WebGL動畫
,請以交互模式隨意播放此動畫。我喜歡它的說明性-它確實可以幫助您更好地理解該算法。 * generate * goroutine發出從2開始的每個整數,每個新的goroutine僅過濾特定的質數倍數-2、3、5、7 …,將第一個找到的質數發送給* main *。如果旋轉它從頂部看,您會看到從goroutine發送到main的所有數字都是質數。漂亮的算法,尤其是在3D中。
現在,讓我們回到我們的工作人員示例。還記得我告訴過它以GOMAXPROCS = 4運行嗎?那是因為所有這些動畫都不是藝術品,它們是真實程序的真實痕跡。
讓我們回顧一下GOMAXPROCS是什么。
GOMAXPROCS設置可以同時執行的最大CPU數量。
當然,CPU是指邏輯CPU。我修改了一些示例,以使他們真正地工作(而不僅僅是睡覺)并使用實際的CPU時間。然后,我運行了代碼,沒有進行任何修改,只是設置了不同的GOMAXPROCS值。 Linux機頂盒有2個CPU,每個CPU具有12個內核,因此有24個內核。
因此,第一次運行演示了該程序在1個內核上運行,而第二次-使用了所有24個內核的功能。
WebGL動畫-1| WebGL動畫-24GOMAXPROCS1
這些動畫中的時間速度是不同的(我希望所有動畫都適合同一時間/ height),因此區別很明顯。當GOMAXPROCS = 1時,下一個工作人員只有在上一個工作完成后才能開始實際工作。在GOMAXPROCS = 24的情況下,加速非常大,而復用的開銷可以忽略不計。
不過,重要的是要了解,增加GOMAXPROCS并不總是可以提高性能,在某些情況下實際上會使它變得更糟。
我們可以從Go中的并發時間中證明什么呢?我想到的一件事情是goroutine泄漏。例如,如果您啟動goroutine,但超出范圍,可能會發生泄漏。或者,您只是忘記添加結束條件,而運行了for{}循環。
第一次在代碼中遇到goroutine泄漏時,我的腦海中出現了可怕的圖像,并且在下個周末我寫了 expvarmon。現在,我可以使用WebGL可視化該恐怖圖像。
看一看:
僅僅是看到此,我都會感到痛苦:) 所有這些行都浪費了資源,并且是您程序的定時炸彈。
我要說明的最后一件事是并行性與并發性之間的區別。這個話題涵蓋了 很多 ,Rob Pike在這個話題上做了一個精彩的演講。確實是#必須觀看的視頻之一。
簡而言之,
并行是簡單的并行運行事物。
并發是一種構造程序的方法。
因此,并發程序可能是并行的,也可能不是并行的,這些概念在某種程度上是正交的。我們在演示 GOMAXPROCS 設置效果時已經看到了這一點。
我可以重復所有這些鏈接的文章和談話,但是一張圖片相當于說了一千個字。我在這里能做的是可視化這個差異。因此,這是并行。許多事情并行運行。
轉到交互式WebGL動畫
這也是并行性:
轉到交互式WebGL動畫
但這是并發的:
還有這個:
這也是并發的:
為了創建這些動畫,我編寫了兩個程序:gotracer 和 gothree.js 庫。首先,gotracer執行以下操作:
解析Go程序的AST樹(Abstract Syntax Tree,抽象語法樹),并在與并發相關的事件上插入帶有輸出的特殊命令-啟動/停止goroutine,創建通道,向/從通道發送/接收。
運行生成的程序
分析此特殊輸出,并生成帶有事件和時間戳描述的JSON。
生成的JSON示例:
“Go中的并發方法實例代碼分析”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。