您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關Kafka分組消費的示例分析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
從kafka消費消息,kafka客戶端提供兩種模式: 分區消費,分組消費。
分區消費對應的就是我們的DirectKafkaInputDStream
分組消費對應的就是我們的KafkaInputDStream
消費者數目跟分區數目的關系:
1),一個消費者可以消費一個到全部分區數據
2),分組消費,同一個分組內所有消費者消費一份完整的數據,此時一個分區數據只能被一個消費者消費,而一個消費者可以消費多個分區數據
3),同一個消費組內,消費者數目大于分區數目后,消費者會有空余=分區數-消費者數
當一個group中,有consumer加入或者離開時,會觸發partitions均衡partition.assignment.strategy,決定了partition分配給消費者的分配策略,有兩種分配策略:
1,org.apache.kafka.clients.consumer.RangeAssignor
默認采用的是這種再平衡方式,這種方式分配只是針對消費者訂閱的topic的單個topic所有分區再分配,Consumer Rebalance的算法如下:
1),將目標Topic下的所有Partirtion排序,存于TP
2),對某Consumer Group下所有Consumer按照名字根據字典排序,存于CG,第i個Consumer記為Ci
3),N=size(TP)/size(CG)
4),R=size(TP)%size(CG)
5),Ci獲取的分區起始位置=N*i+min(i,R)
6),Ci獲取的分區總數=N+(if (i+ 1 > R) 0 else 1)
2,org.apache.kafka.clients.consumer.RoundRobinAssignor
這種分配策略是針對消費者消費的所有topic的所有分區進行分配。當有新的消費者加入或者有消費者退出,就會觸發rebalance。這種方式有兩點要求
A),在實例化每個消費者時給每個topic指定相同的流數
B),每個消費者實例訂閱的topic必須相同
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();topicCountMap.put(topic, new Integer(1));Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
其中,topic對應的value就是流數目。對應的kafka源碼是在
在kafka.consumer.ZookeeperConsumerConnector的consume方法里,根據這個參數構建了相同數目的KafkaStream。
這種策略的具體分配步驟:
1),對所有topic的所有分區按照topic+partition轉string之后的hash進行排序
2),對消費者按字典進行排序
3),然后輪訓的方式將分區分配給消費者
3,舉例對比
舉個例子,比如有兩個消費者(c0,c1),兩個topic(t0,t1),每個topic有三個分區p(0-2),
那么采用RangeAssignor,結果為:
* C0: [t0p0, t0p1, t1p0, t1p1]
* C1: [t0p2, t1p2]
采用RoundRobinAssignor,結果為:
* C0: [t0p0, t0p2, t1p1]
* C1: [t0p1, t1p0, t1p2]
分組消費有一個比較好的功能就是自動檢測失敗的消費者并將其踢出分組,然后重新進行分區分配。那么kafka是如何檢測失敗的消費者的呢。我們就拿0.10.x為例進行講解說明。
消費著訂閱了一組的topic后,會在調用poll(long)函數的時候加入分組,分組內新增消費者就會進行再平衡。Poll 函數的設計目標就是來保證消費者存活的。只要持續不斷的調用poll函數,消費者就會留在分組里,連續的從分配給他的分區里消費消息。消費者也會使用一個后臺線程發送周期性的心跳給broker。如果消費者掛掉或者無法在session.timeout.ms時間范圍內發送心跳,消費者會被視為死亡,它的分區就會被重新分配。session.timeout.ms默認是10000ms。該值要在group.max.session.timeout.ms=300000ms和group.min.session.timeout.ms=6000ms之間。
由于心跳是后臺線程周期性發送的,那么會存在消費者心跳正常發送,但是不消費消息的情況。為了避免這種消費者無限期的占用分配給他的分區這種情況,kafka提供了一種存活檢測機制,使用max.poll.interval.ms配置。根本上來說,兩次調用poll函數的間隔大于該值,消費者就會離開分組,然后它的分區會被其它消費著消費。當發生這種情況時,你會收到一個offset提交失敗的異常。這種機制確保了只有活躍的消費者才能提交offset。
消費者有兩個配置來控制poll函數的行為:
max.poll.interval.ms:增加兩次調用poll的間隔,實際上就是增加消費者處理上次poll所拉取消息的時間。當然,弊端是增加該值會增加消費者組再平衡的時間,因為僅僅在調用poll的過程中消費者才能參與再平衡。要注意一點,request.timeout.ms=305000,默認值要修改比max.poll.interval.ms大,也即是大于5min。該值是當消費者進行再平衡時,JoinGroup請求在server端的阻塞時間。
max.poll.records:限制每次調用poll返回消息的最大數。有了該參數我們就可以預估兩次
有些情況下,數據處理時間不可預期,上面的兩個參數并不難滿足需求。這種情況下,推薦將消息處理放到其它后臺線程中執行,這樣消費者就可以持續的調用poll函數了。但是這中情況下,要處理好offset提交的問題。典型做法就是禁止掉自動提交offset,改為手動再消息處理結束后提交offset。這種情況下,需要對消費的分區調用pause函數,這樣在調用poll函數的時候就不會接受新的數據,然后處理完之后調用resume(Collection)即可恢復消費。
關于Kafka分組消費的示例分析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。