您好,登錄后才能下訂單哦!
一個StateBackEnd 包括以下幾個部分:
1.CheckPointStreamFactory 構造流用于寫出Checkpoint 數據
不同的StateBackEnd會有不同的實現,返回不同的CheckpointStateOutputStream實現,比如 FsStateBackEnd 就會構造文件流, 而MemoryStateBackEnd就會構造ByteArraOutputStream
CheckpointStateOutputStream 會作為IO代理包含在KeyedStateCheckpointOutputStream 和 OperatorStateCheckpointOutputStream內.
KeyedStateCheckpointOutputStream 和 OperatorStateCheckpointOutputStream 分別需要記錄額外的狀態. KeyedStateCheckpointOutputStream 需要記錄每個keyGroup起始在流中的位置, OperatorStateCheckpointOutputStream 需要記錄每個partition起始在流中的位置, 這些信息都會體現在對應的StreamStateHandle中.
CheckpointStateOutputStream 定義了 closeAndGetHandle 方法返回了一個 StreamStateHandle 的實現,這個句柄會被序列化傳遞給JobManager, JobManager 會將句柄作為快照的一部分集中保存,那么在恢復數據的時候就能夠通過句柄反向獲得InputStream讀取數據
具體參考 AbstractStreamOperator.snapshotState
InternalTimerServiceSerializationProxy.write -> HeapInternalTimerService.snapshotTimersForKeyGroup
KeyedStateBackEnd.snapshot OperatorStateBackEnd.snapshot
2.KeyedStateBackEnd
KeyedStateBackEnd 在創建StreamTask 的時候創建,所以一個Task 對應一個KeyedStateBackEnd.
KeyedStateBackEnd 定義了如何注冊和生成各種State 包括: ListState, MapState, ValueState, AggregatingState, FoldingState, ReducingState
KeyedStateBackEnd 目前有兩種實現: HeapKeyedStateBackend 和 RocksDBKeyedStateBackend. 其中HeapKeyedStateBackend 把狀態存儲在內部的一個StateTable中,每個State name 對應StateTable 中的一個Entry StateTable 包含三元信息:Key, Namespace, Value. Key和Value 很容易理解, Namespace 目前好像僅僅用于Window 算子,記錄了當前的Window 信息, 如果沒有Window 會給一個默認的namespace (VoidNamespace.INSTANCE). RocksDBKeyedStateBackend 會根據StateDescription 生成一個RocksDB column family, 然后在每種State get/set 的時候直接對Rocks DB 進行讀寫操作 *
異步State Snapshot: HeapKeyedStateBackend 和 RocksDBKeyedStateBackend 都支持異步Snapshot, 所謂異步Snapshot 就是起一根獨立線程向 CheckpointStateOutputStream 寫State 數據. 但是對數據結構有要求,因為在做snapshot 的過程中 state table 本身可能會繼續變化. 所以需要在snapshot 開始的時候對數據做一個快照. HeapKeyedStateBackend內部用了CopyOnWriteStateTable保證線程安全性,使數據快照的數據不會corrupt. RocksDBKeyedStateBackend 思路是類似的. snapshot 開始的時候調用RocksDB.snapshot, 然后再通過線程異步向 CheckpointStateOutputStream 寫State 數據.
增量 State Snapshot: RocksDBKeyedStateBackend 特有的特性. 具體的實現參考RocksDBIncrementalSnapshotOperation. 這里簡單比較一下RocksDBFullSnapshotOperation和RocksDBIncrementalSnapshotOperation. RocksDBFullSnapshotOperation 會完整地讀取Snapshot中所有的KV數據,然后向流中寫出所有的kvMetadata和kvData. 返回的StateHandle是KeyGroupsStateHandle, 和HeapKedStateBackend一致. 而RocksDBIncrementalSnapshotOperation則會遍歷RocksDB checkpoint目錄下的所有文件. 每次做Checkpoint的時候,RocksDBKeyedStateBackend會記錄當前checkPointId對應的RocksDB ssd文件.這樣在做一次新的Checkpoint的時候就可以比對文件獲取是否有新的數據文件.原有的數據文件不用再寫而是直接返回一個PlaceholderStreamStateHandle. Checkpoint不是逐條遍歷KV寫出,而是直接向流中寫出RocksDB數據文件的數據. 返回的StateHandle是IncrementalKeyedStateHandle其中包含了一組RocksDB數據文件的句柄.
數據恢復的過程也同樣需要區分full/incremental. 分別對應RocksDBFullRestoreOperation和RocksDBIncrementalRestoreOperation
3.OperatorStateBackEnd
主要管理OperatorState. 目前只有一種實現: DefaultOperatorStateBackend. 構造出一個 PartitionableListState (屬于ListState). 這是一個In Memory的實現. Add 操作追加到
內存的一個List中. Snapshot 的過程和KeyedStateBackEnd大同小異,這里就不再贅述.
StateBackend 的類結構:
State 恢復的過程:
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。