91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

發布時間:2021-12-07 11:05:20 來源:億速云 閱讀:120 作者:柒染 欄目:大數據

本篇文章給大家分享的是有關SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。

前言

當我在測試SparkStreaming的狀態操作mapWithState算子時,當我們設置timeout(3s)的時候,3s過后數據還是不會過期,不對此key進行操作,等到30s左右才會清除過期的數據。

百度了很久,關于timeout的資料很少,更沒有解決這個問題的文章,所以說,百度也不是萬能的,有時候還是需要靠自己。

所以我就在周末研究了一下,然后將結果整理了出來,希望能幫助大家更全面的理解Spark狀態計算。

mapWithState

按理說Spark Streaming實時處理,數據就像流水,每個批次之間的數據都是獨立的,處理完就處理完了,不留下任何狀態。但是免不了一些有狀態的操作,例如統計從流啟動到現在,某個單詞出現了多少次,所以狀態操作就出現了。

狀態操作分為updateStateByKey和mapWithState,兩者有著很大的區別。簡單的來說,前者每次輸出的都是全量狀態,后者輸出的是增量狀態。

過期原理

SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

過期這一塊估計很多人開始都理解錯了,我剛開始理解就是數據從出現,經過多少秒之后就會過期。其實不是,這里的過期指的是空閑時間。

注釋大概是這個意思:timeout()傳入一個時間間隔參數,如果一個key在大于此間隔沒有此key的數據流入,則被認為是空閑的,就會單獨調用一次mapWithState中的func來清除這些空閑數據狀態。

先寫結論

使用了timeout()之后,需要使用以下代碼來在間隔內清除失效key。

stream.checkpoint(Seconds(6))

checkpoint的時候,會開啟全面掃描,才會對state中的失效key進行清理。

測試

val conf = new SparkConf().setMaster("local[2]").setAppName("state")val ssc = new StreamingContext(conf, Seconds(3))ssc.checkpoint("./tmp")val streams: DStream[(String, Int)] = ssc.socketTextStream("localhost", 9999)  .map(x => (x, 1))val result = streams.mapWithState(StateSpec.function((k: String, v: Option[Int], state: State[Int]) => {   val count = state.getOption().getOrElse(0)println(k)println(v)var sum = 0if (!state.isTimingOut()) {     sum = count + v.get
          state.update(sum)} else {     println("timeout")}Option(sum)  })  .timeout(Seconds(3)))// 這行代碼是觸發清除機制的關鍵// result.checkpoint(Seconds(6))result.print()ssc.start()ssc.awaitTermination()

使用上面的代碼進行測試,設置過期時間為3s。但是3s過后發現key并沒有過期,也不會被清除,大概30S之后被清除。

在9999端口輸入一個tom后,不再進行任何操作。測試結果如下:

tom
Some(1)
-------------------------------------------
Time: 1618228587000 ms
-------------------------------------------
Some(1)


tom
None
timeout
-------------------------------------------
Time: 1618228614000 ms
-------------------------------------------
Some(0)

從測試結果可以看出,從輸入到清除大概是27s。

我們現在將注釋的代碼放開,每6s進行checkpoint一次,輸入tom:

tom
Some(1)
-------------------------------------------
Time: 1618228497000 ms
-------------------------------------------
Some(1)

tom
None
timeout
-------------------------------------------
Time: 1618228506000 ms
-------------------------------------------
Some(0)

從生成到清除用了9秒,正好是過期時間 + 下一個窗口時間,觸發了checkpoint。

猜想

第一次學狀態操作的時候,就考慮如何去掉一些過期的key,通過timeout()的方法沒有完成自己想法,從網上也沒有找到解決方案,所以就暫且擱置在一邊了。后來又回過頭來考慮這個問題,然后根據自己的想法去猜想、去驗證。

1. 我先看的是mapWithState()的返回值

SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

2. MapWithStateDStreamImpl

SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

每個Dstream的計算邏輯都在compute()中,這里是調用了internalStream的getOrCompute(),根據繼承關系,調用的是父類Dstream的此方法:

SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

getOrCompute()主要功能為:計算、緩存、checkpoint。這里只需要記住幾個地方:checkpointDuration,即checkpoint間隔,和調用了checkpoint()。其實真正的計算還是調用了compute(),接著去看compute()

3. InternalMapWithStateDStream

SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

compute()里面也調用了getOrCompute()方法,其實和上面調用的一樣,都是Dstream的,這里主要看的是使用createFromRDD()生成的StateRDD。

4. MapWithStateRDD

這個StateRDD就是參與狀態計算的數據集合,首先看它是如何生成的:
SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

再看看StateRDD的compute()是如何計算的:
SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

從compute()看出,當doFullScan為true的時候,才會觸發過期key的清除,updateRecordWithData()負責全面掃描清除過期key

這不,思路就來了,我們只要找到開啟FullScan的方法,不就可以自行觸發清除機制了嗎!

那么,我們先看看doFullScan的默認值:
SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

默認是沒開啟的,接著通過快捷鍵看看哪些地方使用了doFullScan:
SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

從圖中看出,有兩處代碼修改了doFullScan,我們找到這兩處代碼:
SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決
SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

第一個基本上排除,那么就剩下第二個:checkpoint(),我們要知道的是,狀態操作必須要checkpoint

還記得在2中的getOrCompute()嗎,當checkpointDuration不為null的時候,調用checkpoint()。
我們來看3中InternalMapWithStateDStream是如何定義這個duration的:
SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決
SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

如圖,sideDuration是窗口時間,乘以系數10就是默認的checkpoint時長,所以當我設置窗口為3s時,checkpoint周期就是30s,30s才會清理一次過期key。

而通過checkpoint(interval)可以設置checkpoint的間隔,所以覆蓋了上面程序中默認的30s。
SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

5.MapWithStateRDDRecord

最后提一提,FullScan是在這個類中開啟的,所以先看看這個Record的注釋介紹:
SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

意思就是負責存儲StateRDD的狀態KV,updateRecordWithData()負責清除過期的Record,我們來看看這個方法的實現:
SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決

removeTimedoutData就是是否開啟全面掃描,即doFullScan的值。

以上就是SparkStreaming使用mapWithState時設置timeout()無法生效問題該怎么解決,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

新沂市| 通山县| 杭锦后旗| 梁山县| 南江县| 石城县| 宁河县| 丰都县| 赞皇县| 鄱阳县| 安康市| 集贤县| 巫山县| 洛南县| 肃宁县| 昆山市| 娄底市| 麻城市| 廉江市| 景德镇市| 郑州市| 文昌市| 临武县| 元谋县| 屏山县| 郁南县| 衢州市| 龙井市| 全南县| 新民市| 临洮县| 英德市| 临安市| 谢通门县| 罗平县| 建湖县| 枣强县| 汝城县| 天津市| 罗山县| 水富县|