您好,登錄后才能下訂單哦!
這篇文章主要介紹“Go語言中的并發是什么”,在日常操作中,相信很多人在Go語言中的并發是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Go語言中的并發是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
Go語言直接支持內置支持并發。當一個函數創建為goroutine時,Go會將其視為一個獨立的工作單元。這個單元會被調度到可用的邏輯處理器上執行。
Go語言運行時的調度器是一個復雜的軟件,這個調度器在操作系統之上。操作系統的線程與語言運行時的邏輯處理器綁定,并在邏輯處理器上運行goroutine。
Go語言的并發同步邏輯來自一個叫做通信順訊進程(CSP)的范型。CSP是一種消息傳遞模型,通過在goroutine之間傳遞數據來傳遞消息,而不是通過對數據進行加鎖來實現同步訪問。這種數據的類型叫做通道(channel) 。
在操作系統中,一個應用程序就可以看作一個進程,而每個進程至少包含一個線程。每個進程的初始線程被稱為主線程。
操作系統會在物理處理器(CPU)上調度線程來運行,而Go語言會在邏輯處理器來調度goroutine來運行。1.5版本之上,Go語言的運行時默認會為每個可用的物理處理器分配一個邏輯處理器。1.5之前,默認給整個應用程序只分配一個邏輯處理器。
如下圖,在運行時把goroutine調度到邏輯處理器上運行,邏輯處理器綁定到唯一的操作系統線程。
當goroutine執行了一個阻塞的系統調用(就是一個非純CPU的任務)時,調度器會將這個線程與處理器分離,并創建一個新線程來運行這個處理器上提供的服務。
語言運行默認限制每個程序最多創建10000個線程。
注意并發≠并行!并行需要至少2個邏輯處理器。
以并發的形式分別顯示大寫和小寫的英文字母
package main
import (
"fmt"
"runtime"
"sync"
func main() {
// 分配一個邏輯處理器給調度器使用
runtime.GOMAXPROCS(1)
// wg用來等待程序完成
var wg sync.WaitGroup
// 計數器加2,表示要等待兩個goroutine
wg.Add(2)
fmt.Println("Start!")
// 聲明一個匿名函數,并創建一個goroutime
go func() {
// 通知main函數工作已經完成
defer wg.Done()
21: // 顯示字母表3次
22: for count:=0; count<3;count++ {
23: for char:='a';char<'a'+26;char++ {
24: fmt.Printf("%c ", char)
25: }
26: }
27: }()
28: // 同上
29: go func() {
30: // 通知main函數工作已經完成
31: defer wg.Done()
32: // 顯示字母表3次
33: for count:=0; count<3;count++ {
34: for char:='A';char<'A'+26;char++ {
35: fmt.Printf("%c ", char)
36: }
37: }
38: }()
39: // 等待goroutine結束
40: fmt.Println("Waiting!")
41: wg.Wait()
42: fmt.Println("\nFinish!")
43: }
運行結果后,可以看到先輸出的是所有的大寫字母,最后才是小寫字母。是因為第一個goroutine完成所有顯示需要花時間太短了,以至于在調度器切換到第二個goroutine之前,就完成了所有任務。
調度器為了防止某個goroutine長時間占用邏輯處理器,會停止當前正運行的goroutine,運行其他可運行的goroutine運行的機會。
創建兩個相同的長時間才能完成其工作的goroutine就可以看到,比如說顯示5000以內的素數值。
代碼結構如下
1: go printPrime("A")
2: go printPrime("B")
3:
4: func printPrime(prefix string) {
5: ...
6: }
結果類似
1: B:2
2: B:3
3: ...
4: B:4591
5: A:3
6: A:5
7: ...
8: A:4561
9: A:4567
10: B:4603
11: B:4621
12: ...
13: // Completed B
14: A:4457
15: ...
16: // Completed A
如何修改邏輯處理器的數量
1: runtime.GOMAXPROCS(runtime.NUMCPU())
稍微改動下上面的代碼,結果就會大不同
1: package main
2:
3: import (
4: "fmt"
5: "runtime"
6: "sync"
7:
8:
9: func main() {
10: // 分配兩個邏輯處理器給調度器使用
11: runtime.GOMAXPROCS(2)
12: // wg用來等待程序完成
13: var wg sync.WaitGroup
14: // 計數器加2,表示要等待兩個goroutine
15: wg.Add(2)
16: fmt.Println("Start!")
17: // 聲明一個匿名函數,并創建一個goroutime
18: go func() {
19: // 通知main函數工作已經完成
20: defer wg.Done()
21: // 顯示字母表3次
22: for count:=0; count<10;count++ {
23: for char:='a';char<'a'+26;char++ {
24: fmt.Printf("%c ", char)
25: }
26: }
27: }()
28: // 同上
29: go func() {
30: // 通知main函數工作已經完成
31: defer wg.Done()
32: // 顯示字母表3次
33: for count:=0; count<10;count++ {
34: for char:='A';char<'A'+26;char++ {
35: fmt.Printf("%c ", char)
36: }
37: }
38: }()
39: // 等待goroutine結束
40: fmt.Println("Waiting!")
41: wg.Wait()
42: fmt.Println("\nFinish!")
43: }
結果類似下面的(根據CPU單核的性能結果可能結果稍微不一樣)
1: Start!
2: Waiting!
3: a b c d e f g h i j k l m n o A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g
4: h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z
5: a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s
6: t u v w x y z M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X
7: Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q
8: R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
9: Finish!
可以發現,goroutine是并行運行的。
只有在有多個邏輯處理器且可以同時讓每個goroutine運行在一個可用的物理處理器上的時候,goroutine才會并行運行。
如果兩個或者多個goroutine在沒有互相同步的情況下,訪問某個共享的資源,并且試圖同時讀和寫這個資源,就處于相互競爭的狀態。
在競爭狀態,每個goroutine都會覆蓋另一個goroutine的工作。這種覆蓋發生在goroutine發生切換的時候。
每個goroutien都會創造自己的共享變量副本。當切換到領另一個goroutine時,如果這個變量的值在上一個goroutine發生改變,這個goroutine再次運行時,雖然變量的值改變了,但是由于這個goroutine沒有更新自己的那個副本的值,而是繼續使用,并且將其存回變量的值,從而覆蓋上一個goroutine 的工作。
go build –race用來競爭檢測器標志來編譯程序
原子函數能夠以底層的枷鎖機制來同步訪問整型變量和指針。省略部分代碼如下:
1: var counter int64
2: go incCounter(1)
3: go incCounter(2)
4: func incCounter(id int) {
5: for count:=0;count<2;count++{
6: //安全地對counter加1
7: atomic.AddInt64(&counter, 1)
8: //當前goroutine從線程退出,并放回隊列
9: runtime.Gosched()
10: }
11: }
使用atmoi包的AddInt64函數。這些goroutine都會自動根據所引用的變量做同步處理。
另外兩個原子函數是LoadInt64和StoreInt64。用法如下:
1: // shutdown是通知正在執行的goroutine停止工作的標志
2: var shutdown int64
3: var wg sync.WaitGroup
4: // 該停止工作了,安全地設置shutdown標志
5: atomic.StoreInt64(&shutdown, 1)
6: // 等待goroutine結束
7: wg.Wait()
8: // 檢測是否停止工作,如果shutdown==1那么goroutine就會終止
9: if atomic.LoadInt64(&shutdown) == 1 {
10: break
11: }
12:
另一種同步訪問共享資源的方式是互斥鎖。主要代碼如下:
1: var (
2: // counter是所有goroutine都要增加其值的變量
3: counter int
4: wg sync.WaitGroup
5: // mutex用來定義一段代碼臨界區
6: mutex sync.Mutex
7: )
8: func main...
9: // 業務代碼
10: func incCounter(id int) {
11: defer wg.Done()
12: for i:=0;i<2;i++ {
13: //同一時期只允許一個goroutine進入
14: mutex.Lock()
15: //大括號并不是必須的
16: {
17: //捕獲counter的值
18: value := counter
19: //當前goroutine從線程退出,并返回到隊列
20: runtime.Gosched()
21: //增加本地value變量的值
22: value++
23: //將該值保存回counter
24: counter = value
25: }
26: // 釋放鎖,允許其他正在等待的goroutine
27: mutex.Unlock()
28: }
29: }
通道在goroutine之間架起了一個管道,并提供了確保同步交換數據的機制。聲明通道時,需要指定將要被共享的數據的類型。
可以通過通道共享內置類型,命名類型,結構類型和引用類型的值或者指針。
go語言需要使用make來創建一個通道,chan是關鍵字:
1: // 無緩沖的整型通道
2: unbuffered := make(chan int)
3: // 有緩沖的字符串通道
4: buffered := make(chan string, 10)
向通道發送值
1: buffered := make(chan string, 10)
2: // 通通道發送一個字符串
3: buffered <- "Gopher"
4: // 從通道接收一個字符串
5: value := <-buffered
無緩沖的通道是指在接收前沒有能力保存任何值的通道。發送goroutine和接收goroutine同時準備好,才能完成發送和接收操作。如果沒有準備好,通道會導致goroutine阻塞等待。所以無緩沖通道保證了goroutine之間同一時間進行數據交換。
1: // 四個goroutine間的接力比賽
2: package main
3:
4: import (
5: "fmt"
6: "sync"
7: "time"
8: )
9
10: var wg sync.WaitGroup
11:
12: func main() {
13: //創建一個無緩沖的通道
14: baton := make(chan int)
15: wg.Add(1)
16: // 第一步跑步者持有接力棒
17: go Runner(baton)
18: // 開始比賽
19: baon <- 1
20: // 等待比賽結束
21: wg.Wait()
22: }
23:
24: // Ruuner模擬接力比賽中的一位跑步者
25: func Runner(baton chan int) {
26: var newRunner int
27: // 等待接力棒
28: runner := <-baton
29: // 開始跑步
30: fmt.Printf("運動員%d帶著Baton跑\n", runner)
31: // 創建下一步跑步者
32: if runner != 4{
33: newRunner = runner + 1
34: fmt.Printf("運動員%d上線\n", newRunner)
35: go Runner(baton)
36: }
37: // 圍繞跑到跑
38: time.Sleep(100 * time.Millisecond)
39: // 比賽結束了嗎?
40: if runner == 4{
41: fmt.Printf("運動員%d完成,比賽結束\n", runner)
42: wg.Done()
43: return
44: }
45: // 將接力棒交給下一位跑步者
46: fmt.Printf("運動員%d與運動員%d交換\n", runner, newRunner)
47: baton <- newRunner
48: }
結果:
1: 運動員1帶著Baton跑
2: 運動員2上線
3: 運動員1與運動員2交換
4: 運動員2帶著Baton跑
5: 運動員3上線
6: 運動員2與運動員3交換
7: 運動員3帶著Baton跑
8: 運動員4上線
9: 運動員3與運動員4交換
10: 運動員4帶著Baton跑
11: 運動員4完成,比賽結束
有緩沖的通道則能在接收前能存儲一個或者多個值的通道。這種類型的通道并不強制要求goroutine之間必須同時完成發送和接收。只有在通道沒有可用緩沖區或者沒有要接收的值時,發送或者接收才會阻塞。
1: package main
2:
3: import (
4: "fmt"
5: "math/rand"
6: "sync"
7: "time"
8: )
9:
10: const (
11: // goroutine的數量
12: numberGoroutines = 4
13: // 工作的數量
14: taskLoad = 10
15: )
16:
17: var wg sync.WaitGroup
18:
19: // 初始化隨機數種子
20: func init() {
21: rand.Seed(time.Now().Unix())
22: }
23: func main() {
24: // 創建一個有緩沖的通道來管理工作
25: tasks := make(chan string, taskLoad)
26: wg.Add(numberGoroutines)
27: // 增加一組要完成的工作
28: for post:=1;post<taskLoad;post++ {
29: tasks <- fmt.Sprintf("Task:%d", post)
30: }
31: // 啟動goroutine來處理工作
32: for i:=1;i<numberGoroutines+1;i++ {
33: go worker(tasks, i)
34: }
35: // 有工作處理完時關閉通道
36: closetasks)
37:
38: wg.Wait()
39: fmt.Printf("all finished!")
40:
41: }
42:
43: func worker(tasks chan string, worker_id int) {
44: defer wg.Done()
45:
46: for {
47: //等待分配工作
48: task, ok := <-tasks
49: if !ok {
50: //通道變空
51: fmt.Printf("Worker%d shut down\n", worker_id)
52: return
53: }
54: // 開始工作
55: fmt.Printf("Worker%d start %s\n", worker_id, task)
56:
57: // 隨機等待一段時間
58: sleep := rand.Int63n(100)
59: time.Sleep(time.Duration(sleep)*time.Millisecond)
60: // 顯示完成了工作
61: fmt.Printf("Worker%d Completed %s\n", worker_id, task)
62: }
63:
輸出結果:
1: Worker4 start Task:1
2: Worker1 start Task:2
3: Worker2 start Task:3
4: Worker3 start Task:4
5: Worker3 Completed Task:4
6: Worker3 start Task:5
7: Worker4 Completed Task:1
8: Worker4 start Task:6
9: Worker2 Completed Task:3
10: Worker2 start Task:7
11: Worker3 Completed Task:5
12: Worker3 start Task:8
13: Worker Completed Task:7
14: Worker2 start Task:9
15: Worker3 Completed Task:8
16: Worker3 shut down
17: Worker4 Completed Task:6
18: Worker4 shut down
19: Worker1 Completed Task:2
20: Worker1 shut down
21: Worker2 Completed Task:9
22: Worker2 shut down
23: all inished!
由于程序和Go語言的調度器有隨機的成分,結果每次都會不一樣。不過總流程不會大變。
當通道關閉后,goroutine依舊從通道里的緩沖區獲取數據,但是不能再向通道里發送數據。從一個已經關閉且沒有數據的通道里獲取數據,總會立刻返回,兵返回一個通道類型的零值。
到此,關于“Go語言中的并發是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。