您好,登錄后才能下訂單哦!
這篇文章主要介紹了kafka-consumer-offset位移問題怎么解決的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇kafka-consumer-offset位移問題怎么解決文章都會有所收獲,下面我們一起來看看吧。
_consumer_offsets主題里面采用key和 value的方式存儲數據。
key是 group.id+topic+分區號,value 就是當前offset的值。
每隔一段時間,kafka 內部會對這個topic進行compact(壓縮),也就是每個group.id+topic+分區號就保留最新數據。
Kafka0.9版本之前,consumer黑認將offset保存在Zookeeper中。0.9版本開始,consumer默認將offset保存在Kafka一個內置的topic中,該topic為_consumer_offsets。
將offset信息存儲在zk中的不足:如果將offset信息存儲在zk中,那么所有的consumer都會訪問zk,會消耗大量的網絡資源,消費速度慢。
思想:_consumer_offsets為Kafka中的 topic,那就可以通過消費者進行消費。
在配置文件 config/consumer.properties中添加配置exclude.internal.topics = false,默認是 true,表示不能消費系統主題。為了查看該系統主題數據,所以該參數修改為false。修改以后執行分發命令:xsync consumer.properties。
采用命令行方式,創建一個新的topic。
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic atguigu --partitions 2 --replication-factor 2
啟動生產者往atguigu生產數據。
[atguigu@hadoop102 kafka] $ bin/kafka-console-producer.sh --topic atguigu --bootstrap-server hadoop102:9092
啟動消費者消費atguigu數據。
[atguigu@hadoop104 kafka]$ bin/kafka-console-consumer.sh bootstrap-server hadoop102:9092--topic atguigu --group test
注意:指定消費者組名稱,更好觀察數據存儲位置(key是 group.id+topic+分區號)。查看消費者消費主題_consumer_offsets。
[atguigu@hadoop102 kafka]$ bin/kafka-console-consumer.sh --topic _consumer_offsets --bootstrap-server hadoop102:9092 --consumer.config config/consumer.properties --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
為了使我們能夠專注于自己的業務邏輯,Kafka提供了自動提交offset的功能。自動提交offset的相關參數:
enable.auto.commit:是否開啟自動提交offset功能,默認是true
auto.commit.interval.ms:自動提交offset的時間間隔,默認是5s
消費者配置代碼:
//配置是否是自動提交 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true); //提交時間間隔,單位是ms properties.put(ConsumerConfig.AUTO_COMNIT_INTERVAL_NS_CONFI6,1000);
雖然自動提交offset十分簡單便利,但由于其是基于時間提交的,開發人員難以把握offset提交的時機。因此Kafka還提供了手動提交offset的API。
手動提交offset的方法有兩種:分別是commitSync(同步提交)和commitAsync(異步提交)。
兩者的相同點是,都會將本次提交的一批數據最高的偏移量提交;不同點是,同步提交阻塞當前線程,一直到提交成功,并且會自動失敗重試(由不可控因素導致,也會出現提交失敗);而異步提交則沒有失敗重試機制,故有可能提交失敗。
commitSync
(同步提交):必須等待offset提交完畢,再去消費下一批數據。
commitAsync
(異步提交):發送完提交offset請求后,就開始消費下一批數據了
3.2.1 同步提交
//手動提交屬性配置 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,false); //消費代碼邏輯 XXX XXX XXX //手動提交代碼(處理完數據以后,這里為了方便,只展示關鍵代碼) //手動提交offset kafkaConsumer.commitsync();
3.2.2 異步提交(生產常用)
//手動提交屬性配置 properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG ,false); //消費代碼邏輯 XXX XXX XXX //手動提交代碼(處理完數據以后,這里為了方便,只展示關鍵代碼) //手動提交offset kafkaConsumer.commitAsync();
auto.offset.reset = earliest | latest | none 默認是latest。
當Kafka 中沒有初始偏移量(消費者組第一次消費)或服務器上不再存在當前偏移量時(例如該數據已被刪除),該怎么辦?
earliest
:自動將偏移量重置為最早的偏移量,--from-beginning。
latest
(默認值):自動將偏移量重置為最新偏移量。
none
:如果未找到消費者組的先前偏移量,則向消費者拋出異常。
任意指定offset位移開始消費。
//1創建消費者 KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties); // 2訂閱主題 ArrayList<String> topics = new ArrayList<>(;topics.add( "first"); kafkaConsumer.subscribe(topics); //指定位置進行消費 set<TopicPartition> assignment = kafkaConsumer.assignment();//獲取所有分區信息 //保證分區分配方案已經制定完畢,因為由于leader消費者制定分配方案會消耗一定時間,有可能此時獲取不到分區信息,所以加一層分區空間判斷 while (assignment.size() == 0){ //促使獲取的分區數量不為0 kafkaConsumer.poll(Duration.ofSeconds(1)); assignment = kafkaConsumer.assignment(); } //遍歷所有分區,指定消費的offset for (TopicPartition topicPartition : assignment) { kafkaConsumer.seek(topicPartition, 100); } // 3消費數據 while (true){
需求:在生產環境中,會遇到最近消費的幾個小時數據異常,想重新按照時間消費。
例如要求按照時間消費前一天的數據,怎么處理?
//1創建消費者 KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties); // 2訂閱主題 ArrayList<String> topics = new ArrayList<>(;topics.add( "first"); kafkaConsumer.subscribe(topics); //指定位置進行消費 set<TopicPartition> assignment = kafkaConsumer.assignment();//獲取所有分區信息 //保證分區分配方案已經制定完畢,因為由于leader消費者制定分配方案會消耗一定時間,有可能此時獲取不到分區信息,所以加一層分區空間判斷 while (assignment.size() == 0){ //促使獲取的分區數量不為0 kafkaConsumer.poll(Duration.ofSeconds(1)); assignment = kafkaConsumer.assignment(); } //希望把時間轉換為對應的offset HashMap<TopicPartition,Long> topicPartitionLongHashMap = new HashMap<>(); //封裝對應集合 for (TopicPartition topicPartition : assignment) { //希望獲取當前系統時間一天前的數據。 topicPartitionLongHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000); } Nap<TopicPartition,OffsetAnd imestamp> topioPartitionffsetAndrtimestampMep = karfiaConsumer.offsetsForTines(topicPartitionL ongHashiap); //遍歷所有分區,指定消費的offset //指定消費的offset for (TopicPartition topicPartition : assignment) { OffsetAndTimestamp offsetAndTimestamp = topicPartition0ffsetAndTimestampHap.get(topicPartition); kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset()); } // 3消費數據 while (true){
場景1:重復消費。自動提交offset引起。
場景1:漏消費。設置offset為手動提交,當offset被提交時,數據還在內存中未落盤,此時剛好消費者線程被kill掉,那么offset已經提交,但是數據未處理,導致這部分內存中的數據丟失。
如果想完成Consumer端的精準一次性消費,那么需要Kafka消費端將消費過程和提交offset過程做原子綁定。
此時我們需要將Kafka的offset保存到支持事務的自定義介質(比如MySQL)。這部分知識會在后續項目部分涉及。
方案1:如果是Kafka消費能力不足,則可以考慮增加Topic的分區數,并且同時提升消費組的消費者數量,消費者數=分區數。(兩者缺一不可)
方案2:如果是下游的數據處理不及時:提高每批次拉取的數量。批次拉取數據過少(拉取數據/處理時間<生產速度),使處理的數據小于生產的數據,也會造成數據積壓。
關于“kafka-consumer-offset位移問題怎么解決”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“kafka-consumer-offset位移問題怎么解決”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。