您好,登錄后才能下訂單哦!
關于Spark Streaming感知kafka動態分區的問題該怎么理解,針對這個問題,這篇文章詳細介紹了相對應的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
小編主要是講解Spark Streaming與kafka結合的新增分區檢測的問題。
讀閱前是需要了解Spark Streaming的原理和源碼結構基礎。
kafka 0.8版本
進入正題,之所以會有今天題目的疑惑,是由于在08版本kafka和Spark Streaming結合的DirectStream這種形式的API里面,是不支持kafka新增分區或者topic檢測的。而這個問題,對于很多業務增長比較明顯的公司都是會有碰到相應的問題。
比如,原來的公司業務增長比較明顯,那么kafka吞吐量,剛開始創建的topic數目和分區數目可能滿足不了并發需求,需要增加分區。新增加的分區會有生產者往里面寫數據,而Spark Streaming跟kafka 0.8版本結合的API是滿足不了動態發現kafka新增topic或者分區的需求的。
這么說有什么依據嗎?我們做項目不能人云亦云,所以我們可以從源碼入手驗證我們的想法。
我們在這里不會詳細講Spark Streaming源碼,但是我們可以在這里思考一下,Spark Streaming分區檢測是在哪做的?
很明顯對于批處理的Spark Streaming任務來說,分區檢測應該在每次job生成獲取kafkaRDD,來給kafkaRDD確定分區數并且每個分區賦值offset范圍的時候有牽扯,而這段代碼就在DirectKafkaInputDStream#compute方法中。(看過浪尖Spark Streaming源碼視頻教程的肯定會知道)
那么我們就貼出這塊源碼去驗證我們的想法,首先compute方法的第一行:
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
這里面獲取的是當前生成KafkaRDD每個分區消費的offset的最大值,那么我們需要進入latestLeaderOffsets進一步去看,可以發現下面一行代碼:
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
這個是根據currentOffsets信息來獲取最大的offset,由此此處繼續深入發現,由于它只是根據currentOffsets信息來獲取最大的offset,沒有去感知新增的分區,所以Spark Streaming與kafka 0.8結合是不能動態感知分區的。
kafka 0.10版本
相似的我們也可以直接去看kafka 0.10這塊的源碼去檢查,他是否會動態生成kafka分區。
進入DirectKafkaInputDStream的compute,看到的第一行代碼也是:
val untilOffsets = clamp(latestOffsets())
在latestOffsets里面,有了新的大陸:
關于關于Spark Streaming感知kafka動態分區的問題該怎么理解問題的解答就分享到這里了,希望以上內容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關注億速云行業資訊頻道了解更多相關知識。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。