您好,登錄后才能下訂單哦!
這篇文章主要介紹了kafka的編程模型有哪些,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
1.kafka消費者編程模型
分區消費模型
組(group)消費模型
1.1.1.分區消費架構圖,每個分區對應一個消費者。
1.1.2.分區消費模型偽代碼描述
指定偏移量,用于從上次消費的地方開始消費.
提交offset ,java客戶端會自動提交的集群,所以這一步可選。
1.2.1.組消費模型架構圖
每個組都消費該topic的全量數據,一條消息會發給groupA和groupB.
1.2.2.組消費模型偽代碼:
流數N:表示一個consumer組里面有幾個consumer 實例,上例中組A創建2個流,組B創建4個流。
1.2.3.consumer分配算法
當kafka的分區個數大于組A里consumer實例個數時,怎么去分配,以下為分配步驟:
Partition消費模型更加靈活但是:
(1)需要自己處理各種異常情況;
(2)需要自己管理offset(以實現消息傳遞的其他語義);
Group消費模型更加簡單,但是不靈活:
(1)不需要自己處理異常情況,不需要自己管理offset;
(2)只能實現kafka默認的最少一次消息傳遞語義;
知識補充:消息傳遞的3中語義:
至少一次,(消息不會丟,消息者至少得到一次,但有可能會重復,生產者向消費者發送之后,會等待消費者確認,沒收到確認會再發) (kafka 默認實現的語義)。
至多一次,(消息會丟)
有且只有一次。
fetchSize: 從服務器獲取單包大小;
bufferSize: kafka客戶端緩沖區大小;
group.id: 分組消費時分組名 (指定的每個組將獲得全量的數據)
同步生產模型
異步生產模型
至少成功一次 , 發送給kafka消費者
打包發送給kafka broker。
main()
創建到kafka broker的連接:KafkaClient(host,port)
選擇或者自定義生產者負載均衡算法 partitioner (算法有:hash,輪詢,隨機)
設置生產者參數 (緩存隊列長度,發送時間,同步/異步參數設置)
根據負載均衡算法和設置的生產者參數構造Producer對象
while True
getMessage:從上游獲得一條消息
按照kafka要求的消息格式構造kafka消息
根據分區算法得到分區
發送消息
處理異常
同步生產模型:
(1)低消息丟失率;
(2)高消息重復率(由于網絡原因,回復確認未收到);
(3)高延遲 (每發一條消息需要確認)
(使用在不丟消息場景)
異步生產模型:
(1)低延遲;
(2)高發送性能;(每秒一個分區發50萬條)
(3)高消息丟失率(無確認機制,發送端隊列滿了,消息會丟掉;整個隊列發送給)
(使用在允許丟消息場景,偶爾丟一條)
//同步配置參數:
默認的序列化方式:字節序列化。
設定分區算法:默認是對key進行hash分區算法,可以自定義分區算法。
確認機制 request.require.acks: 合理設置為1; 0: 絕不等確認 1: leader的一個副本收到這條消息,并發回確認 -1: leader的所有副本都收到這條消息,并發回確認
消息是以key-value的形式發送的,key必須要設置。
message.send.max.retries: 發送失敗重試次數;
retry.backoff.ms :未接到確認,認為發送失敗的時間;
producer.type: 同步發送或者異步發送;
batch.num.messages: 異步發送時,累計最大消息數;
queue.buffering.max.ms:異步發送時,累計最大時間;
感謝你能夠認真閱讀完這篇文章,希望小編分享的“kafka的編程模型有哪些”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。