您好,登錄后才能下訂單哦!
并發:同一時間段內執行多個任務。
并行:同一時刻執行多個任務。
Go語言的并發通過goroutine實現。goroutine類似于線程,屬于用戶態的線程,我們可以根據需要創建成千上萬個goroutine并發工作。goroutine是由go語言的運行調度完成的,而線程是由操作系統調度完成的。
Go語言還提供了channel在多個goroutine間進行通信。goroutine和channel是go語言秉承的CSP(Communicating Sequential Process)并發模式的重要實現基礎。
在Java/c++中我們要實現并發編程的時候,我們通常要自己維護一個線程池,并且需要自己去包裝一個又一個的任務和然后自己去調度線程執行任務并維護上線文的切換,這一切通常會耗費程序員的大量心智。能不能有一種機制,程序員只需要定義很多個任務,讓系統去幫忙我們把這些任務分配到CPU上實現并發執行呢? Go語言中的goroutine就是這樣一種機制,Go語言之所以能被稱為現代化的編程語言,就是因為它在語言層面已經內置了調度和上下文切換的機制。
Go程序中使用go關鍵字為一個函數創建一個goroutine。一個函數可以被創建多個goroutine,一個goroutine必定對應一個函數。
啟動goroutine的方式非常簡單,只需要在調用的函數(普通函數和匿名函數)前面加一個go 關鍵字。
舉個例子:
package main
import (
"fmt"
)
func hello() {
fmt.Println("hello goroutine")
}
func main() {
hello()
fmt.Println("main goroutine done!")
}
這個示例中hello函數和下面的語句是串行的,執行的結果是打印完hello goroutine 后打印main goroutine done!
接下來我們在調用hello 函數前面加上go關鍵字,也就是啟動一個goroutine區執行hello這個函數。
func main() {
go hello()
fmt.Println("main goroutine done!")
}
這一次的執行結果只打印了 main goroutine done! , 并沒有打印 hello goroutine,為什么呢?
在程序啟動時,Go程序就會為main()函數創建一個默認的goroutine。當main函數返回的時候該goroutine就結束了,所有在main()函數中啟動的goroutine 會一同結束,main函數所在的goroutine就像是權利的游戲中的夜王,其他的goroutine就像是異鬼,夜王一死它轉化的那些異鬼也就全部GG了。
所以我們要想辦法讓main函數等一等hello函數,最簡單粗暴的方式就是sleep了。
func main(){
go hello()
fmt.Println("main goroutine done!")
time.Sleep(time.Second)
}
執行上面的代碼你會發現,這一次先打印main goroutine done! ,然后緊接著打印Hello Goroutine!
首先為什么會打印main goroutine done! 是因為我們在創建新的goroutine 的時候需要花費一些時間,而此時main函數所在的goroutine是繼續執行的。
在代碼中生硬的使用time.sleep肯定是不合適的,Go語言中可以使用sync.WaitGroup來實現并發任務的同步。sync.WaitGroup有一下幾個方法:
方法名 功能
(wg WaitGroup)Add(delta int) 計數器+delta
(wg WaitGroup)Done() 計算器-1
(wg *WaitGroup)Wait() 阻塞直到計數器變為0
sync.WaitGroup內部維護著一個計數器,計數器的值可以增加和減少。例如當我們啟動了N個并發任務時,就將計數器的值增加N, 每個任務完成時通過調用Done()方法將計數器減1,通過調用Wait()來等待并發任務執行完,當計數器值為0時,表示所有并發任務已經完成。
我們利用sync.WaitGroup將上面的代碼優化一下:
var wg sync.WaitGroup
func hello() {
defer wg.Done()
fmt.Println("hello 1")
}
func main() {
wg.Add(1)
go hello()
fmt.Println("main 2")
wg.Wait()
}
需要注意的是sync.WaitGroup是一個結構體,傳遞的時候需要傳遞指針。
在go語言中實現并發就是這么簡單,我們還可以啟動多個goroutine。讓我們再來一個例子:
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done()
fmt.Println("hello ,", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1)
go hello(i)
}
wg.Wait()
}
多次執行上面的代碼,會發現每次打印的數字的順序都不一樣,這是因為10個goroutine是并發執行的,而goroutine的調度室隨機的。
OS線程一般都有固定的棧內存(通常為2MB),一個goroutine的棧在其生命周期開始時只有很小的棧(典型情況下2KB),goroutine的棧不是固定的,他可以按需增大和縮小,goroutine的棧大小限制可以達到1GB,雖然極少會用到這么大。所以在go語言中一次創建十萬左右的goroutine也是可以的。
os線程是由os內核來調度的,goroutine則是由go運行時自己的調度器來調度的,這個調度器使用一個稱為m:n調度的技術(復用/調度m個goroutine到n個OS線程上)。goroutine的調度不需要切換內核語境,所以調度一個goroutine比調度一個線程成本低很多。
go運行時的調度器使用GOMAXPROCS 參數來確定需要使用多少個OS線程來同時執行GO代碼,默認值是機器上的CPU的核心數,例如在一個8核的機器上,調度器會把go代碼同時調度到8個OS線程上。go語言中可以通過runtime.GOMAXPROCS()函數設置當前程序并發時占用的CPU邏輯核心數。
go1.5版本之前,默認使用的是單核心執行,Go1.5版本之后,默認使用全部的CPU邏輯核心數。
GO語言中的操作系統線程和goroutine的關系:
1.一個操作系統線程對應用戶態多個goroutine。
2.go程序可以同時多個操作系統線程。
3.goroutine 和OS線程是多對多的關系,即m:n
單純的將函數并發執行是沒有意義的,函數與函數間需要交換數據才能體現并發執行函數的意義。
雖然可以使用共享內存進行數據交換,但是共享內存存在不同的goroutine中容易發生的競態問題。為了保證數據交換的正確性,必須使用互斥量對內存進行加鎖,這種做法必然導致性能問題。
go語言的并發模型是CSP,提倡通過通信共享內存,而不是通過共享內存實現通信。
如果說goroutine是Go程序并發的執行體,channel就是它們之間的連接,channel是可以讓一個goroutine發送特定值到另一個goroutine的通信機制。
go語言中的通道channel是一種特殊的類型,通道像一個傳送帶或者隊列,總是遵循先入先出的規則,保證收發數據的順序,每個通道都是一個具體類型的導管,也就是聲明channel的時候需要為其指定元素類型。
聲明通道類型的格式如下:
var 變量 chan 元素類型
舉幾個例子:
var ch2 chan int
var ch3 chan bool
var ch4 chan []int
通道是引用類型,通道類型的空值是nil。
var ch chan int
fmt.Println(ch) //<nil>
聲明的通道后需要使用make函數初始化之后才能使用。創建channel的格式如下:
make(chan 元素類型, [緩沖大小])
緩沖大小是可選的。
舉幾個例子:
cha4 := make(chan int)
cha5 := make(chan bool)
cha6 := make(chan []int)
通道有發送(send)、接收(receive)和關閉(close)三種操作。發送和接收都使用<- 符號。
現在我們先使用以下語句定義一個通道:
ch := make(chan int)
將一個值發送到通道中。
ch <- 10 //把10發送到ch中
從一個通道中接收值。
x := <- ch //從ch中接收值并賦值給變量x
<- ch // 從ch中接收值,忽略結果
我們通過調用內置的close函數來關閉通道。
close(ch)
關于關閉通道需要注意的事情是,只有在通知接收方goroutine所有的數據都發送完畢的時候才需要關閉通道。通道是可以被垃圾回收機制回收的,他和關閉文件不一樣,在結束操作之后關閉文件是必須做的,但是關閉通道不是必須的。
關閉后的通道有以下特點:
無緩沖的通道又稱為阻塞的通道。我們來看一下下面的代碼:
func main(){
ch := make(chan int)
ch <- 10
fmt.Println("發送成功了")
}
上面的代碼能夠通過編譯,但是執行的時候會出現以下錯誤:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan send]:
main.main()
/Users/huoshuaibing/gowork/src/github.com/studygolang/day12/main.go:9 +0x54
exit status 2
為什么會出現deadlock錯誤呢?
因為我們使用ch := make(chan int)創建的是無緩沖的通道,無緩沖的通道只有在有人接收值的時候才能發送值。就像你住的小區沒有代收點和快遞柜,快遞員給你打電話必須把這個物品送到你的手中,簡單來說就是無緩沖的通道必須有接收才能發送。
上面的代碼會阻塞在 ch <- 10 這一行代碼形成死鎖,那么如何解決這個問題呢?
一種方法是啟用一個goroutine去接收值,例如:
func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
ch := make(chan int)
go recv(ch)
ch <- 10
fmt.Println("發送成功")
}
無緩沖的通道上的發送操作會阻塞,直到另一個goroutine在該通道上執行接收操作,這時值才能發送成功,兩個goroutine將繼續執行;相反,如果接收操作先執行,接收方的goroutine將阻塞,直到另一個goroiutine在該通道上發送一個值。
使用無緩沖的通道進行通信將導致發送和接收的goroutine同步化,因此,無緩沖的通道也被稱為同步通道。
解決上面的問題的方法還有一種就是使用有緩沖區的通道。我們可以使用make函數初始通道的時候為其指定通道的容量,例如:
func main() {
ch := make(chan int, 1)
ch <- 10
fmt.Println("發送成功")
}
只要通道的容量大于零,那么該通道就是有緩沖的通道,通道的容量表示通道中能存放元素的數量。就像你小區的快遞柜只有那么多個格子,格子滿了就裝不下了,就阻塞了,等到別人取走一個快遞員才能往里面放一個。
我們可以使用內置的len函數獲取通道內元素的數量,使用cap函數獲取通道的容量。
當通過通道發送有限的數據時,我們可以通過close函數關閉通道來告知從該通道接收值的goroutine停止等待。當通道關閉時,往該通道發送值會引發panic,從該通道里接收的值一直都是類型零值。那如何判斷一個通道是否被關閉了呢?
我們來看下面的例子:
func main() {
ch2 := make(chan int)
ch3 := make(chan int)
go func() {
for i := 0; i < 100; i++ {
ch2 <- i
}
close(ch2)
}()
go func() {
for {
i, ok := <-ch2
if !ok {
break
}
ch3 <- i * i
}
close(ch3)
}()
for i := range ch3 {
fmt.Println(i)
}
}
從上面的例子中我們看到兩種方式在接收值得時候判斷通道是否被關閉,我們通常使用的是for range 的方式。
有時候我們會將通道作為參數在多個任務函數間傳遞,很多時候我們在不同的任務函數中使用通道都會對其進行限制,比如只能發送或接收。Go語言中提供了單向通道來處理這種情況。例如,我們把上面的例子改造如下:
func counter(out chan<- int) {
for i := 0; i < 100; i++ {
out <- i
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}
func printer(in <-chan int) {
for i := range in {
fmt.Println(i)
}
}
func main() {
ch2 := make(chan int)
ch3 := make(chan int)
go counter(ch2)
go squarer(ch3, ch2)
printer(ch3)
}
其中,chan<- int 是一個只能發送的通道,可以發送但是不能接收; <-chan int 是一個只能接收的通道,可以接收但是不能發送。在函數傳參及任何賦值操作中將雙向通道轉換為單向通道是可以的,但是反過來不可以的。
在某些場景下我們需要同時從多個通道接收數據。通道在接收數據時,如果沒有數據可以接收將會發生阻塞。你也許會寫出如下代碼使用遍歷的方式來實現:
for {
data,ok := <- ch2
data,ok := <- ch3
...
}
這種方式雖然可以實現從多個通道接收值的需求,但是運行性能會差很多。為了應對這種場景,GO內置了select關鍵字,可以同時響應多個通道的操作。select的使用類似于switch語句,它有一些列case分支和一個默認分支,每個case會對應一個通道的通信過程。select會一直等待,直到某個case的通信操作完成,就會執行case分支對應的語句。具體格式如下:
select {
case <- ch2:
...
case <-ch3
...
default:
默認操作
}
舉個小例子來演示一下select 的使用:
package main
import (
"fmt"
)
func main() {
ch := make(chan int, 1)
for i := 0; i < 10; i++ {
select {
case x := <-ch:
fmt.Println(x)
case ch <- i:
}
}
}
使用select語句能提高代碼的可讀性。如果多個case同時滿足,那么select會隨機挑選一個。對于沒有case的select{}會一直等待。
有時候在Go代碼中可能會存在多個goroutine同時操作一個資源(臨界區),這種情況會發生競態問題。類比生活中的例子有十字路口被各個方向的汽車競爭;還有火車上的衛生間被車廂里面的人競爭。舉個例子:
var x int64
var wg sync.WaitGroup
func add() {
for i := 0; i < 100; i++ {
x = x + 1
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
上面的代碼我們開啟了兩個goroutine 去累加變量x的值,這兩個goroutine在訪問和修改x變量的時候就會存在數據競爭,導致最后的結果與期待的不符。
互斥鎖是一種常用的控制共享資源訪問的方法,他們能夠保證同時只有一個goroutine可以訪問共享資源。Go語言中使用sync包的Mutex來實現互斥鎖。使用互斥鎖來修復上面代碼的問題:
var x int64
var wg sync.WaitGroup
var lock sync.Mutex
func add() {
for i := 0; i < 100; i++ {
lock.Lock()
x = x + 1
lock.Unlock()
}
wg.Done()
}
func main() {
wg.Add(2)
go add()
go add()
wg.Wait()
fmt.Println(x)
}
使用互斥鎖能夠保證同一時間有且只有一個goroutine進入臨界區,其他的goroutine 則在等待鎖;當互斥鎖釋放后,等待goroutine才可以獲得鎖進入臨界區,多個goroutine同時等待一個鎖時,喚醒的策略是隨機的。
互斥鎖時完全互斥的,實際情況是很多情景下是讀多寫少的,當我們并發的去讀一個資源不涉及資源修改的時候是沒有必要加鎖的,這種場景下使用讀寫鎖是更好的一種選擇。讀寫鎖在Go語言中使用sync包的RWMutex 類型。
讀寫鎖分兩種:讀鎖和寫鎖。當一個goroutine獲取讀鎖之后,其他的goroutine如果是獲取讀鎖會繼續獲得鎖,如果獲取的是寫鎖就會等待,當一個goroutine獲取寫鎖之后,其他的goroutine無論是獲取讀鎖還是寫鎖都會等待。
讀寫鎖示例:
var (
x int64
wg sync.WaitGroup
lock sync.Mutex
rwlock sync.RWMutex
)
func write() {
// lock.Lock() // 加互斥鎖
rwlock.Lock() // 加寫鎖
x = x + 1
time.Sleep(10 * time.Millisecond) // 假設讀操作耗時10毫秒
rwlock.Unlock() // 解寫鎖
// lock.Unlock() // 解互斥鎖
wg.Done()
}
func read() {
// lock.Lock() // 加互斥鎖
rwlock.RLock() // 加讀鎖
time.Sleep(time.Millisecond) // 假設讀操作耗時1毫秒
rwlock.RUnlock() // 解讀鎖
// lock.Unlock() // 解互斥鎖
wg.Done()
}
func main() {
start := time.Now()
for i := 0; i < 10; i++ {
wg.Add(1)
go write()
}
for i := 0; i < 1000; i++ {
wg.Add(1)
go read()
}
wg.Wait()
end := time.Now()
fmt.Println(end.Sub(start))
}
說在前面:這是一個進階知識點。延遲一個開銷很大的初始化操作到真正用到它的時候再執行是一個很好的實踐。因為預先初始化一個變量(比如在init函數中完成初始化)會增加程序啟動延時,而且有可能實際執行過程中這個變量沒有用上,那么這個初始化操作就不是必須要的。我們來看一個例子:
sync.Once其實內部包含一個互斥鎖和一個布爾值,互斥鎖保證布爾值和數據的安全,而布爾值用來記錄初始化是否完成。這樣設計就能保證初始化操作的時候是并發安全的并且初始化操作也不會執行多次。
Go語言中內置的map不是并發安全的。請看下面的示例:
var m = make(map[string]int)
func get(key string) int {
return m[key]
}
func set(key string, value int) {
m[key] = value
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 20; i++ {
wg.Add(1)
go func(n int) {
key := strconv.Itoa(n)
set(key, n)
fmt.Println("k=:%v, v:=%v\n", key, get(key))
wg.Done()
}(i)
}
wg.Wait()
}
上面的代碼開啟少量幾個goroutine 的時候沒有問題,當并發對了之后,執行就會報fatal error: concurrent map writes錯誤。
像這種場景下就需要為map加鎖來保證并發的安全性,go語言的sync包中提供了一個開箱即用的并發安全版map-sync.Map。同時sync.Map 內置了諸如Store、Load、LoadOrStore、Delete、Range等操作方法。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。