您好,登錄后才能下訂單哦!
今天小編給大家分享一下Golang工作池如何使用的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
我們可以將工作池理解為線程池。線程池的創建和銷毀非常消耗資源,所以專門寫一個pool,每次用過的線程池再放回pool中而不是銷毀。不過在Go語言中不會使用系統的線程,而是使用goroutine。gorotine的創建和銷毀比系統線程的消耗要小的多,而且goroutine沒有標號。所以goroutine的pool就不再時線程池,而是work pool(工作池)。
雖然goroutine的系統消耗較小,但也不能隨意在編碼時使用go func(),如果程序頻繁啟動goroutine,會造成極其不可控性能問題。對于可以提前預知的大量異步處理的任務就要考慮使用工作池。
工作池的作用控制goroutine的規模,或者說是goroutine的數量。在Go語言中,控制goroutine的數量最好方式就是使用緩存通道。
下面是Go語言解決工作池的經典用法。
func worker(id int, jobs <-chan int, results chan<- int) { for job := range jobs { fmt.Printf("worker(%d) start to do job(%d)\n", id, job) time.Sleep(time.Second) fmt.Printf("worker(%d) finished job(%d)\n", id, job) results <- job } } func main() { // 為了使用我們的工作池,我們需要發送工作和接受工作的結果, // 這里我們定義兩個通道,一個jobs,一個results jobs := make(chan int, 100) results := make(chan int, 100) // 開啟3個goroutine for id := 1; id <= 3; id++ { go worker(id, jobs, results) } // 創建5個任務 for job := 1; job <= 5; job++ { jobs <- job } close(jobs) // 輸出結果 for i := 1; i <= 5; i++ { <-results } }
上述代碼工作池思想主要體現在jobs的通道上,因為定義了一個緩存長度為100的通道,所以在通道到100以后,新任務就會阻塞,只有等worker從通道取走一個工作以后才能繼續分配新工作。
本案例較為簡單,如果worker的數量較大,業務執行時間較長的話,我們需要在程序設計上將jobs和worker的模式進行優化,每個worker處理一項工作,工作池可以自定義最大數量的worker;這樣可以保證goroutine的最大數量,可程序更加可控,避免代碼消耗壓垮系統。
下面時改良之后代碼
1package main import ( "fmt" "reflect" "time" ) // Job 任務內容 type Job struct { ID int Name string } // Worker 工作 type Worker struct { id int // id WorkerPool chan chan Job // 工作者池(通道的通道),每個元素都是一個job通道, 公共的job JobChannel chan Job // 工作通道,每個元素是一個job,worker私有的job exit chan bool // 結束信號 } var ( MaxWorker = 5 // 最大worker數量 JobQueue = make(chan Job, 5) // 工作通道,模擬需處理的工作 ) // Scheduler 排程中心 type Scheduler struct { WorkerPool chan chan Job // 工作池 WorkerMaxNum int // 最大工作者數 Workers []*Worker // worker隊列 } // NewScheduler 創建排程中心 func NewScheduler(workerMaxNum int) *Scheduler { workerPool := make(chan chan Job, workerMaxNum) // 工作池 return &Scheduler{WorkerPool: workerPool, WorkerMaxNum: workerMaxNum} } // Start 工作池開始 func (s *Scheduler) Start() { Workers := make([]*Worker, s.WorkerMaxNum) for i := 0; i < s.WorkerMaxNum; i++ { worker := NewWorker(s.WorkerPool, i) worker.Start() Workers[i] = &worker } s.Workers = Workers go s.schedule() } // Stop 工作池的關閉 func (s *Scheduler) Stop() { Workers := s.Workers for _, w := range Workers { w.Stop() } time.Sleep(time.Second) close(s.WorkerPool) } func NewWorker(WorkerPool chan chan Job, id int) Worker { fmt.Printf("new a worker(%d)\n", id) return Worker{ id: id, WorkerPool: WorkerPool, JobChannel: make(chan Job), exit: make(chan bool), } } // Start 監聽任務和結束信號 func (w Worker) Start() { go func() { for { select { case job := <-w.JobChannel: // 收到任務 fmt.Println("get a job from private w.JobChannel") fmt.Println(job) case <-w.exit: // 收到結束信號 fmt.Println("worker exit", w) return } } }() } func (w Worker) Stop() { go func() { w.exit <- true }() } // 排程 func (s *Scheduler) schedule() { for { select { case job := <-JobQueue: fmt.Println("get a job from JobQueue") go func(job Job) { //從WorkerPool獲取jobChannel,忙時阻塞 jobChannel := <-s.WorkerPool fmt.Println("get a private jobChannel from public s.WorkerPool", reflect.TypeOf(jobChannel)) jobChannel <- job fmt.Println("worker's private jobChannel add one job") }(job) } } } func main() { scheduler := NewScheduler(MaxWorker) scheduler.Start() jobQueue() scheduler.Stop() } // 模擬Job任務 func jobQueue() { for i := 1; i <= 30; i++ { JobQueue <- Job{ID: i, Name: fmt.Sprintf("Job【%d】", i)} fmt.Printf("jobQueue add %d job\n", i) } }
定義了兩個結構體:Task任務和Job工作,Task并沒有實質性的內容,這里僅僅定義了一個整型變量;
定義兩個全局變量:MaxWorker是最大的worker數量;JobQueue是Job的通道。這兩個變量都用于后面的模擬,在真實場景中可以不設置這兩個變量。
定義了一個Worker結構體,與上一個簡單工作池的示例不同,本例的Worker不再是簡單的一個goroutine,而是一個結構體。結構體內定義了如下四個變量。?id:worker編號。?exit:這是一個bool類型的通道,當有數據寫入時worker結束運行。?JobChannel:Job類型的通道,該通道是專屬于當前worker的私有工作隊列。?WorkerPool:注意看,定義的時候使用了兩個Channel,每一個元素是一個Job通道,其實每一個元素是一個JobChannel。
NewWorker方法用于創建一個新的worker,要注意該方法的參數workerPool用于創建worker時傳入,這就說明每個worker與其他worker的WorkerPool是共享的,或者說多個worker使用一個WorkerPool。這一點很重要,這是本示例代碼在上一個簡單示例代碼基礎上的優化。而JobChannel和exit變量則是隨著Worker的新建而新建的。
Worker的Start方法,該方法用于監聽任務或者結束信號。Start方法一開始就用goroutine運行一個匿名函數,而函數內部是一個無限循環。在循環內部,首先是把當前的JobChannel注冊到WorkerPool里,一旦注冊進去也就說明該worker可以接收任務了。然后通過select判斷JobChannel是否可以讀取,也就是其中是否有Job,或者exit通道是否可以讀取。如果JobChannel可讀取,證明有Job,后續開始處理Job;而如果exit可讀,則結束當前的無限循環。所以,后面的代碼中要特別注意對WorkerPool的操作,Worker是從WorkerPool領取工作的。Worker的Stop方法,用于為exit通道寫入數據,在Start方法內Worker會讀取到寫入的數據,進而結束無限循環。
NewScheduler函數用于創建一個Scheduler,可以看到函數內部的WorkerPool是通過make函數新建的,NewWorker函數一樣靠參數傳入。注意WorkerPool是有緩存通道的,緩存長度是MaxWorkers。
Scheduler的Create方法,該方法根據MaxWorkers最大數創建Worker,并且把引用存入Workers切片。創建好Worker后,馬上調用Worker的Start方法,最后通過goroutine運行Schedule方法。Scheduler的Shutdown方法,用于關閉工作池,調用所有worker的Stop方法并且關閉WorkerPool工作池。
Scheduler的Schedule方法,該方法內也是一個無限循環,循環內部就是不停地讀取JobQueue,然后運行一個goroutine。在新運行的goroutine內從s.WorkerPool讀取一個JobChannel,注意,Worker注冊到WorkerPool以后此處才可以讀取到,如果WorkerPool的緩存通道內沒有JobChannel,則會阻塞,直到讀取到JobChannel,才把Job寫入。
以上就是“Golang工作池如何使用”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。