在Go語言中,可以使用WaitGroup來實現高并發的數據處理流水線。
WaitGroup是一個計數信號量,用于等待一組并發操作完成。通過Add方法可以增加計數器的值,Done方法可以減少計數器的值,Wait方法可以阻塞直到計數器變為0。
下面是一個使用WaitGroup的高并發數據處理流水線的示例:
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
numWorkers := 10
dataChan := make(chan int, 100)
// 第一階段,生成數據
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
dataChan <- i
}
close(dataChan)
}()
// 第二階段,處理數據
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for data := range dataChan {
result := processData(data)
fmt.Println(result)
}
}()
}
wg.Wait()
}
func processData(data int) int {
// 此處模擬數據處理過程
return data * 2
}
在這個示例中,首先創建了一個WaitGroup對象wg和一個緩沖通道dataChan,用于數據在各個階段之間的傳遞。
然后,在第一階段中,啟動一個goroutine來生成數據,并向dataChan通道中發送數據。發送完數據后,通過調用close(dataChan)來關閉通道。
在第二階段中,通過循環啟動多個goroutine來處理數據。每個goroutine從dataChan中接收數據,然后調用processData函數來處理數據,并打印處理結果。
最后,通過調用wg.Wait()來等待所有goroutine完成。
這樣,就可以實現一個高并發的數據處理流水線。在數據生成階段和數據處理階段之間使用通道進行數據傳遞,通過WaitGroup來等待所有goroutine完成。