Golang中的協程池可以通過使用goroutine
和channel
實現。下面是一個簡單的示例,展示了如何使用協程池來處理任務:
package main
import (
"fmt"
"sync"
)
type Worker struct {
ID int
Task chan int
Done chan bool
WorkerPool chan chan int
}
func NewWorker(id int, workerPool chan chan int) *Worker {
return &Worker{
ID: id,
Task: make(chan int),
Done: make(chan bool),
WorkerPool: workerPool,
}
}
func (w *Worker) Start() {
go func() {
for {
// 把自己的任務通道注冊到工作池
w.WorkerPool <- w.Task
select {
case task := <-w.Task:
// 處理任務
fmt.Printf("Worker %d processing task %d\n", w.ID, task)
case <-w.Done:
// 任務完成
fmt.Printf("Worker %d stopping\n", w.ID)
return
}
}
}()
}
func (w *Worker) Stop() {
go func() {
w.Done <- true
}()
}
type Pool struct {
WorkerPool chan chan int
Tasks chan int
MaxWorkers int
WaitGroup sync.WaitGroup
}
func NewPool(maxWorkers, maxTasks int) *Pool {
return &Pool{
WorkerPool: make(chan chan int, maxWorkers),
Tasks: make(chan int, maxTasks),
MaxWorkers: maxWorkers,
}
}
func (p *Pool) Start() {
// 啟動協程池中的工作協程
for i := 0; i < p.MaxWorkers; i++ {
worker := NewWorker(i, p.WorkerPool)
worker.Start()
}
go p.dispatch()
}
func (p *Pool) dispatch() {
for {
select {
case task := <-p.Tasks:
workerTask := <-p.WorkerPool
// 分發任務給空閑的工作協程
workerTask <- task
}
}
}
func main() {
pool := NewPool(3, 10)
pool.Start()
// 添加任務到任務隊列
for i := 0; i < 10; i++ {
pool.Tasks <- i
}
pool.WaitGroup.Wait()
}
在上面的示例中,我們定義了一個Worker
結構體,其中包含了一個任務通道Task
和一個完成通道Done
。當Worker啟動時,它會把自己的任務通道注冊到工作池中,并等待任務的到來。在任務到來時,它會從任務通道中接收任務并處理。當任務完成時,它會通過完成通道通知主線程。
Pool
結構體包含了一個工作協程池和一個任務通道。在Start
方法中,我們創建了maxWorkers
個工作協程,并啟動它們。同時,我們還啟動了一個dispatch
協程,該協程用于從任務通道中接收任務,并將其分發給空閑的工作協程。
在main
函數中,我們創建了一個協程池,并向任務通道中添加了10個任務。最后,我們使用WaitGroup
等待所有任務完成。
這就是一個簡單的Golang協程池的實現和應用。你可以根據自己的需求進行擴展和修改。