您好,登錄后才能下訂單哦!
ConsumerConfig.scala
儲存Consumer的配置
按照我的理解,0.10的Kafka沒有專門的SimpleConsumer,仍然是沿用0.8版本的。
消費的規則如下:
一個partition只能被同一個ConsumersGroup的一個線程所消費.
線程數小于partition數,某些線程會消費多個partition.
線程數等于partition數,一個線程正好消費一個線程.
當添加消費者線程時,會觸發rebalance,partition的分配發送變化.
同一個partition的offset保證消費有序,不同的partition消費不保證順序.
Consumers編程的用法:
private final KafkaConsumer<Long, String> consumer; // 與Kafka進行通信的consumer... consumer = new KafkaConsumer<Long, String>(props); consumer.subscribe(Collections.singletonList(this.topic)); ConsumerRecords<Long, String> records = consumer.poll(512); ...
consumer,是一個純粹的單線程程序,后面所講的所有機制(包括coordinator,rebalance, heartbeat等),都是在這個單線程的poll函數里面完成的。也因此,在consumer的代碼內部,沒有鎖的出現。
從KafkaConsumer的構造函數可以看出,KafkaConsumer有以下幾個核心部件:
Metadata: 存儲Topic/Partion與broker的映射關系
NetworkClient:網絡層 A network client for asynchronous request/response network i/o.
ConsumerNetworkClient: Higher level consumer access to the network layer //對NetworkClient的封裝,非線程安全
ConsumerCoordinator:只是client端的類,只是和服務端的GroupCoordinator通信的介質。(broker端的Coordinator 負責reblance、Offset提交、心跳)
SubscriptionState: consumer的Topic、Partition的offset狀態維護
Fetcher: manage the fetching process with the brokers. //獲取消息
后面會分組件講解Consumers的工作流程
在consumer啟動時或者coordinator節點故障轉移時,consumer發送ConsumerMetadataRequest給任意一個brokers。在ConsumerMetadataResponse中,它接收對應的Consumer Group所屬的Coordinator的位置信息。
Consumer連接Coordinator節點,并發送HeartbeatRequest。如果返回的HeartbeatResponse中返回IllegalGeneration錯誤碼,說明協調節點已經在初始化平衡。消費者就會停止抓取數據,提交offsets,發送JoinGroupRequest給協調節點。在JoinGroupResponse,它接收消費者應該擁有的topic-partitions列表以及當前Consumer Group的新的generation編號。這個時候Consumer Group管理已經完成,Consumer就可以開始fetch數據,并為它擁有的partitions提交offsets。
如果HeartbeatResponse沒有錯誤返回,Consumer會從它上次擁有的partitions列表繼續抓取數據,這個過程是不會被中斷的。
見Producer里面的分析。
補充一下,KafkaConsumer、KafkaProducer都是在構造函數中獲取metadata信息,通過調用metadata.update
方法來獲取信息。
在0.9以前的client api中,consumer是要依賴Zookeeper的。因為同一個consumer group中的所有consumer需要進行協同,新航道雅思培訓這與后面要講的rebalance有關。(ConsumerConnector、KafkaStream、ConsumerIterator
) -- package kafka.consumer
0.9之后新的consumer不依賴與Zookeeper,一個consumerGroup內的consumer由Coordinator管理.(KafkaConsumer
) -- package org.apache.kafka.clients.consumer
為什么?后面講
提問:為什么在一個group內部,1個parition只能被1個consumer擁有?
給定一個topic,有4個partition: p0, p1, p2, p3, 一個group有3個consumer: c0, c1, c2。
那么,如果按RangeAssignor
策略,分配結果是:
c0: p0, c1: p1, c2: p2, p3
如果按RoundRobinAssignor
策略:
c0: p1, p3, c1: p1, c2: p2
partition.assignment.strategy=RangeAssignor,默認值
(到底是哪種分配狀態呢)
那這整個分配過程是如何進行的呢?見下圖所示:
1. 步驟1:對于每1個consumer group,Kafka集群為其從broker集群中選擇一個broker作為其coordinator。因此,第1步就是找到這個coordinator。(1個consumer group對應一個coordinattor)
GroupCoordinatorRequest: GCR,由ConsumerNetworkClient發送請求去尋找coordinator。
2. 步驟2:找到coordinator之后,發送JoinGroup請求
consumer在這里會被劃分leader、follower(無責任的說:選擇第一個consumer)
leader作用:perform the leader synchronization and send back the assignment for the group(負責發送partition分配的結果)
follower作用:send follower's sync group with an empty assignment
3. 步驟3:JoinGroup返回之后,發送SyncGroup,得到自己所分配到的partition
SyncGroupRequest
consumer leader發送 SyncGroupRequest給Coordinator,Coordinator回給它null
follower發送 null的 SyncGroupRequest 給Coordinator,Coordinator回給它partition分配的結果。
注意,在上面3步中,有一個關鍵點:
partition的分配策略和分配結果其實是由client決定的,而不是由coordinator決定的。什么意思呢?在第2步,所有consumer都往coordinator發送JoinGroup消息之后,coordinator會指定其中一個consumer作為leader,其他consumer作為follower。
然后由這個leader進行partition分配。
然后在第3步,leader通過SyncGroup消息,把分配結果發給coordinator,其他consumer也發送SyncGroup消息,獲得這個分配結果。
接下來就到Fetcher
拉取數據了
四個步驟
步驟0:獲取consumer的offset
步驟1:生成FetchRequest,并放入發送隊列
步驟2:網絡poll
步驟3:獲取結果
當consumer初次啟動的時候,面臨的一個首要問題就是:從offset為多少的位置開始消費。
poll之前,給集群發送請求,讓集群告知客戶端,當前該TopicPartition的offset是多少。通過SubscriptionState
來實現, 通過ConsumerCoordinator
if (!subscriptions.hasAllFetchPositions()) updateFetchPositions(this.subscriptions.missingFetchPositions());
核心是:向Coordinator發了一個OffsetFetchRequest,并且是同步調用,直到獲取到初始的offset,再開始接下來的poll.(也就是說Offset的信息如果存在Kafka里,是存在GroupCoordinator里面)
consumer的每個TopicPartition都有了初始的offset,接下來就可以進行不斷循環取消息了,這也就是Fetch的過程:
fetcher.initFetches(cluster)
核心就是生成FetchRequest: 假設一個consumer訂閱了3個topic: t0, t1, t2,為其分配的partition分別是: t0: p0; t1: p1, p2; t2: p2
即總共4個TopicPartition,即t0p0, t0p1, t1p1, t2p2。這4個TopicPartition可能分布在2臺機器n0, n1上面: n0: t0p0, t1p1 n1: t0p1, t2p2
則會分別針對每臺機器生成一個FetchRequest,即Map<Node, FetchRequest>
。所以會有一個方法把所有屬于同一個Node的TopicPartition放在一起,生成一個FetchRequest。
調用ConsumerNetworkClient.poll
發送網絡請求。向服務器發 送響應請求和獲取服務器的響應。(默認值:executeDelayedTasks=true)
fetcher.fetchedRecords()
獲取Broker返回的Response,里面包含了List<ConsumerRecord> records
是否自動消費確認:由參數auto.xxx.commit=true
控制
手動消費:用于自定義Consumers的消費控制
下面從自動消費確認來分析,Offset自動確認是由ConsumerCoordinator
的AutoCommitTask
來實現的。
其調用在ConsumerNetworkClient
的 DelayedTaskQueue delayedTasks
里面,然后被周期性的調用。 周期性的發送確認消息,類似HeartBeat,其實現機制也就是前面所講的DelayedQueue + DelayedTask
.
poll
函數中的注釋:
// execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
可以這樣理解:第二次poll調用的時候,提交上一次poll的offset和心跳發送。
先提交offset,再去拉取record。那么這次Offset其實是上一次poll的Record的offset。
因此,當你把按照下面的邏輯寫程序的時候,可能會導致Consumer與Coordinator的心跳超時。
while(true) { consumer.poll();do process message // 假如這個耗時過長,那么這個consumer就無法發送心跳給coordinator,導致它錯誤認為這個consumer失去聯系了,引起不必要的rebalance。槽糕的情況下,會丟重復消費數據。}
因此,有必要把offset的提交單獨拿出來做一個線程。
到這里,就把整個Consumer的流程走完了。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。