91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

怎么讓Kafka達到最佳吞吐量

發布時間:2021-10-15 14:54:30 來源:億速云 閱讀:192 作者:iii 欄目:編程語言

本篇內容介紹了“怎么讓Kafka達到最佳吞吐量”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

上手使用

func main() {
  // 1. 初始化
	pusher := kq.NewPusher([]string{
		"127.0.0.1:19092",
		"127.0.0.1:19092",
		"127.0.0.1:19092",
	}, "kq")

	ticker := time.NewTicker(time.Millisecond)
	for round := 0; round < 3; round++ {
		select {
		case <-ticker.C:
			count := rand.Intn(100)
			m := message{
				Key:     strconv.FormatInt(time.Now().UnixNano(), 10),
				Value:   fmt.Sprintf("%d,%d", round, count),
				Payload: fmt.Sprintf("%d,%d", round, count),
			}
			body, err := json.Marshal(m)
			if err != nil {
				log.Fatal(err)
			}

			fmt.Println(string(body))
      // 2. 寫入
			if err := pusher.Push(string(body)); err != nil {
				log.Fatal(err)
			}
		}
	}
}

kafka cluster 配置以及 topic 傳入,你就得到一個操作 kafkapush operator

至于寫入消息,簡單的調用 pusher.Push(msg) 就行。是的,就這么簡單!

> 當然,目前只支持單個 msg 寫入。可能有人會疑惑,那就繼續往下看,為什么只能一條一條寫入?

初始化

一起看看 pusher 初始化哪些步驟:

NewPusher(clusterAddrs, topic, opts...)
	|- kafka.NewWriter(kfConfig)								// 與 kf 之前的連接
	|- executor = executors.NewChunkExecutor()  // 設置內部寫入的executor為字節數定量寫入
  1. 建立與 kafka cluster 的連接。此處肯定就要傳入 kafka config

  2. 設置內部暫存區的寫入函數以及刷新規則。

使用 chunkExecutor 作用不言而喻:將隨機寫 -> 批量寫,減少 I/O 消耗;同時保證單次寫入不能超過默認的 1M 或者自己設定的最大寫入字節數。

其實再往 chunkExecutor 內部看,其實每次觸發插入有兩個指標:

  • maxChunkSize:單次最大寫入字節數

  • flushInterval:刷新暫存消息插入的間隔時間

在觸發寫入,只要滿足任意一個指標都會執行寫入。同時在 executors 都有設置插入間隔時間,以防暫存區寫入阻塞而暫存區內消息一直不被刷新清空。

> 更多關于 executors 可以參看以下:https://zeromicro.github.io/go-zero/executors.html

生產者插入

根據上述初始化對 executors 介紹,插入過程中也少不了它的配合:

func (p *Pusher) Push(v string) error {
  // 1. 將 msg -> kafka 內部的 Message
	msg := kafka.Message{
		Key:   []byte(strconv.FormatInt(time.Now().UnixNano(), 10)),
		Value: []byte(v),
	}
  
  // 使用 executor.Add() 插入內部的 container
  // 當 executor 初始化失敗或者是內部發生錯誤,也會將 Message 直接插入 kafka
	if p.executor != nil {
		return p.executor.Add(msg, len(v))
	} else {
		return p.produer.WriteMessages(context.Background(), msg)
	}
}

過程其實很簡單。那 executors.Add(msg, len(msg)) 是怎么把 msg 插入到 kafka 呢?

插入的邏輯其實在初始化中就聲明了:

pusher.executor = executors.NewChunkExecutor(func(tasks []interface{}) {
		chunk := make([]kafka.Message, len(tasks))
  	// 1
		for i := range tasks {
			chunk[i] = tasks[i].(kafka.Message)
		}
  	// 2
		if err := pusher.produer.WriteMessages(context.Background(), chunk...); err != nil {
			logx.Error(err)
		}
	}, newOptions(opts)...)
  1. 觸發插入時,將暫存區中存儲的 []msg 依次拿出,作為最終插入消息集合;

  2. 將上一步的消息集合,作為一個批次插入 kafkatopic

這樣 pusher -> chunkExecutor -> kafka 一個鏈路就出現了。

“怎么讓Kafka達到最佳吞吐量”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

凯里市| 东城区| 岑溪市| 洞口县| 吐鲁番市| 和平县| 屏东县| 德格县| 北海市| 濮阳市| 高邮市| 亳州市| 逊克县| 娄底市| 衡水市| 南康市| 邵东县| 大理市| 普兰店市| 青川县| 兴山县| 宜黄县| 新邵县| 惠来县| 利辛县| 苍溪县| 嵊州市| 郎溪县| 永宁县| 永寿县| 黔南| 太白县| 禹城市| 湘乡市| 澜沧| 双牌县| 平顺县| 河南省| 澄城县| 莱西市| 台南市|