您好,登錄后才能下訂單哦!
Golang中怎么實現百萬級高并發,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
package main import ( "fmt" "runtime" "time" ) func main(){ //這里我們假設數據是int類型,緩存格式設為100 dataChan:=make(chan int,100) go func(){ for{ select{ case data:=<-dataChan: fmt.Println("data:",data) time.Sleep(1 * time.Second)//這里延遲是模擬處理數據的耗時 } } }() //填充數據 for i:=0;i<100;i++{ dataChan<-i } //這里循環打印查看協程個數 for { fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine()) time.Sleep(2 * time.Second) } }
這里打印出來的協程個數時2,為什么? 因為main方法獨占一個主協程,我們又起了一個協程,所以是兩個。
首先我們要抽象出幾個概念:
Job: type Job interface { Do() } // 一個數據接口,所有的數據都要實現該接口,才能被傳遞進來 //實現Job接口的一個數據實例,需要實現一個Do()方法,對數據的處理就在這個Do()方法中。 Job通道: 這里有兩個Job通道: 1、WorkerPool的Job channel,用于調用者把具體的數據寫入到這里,WorkerPool讀取。 2、Worker的Job channel,當WorkerPool讀取到Job,并拿到可用的Worker的時候,會將Job實例寫入該Worker的Job channel,用來直接執行Do()方法。 Worker: type Worker struct { JobQueue chan Job //Worker的Job通道 } //每一個被初始化的worker都會在后期單獨占用一個協程 //初始化的時候會先把自己的JobQueue傳遞到Worker通道中, //然后阻塞讀取自己的JobQueue,讀到一個Job就執行Job對象的Do()方法。 工作池(WorkerPool): type WorkerPool struct { workerlen int //WorkerPool中同時 存在Worker的個數 JobQueue chan Job // WorkerPool的Job通道 WorkerQueue chan chan Job } //初始化時會按照傳入的num,啟動num個后臺協程,然后循環讀取Job通道里面的數據, //讀到一個數據時,再獲取一個可用的Worker,并將Job對象傳遞到該Worker的chan通道
整個過程中 每個Worker都會被運行在一個協程中,在整個WorkerPool中就會有num可空閑的Worker,當來一條數據的時候,就會在工作池中去一個空閑的Worker去執行該Job,當工作池中沒有可用的worker時,就會阻塞等待一個空閑的worker。
這是一個粗糙最簡單的版本,只是為了演示效果,具體使用需要根據實際情況加一些特殊的處理。
當數據無限多的時候func (wp *WorkerPool) Run() 會無限創建協程,這里需要做一些處理,這里是為了讓所有的請求不等待,并且體現一下最大峰值時的協程數。具體因項目而異。
代碼地址:https://github.com/wangzhen0625/gonote/tree/master/7goroutune
main.go
package main import ( "fmt" "runtime" "time" ) type Score struct { Num int } func (s *Score) Do() { fmt.Println("num:", s.Num) time.Sleep(1 * 1 * time.Second) } func main() { num := 100 * 100 * 20 // debug.SetMaxThreads(num + 1000) //設置最大線程數 // 注冊工作池,傳入任務 // 參數1 worker并發個數 p := NewWorkerPool(num) p.Run() datanum := 100 * 100 * 100 * 100 go func() { for i := 1; i <= datanum; i++ { sc := &Score{Num: i} p.JobQueue <- sc } }() for { fmt.Println("runtime.NumGoroutine() :", runtime.NumGoroutine()) time.Sleep(2 * time.Second) } }
job.go
package main type Job interface { Do() }
worker.go
package main type Worker struct { JobQueue chan Job } func NewWorker() Worker { return Worker{JobQueue: make(chan Job)} } func (w Worker) Run(wq chan chan Job) { go func() { for { wq <- w.JobQueue select { case job := <-w.JobQueue: job.Do() } } }() }
workerpool.go
package main import "fmt" type WorkerPool struct { workerlen int JobQueue chan Job WorkerQueue chan chan Job } func NewWorkerPool(workerlen int) *WorkerPool { return &WorkerPool{ workerlen: workerlen, JobQueue: make(chan Job), WorkerQueue: make(chan chan Job, workerlen), } } func (wp *WorkerPool) Run() { fmt.Println("初始化worker") //初始化worker for i := 0; i < wp.workerlen; i++ { worker := NewWorker() worker.Run(wp.WorkerQueue) } // 循環獲取可用的worker,往worker中寫job go func() { for { select { case job := <-wp.JobQueue: worker := <-wp.WorkerQueue worker <- job } } }() }
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。