您好,登錄后才能下訂單哦!
今天小編給大家分享一下go語言同步機制是什么及怎么實現的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
go同步機制有:1、channel,著重并發問題中的數據流動,把流動的數據放到channel中,就能使用channel解決這個并發;2、Sync.Mutex,擁有Lock、Unlock兩個方法,主要實現思想體現在Lock函數中;3、Sync.waitGroup;4、Sync.Once;5、Sync.context;6、Sync.pool;7、atomic包,針對變量進行操作。
Golang的提供的同步機制有sync模塊下的Mutex、WaitGroup以及語言自身提供的chan等。
概述
Golang以如此明顯的方式告訴我們:。
優點:channel的核心是數據流動,關注到并發問題中的數據流動,把流動的數據放到channel中,就能使用channel解決這個并發
問題,而且使用channel是線程安全的并且不會有數據沖突,比鎖好用多了
缺點:不太適應同步太復雜的場景,比如多協程的同步等待問題,而且存在死鎖問題
分類
channel類型:無緩沖和緩沖類型
channel有兩種形式的,一種是無緩沖的,一個線程向這個channel發送了消息后,會阻塞當前的這個線程,知道其他線程去接收這個channel的消息。無緩沖的形式如下:
intChan := make(chan int)
帶緩沖的channel,是可以指定緩沖的消息數量,當消息數量小于指定值時,不會出現阻塞,超過之后才會阻塞,需要等待其他線程去接收channel處理,帶緩沖的形式如下:
//3為緩沖數量
intChan := make(chan int, 3)
舉例
type Person struct {
Name string
Age uint8
Address Addr
}
type Addr struct {
city string
district string
}
/*
測試channel傳輸復雜的Struct數據
*/
func testTranslateStruct() {
personChan := make(chan Person, 1)
person := Person{"xiaoming", 10, Addr{"shenzhen", "longgang"}}
personChan <- person
person.Address = Addr{"guangzhou", "huadu"}
fmt.Printf("src person : %+v \n", person)
newPerson := <-personChan
fmt.Printf("new person : %+v \n", newPerson)
}
在實際應用過程中,等待channel 結束信號的過程可能不是無期限的,一般會伴隨一個timer,超時時間如下面所示:
/*
檢查channel讀寫超時,并做超時的處理
*/
func testTimeout() {
g := make(chan int)
quit := make(chan bool)
go func() {
for {
select {
case v := <-g:
fmt.Println(v)
case <-time.After(time.Second * time.Duration(3)):
quit <- true
fmt.Println("超時,通知主線程退出")
return
}
}
}()
for i := 0; i < 3; i++ {
g <- i
}
<-quit
fmt.Println("收到退出通知,主線程退出")
}
Mutex擁有Lock、Unlock兩個方法,主要的實現思想都體現在Lock函數中。
Lock執行時,分三種情況:
無沖突 通過CAS操作把當前狀態設置為加鎖狀態;
有沖突 開始自旋,并等待鎖釋放,如果其他Goroutine在這段時間內釋放了該鎖, 直接獲得該鎖;如果沒有釋放,進入3;
有沖突,且已經過了自旋階段 通過調用semacquire函數來讓當前Goroutine進入等待狀態。
無沖突時是最簡單的情況;有沖突時,首先進行自旋,是從效率方面考慮的, 因為大多數的Mutex保護的代碼段都很短,經過短暫的自旋就可以獲得;如果自旋等待無果,就只好通過信號量來讓當前 Goroutine進入等待了。
Channel在某些同步場景下,使用略顯復雜,不管是使用多個channel還是使用channel數組,如下:
func coordinateWithChan() {
sign := make(chan struct{}, 2)
num := int32(0)
fmt.Printf("The number: %d [with chan struct{}]\n", num)
max := int32(10)
go addNum(&num, 1, max, func() {
sign <- struct{}{}
})
go addNum(&num, 2, max, func() {
sign <- struct{}{}
})
<-sign
<-sign
}
所以Sync.waitGroup 就顯得更為優雅,Sync.waitGroup 用來等待一組goroutines的結束,在主Goroutine里聲明,并且設置要等待的goroutine的個數,每個goroutine執行完成之后調用 Done,最后在主Goroutines 里Wait即可。類似于JAVA中的CountDownLatch或者循環屏障,并且Sync.waitGroup可以被重復使用,提供了如下API:
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
但是Sync.waitGroup的使用需要遵循一些規則,避免拋出Panic:
a. 錯誤調用Done方法, 導致waitGroup內部計數值出現負數的情況
b. 錯誤的調用Add方法,在waitGroup內部計數值到達0的時候,Add方法被調用,導致應該被喚起的goroutine沒有被喚起,就開始了新的一輪計數周期
所以在調用的時候,就要遵循一下原則:
先統一Add,再并發Done,最后Wait
Sync.once實現方式是內部包含一個int32位的標志,用來判斷方式是否被執行過,標志值更改的時機為方法執行完之后,當有多個goroutine進行調用的時候,使用double-check方式進行驗證,首先在在沒有同步方式的情況下,進行標志值的判定,為0則競爭獲取mutex鎖,進入臨界區內,此時會在此進行標志值的判斷,確保方法真的被執行一次。double-check第一次是為了更快的進行判斷,但是存在錯誤的情況,第二次check是為了正確的確定標志值此時的狀態。
使用:
func main() {
var once sync.Once
onceBody := func() {
time.Sleep(3e9)
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
j := i
go func(int) {
once.Do(onceBody)
fmt.Println(j)
done <- true
}(j)
}
//給一部分時間保證能夠輸出完整【方法一】
//for i := 0; i < 10; i++ {
// <-done
//}
//給一部分時間保證能夠輸出完整【方法二】
<-done
time.Sleep(3e9)
}
場景
當需要進行多批次的計算任務同步,或者需要一對多的協作流程的時候
使用舉例
func coordinateWithContext() {
total := 12
var num int32
fmt.Printf("The number: %d [with context.Context]\n", num)
cxt, cancelFunc := context.WithCancel(context.Background())
for i := 1; i <= total; i++ {
go addNum(&num, i, func() {
if atomic.LoadInt32(&num) == int32(total) {
cancelFunc()
}
})
}
<-cxt.Done()
fmt.Println("End.")
}
注意事項
a.如何生成自己的context
通過WithCancel、WithDeadline、WithTimeout和WithValue四個方法從context.Background中派生出自己的子context
注意context.background這個上下文根節點僅僅是一個最基本的支點,它不提供任何額外的功能,也就是說,它既不可以被撤銷(cancel),也不能攜帶任何數據,在使用是必須通過以上4種方法派生出自己的context
b.子context是會繼承父context的值
c.撤銷消息的傳播
撤銷消息會按照深度遍歷的方式傳播給子context(注意因為多routine調用的原因,最終的撤銷順序可能不會是深度遍歷的順序)
,在遍歷的過程中,通過WithCancel、WithDeadline、WithTimeout派生的context會被撤銷,但是通過WithValue方法派生的context不會被撤銷
我們調用sync/atomic中的幾個函數可以對幾種簡單的類型進行原子操作。這些類型包括int32,int64,uint32,uint64,uintptr,unsafe.Pointer,共6個。這些函數的原子操作共有5種:增或減,比較并交換、載入、存儲和交換它們提供了不同的功能,切使用的場景也有區別。
增或減
顧名思義,原子增或減即可實現對被操作值的增大或減少。因此該操作只能操作數值類型。
被用于進行增或減的原子操作都是以“Add”為前綴,并后面跟針對具體類型的名稱。
//方法源碼
func AddUint32(addr *uint32, delta uint32) (new uint32)
增
栗子:(在原來的基礎上加n)
atomic.AddUint32(&addr,n)
減
栗子:(在原來的基礎上加n(n為負數))
atomic.AddUint32(*addr,uint32(int32(n)))
//或
atomic.AddUint32(&addr,^uint32(-n-1))
比較并交換
比較并交換----Compare And Swap 簡稱CAS
他是假設被操作的值未曾被改變(即與舊值相等),并一旦確定這個假設的真實性就立即進行值替換
如果想安全的并發一些類型的值,我們總是應該優先使用CAS
//方法源碼
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
栗子:(如果addr和old相同,就用new代替addr)
ok:=atomic.CompareAndSwapInt32(&addr,old,new)
載入
如果一個寫操作未完成,有一個讀操作就已經發生了,這樣讀操作使很糟糕的。
為了原子的讀取某個值sync/atomic代碼包同樣為我們提供了一系列的函數。這些函數都以"Load"為前綴,意為載入。
//方法源碼
func LoadInt32(addr *int32) (val int32)
栗子
fun addValue(delta int32){
for{
v:=atomic.LoadInt32(&addr)
if atomic.CompareAndSwapInt32(&v,addr,(delta+v)){
break;
}
}
}
存儲
與讀操作對應的是寫入操作,sync/atomic也提供了與原子的值載入函數相對應的原子的值存儲函數。這些函數的名稱均以“Store”為前綴
在原子的存儲某個值的過程中,任何cpu都不會進行針對進行同一個值的讀或寫操作。如果我們把所有針對此值的寫操作都改為原子操作,那么就不會出現針對此值的讀操作讀操作因被并發的進行而讀到修改了一半的情況。
原子操作總會成功,因為他不必關心被操作值的舊值是什么。
//方法源碼
func StoreInt32(addr *int32, val int32)
栗子
atomic.StoreInt32(被操作值的指針,新值)
atomic.StoreInt32(&value,newaddr)
交換
原子交換操作,這類函數的名稱都以“Swap”為前綴。
與CAS不同,交換操作直接賦予新值,不管舊值。
會返回舊值
//方法源碼
func SwapInt32(addr *int32, new int32) (old int32)
栗子
atomic.SwapInt32(被操作值的指針,新值)(返回舊值)
oldval:=atomic.StoreInt32(&value,newaddr)
1. 什么是Sync包?
Package sync provides basic synchronization primitives such as mutual exclusion locks. Other than the Once and WaitGroup types, most are intended for use by low-level library routines. Higher-level synchronization is better done via channels and communication.
Values containing the types defined in this package should not be copied.
這句話大意是說:
Sync包同步提供基本的同步原語,如互斥鎖。 除了Once和WaitGroup類型之外,大多數類型都是供低級庫例程使用的。 通過Channel和溝通可以更好地完成更高級別的同步。并且此包中的值在使用過后不要拷貝。
從描述中可以看到的是,golang 并不推薦這個包中的大多數并發控制方法,但還是提供了相關方法,主要原因是golang中提倡以共享內存的方式來通信:
不要以共享內存的方式來通信,作為替代,我們應該以通信的手段來共享內存
共享內存的方式使得多線程中的通信變得簡單,但是在并發的安全性控制上將變得異常繁瑣。
正確性不是我們唯一想要的,我們想要的還有系統的可伸縮性,以及可理解性,我覺得這點非常重要,比如現在廣泛使用的Raft算法。
2. 包中的Type
包中主要有: Locker, Cond, Map, Mutex, Once, Pool,
RWMutex, WaitGroup
type Locker interface {
Lock()
Unlock()
}
type Cond struct {
// L is held while observing or changing the condition
L Locker
}
3. 什么是鎖,為什么需要鎖?
鎖是sync包中的核心,他主要有兩個方法,加鎖和解鎖。
在單線程運行的時候程序是順序執行的,程序對數據的訪問也是:
讀取 => 一頓操作(加減乘除之類的) => 寫回原地址
但是一旦程序中進行了并發編程,也就是說,某一個函數可能同時被不同的線程執行的時候,以時間為維度會發生以下情況:
可以看到的是,A地址的數字被執行了兩次自增,若A=5,我們在執行完成后預期的A值是7,但是在這種情況下我們得到的A卻是6,bug了~
還有很多類似的并發錯誤,所以才有鎖的引入。若是我們在線程2讀取A的值的時候對A進行加鎖,讓線程2等待,線程1執行完成之后在執行線程2,這樣就能夠保證數據的正確性。但是正確性不是我們唯一想要的。
4 寫更優雅的代碼
在很多語言中我們經常為了保證數據安全正確,會在并發的時候對數據加鎖
Lock()
doSomething()
Unlock()
Golang在此包中也提供了相關的鎖,但是標明了"most are intended for use by low-level library routines" 所以我這里只對 Once and WaitGroup types做簡述。
5.Once 對象
Once 是一個可以被多次調用但是只執行一次,若每次調用Do時傳入參數f不同,但是只有第一個才會被執行。
func (o *Once) Do(f func())
var once sync.Once
onceBody := func() {
fmt.Println("Only once")
}
done := make(chan bool)
for i := 0; i < 10; i++ {
go func() {
once.Do(onceBody)
done <- true
}()
}
for i := 0; i < 10; i++ {
<-done
}
如果你執行這段代碼會發現,雖然調用了10次,但是只執行了1次。BTW:這個東西可以用來寫單例。
6. WaitGroup
下面是個官方的例子:
var wg sync.WaitGroup
var urls = []string{
"http://www.golang.org/",
"http://www.google.com/",
"http://www.somestupidname.com/",
}
for _, url := range urls {
// Increment the WaitGroup counter.
wg.Add(1)
// Launch a goroutine to fetch the URL.
go func(url string) {
// Decrement the counter when the goroutine completes.
defer wg.Done()
// Fetch the URL.
http.Get(url)
}(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()
7. 簡述
Golang中高級的并發可以通過channel來實現,這是golang所倡導的,但是go也提供了鎖等先關操作。
以上就是“go語言同步機制是什么及怎么實現”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。