您好,登錄后才能下訂單哦!
相比于Go語言宣揚的“用通訊的方式共享數據”,通過共享數據的方式來傳遞信息和協調線程運行的做法其實更加主流。本篇就是討論一些與多線程、共享資源以及同步有關的知識。
sync包,就是一個與并發編程關系緊密的代碼包。這里“sync”的中文意思就是“同步”。
這里會講一些重要的并發編程概念:競態條件、臨界區、互斥量、死鎖。死鎖會在互斥鎖里引出。
一旦數據被多個線程共享,那么就很可能會產生爭用和沖突的情況。這種情況也被稱為競態條件(race condition),這往往會破幻共享數據的一致性。
概括來講,同步的用途有兩個:
由于這樣的數據塊和代碼塊的背后都隱含著一種或多種資源,可以把他們看作是共享資源。
同步就是在控制多個線程對共享資源的訪問。針對某個資源的訪問,同一時刻只能有一個線程訪問到該資源。那么可以說,多個并發進行的線程對這個共享資源的訪問是完全串行的。只要一個代碼片段需要實現對共享資源的串行化訪問,就可以被視為一個臨界區(critical section)。也就是說,要訪問到資源就必須進入到這個區域。如果針對一個共享資源,這樣的代碼片段有多個,那么它們就可以被稱為相關臨界區。
應對競態條件的問題,就需要施加一些保護的手段。方法之一就是使用實現了某種同步機制的工具,也稱為同步工具。在Go語言中,可供我們選擇的同步工具并不少。其中,最重要且最常用的同步工具當屬互斥量(mutual exclusion,簡稱 mutex)。sync包中的Mutex就是與其對應的類型,該類型的值可以被稱為互斥量或者互斥鎖。
雖然Go語言是以“用通訊的方式共享數據”為亮點,但是依然提供了一些易用的同步工具。而互斥鎖就是最常用到的一個。
一個互斥鎖可以被用來保護一個臨界區或者一組相關臨界區。保證同一時刻只有一個goroutine處于改臨界區之內。每當有goroutine想進入臨界區是,需要對它進行鎖定,并且在離開臨界區時進行解鎖。
使用互斥鎖時,鎖定操作可以通過調用互斥鎖的Lock方法實現,而解鎖是調用Unlock方法。示例如下:
package main
import (
"fmt"
"flag"
"os"
"sync"
"bytes"
"io"
)
var lock bool
func init() {
flag.BoolVar(&lock, "lock", false, "是否加鎖")
}
const (
max1 = 5 // 準備啟用多個goroutine
max2 = 10 // 每個goroutine里寫入這么多組數據
max3 = 10 // 每組數據就是重復寫入多個數字
)
func main () {
// 解析命令行參數
flag.Parse()
// bytes.Buffer是一個緩沖byte類型的緩沖器,存放的都是byte類型
var buffer bytes.Buffer
var mu sync.Mutex // 互斥鎖
done := make(chan struct{}) // 每當一個goroutine執行完畢了,就往這里發一個信號
for i := 0; i < max1; i++ {
go func(id int, writer io.Writer) {
defer func() {
done <- struct{}{}
}()
for j := 0; j < max2; j++ {
// 準備數據
header := fmt.Sprintf("\n[%d %d]", id, j)
data := fmt.Sprintf(" %d-%d", id, j)
// 加鎖
if lock {
mu.Lock()
}
// 寫入數據
_, err := writer.Write([]byte(header))
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR when write header in %d: %s\n", id, err)
}
for k := 0; k < max3; k++ {
_, err := writer.Write([]byte(data))
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR when write data in %d: %s\n", id, err)
}
}
// 解鎖
if lock {
mu.Unlock()
}
}
}(i, &buffer)
}
// 等待goroutine退出
for i := 0; i < max1; i++ {
<- done
}
// 打印結果
fmt.Println(buffer.String())
}
這個示例提供了一個命令行參數-lock,可以選擇加鎖或者不加鎖來運行這個程序。這樣可以方便的比較在代碼中加鎖的作用。
使用互斥鎖時的注意事項:
對一個已經被鎖定的互斥鎖進行鎖定,是會立即阻塞當前goroutine的。會一直等到該互斥鎖在別的goroutine里被解鎖,并且這里的鎖定操作完成為止。如果那邊解鎖后又被別的goroutine鎖定了,那就繼續等,一直到搶到鎖完成鎖定操作。
雖然沒有任何的強制規定,你是可以用同一個互斥鎖保護多個無關的臨界區的。但是這樣做,一定會使你的程序變的復雜,就是說不要這么做,需要的話,就多搞幾把鎖。如果真的把一個互斥鎖同時用在了多個地方,必然會有更多的goroutine征用這把鎖。這不但會使得程序變慢,還會打打增加死鎖(deadlock)的可能性。
死鎖
所謂死鎖,就是當前程序中的主goroutine,以及啟用的那個goroutine都已經被阻塞。這些goroutine可以被統稱為用戶級的goroutine。就是說整個程序都停滯不前了。
Go語言運行時,系統是不允許死鎖的情況出現的。只要發現所有的用戶級goroutine都處于等待狀態,就會自行拋出panic。隨便寫個函數,連續上2次鎖就死鎖了:
func main() {
var mu sync.Mutex
mu.Lock()
mu.Lock()
mu.Unlock()
mu.Unlock()
}
拋出的信息如下,主要就看第一行fatal error: all goroutines are asleep - deadlock!
:
PS H:\Go\src\Go36\article26\example02> go run main.go
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [semacquire]:
sync.runtime_SemacquireMutex(0xc042046004, 0x0)
D:/Go/src/runtime/sema.go:71 +0x44
sync.(*Mutex).Lock(0xc042046000)
D:/Go/src/sync/mutex.go:134 +0xf5
main.main()
H:/Go/src/Go36/article26/example02/main.go:8 +0x55
exit status 2
PS H:\Go\src\Go36\article26\example02>
這種在Go運行時系統自行拋出的panic都屬于致命錯誤,是無法被恢復的。調用recover函數也不起作用。就是說,一旦死鎖,程序必然崩潰。
要避免這種情況,最有效的做法就是,讓每一個互斥鎖只保護一個臨界區或一組相關的臨界區。
用defer語句解鎖
還要注意,對同一個goroutine而言,既不要重復鎖定一個互斥鎖,也不要忘記進行解鎖。這里不要忘記解鎖的一個很重要的原因就是為了避免重復鎖定。在很多時候,一個函數執行的流程并不是單一的,流程中間可能會有分叉、也可能會被中斷。最保險的做法就是使用defer語句來進行解鎖,并且這樣的defer語句應該緊跟在鎖定操作的后面。
上面的那個示例,沒有按這里說的來做,因為整個寫操作是在for循環里的。解鎖操作后還有其他語句要執行,這里是for循環里的其他迭代要處理。而defer語句是只有程序退出后才會執行的。不過這都不是借口,要按這里最保險的做法來做,只需要把for循環里的語句再寫一個函數或匿名函數就可以用defer了:
for i := 0; i < max1; i++ {
go func(id int, writer io.Writer) {
defer func() {
done <- struct{}{}
}()
for j := 0; j < max2; j++ {
// 準備數據
header := fmt.Sprintf("\n[%d %d]", id, j)
data := fmt.Sprintf(" %d-%d", id, j)
func () {
// 加鎖
if lock {
mu.Lock()
defer func() {
// 解鎖
mu.Unlock()
}()
}
// 寫入數據
_, err := writer.Write([]byte(header))
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR when write header in %d: %s\n", id, err)
}
for k := 0; k < max3; k++ {
_, err := writer.Write([]byte(data))
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR when write data in %d: %s\n", id, err)
}
}
}()
}
}(i, &buffer)
}
解鎖未鎖定的互斥鎖也會立即引發panic。并且與死鎖一樣,也是無法被恢復的。從這一定看,也是需要保證對于沒一個鎖定操作,都必須且只能由一個對應的解鎖操作。就是要讓他們成對出現,這也算是互斥鎖一個很重要的使用原則。而利用defer語句進行解鎖就可以很容易的做到這一點。
互斥鎖是結構體、值類型
Go語言中的互斥鎖時開箱即用的,就是一旦聲明了一個sync.Mutex類型的變量,就可以直接使用它。不過要注意,該類型是一個結構體,屬于值類型:
type Mutex struct {
state int32
sema uint32
}
對于值類型,把它傳遞給一個函數、將他從函數中返回、把它賦值給其他變量、讓它進入某個通道都會導致它的副本的產生。這里,原值和副本以及多個副本之間都是完全獨立的,是不同的互斥鎖。舉例說明,如果你把一個互斥鎖作為參數值傳給了一個函數,那么在這個函數中對傳入的鎖的所有操作,都不會對存在于該函數之外的那個原鎖產生任何影響。
這就是為什么“不要在多個函數之間直接傳遞互斥鎖”。避免歧義,即使你希望的是在這個函數中使用另外一個互斥鎖也不要這樣做。
學習了上面的注意事項和建議,就來看看如何更好的使用互斥鎖。下面是一個使用互斥鎖的示例:
package main
import (
"bytes"
"fmt"
"os"
"io"
"sync"
"time"
)
// 創建互斥鎖
var mu sync.Mutex
// singleHandler 代表單次處理函數的類型,讀和寫用的函數內容有些不同,但是簽名都是這樣的
type singleHandler func() (data string, n int, err error)
// 生成一個寫入當前時間的函數
func genWriter(writer io.Writer) singleHandler {
return func() (data string, n int, err error) {
// 準備數據
data = fmt.Sprintf("%s\t", time.Now().Format(time.StampNano))
// 寫入數據
mu.Lock()
defer mu.Unlock()
n, err = writer.Write([]byte(data))
return
}
}
// 生成一個讀取數據的函數
func genReader(reader io.Reader) singleHandler {
return func() (data string, n int, err error) {
// 類型斷言,把io.Reader接口轉成*bytes.Buffer類型
// 下面要調用*bytes.Buffer類型的ReadString方法
// 因為函數的參數要求是一個接口類型,但是后面的讀操作用的是*bytes.Buffer的ReadString方法
// 所以在調用方法前,必須要檢查接口的實際類型(動態類型)
// 實際在主函數里調用genReader函數是,傳入的就是*bytes.Buffer
// 類型斷言x.(T),這里x必須為一個接口類型,但并非必須是空接口
// 這里reader是個io.Reader接口。如果要對非接口類型的變量做類型斷言,就要先轉成空接口
buffer, ok := reader.(*bytes.Buffer)
if !ok {
err = fmt.Errorf("unsupported reader")
return
}
// 讀取數據
mu.Lock()
defer mu.Unlock()
data, err = buffer.ReadString('\t')
n = len(data)
return
}
}
// 處理流程配置的類型,這里把處理流程相關的信息全部寫在下面的結構體類型里
type handlerConfig struct {
handler singleHandler // 處理函數
goNum int // 要啟用的goroutine的數量
number int // 單個goroutine中處理的次數
interval time.Duration // 單個goroutine中,多次處理中間間隔的時間
counter int // 數據量計數器,字節數
counterMu sync.Mutex // 上面的數據量計數器專用的互斥鎖
}
// 增加數據量計數器的方法
func (hc *handlerConfig) count(skip int) int {
hc.counterMu.Lock()
defer hc.counterMu.Unlock()
hc.counter += skip
return hc.counter
}
func main() {
// 創建緩沖區由于下面的讀和寫
var buffer bytes.Buffer
// 寫入數據的配置,分6個goroutine分別寫入4次,一個24次
writingConfig := handlerConfig{
handler: genWriter(&buffer),
goNum: 6,
number: 4,
interval: time.Millisecond * 100,
}
// 讀取數據的配置,分8個goroutine分別讀取3次,一個也是24次
readingConfig := handlerConfig{
handler: genReader(&buffer),
goNum: 8,
number: 3,
interval: time.Millisecond * 100,
}
done := make(chan struct{})
// 啟用多個goroutine對緩沖區進行多次寫入
for i := 0; i < writingConfig.goNum; i++ {
go func(i int) {
defer func() {
done <- struct{}{}
}()
for j :=0; j < writingConfig.number; j++ {
// 進入迭代前等待,邏輯稍微簡單一點
// 如果寫在最后,那么因為err而退出時這次迭代就不會等待了
time.Sleep(writingConfig.interval)
data, n, err := writingConfig.handler()
if err != nil {
fmt.Fprintf(os.Stderr, "writer [%d-%d] ERROR: %s\n", i, j, err)
continue
}
total := writingConfig.count(n)
fmt.Printf("writer [%d-%d] Report: %s (total %d)\n", i, j, data, total)
}
}(i)
}
// 啟用多個goroutine對緩沖區進行多次讀取
for i := 0; i < readingConfig.goNum; i++ {
go func (i int) {
defer func() {
done <- struct{}{}
}()
for j := 0; j < readingConfig.number; j++ {
var (
data string
n int
err error
)
// 下面的無限for循環是一個代碼塊,里面的data、n、err如果用短變量賦值就是局部變量
// 所以上面在代碼塊外面,聲明了i的for循環內部的變量
for {
// 如果讀比寫快,被讀的是空的,一讀就到末尾了,就會返回EOF錯誤,
time.Sleep(readingConfig.interval)
data, n, err = readingConfig.handler()
// 這個判斷邏輯是讀取的EOF錯誤,就無限循環等待。讀到內容或其他錯誤就跳出循環
if err == nil || err != io.EOF {
break
}
}
if err != nil {
fmt.Fprintf(os.Stderr, "reader [%d-%d] ERROR: %s\n", i, j, err)
}
total := readingConfig.count(n)
fmt.Printf("reader [%d-%d] Report: %s (total %d)\n", i, j, data, total)
}
}(i)
}
// 等待所有goroutine結束
doneNum := writingConfig.goNum + readingConfig.goNum
for i := 0; i < doneNum; i++ {
<- done
}
}
這個示例中,分別有讀和寫的兩個處理函數。而處理函數里做的事情就是:加鎖、defer解鎖,完成讀或寫操作然后返回。這里就做到了加鎖和解鎖操作成對出現,并且把鎖和要保護的共享資源放在一起了。
示例中還有一個互斥鎖在handlerConfig結構體中,要保護的共享資源也是handlerConfig結構體中的counter字段。并且寫了一個方法count實現對counter字段的鎖定和修改。
讀寫鎖是讀/寫互斥鎖的簡稱。在Go語言中,讀寫鎖有sync.RWMutex類型的值代表。與sync.Mutex一樣,這個類型也是開箱即用的。開箱即用,應該就是指不用賦值,定義了之后直接就能用了。就是讓它的零值也具有意義。
讀寫鎖就是把共享資源的“讀操作”和“寫操作”區別對待了。為兩種操作施加了不同程度的保護。相比于互斥鎖,讀寫鎖可以實現更加細膩的訪問控制。
一個讀寫鎖中實際包含了兩個鎖,讀鎖和寫鎖:
對于同一個讀寫鎖,有如下的規則:
總結一下,就是可以有多個讀操作,讀鎖鎖定的情況下,別的goroutine也可以讀。其他的情況下要操作,只能等之前鎖定的操作完成釋放鎖,并且搶到鎖了。再換個角度說,就是多個讀操作可以同時進行,多個寫操作不能同時進行,讀和寫操作也不能同時進行。
讀寫鎖對寫操作之間的互斥,其實是通過它內含的一個互斥鎖實現的。因此,讀寫鎖是互斥鎖的一種擴展。所以無論是互斥鎖還是讀寫鎖,都不要試圖去解鎖未鎖定的鎖,因為這樣會引發不可恢復的panic。
之前互斥鎖的示例中,使用互斥鎖保護了對緩沖區的讀寫操作,而這里又講了讀寫鎖,不要被這里讀和寫的說法鎖迷惑。對緩沖區的讀操作是會把讀到的內容從緩沖區里去除的,所以是有類似寫的操作在里面的,使用互斥鎖時正確的做法,并且不能使用這里的讀寫鎖。
而這個示例中的讀操作,就僅僅只是去獲取到值而已了,在讀操作的時候加個讀鎖正合適:
package main
import(
"fmt"
"sync"
"time"
)
// 計數器
type counter struct {
num uint // 計數
mu sync.RWMutex //讀寫鎖
}
// 獲取num值的操作,加讀鎖
func (c *counter) number() uint {
c.mu.RLock()
defer c.mu.RUnlock()
return c.num
}
// 修改num值的操作,加寫鎖
func (c *counter) add (increment uint) uint {
c.mu.Lock()
defer c.mu.Unlock()
c.num += increment
return c.num
}
// 跑一下上面的兩個方法看看效果
func main() {
c := counter{}
done := make(chan struct{})
// 增加計數器
go func() {
defer func() {
done <- struct{}{}
}()
for i := 0; i < 10; i++ {
time.Sleep(time.Millisecond * 500)
c.add(1)
}
}()
go func() {
defer func() {
done <- struct{}{}
}()
for j := 0; j < 20; j++ {
time.Sleep(time.Millisecond * 200)
fmt.Printf("[%d-%02d] 讀數: %d\n", 1, j, c.number())
}
}()
go func() {
defer func() {
done <- struct{}{}
}()
for k := 0; k < 20; k++ {
time.Sleep(time.Millisecond * 300)
fmt.Printf("[%d-%02d] 讀數: %d\n", 2, k, c.number())
}
}()
<- done
<- done
<- done
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。