您好,登錄后才能下訂單哦!
本篇內容介紹了“怎么讓Kafka達到最佳吞吐量”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
func main() { // 1. 初始化 pusher := kq.NewPusher([]string{ "127.0.0.1:19092", "127.0.0.1:19092", "127.0.0.1:19092", }, "kq") ticker := time.NewTicker(time.Millisecond) for round := 0; round < 3; round++ { select { case <-ticker.C: count := rand.Intn(100) m := message{ Key: strconv.FormatInt(time.Now().UnixNano(), 10), Value: fmt.Sprintf("%d,%d", round, count), Payload: fmt.Sprintf("%d,%d", round, count), } body, err := json.Marshal(m) if err != nil { log.Fatal(err) } fmt.Println(string(body)) // 2. 寫入 if err := pusher.Push(string(body)); err != nil { log.Fatal(err) } } } }
將 kafka cluster
配置以及 topic
傳入,你就得到一個操作 kafka
的 push operator
。
至于寫入消息,簡單的調用 pusher.Push(msg)
就行。是的,就這么簡單!
> 當然,目前只支持單個 msg
寫入。可能有人會疑惑,那就繼續往下看,為什么只能一條一條寫入?
一起看看 pusher
初始化哪些步驟:
NewPusher(clusterAddrs, topic, opts...) |- kafka.NewWriter(kfConfig) // 與 kf 之前的連接 |- executor = executors.NewChunkExecutor() // 設置內部寫入的executor為字節數定量寫入
建立與 kafka cluster
的連接。此處肯定就要傳入 kafka config
;
設置內部暫存區的寫入函數以及刷新規則。
使用 chunkExecutor
作用不言而喻:將隨機寫 -> 批量寫,減少 I/O 消耗;同時保證單次寫入不能超過默認的 1M
或者自己設定的最大寫入字節數。
其實再往 chunkExecutor
內部看,其實每次觸發插入有兩個指標:
maxChunkSize
:單次最大寫入字節數
flushInterval
:刷新暫存消息插入的間隔時間
在觸發寫入,只要滿足任意一個指標都會執行寫入。同時在 executors
都有設置插入間隔時間,以防暫存區寫入阻塞而暫存區內消息一直不被刷新清空。
> 更多關于 executors
可以參看以下:https://zeromicro.github.io/go-zero/executors.html
根據上述初始化對 executors
介紹,插入過程中也少不了它的配合:
func (p *Pusher) Push(v string) error { // 1. 將 msg -> kafka 內部的 Message msg := kafka.Message{ Key: []byte(strconv.FormatInt(time.Now().UnixNano(), 10)), Value: []byte(v), } // 使用 executor.Add() 插入內部的 container // 當 executor 初始化失敗或者是內部發生錯誤,也會將 Message 直接插入 kafka if p.executor != nil { return p.executor.Add(msg, len(v)) } else { return p.produer.WriteMessages(context.Background(), msg) } }
過程其實很簡單。那 executors.Add(msg, len(msg))
是怎么把 msg
插入到 kafka
呢?
插入的邏輯其實在初始化中就聲明了:
pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) { chunk := make([]kafka.Message, len(tasks)) // 1 for i := range tasks { chunk[i] = tasks[i].(kafka.Message) } // 2 if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil { logx.Error(err) } }, newOptions(opts)...)
觸發插入時,將暫存區中存儲的 []msg
依次拿出,作為最終插入消息集合;
將上一步的消息集合,作為一個批次插入 kafka
的 topic
中
這樣 pusher -> chunkExecutor -> kafka
一個鏈路就出現了。
“怎么讓Kafka達到最佳吞吐量”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。