您好,登錄后才能下訂單哦!
Go 有兩種并發編程風格:
這章講第一種 goroutine 和通道。
如果說 goroutine 是 Go 程序并發的執行體,通道就是它們之間的連接。每一個通道是一個具體類型的導管,叫作通道的元素類型。
像 map 一樣,通道是一個使用 make 創建的數據結構的應用。當復制或者作為參數傳遞到一個函數時,復制的是引用,這樣調用者和被調用者都引用了同一份數據結構。和其他引用類型一樣,通道的零值是 nil。
通道有兩個主要操作:發送(send)和接收(receive),兩者統稱為通信。通道還支持第三個操作:關閉(close),它設置一個標志位來指示值當前已經發送完畢。
使用簡單的 make 調用創建的通道叫無緩沖(unbuffered)通道,但 make 還可以接受第二個可選參數,一個表示通道容量的整數。如果容量是0,創建的也是無緩沖通道。
使用無緩沖通道進行的通信導致發送和接收 goroutine 同步化。因此無緩沖通道也稱為同步通道。
通過通道發送消息有兩個重要的方面需要考慮:
當事件沒有攜帶額外的信息時,它單純的目的是進行同步。和 map 實現的集合一樣,可以使用一個 struct{} 元素類型的通道來強調它,盡管通常使用 bool 或 int 類型的通道來做相同的事情。因為done <- 1
更簡短。書上講集合的時候,使用的是 bool類型,這里講事件同步,使用的是空結構體。
通道可以用來連接 goroutine,這樣一個的輸出是另一個的輸入,這個叫管道(pipline)。
關閉通道
如果發送方知道沒有更多的數據要發送,告訴接收者所在的 goroutine 可以停止等待是很有用的。這可以通過調用內置的 Close 函數來關閉通道:
ch2 := make(chan bool) // 創建通道 ch2
// 下面是關閉通道
close(ch2)
在通道關閉后,任何后續的發送操作將會導致應用崩潰。當關閉的通道被讀完(就是最后一個發送的值被接收)后,所有后續的接收操作都會立即返回,返回值是對應類型的零值。
關閉通道還可以作為一個廣播機制,后面的章節會具體講。
檢查通道的關閉
沒有一個直接的方式來判斷是否通道已經關閉,不過可以接收返回兩個參數:接收到的元素,以及一個布爾值(通常是ok),返回 true 表示接收成功,返回 false 表示當前的接收操作在一個關閉的并且讀完的通道上。這個方法檢查的也不是通道是否關閉了,而是通道里的值是否已經取完了。只有關閉的通道,才能保證不會有新值進入,把里面的值都取完后,會返回 false 表示這次取到的是通道關閉后的零值,而不是原本就是一個值為零的數據。
另外,還提供了一個 range 循環語法可以在通道上迭代。這個語法更為方便接收在通道上所有發送的值,接收完最后一個值后結束循環。
垃圾回收
結束時,關閉沒一個通道不是必需的。只有在通知接收方 goroutine 所有的數據都發送完畢的時候才需要關閉通道。通道也可以通過垃圾回收器根據它是否可以訪問來決定是否回收它,而不是根據它是否關閉。
Go 還提供了單向通道類型,僅僅導出發送或接收操作。類型chan<- int
是一個只能發送的通道,允許發送單不允許接收。反之,類型<-chan int
是一個只能接收的通道,允許接收但是不能發送。這里像箭頭一樣的操作符相對于 chan 關鍵字的位置是一個幫助記憶的點。如果違反這里的接收或發送的原則,在編譯時會被檢查出來。
在函數定義時,指定了單向通道的類型。在函數調用時,依然是把正常定義的雙向通道類型傳值給函數的參數。函數的調用會隱式地將普通的通道類型轉化為要求的單向通道的類型。在任何賦值操作中將雙向通道轉換為單向通道都是允許的,但是反過來是不行的。一旦有一個單向通道,是沒有辦法通過它獲取到引用同一個數據結構的雙向通道的類型的。
緩沖通道有一個元素隊列,隊列的最大長度在創建的時候通過 make 的容量參數來設置:
ch2 := make(chan string, 3)
通過調用內置的 cap 函數,可以獲取通道緩沖區的容量。這種需求不太常見。
通過調用內置的 len 函數,可以獲取通道內的元素個數。不過在并發程序中這個信息會隨著檢索操作很快過時,所以它的價值很低,但是它在錯誤診斷和性能優化的時候很有用。
這不是隊列
發送和接收操作可以在同一個 goroutine 中,但在真實的程序中通常由不同的 goroutine 執行。因為語法簡單,新手有時候粗暴地將緩沖通道作為隊列在單個 goroutine 中使用,但是這是個錯誤的用法。通道和 goroutine 的調度深度關聯,如果沒有另一個 goroutine 從通道進行接收,發送者(也許是整個程序)有被永久阻塞的風險。如果僅僅需要一個簡單的隊列,使用切片創建一個就好了。
示例:并發請求最快的鏡像資源
下面的例子展示一個使用緩沖通道的應用。它并發地向三個鏡像地址發請求,鏡像指相同但分布在不同地理區域的服務器。它將它們的響應通過一個緩沖通道進行發送,然后只接收第一個返回的響應,因為它是最早到達的。所以 mirroredQuery 函數甚至在兩個比較慢的服務器還沒有響應之前返回了一個結果。(偶然情況下,會出現像這個例子中的幾個 goroutine 同時在一個通道上并發發送,或者同時從一個通道接收的情況。):
func mirroredQuery() string {
responses := make(chan string, 3) // 有幾個鏡像,就要多大的容量,不能少
go func () { responses <- request("asia.gopl.io") }()
go func () { responses <- request("europe.gopl.io") }()
go func () { responses <- request("americas.gopl.io") }()
return <- responses // 返回最快一個獲取到的請求結果
}
func request(hostname string) (response string) { return "省略獲取返回的代碼" }
goroutine 泄露
在上面的示例中,如果使用的是無緩沖通道,兩個比較慢的 goroutine 將被卡住,因為在它們發送響應結果到通道的時候沒有 goroutine 來接收。這個情況叫做 goroutine 泄漏。它屬于一個 bug。不像回收變量,泄漏的 goroutine 不會自動回收,所以要確保 goroutine 在不再需要的時候可以自動結束。
無緩沖和緩沖通道的選擇,緩沖通道容量大小的選擇,都會對程序的正確性產生影響。無緩沖通道提供強同步保障,因為每一次發送都需要和一次對應的接收同步;對于緩沖通道,這些操作則是解耦的。如果知道要發送的值數量的上限,通常會創建一個容量是使用上限的緩沖通道,在接收第一個值前就完成所有的發送。在內存無法提供緩沖容量的情況下,可能導致程序死鎖。
有時候需要在多個通道上接收,不能只從一個通道上接收,因為任何一個操作都會在完成前阻塞。所以需要多路復用那些操作過程,為了實現這個目的,需要一個 select 語句:
select {
case <-ch2:
// ...
case x := <-ch3:
// ...use x...
case ch4 <- y:
// ...
default:
// ...
}
上面展示的是 select 語句的通用形式。像 switch 語句一樣,它有一系列的情況和一個可選的默認分支。每一個情況指定一次通信(在一些通道上進行發送或接收操作)和關聯的一段代碼塊。接收表達式操作可能出現在它本身上,像第一個情況,或者在一個短變量聲明中,像第二個情況。第二種形式可以讓你引用所接收的值。
select 一直等待,直到一次通信來告知有一些情況可以執行。然后,它進行這次通信,執行此情況所對應的語句,其他的通信將不會發生。
下面是一個微妙的例子。通道 ch 的緩沖區大小為 1,它要么是空的,要么是滿的,因此只有在其中一個狀況下可以執行,要么在 i 是偶數時發送,要么在 i 是奇數時接收。它總是輸出 0 2 4 6 8:
func main() {
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
case ch <- i:
}
}
}
如果多個情況同時滿足,select 隨機選擇一個,這樣保證每一個通道有相同的機會被選中。在前一個例子中增加緩沖區的容量,會使輸出變得不可確定,因為當緩沖既不空也不滿的情況,相當于 select 語句在隨機做選擇。
有時候我們試圖在一個通道上發送或接收,但是不想在通道沒有準備好的情況下被阻塞,非阻塞通信。這使用 select 語句也可以做到。select 可以有一個默認情況,它用來指定在沒有其他的通信發生時可以立即執行的動作。
下面的 select 語句嘗試從 abort 通道中接收一個值,如果沒有值,它什么也不做。這是一個非阻塞的接收操作。重復這個動作稱為對通道輪詢:
select {
case <-abort:
fmt.Println("Launch aborted!")
return
default:
// 不執行任何操作
}
通道的零值是 nil。令人驚訝的是,nil 通道有時候很有用。因為在 nil 通道上發送和接收將永遠阻塞。對于 select 語句中的情況,如果其通道是 nil,它將永遠不會被選擇。可以用 nil 來開啟或禁用特性所對應的情況,比如超時處理或者取消操作,響應其他的輸入事件或者發送事件。
最后來一個示例的實戰。這里要構建一個程序,根據命令行指定的輸入,報告一個或多個目錄的磁盤使用情況,類似 UNIX 的 du 命令。
大多數的工作由下面的 walkDir 函數完成,它使用 dirents 輔助函數來枚舉目錄中的條目:
// walkDir 遞歸地遍歷以 dir 為根目錄的整個文件樹
// 并在 fileSizes 上發送每個已找到的文件的大小
func walkDir(dir string, fileSizes chan<- int64) {
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
// dirents 返回 dir 目錄中的條目
func dirents(dir string) []os.FileInfo {
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}
ioutil.ReadDir 函數返回一個 os.FileInfo 類型的切片,針對單個文件同樣的信息可以通過調用 os.Stat 函數來返回。對每一個子目錄,walkDir 遞歸調用它自己,對于每一個文件,walkDir 發送一條消息到 fileSizes 通道。消息是文件所占用的字節數。
下面的 main 函數使用兩個 goroutine。后臺 goroutine 調用 walkDir 遍歷命令行上指定的每一個目錄,最后關閉 fileSizes 通道。主 goroutine 計算從通道中接收的文件的大小的和,最后輸出總數:
func main() {
// 確定初始目錄
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
// 遍歷文件樹
fileSizes := make(chan int64)
go func() {
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()
// 輸出結果
var nfiles, nbytes int64
for size := range fileSizes {
nfiles++
nbytes += size
}
printDiskUsage(nfiles, nbytes)
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.2f GB\n", nfiles, float64(nbytes)/(1<<30)) // 1<<30 就是 2**30 就是 1024*1024*1024
}
現在程序可以正常的工作。
如果程序可以匯報進度的話,會更加友好。如果僅僅只是把 printDiskUsage 調用移動到循環內部,會有非常多的輸出。
下面的示例,修改了主 goroutine 中記錄結果的部分。不是在每次迭代中輸出,而是加了一個定時器,通過 select 定期輸出一次結果。另外還加上了 -v 參數來控制,可以選擇性的開啟這個功能。如果不開啟功能,那么 tick 通道的值就是 nil,它對應的分支在select 中就永遠是阻塞的。相當于沒有開啟這個選項,很直觀的理解:
var verbose = flag.Bool("v", false, "周期性的輸出進度")
func main() {
// 確定初始目錄,沒變化
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
// 遍歷文件樹,沒變化
fileSizes := make(chan int64)
go func() {
for _, root := range roots {
walkDir(root, fileSizes)
}
close(fileSizes)
}()
// 定期輸出結果
var tick <-chan time.Time
if *verbose {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes 關閉,則退出,相當于原來的遍歷結束
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes)
}
因為這個版本有兩個通道需要接收 size、tick,所以無法使用 range 循環了。所以第一個 select 的分支需要通過第二個參數 ok 來判斷通道是否已經關閉。這個的 break 退出使用了標簽,因為沒有標簽的 break 只能跳出當前的 select 這層,而這里是需要跳出外層的 for 循環。
這里的 flag 的解析也值得借鑒,非常簡單。首先是解析指定的參數,這里是 -v 參數。多余的參數會通過 flag.Args() 返回一個字符串切片。調用的時候,必須把解析的參數放在前面:
PS H:\Go\src\gopl\ch8\du2> go run main.go -v E:\BaiduNetdiskDownload E:\XMPCache E:\Downloads
4 files 0.02 GB
41 files 2.16 GB
177 files 6.99 GB
567 files 46.66 GB
605 files 50.26 GB
PS H:\Go\src\gopl\ch8\du2>
還可以進一步提高效率,這里的 walkDir 也是可以并發調用從而充分利用磁盤系統的并行機制。這個版本使用了 sycn.WaitGroup 來為并發調用的 walkDir 計數。當計數器為減為 0 的時候,關閉 fileSizes 通道:
var verbose = flag.Bool("v", false, "周期性的輸出進度")
func main() {
// 確定初始目錄,沒變化
flag.Parse()
roots := flag.Args()
if len(roots) == 0 {
roots = []string{"."}
}
// 并行遍歷每一個文檔樹
fileSizes := make(chan int64)
var n sync.WaitGroup
for _, root := range roots {
n.Add(1)
go walkDir(root, &n, fileSizes) // 注意,多傳了一個參數
}
go func() {
n.Wait()
close(fileSizes)
}()
// 定期輸出結果,沒變化
var tick <-chan time.Time
if *verbose {
tick = time.Tick(500 * time.Millisecond)
}
var nfiles, nbytes int64
loop:
for {
select {
case size, ok := <-fileSizes:
if !ok {
break loop // fileSizes 關閉,則退出,相當于原來的遍歷結束
}
nfiles++
nbytes += size
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
printDiskUsage(nfiles, nbytes)
}
func printDiskUsage(nfiles, nbytes int64) {
fmt.Printf("%d files %.2f GB\n", nfiles, float64(nbytes)/(1<<30)) // 1<<30 就是 2**30 就是 1024*1024*1024
}
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) { // 注意,多了個參數
defer n.Done() // 記得退出時計數器要減1
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, n, fileSizes) // 注意,多了個參數
} else {
fileSizes <- entry.Size()
}
}
}
還需要限制一下并發數,這里要修改一下 dirents 函數來使用計數信號量進行限制,防止同時打開太多的文件:
// 用于限制目錄并發數的計數信號量
var sema = make(chan struct{}, 20)
// dirents 返回 dir 目錄中的條目
func dirents(dir string) []os.FileInfo {
sema <- struct{}{} // 獲取令牌
defer func() { <-sema }() // 釋放令牌
entries, err := ioutil.ReadDir(dir) // 這個打開文件的操作需要限制并發,在這句之前加上計數信號量,非常合適
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}
現在這個版本的是最好的了。不過下面還會再增加一個取消的操作,這里的取消會用到廣播的機制。
一個 goroutine 無法直接終止另一個,因為這樣會讓所有的共享變量狀態處于不確定狀態。正確的做法是使用通道來傳遞一個信號,當 goroutine 接收到信號時,就終止自己。這里要討論的是如何同時取消多個 goroutine。
一個可選的做法是,給通道發送你要取消的 goroutine 同樣多的信號。但是如果一些 goroutine 已經自己終止了,這樣計數就多了,就會在發送過程中卡住。如果某些 goroutine 還會自我繁殖,那么信號的數量又會太少。通常,任何時刻都很難知道有多少個 goroutine 正在工作。對于取消操作,這里需要一個可靠的機制在一個通道上廣播一個事件,這樣所有的 goroutine 就都能收到信號,而不用關心具體有多少個 goroutine。
當一個通道關閉且已經取完所有發送的值后,接下來的接收操作都會立刻返回,得到零值。就可以利用這個特性來創建一個廣播機制。第一步,創建一個取消通道,在它上面不發送任何的值,但是它的關閉表明程序需要停止它正在做的事情。
這節,先講解取消廣播的實現。然后把這個功能加到上面的例子中去。
還要定義一個工具函數 cancelled,在它被調用的時候檢測或輪詢取消狀態:
var done = make(chan struct{})
func cancelled() bool {
select {
case <-done:
return true
default:
return false
}
}
接下來,創建一個讀取標準輸入的 goroutine,它通常連接到終端,當用戶按回車后,這個 goroutine 通過關閉 done 通道來廣播取消事件:
// 當檢測到輸入時,廣播取消
go func() {
os.Stdin.Read(make([]byte, 1)) // 讀一個字節
close(done)
}()
現在要讓所有的 goroutine 來響應這個取消操作。在主 goroutine 中的 select 中,嘗試從 done 接收。如果接收到了,就需要進行取消操作,但是在結束之前,它必須耗盡 fileSizes 通道,丟棄它所有的值,知道通道關閉。這么做是為了保證所有的 walkDir 調用可以執行完,不會卡在向 fileSizes 通道發送消息上:
for {
select {
case <-done:
// 耗盡 fileSizes,讓已經創建的 goroutine 結束
for range fileSizes {
// 什么也不做
}
return
case siez, ok := <-fileSizes:
if !ok {
break loop
}
nfiles++
nbytes += siez
case <-tick:
printDiskUsage(nfiles, nbytes)
}
}
walkDir 的 goroutine 在開始的時候輪詢取消狀態。如果是取消的狀態,就什么都不做立即返回。這樣在取消后創建的 goroutine 就會什么都不做而是立刻返回:
func walkDir(dir string, n *sync.WaitGroup, fileSizes chan<- int64) {
defer n.Done()
if cancelled() {
return
}
for _, entry := range dirents(dir) {
if entry.IsDir() {
subdir := filepath.Join(dir, entry.Name())
walkDir(subdir, n, fileSizes)
} else {
fileSizes <- entry.Size()
}
}
}
現在基本就避免了在取消后創建新的 goroutine。但是其他已經創建的 goroutine 則會等待他們執行完畢。要想更快的響應,就需要對程序邏輯進行侵入式的修改。要確保在取消事件之后沒有更多昂貴的操作發生。這就需要更新更多的代碼,但是通常可以通過在少量重要的地方檢察取消狀態來達到目的。在 dirents 中獲取信號量令牌的操作也可需要快速結束:
func dirents(dir string) []os.FileInfo {
select {
case sema <- struct{}{}: // 獲取令牌
case <-done:
return nil // 取消
}
defer func() { <-sema }() // 釋放令牌
entries, err := ioutil.ReadDir(dir)
if err != nil {
fmt.Fprintf(os.Stderr, "du1: %v\n", err)
return nil
}
return entries
}
現在,當取消事件發生時,已經進入 dirents 函數的調用,如果已經獲取到了令牌,則會執行完畢,但是返回后,在地遞歸調用 walkDir 的時候就會快速退出。那些還沒獲取令牌的調用,此時在 select 中會因為從 done 通道中接收到取消的廣播而直接返回 nil。
期望的情況是,當然是當取消事件到來時 main 函數可以返回,然后程序隨之退出。如果發現在取消事件到來的時候 main 函數沒有返回,可以執行一個 panic 調用。從崩潰的轉存儲信息中通常含有足夠的信息來幫助我們分析,發現哪些 goroutine 還沒有合適的取消。也可能是已經取消了,但是需要的時間比較長。總之,使用 panic 可以幫助查找原因。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。