您好,登錄后才能下訂單哦!
本篇內容介紹了“如何解決Kafka問題”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
寫在前面
估計運維年前沒有祭拜服務器,Nginx的問題修復了,Kafka又不行了。今天,本來想再睡會,結果,電話又響了。還是運營,“喂,冰河,到公司了嗎?趕緊看看服務器吧,又出問題了“。“在路上了,運維那哥們兒還沒上班嗎”?“還在休假。。。”, 我:“。。。”。哎,這哥們兒是跑路了嗎?先不管他,問題還是要解決。
問題重現
到公司后,放下我專用的雙肩包,拿出我的利器——筆記本電腦,打開后迅速登錄監控系統,發現主要業務系統沒啥問題。一個非核心服務發出了告警,并且監控系統中顯示這個服務頻繁的拋出如下異常。
2021-02-28 22:03:05 131 pool-7-thread-3 ERROR [] - commit failed org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na] at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na] at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]
從上面輸出的異常信息,大概可以判斷出系統出現的問題:Kafka消費者在處理完一批poll消息后,在同步提交偏移量給broker時報錯了。大概就是因為當前消費者線程的分區被broker給回收了,因為Kafka認為這個消費者掛掉了,我們可以從下面的輸出信息中可以看出這一點。
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
Kafka內部觸發了Rebalance機制,明確了問題,接下來,我們就開始分析問題了。
分析問題
既然Kafka觸發了Rebalance機制,那我就來說說Kafka觸發Rebalance的時機。
什么是Rebalance
舉個具體點的例子,比如某個分組下有10個Consumer實例,這個分組訂閱了一個50個分區的主題。正常情況下,Kafka會為每個消費者分配5個分區。這個分配的過程就是Rebalance。
觸發Rebalance的時機
當Kafka中滿足如下條件時,會觸發Rebalance:
組內成員的個數發生了變化,比如有新的消費者加入消費組,或者離開消費組。組成員離開消費組包含組成員崩潰或者主動離開消費組。
訂閱的主題個數發生了變化。
訂閱的主題分區數發生了變化。
后面兩種情況我們可以人為的避免,在實際工作過程中,對于Kafka發生Rebalance最常見的原因是消費組成員的變化。
消費者成員正常的添加和停掉導致Rebalance,這種情況無法避免,但是時在某些情況下,Consumer 實例會被 Coordinator 錯誤地認為 “已停止” 從而被“踢出”Group,導致Rebalance。
當 Consumer Group 完成 Rebalance 之后,每個 Consumer 實例都會定期地向 Coordinator 發送心跳請求,表明它還存活著。如果某個 Consumer 實例不能及時地發送這些心跳請求,Coordinator 就會認為該 Consumer 已經 “死” 了,從而將其從 Group 中移除,然后開啟新一輪 Rebalance。這個時間可以通過Consumer 端的參數 session.timeout.ms 進行配置。默認值是 10 秒。
除了這個參數,Consumer 還提供了一個控制發送心跳請求頻率的參數,就是 heartbeat.interval.ms。這個值設置得越小,Consumer 實例發送心跳請求的頻率就越高。頻繁地發送心跳請求會額外消耗帶寬資源,但好處是能夠更加快速地知曉當前是否開啟 Rebalance,因為,目前 Coordinator 通知各個 Consumer 實例開啟 Rebalance 的方法,就是將 REBALANCE_NEEDED 標志封裝進心跳請求的響應體中。
除了以上兩個參數,Consumer 端還有一個參數,用于控制 Consumer 實際消費能力對 Rebalance 的影響,即 max.poll.interval.ms 參數。它限定了 Consumer 端應用程序兩次調用 poll 方法的最大時間間隔。它的默認值是 5 分鐘,表示 Consumer 程序如果在 5 分鐘之內無法消費完 poll 方法返回的消息,那么 Consumer 會主動發起 “離開組” 的請求,Coordinator 也會開啟新一輪 Rebalance。
通過上面的分析,我們可以看一下那些rebalance是可以避免的:
第一類非必要 Rebalance 是因為未能及時發送心跳,導致 Consumer 被 “踢出”Group 而引發的。這種情況下我們可以設置 session.timeout.ms 和 heartbeat.interval.ms 的值,來盡量避免rebalance的出現。(以下的配置是在網上找到的最佳實踐,暫時還沒測試過)
設置 session.timeout.ms = 6s。
設置 heartbeat.interval.ms = 2s。
要保證 Consumer 實例在被判定為 “dead” 之前,能夠發送至少 3 輪的心跳請求,即 session.timeout.ms >= 3 * heartbeat.interval.ms。
將 session.timeout.ms 設置成 6s 主要是為了讓 Coordinator 能夠更快地定位已經掛掉的 Consumer,早日把它們踢出 Group。
第二類非必要 Rebalance 是 Consumer 消費時間過長導致的。此時,max.poll.interval.ms 參數值的設置顯得尤為關鍵。如果要避免非預期的 Rebalance,最好將該參數值設置得大一點,比下游最大處理時間稍長一點。
總之,要為業務處理邏輯留下充足的時間。這樣,Consumer 就不會因為處理這些消息的時間太長而引發 Rebalance 。
拉取偏移量與提交偏移量
kafka的偏移量(offset)是由消費者進行管理的,偏移量有兩種,拉取偏移量(position)與提交偏移量(committed)。拉取偏移量代表當前消費者分區消費進度。每次消息消費后,需要提交偏移量。在提交偏移量時,kafka會使用拉取偏移量的值作為分區的提交偏移量發送給協調者。
如果沒有提交偏移量,下一次消費者重新與broker連接后,會從當前消費者group已提交到broker的偏移量處開始消費。
所以,問題就在這里,當我們處理消息時間太長時,已經被broker剔除,提交偏移量又會報錯。所以拉取偏移量沒有提交到broker,分區又rebalance。下一次重新分配分區時,消費者會從最新的已提交偏移量處開始消費。這里就出現了重復消費的問題。
異常日志提示的方案
其實,說了這么多,Kafka消費者輸出的異常日志中也給出了相應的解決方案。
接下來,我們說說Kafka中的拉取偏移量和提交偏移量。
其實,從輸出的日志信息中,也大概給出了解決問題的方式,簡單點來說,就是可以通過增加 max.poll.interval.ms 時長和 session.timeout.ms時長,減少 max.poll.records的配置值,并且消費端在處理完消息時要及時提交偏移量。
問題解決
通過之前的分析,我們應該知道如何解決這個問題了。這里需要說一下的是,我在集成Kafka的時候,使用的是SpringBoot和Kafka消費監聽器,消費端的主要代碼結構如下所示。
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory") public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){ logger.info("topic is {}, offset is {}, value is {} n", record.topic(), record.offset(), record.value()); try { Object value = record.value(); logger.info(value.toString()); ack.acknowledge(); } catch (Exception e) { logger.error("日志消費端異常: {}", e); } }
上述代碼邏輯比較簡單,就是獲取到Kafka中的消息后直接打印輸出到日志文件中。
嘗試解決
這里,我先根據異常日志的提示信息進行配置,所以,我在SpringBoot的application.yml文件中新增了如下配置信息。
spring: kafka: consumer: properties: max.poll.interval.ms: 3600000 max.poll.records: 50 session.timeout.ms: 60000 heartbeat.interval.ms: 3000
配置完成后,再次測試消費者邏輯,發現還是拋出Rebalance異常。
最終解決
我們從另一個角度來看下Kafka消費者所產生的問題:一個Consumer在生產消息,另一個Consumer在消費它的消息,它們不能在同一個groupId 下面,更改其中一個的groupId 即可。
這里,我們的業務項目是分模塊和子系統進行開發的,例如模塊A在生產消息,模塊B消費模塊A生產的消息。此時,修改配置參數,例如 session.timeout.ms: 60000,根本不起作用,還是拋出Rebalance異常。
此時,我嘗試修改下消費者分組的groupId,將下面的代碼
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer", containerFactory = "kafkaListenerContainerFactory") public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){
修改為如下所示的代碼。
@KafkaListener(topicPartitions = {@TopicPartition(topic = KafkaConstants.TOPIC_LOGS, partitions = { "0" }) }, groupId = "kafka-consumer-logs", containerFactory = "kafkaListenerContainerFactory") public void consumerReceive (ConsumerRecord<?, ?> record, Acknowledgment ack){
“如何解決Kafka問題”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。