您好,登錄后才能下訂單哦!
即使再仔細的檢查,仍然可能在并發上犯錯。Go 的 runtime 提供了動態分析工具:競態檢測器(race detectotr)。
在下一節的示例中會用到競態檢測器,所以在用之前,先了解一下這個工具。
簡單地把 -race 命令行參數加到 go build、go run、go test 命令里即可使用該功能。它會讓編譯器為你的應用或測試構建一個修改后的版本,這個版本有額外的手法可以高效記錄在執行時對共享變量的所有訪問,以及讀寫這些變量的 goroutine 標識。除此之外,還會記錄所有的同步事件、包括 go 語句、通道操作、鎖的調用等。(完整的同步事件集合可以在語言規范中的 “The Go Memory Model” 文檔中找到。)
競態檢測器會研究事件流,找到那些有問題的案例,即一個 goroutine 寫入一個變量后,中間沒有任何同步的操作,就有另外一個 goroutine 讀寫了該變量。這種情況表明有對共享變量的并發訪問,即數據競態。工具會輸出一份報告,包括變量的標識以及讀寫 goroutine 當時的調用棧。通常情況下這些信息足以定位問題了,下一章的示例會應用到實戰中。
競態檢測器報告所有實際運行了的數據競態。但只能檢測到那些在運行時發生的競態,無法用來保證肯定不發生競態。所以為了保證效果,需要全部測試包含了并發調用的場景。
由于存在額外的記錄工作,帶競態檢測功能的程序在執行時需要更長的時間和更多的內存,但即使對于生成環境的任務,這種額外開支也是可以接受的。對于那些偶發的競態條件,使用競態檢測器可以節省很多調試的時間。
創建一個并發非阻塞的緩存系統,它能解決函數記憶(memoizing)的問題,即緩存函數的結果,達到多次調用但只須計算一次結果。這個問題在并發實戰中很常見但已有的庫不能很好地解決這個問題。這里的解決方案將會是并發安全的,并且要避免簡單地對整個緩存使用單個鎖而帶來的鎖爭奪問題。
在做系統之前,先準備一個將要被測試的函數。這里將使用下面的 httpGetBody 函數作為示例來演示函數記憶。調用 HTTP 請求相當昂貴,所以我希望只在第一次請求的時候去發起請求,而之后都可以在緩存中找到結果直接返回:
func httpGetBody(url string) (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
先保證能緩存這個函數的執行結果,之后再使用更多個函數來測試和驗證功能。
這是一個并發不安全的版本,不過把基本功能先實現,并發安全的問題之后再進行優化:
// memo包提供了一個對類型 Func 并發不安全的函數記憶功能
package memo
// Memo 緩存了調用 Func 的結果
type Memo struct {
f Func
cache map[string]result
}
// Func 是用于記憶的函數類型
type Func func(key string) (interface{}, error)
type result struct {
value interface{}
err error
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]result)}
}
// 注意:并發不安全
func (memo *Memo) Get(key string) (interface{}, error) {
res, ok := memo.cache[key]
if !ok {
res.value, res.err = memo.f(key)
memo.cache[key] = res
}
return res.value, res.err
}
Memo 實例包含了被記憶的函數 f (類型為Func),以及緩存,類型為一個 key 為字符串,value 為 result 的 map。每個 result 都是調用 f 產生的結果:一個值和一個錯誤,在設計的推進過程中會展示 Memo 的幾種變體,但所有變體都會遵守這些基本概念。
串行測試
下面的例子展示了如何使用 Memo。下面是完整的測試源碼文件,包括上一小節寫的被測試的函數,以及一串 URL。每個 URL 會發起兩次請求。對于每個 URL,首先調用 Get,打印延時和返回的數據長度:
package memo
import (
"io/ioutil"
"log"
"net/http"
"sync"
"testing"
"time"
)
func httpGetBody(url string) (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
var urls = []string{
"http://docscn.studygolang.com/",
"https://studygolang.com/",
"https://studygolang.com/pkgdoc",
"https://github.com/adonovan/gopl.io/tree/master/ch9",
}
func TestSequential(t *testing.T) { // 串行
m := New(httpGetBody)
urls = append(urls, urls...) // 每個 URL 請求兩次
for _, url := range urls {
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
}
t.Logf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
}
}
func TestConcurrent(t *testing.T) { // 并行
m := New(httpGetBody)
var n sync.WaitGroup
urls = append(urls, urls...) // 每個 URL 請求兩次
n.Add(len(urls))
for _, url := range urls {
go func(url string) {
defer n.Done()
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
}
t.Logf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
}(url)
}
n.Wait()
}
這里使用 testing 包系統的測試效果。上面有兩個測試函數,先只用 TestSequential 進行測試,串行的發起請求。從下面的測試結果看,每一個 URL 第一次調用都會消耗一定的時間,但對 URL 第二次的請求會立刻返回結果:
PS H:\Go\src\gopl\output\memo\memo1> go test -run=TestSequential -v
=== RUN TestSequential
http://docscn.studygolang.com/, 87.1978ms, 6612 bytes
https://studygolang.com/, 203.3312ms, 81819 bytes
https://studygolang.com/pkgdoc, 33.0053ms, 1261 bytes
https://github.com/adonovan/gopl.io/tree/master/ch9, 1.4428937s, 61185 bytes
http://docscn.studygolang.com/, 0s, 6612 bytes
https://studygolang.com/, 0s, 81819 bytes
https://studygolang.com/pkgdoc, 0s, 1261 bytes
https://github.com/adonovan/gopl.io/tree/master/ch9, 0s, 61185 bytes
--- PASS: TestSequential (1.81s)
PASS
ok gopl/output/memo/memo1 2.063s
PS H:\Go\src\gopl\output\memo\memo1>
默認在測試成功的時候不打印這類日志,不過可以加上 -v 參數在成功時也打印測試日志。
并行測試
這次測試中所有的 Get 都是串行的。因為 HTTP 請求通過并發來改善的空間很大,所以這次使用 TestConcurrent 進行測試,讓所有的請求并發進行。這個測試要使用 sync.WaitGroup 等待所有的請求完成后再返回結果。
這次的測試結果基本上都是緩存無效的情況,不過偶爾還會出現無法正常運行的情況。除了緩存無效,可能還會有緩存命中后返回錯誤結果,甚至崩潰:
PS H:\Go\src\gopl\output\memo\memo1> go test -run=TestConcurrent -v
=== RUN TestConcurrent
http://docscn.studygolang.com/, 92.9972ms, 6612 bytes
http://docscn.studygolang.com/, 98.9889ms, 6612 bytes
https://studygolang.com/pkgdoc, 204.8383ms, 1261 bytes
https://studygolang.com/pkgdoc, 205.8387ms, 1261 bytes
https://studygolang.com/, 234.1566ms, 81819 bytes
https://studygolang.com/, 235.1749ms, 81819 bytes
https://github.com/adonovan/gopl.io/tree/master/ch9, 1.5041445s, 61184 bytes
https://github.com/adonovan/gopl.io/tree/master/ch9, 2.1051443s, 61184 bytes
--- PASS: TestConcurrent (2.11s)
PASS
ok gopl/output/memo/memo1 2.346s
PS H:\Go\src\gopl\output\memo\memo1>
加上競態檢測器進行并行測試
更糟糕的是,多數時候這樣都能正常運行,所以甚至很難注意到這樣并發調用是有問題的。但是如果加上 -race 標志后再運行,那么競態檢測器就會輸出如下的報告:
PS H:\Go\src\gopl\output\memo\memo1> go test -run=TestConcurrent -v -race
=== RUN TestConcurrent
==================
WARNING: DATA RACE
Write at 0x00c000062cf0 by goroutine 11:
runtime.mapassign_faststr()
D:/Go/src/runtime/map_faststr.go:190 +0x0
gopl/output/memo/memo1.(*Memo).Get()
H:/Go/src/gopl/output/memo/memo1/memo.go:27 +0x1d8
gopl/output/memo/memo1.TestConcurrent.func1()
H:/Go/src/gopl/output/memo/memo1/memo_test.go:57 +0xc0
Previous write at 0x00c000062cf0 by goroutine 7:
runtime.mapassign_faststr()
D:/Go/src/runtime/map_faststr.go:190 +0x0
gopl/output/memo/memo1.(*Memo).Get()
H:/Go/src/gopl/output/memo/memo1/memo.go:27 +0x1d8
gopl/output/memo/memo1.TestConcurrent.func1()
H:/Go/src/gopl/output/memo/memo1/memo_test.go:57 +0xc0
...
FAIL gopl/output/memo/memo1 2.883s
這里就是因為兩個 goroutine 在沒使用同步的情況下更新了 Memo.cache 這個 map。因為整個 Get 并不是并發安全的,它存在數據競態:
// 注意:并發不安全
func (memo *Memo) Get(key string) (interface{}, error) {
res, ok := memo.cache[key]
if !ok {
res.value, res.err = memo.f(key)
memo.cache[key] = res
}
return res.value, res.err
}
所以,接下來就是要改進,實現并發安全。
讓緩存并發安全最簡單的方法就是用一個基于監控的同步機制。需要給 Memo 加一個互斥量,并在 Get 開始就獲取互斥鎖,在返回前釋放互斥鎖,這樣就可以讓 cache 相關的操作發生在臨界區域內了:
// Memo 緩存了調用 Func 的結果
type Memo struct {
f Func
mu sync.Mutex // 保護 cache
cache map[string]result
}
// Get 是并發安全的
func (memo *Memo) Get(key string) (interface{}, error) {
memo.mu.Lock()
res, ok := memo.cache[key]
if !ok {
res.value, res.err = memo.f(key)
memo.cache[key] = res
}
memo.mu.Unlock()
return res.value, res.err
}
加上鎖之后,再運行并發測試函數,競態檢測器不報警了。但是這次的修改后,之前對性能的優化就失效了。由于每次調用 Memo.f 時都上鎖,所以現在的 Get 方法運行的使用實際又是串行的了。這里需要一個非阻塞的緩存,一個不會把他需要記憶的函數串行運行的緩存。
調用 Get 是不需要鎖保護的。調用 Get 的判斷依據是之前的獲取 map 的 key,這個操作需要加鎖。調用 Get 返回后,需要把返回結果更新到 map 中去,這個操作也需要加鎖。在 map 查詢結束后,先釋放鎖。不加鎖的情況下調用 Get。等到結果返回需要更新 map 的時候,再加鎖更新 map。具體修改如下:
func (memo *Memo) Get(key string) (interface{}, error) {
memo.mu.Lock()
res, ok := memo.cache[key]
memo.mu.Unlock()
if !ok {
res.value, res.err = memo.f(key)
memo.mu.Lock()
memo.cache[key] = res
memo.mu.Unlock()
}
return res.value, res.err
}
現在,可以安全的并行運行了,但是緩存又失效了。某些URL被獲取了兩次。修改一下測試源碼文件的被測試函數 httpGetBody,在開頭輸出一行日志,可以觀察到每個URL被調用的次數:
func httpGetBody(url string) (interface{}, error) {
log.Printf("httpGetBody: %s", url) // 輸出哪些 url 被函數調用了,從緩存獲取結果時不會有這個輸出
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
修改之后,可以用最初的串行版本再測試一下。那個版本是一定用到緩存的效果的。而現在的版本,在并發的情況下無法用上緩存。
在幾個 goroutine 幾乎同時調用的 Get 來獲取同一個 URL 時,每個 goroutine 都首先查詢緩存,發現緩存中沒有需要的數據,然后就都去執行 Get 來獲取結果,最后又都用獲得的結果來更新 map,其中一個結果會被另外一個覆蓋。
在理想的情況下,應該要避免這種額外的處理。這個功能有時稱為重復抑制(duplicate suppression)。
下面這個版本,map 的每個元素是一個指向 entry 結構的指針。除了與之前一樣包含一個已經記住的函數 f 調用結果之外,每個 entry 還新加一個通道 ready。在設置了 entry 和 result 字段后,通道會關閉,正在等待的 goroutine 會收到廣播,然后就可以從 entry 字段讀取結果了:
// memo包提供了一個對類型 Func 并發安全的函數記憶功能
// 并發、重復抑制、非阻塞的緩存
package memo
import "sync"
// Func 是用于記憶的函數類型
type Func func(key string) (interface{}, error)
type result struct {
value interface{}
err error
}
type entry struct {
res result
ready chan struct{} // res 準備好之后會被關閉
}
func New(f Func) *Memo {
return &Memo{f: f, cache: make(map[string]*entry)}
}
type Memo struct {
f Func
mu sync.Mutex // 保護 cache
cache map[string]*entry
}
// Get 是并發安全的
func (memo *Memo) Get(key string) (interface{}, error) {
memo.mu.Lock()
e := memo.cache[key]
if e == nil {
// 對 key 的第一次訪問,這個 goroutine 負責獲取數據和廣播數據準備好了的消息
e = &entry{ready: make(chan struct{})}
memo.cache[key] = e
memo.mu.Unlock()
e.res.value, e.res.err = memo.f(key)
close(e.ready) // 廣播數據已經準備好的消息
} else {
// 對這個 key 的重復訪問
memo.mu.Unlock()
<-e.ready // 等待數據準備完畢
}
return e.res.value, e.res.err
}
關于這里的 map 是否包含某個元素的判斷,之前都是返回兩個值,通過ok來判斷。之前的示例中,map的元素是結構體,由于結構體類型的零值不是nil,通過ok來判斷比較好。這里的元素類型是結構體指針,當然可以繼續使用ok來判斷。不過現在是指針類型了,零值是nil也不會和非零值的情況搞混,所以也可以直接通過nil來判斷。
現在調用 Get 會獲取鎖,然后去 map 中查詢,如果沒有找到,就直接分配并插入一個新的值,然后釋放鎖。之后其他 goroutine 來查詢的時候,會發現值存在,那么就直接獲取到 map 的值,然后釋放鎖。
map 里的值并不是 Get 返回的數據,而是數據是否準備好的通道,和存放數據的字段。此時數據可能還沒準備好,數據是否準備好,可以從 ready 通道進行判斷。對 ready 通道的讀取操作,會在數據沒有準備好的時候一直阻塞。一旦數據準備好了,就會關閉 ready 通道,所有從 ready 通道的讀取操作就會立刻返回。這是利用通道進行廣播的方式。所以查詢 map 后獲取值的步驟就是先讀取 ready 通道等待,一旦通道的讀取返回,就表示數據已經準備好了,此時就可以去讀取字段 res 里的內容并返回。
注意,entry 中的變量 e.res.value 和 e.res.err 被多個 goroutine 共享。創建 entry 的 goroutine 會對這兩個變量的值進行設置,其他 goroutine 在收到數據準備完畢的廣播后才會開始讀取這兩個變量。盡管被多個 goroutine 訪問,但是此處不需要加鎖。ready 通道的關閉先于其他 goroutine 收到廣播事件,所以第一個 goroutine 對變量的寫入也先于后續多個 goroutine 的讀取事件。這種情況下數據競態不存在。
到此,并發、重復抑制、非阻塞緩存就完成了。
上面的示例是使用一個互斥量來保護 map 變量的并發安全。下面是另一種設計,讓 map 變量限制在一個監控 goroutine 中。
首先是類型聲明,New 函數在創建實例并返回的同時,還會啟動一個 server 方法。該方法會集中處理所有的 Get 調用。我們在獲取實例后,依然是調用 Get 來獲取結果:
// memo包提供了一個對類型 Func 并發安全的函數記憶功能
// 并發、重復抑制、非阻塞的緩存
// 通過監控 goroutine 來實現并發安全
package memo
// Func 是用于記憶的函數類型
type Func func(key string) (interface{}, error)
type result struct {
value interface{}
err error
}
type entry struct {
res result
ready chan struct{} // res 準備好之后會被關閉
}
// Func、result、entry 的聲明和之前一致
// request 是一條請求消息
type request struct {
key string // 需要 Func 運行的參數
response chan<- result // 每個客戶端接收結果的通道
}
type Memo struct{ requests chan request }
func New(f Func) *Memo {
memo := &Memo{requests: make(chan request)} // 創建實例
go memo.server(f) // 啟動服務端 goroutine
return memo // 返回實例,供客戶端調用
}
可以先往后看客戶端和服務端的處理邏輯,在回過來看這里聲明的數據類型已經通道的作用。
客戶端
現在 Get 就需要要給監控 goroutine 的通道發送請求和一個接收返回結果的通道。服務端會在收到處理請求后進行處理,之后再通過客戶端發來的通道返回結果。而客戶端發送請求之后,只需要從自己創建的這個通道中接收,直到接收到數據后,再返回即可:
func (memo *Memo) Get(key string) (interface{}, error) {
response := make(chan result)
memo.requests <- request{key, response}
res := <- response
return res.value, res.err
}
func (memo *Memo) Close() { close(memo.requests) }
客戶端使用完之后,可以調用 Close 方法關閉發送請求的通道。
服務端
上面的 Get 相當于一個客戶端,還需要一個服務端來處理 Get 發來的請求:
func (memo *Memo) server(f Func) {
cache := make(map[string]*entry)
for req := range memo.requests { // 一次處理收到的請求
e := cache[req.key]
if e == nil {
// 對這個 key 的第一次請求
e = &entry{ready: make(chan struct{})}
cache[req.key] = e
go e.call(f, req.key) // 調用 f(key)
}
// 無論是否第一次請求,最后要回復結果,都有等待 ready 通道返回后,再去讀取結果
go e.deliver(req.response)
}
}
func (e *entry) call(f Func, key string) {
// 執行函數
e.res.value, e.res.err = f(key)
// 發送廣播通知,數據已經準備好了
close(e.ready)
}
func (e *entry) deliver(response chan<- result) {
// 等待數據準備完畢
<-e.ready
// 向客戶端發送結果
response <- e.res
}
變量 cache 被限制在監控 goroutine 中,就是上面的 server 方法。監控 goroutine 從 requests 的通道中讀取請求,直到這個通道被關閉。對于每個請求,先查詢緩存,如果沒有找到就插入一個新的 entry。
這里 call 和 deliver 方法需要在獨立的 goroutine 中運行,以確保監控 goroutine 內持續處理新請求。
完整示例代碼
下面貼上這個實現方式的完整代碼:
// memo包提供了一個對類型 Func 并發安全的函數記憶功能
// 并發、重復抑制、非阻塞的緩存
// 通過監控 goroutine 來實現并發安全
package memo
// Func 是用于記憶的函數類型
type Func func(key string) (interface{}, error)
type result struct {
value interface{}
err error
}
type entry struct {
res result
ready chan struct{} // res 準備好之后會被關閉
}
// Func、result、entry 的聲明和之前一致
// request 是一條請求消息
type request struct {
key string // 需要 Func 運行的參數
response chan<- result // 每個客戶端接收結果的通道
}
type Memo struct{ requests chan request }
func New(f Func) *Memo {
memo := &Memo{requests: make(chan request)} // 創建實例
go memo.server(f) // 啟動服務端 goroutine
return memo // 返回實例,供客戶端調用
}
func (memo *Memo) Close() { close(memo.requests) }
func (memo *Memo) Get(key string) (interface{}, error) {
response := make(chan result)
memo.requests <- request{key, response}
res := <-response
return res.value, res.err
}
func (memo *Memo) server(f Func) {
cache := make(map[string]*entry)
for req := range memo.requests { // 一次處理收到的請求
e := cache[req.key]
if e == nil {
// 對這個 key 的第一次請求
e = &entry{ready: make(chan struct{})}
cache[req.key] = e
go e.call(f, req.key) // 調用 f(key)
}
// 無論是否第一次請求,最后要回復結果,都有等待 ready 通道返回后,再去讀取結果
go e.deliver(req.response)
}
}
func (e *entry) call(f Func, key string) {
// 執行函數
e.res.value, e.res.err = f(key)
// 發送廣播通知,數據已經準備好了
close(e.ready)
}
func (e *entry) deliver(response chan<- result) {
// 等待數據準備完畢
<-e.ready
// 向客戶端發送結果
response <- e.res
}
針對上面的 memo 包的測試代碼:
package memo
import (
"io/ioutil"
"log"
"net/http"
"sync"
"testing"
"time"
)
func httpGetBody(url string) (interface{}, error) {
log.Printf("httpGetBody: %s", url)
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return ioutil.ReadAll(resp.Body)
}
var urls = []string{ // 換一批慢一些的網站,加載時間1s左右的國外資源
"https://github.com/adonovan/gopl.io/tree/master/ch9",
"https://www.djangoproject.com/",
"https://getbootstrap.com/",
"https://www.python.org/",
}
func TestSequential(t *testing.T) { // 串行
m := New(httpGetBody)
defer m.Close()
urls = append(urls, urls...) // 每個 URL 請求兩次
for _, url := range urls {
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
}
t.Logf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
}
}
func TestConcurrent(t *testing.T) { // 并行
m := New(httpGetBody)
defer m.Close()
var n sync.WaitGroup
urls = append(urls, urls...) // 每個 URL 請求兩次
n.Add(len(urls))
for _, url := range urls {
go func(url string) {
defer n.Done()
start := time.Now()
value, err := m.Get(url)
if err != nil {
log.Print(err)
}
t.Logf("%s, %s, %d bytes\n", url, time.Since(start), len(value.([]byte)))
}(url)
}
n.Wait()
}
這里的例子展示了可以使用兩種方案來構建并發結構:
第一種是大家普遍認知的,也是Java或者C++等語言中的多線程開發。
第二種是 Go 語言特有的,也是 Go 語言推薦的。下面是一句推薦的原話:
Do not communicate by sharing memory; instead, share memory by communicating.
Go 箴言:“不要通過共享內存來通信,而應該通過通信來共享內存”。
在給定的情況下也許很難判定哪種方案更好,不過了解他們還是有價值的。有時候從一種方案切換到另外一種方案能讓代碼更簡單。
CSP并發模型
CSP 是 Communicating Sequential Process 的簡稱,中文可以叫做通信順序進程,是一種并發編程模型。
CSP 模型由并發執行的實體(線程或者進程)所組成,實體之間通過發送消息進行通信,這里發送消息時使用的就是通道(channel)。CSP 模型的關鍵是關注 channel,而不關注發送消息的實體。Go 語言就是借用 CSP 模型的一些概念為之實現并發進行理論支持。Go 語言并沒有完全實現 CSP 模型的所有理論,僅僅是借用了 process 和 channel 這兩個概念。process 在 Go 語言上的表現就是 goroutine 是實際并發執行的實體,每個實體之間通過 channel 通訊來實現數據共享。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。