您好,登錄后才能下訂單哦!
這篇文章給大家介紹Golang中怎么應付百萬級請求,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
幼稚地使用Go runtines
最開始的時候我們非常天真地實現一個POST的鉤子方法如下,只是簡單地將每個請求體的上傳動作放到Go rutinues中讓他們并行執行:
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS } w.WriteHeader(http.StatusOK) }
在中等規模的負載情況下,這種方法對大部分人都是沒有問題的,但在應對更大規模的請求量時候,我們很快就招架不住了。當我們把這個版本的代碼部署到生產環境以后,我們期待能有大量的請求進來但實際還不能達到百萬級別的數量級。我們完全低估了這個系統要處理的流量數。
但不管怎么說上面的方法都是欠妥的。因為它沒有任何方法讓我們去控制Go runtinues啟動的數量。所以當我們的系統在面對每分鐘百萬級POST請求的時候很快就垮掉了。
再戰
我們需要找到另外的方法。在一開始我們就在討論如何讓我們的請求處理程序的生命周期盡可能地縮短以及上傳到S3的操作能在后臺或者異步運行。當然,在Ruby on Rails里面你必須這么做,否則你將會阻塞到所有其他的網絡請求處理程序。無論您使用的是美洲獅,獨角獸還是過路人(請不要參與JRuby討論)。然后我們想到使用消息隊列這種比較常見的方法來處理來達到我們的目的,例如Resque, Sidekiq, SQS等等,還有數不清的工具因為實在有太多方法來實現這個功能。
所以在第二次迭代的時候,我們需要創建一個緩沖隊列,我們會將任務放入隊列里面然后再一個個地上傳到S3上,但由于我們希望達到能夠控制這個隊列的最大容量的目的,并且我們有足夠的RAM來允許我們將請求體儲存到內存當中,所以我們認為直接使用了Go提供的channel,然后將我們的請求直接入隊到channel中處理就可以了。
var Queue chan Payload func init() { Queue = make(chan Payload, MAX_QUEUE) } func payloadHandler(w http.ResponseWriter, r *http.Request) { ... // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { Queue <- payload } ... }
我們會從channel中獲取任務并且執行他們的上傳操作
func StartProcessor() { for { select { case job := <-Queue: job.payload.UploadToS3() // <-- STILL NOT GOOD } } }
但說句老實話,我并不知道這是在干嘛。肯定是因為那時已經太晚還有我們已經喝了太多的紅牛。
這個改動并沒有讓我們的困境得到任何改善,我們將并發任務放到了隊列中執行僅僅是看上去好像解決了問題。但是我們的異步程序一次只會上傳一個請求體到S3上面,但是我們的請求數此時遠遠大于我們上傳到S3的數量,可想而知我們的緩沖隊列很快就到達了他的極限爆滿了,然后它阻擋了其他網絡請求的入隊操作。
相當于我們僅僅回避了問題,并且讓我們的系統的崩潰時間進入了倒數。我們這個缺陷的版本發布以后,整個系統的延遲率在持續性地每分鐘在上漲。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
接下來修改我們網絡請求的鉤子函數,負責創建一個Job的結構體的實例然后將其放入JobQueue channel中等待worker來獲取執行。
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { // let's create a job with the payload work := Job{Payload: payload} // Push the work onto the queue. JobQueue <- work } w.WriteHeader(http.StatusOK) }
在我們網絡服務初始化的時候創建一個Dispather并且調用Run()創建一個裝有一定數量worker的線程池,用來接收和處理來自JobQueue的Job
dispatcher := NewDispatcher(MaxWorker) dispatcher.Run()
下面是我們Dispather的實現
type Dispatcher struct { // A pool of workers channels that are registered with the dispatcher WorkerPool chan chan Job } func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool} } func (d *Dispatcher) Run() { // starting n number of workers for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(d.pool) worker.Start() } go d.dispatch() } func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: // a job request has been received go func(job Job) { // try to obtain a worker job channel that is available. // this will block until a worker is idle jobChannel := <-d.WorkerPool // dispatch the job to the worker job channel jobChannel <- job }(job) } } }
當我們將這個版本發布到生產環境以后我們的延遲率馬上有明顯的下降,我們處理請求的能力有一個質的飛躍。
關于Golang中怎么應付百萬級請求就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。