91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Checkpoint與state的關系以及Checkpoint的執行機制

發布時間:2021-09-04 14:34:14 來源:億速云 閱讀:211 作者:chen 欄目:編程語言

這篇文章主要講解了“Checkpoint與state的關系以及Checkpoint的執行機制”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Checkpoint與state的關系以及Checkpoint的執行機制”吧!

大家好,今天我將跟大家分享一下 Flink 里面的 Checkpoint,共分為四個部分。首先講一下 Checkpoint 與 state 的關系,然后介紹什么是 state,第三部分介紹如何在 Flink 中使用state,第四部分則介紹 Checkpoint 的執行機制。

Checkpoint 與 state 的關系

Checkpoint 是從 source 觸發到下游所有節點完成的一次全局操作。下圖可以有一個對 Checkpoint 的直觀感受,紅框里面可以看到一共觸發了 569K 次 Checkpoint,然后全部都成功完成,沒有 fail 的。

state 其實就是 Checkpoint 所做的主要持久化備份的主要數據,看下圖的具體數據統計,其 state 也就 9kb 大小 。

什么是 state

我們接下來看什么是 state。先看一個非常經典的 word count 代碼,這段代碼會去監控本地的 9000 端口的數據并對網絡端口輸入進行詞頻統計,我們本地行動 netcat,然后在終端輸入 hello world,執行程序會輸出什么?

答案很明顯,(hello, 1) 和 (word,1)

那么問題來了,如果再次在終端輸入 hello world,程序會輸入什么?

答案其實也很明顯,(hello, 2) 和 (world, 2)。為什么 Flink 知道之前已經處理過一次 hello world,這就是 state 發揮作用了,這里是被稱為 keyed state 存儲了之前需要統計的數據,所以幫助 Flink 知道 hello 和 world 分別出現過一次。

回顧一下剛才這段 word count 代碼。keyby 接口的調用會創建 keyed stream 對 key 進行劃分,這是使用 keyed state 的前提。在此之后,sum 方法會調用內置的 StreamGroupedReduce 實現。

什么是 keyed state

對于 keyed state,有兩個特點:

  • 只能應用于 KeyedStream 的函數與操作中,例如 Keyed UDF, window state

  • keyed state 是已經分區/劃分好的,每一個 key 只能屬于某一個 keyed state

對于如何理解已經分區的概念,我們需要看一下 keyby 的語義,大家可以看到下圖左邊有三個并發,右邊也是三個并發,左邊的詞進來之后,通過 keyby 會進行相應的分發。例如對于 hello word,hello 這個詞通過 hash 運算永遠只會到右下方并發的 task 上面去。

什么是operator state

  • 又稱為 non-keyed state,每一個 operator state 都僅與一個 operator 的實例綁定。

  • 常見的 operator state 是 source state,例如記錄當前 source 的 offset

再看一段使用 operator state 的 word count 代碼:

這里的fromElements會調用FromElementsFunction的類,其中就使用了類型為 list state 的 operator state。根據 state 類型做一個分類如下圖:

除了從這種分類的角度,還有一種分類的角度是從 Flink 是否直接接管:

  • Managed State:由 Flink 管理的 state,剛才舉例的所有 state 均是 managed state

  • Raw State:Flink 僅提供 stream 可以進行存儲數據,對 Flink 而言 raw state 只是一些 bytes

在實際生產中,都只推薦使用 managed state,本文將圍繞該話題進行討論。

如何在 Flink 中使用 state

下圖就前文 word count 的 sum 所使用的StreamGroupedReduce類為例講解了如何在代碼中使用 keyed state:

下圖則對 word count 示例中的FromElementsFunction類進行詳解并分享如何在代碼中使用 operator state:

Checkpoint 的執行機制

在介紹 Checkpoint 的執行機制前,我們需要了解一下 state 的存儲,因為 state 是 Checkpoint 進行持久化備份的主要角色。

Statebackend 的分類

下圖闡釋了目前 Flink 內置的三類 state backend,其中MemoryStateBackend和FsStateBackend在運行時都是存儲在 java heap 中的,只有在執行 Checkpoint 時,FsStateBackend才會將數據以文件格式持久化到遠程存儲上。而RocksDBStateBackend則借用了 RocksDB(內存磁盤混合的 LSM DB)對 state 進行存儲。

對于HeapKeyedStateBackend,有兩種實現:

  • 支持異步 Checkpoint(默認):存儲格式 CopyOnWriteStateMap

  • 僅支持同步 Checkpoint:存儲格式 NestedStateMap

特別在 MemoryStateBackend 內使用HeapKeyedStateBackend時,Checkpoint 序列化數據階段默認有最大 5 MB數據的限制

對于RocksDBKeyedStateBackend,每個 state 都存儲在一個單獨的 column family 內,其中 keyGroup,Key 和 Namespace 進行序列化存儲在 DB 作為 key。

Checkpoint 執行機制詳解

本小節將對 Checkpoint 的執行流程逐步拆解進行講解,下圖左側是 Checkpoint Coordinator,是整個 Checkpoint 的發起者,中間是由兩個 source,一個 sink 組成的 Flink 作業,最右側的是持久化存儲,在大部分用戶場景中對應 HDFS。

  1. 第一步,Checkpoint Coordinator 向所有 source 節點 trigger Checkpoint;。

  1. 第二步,source 節點向下游廣播 barrier,這個 barrier 就是實現 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才會執行相應的 Checkpoint。

  1. 第三步,當 task 完成 state 備份后,會將備份數據的地址(state handle)通知給 Checkpoint coordinator。

  1. 第四步,下游的 sink 節點收集齊上游兩個 input 的 barrier 之后,會執行本地快照,這里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 會全量刷數據到磁盤上(紅色大三角表示),然后 Flink 框架會從中選擇沒有上傳的文件進行持久化備份(紫色小三角)。

  1. 同樣的,sink 節點在完成自己的 Checkpoint 之后,會將 state handle 返回通知 Coordinator。

  1. 最后,當 Checkpoint coordinator 收集齊所有 task 的 state handle,就認為這一次的 Checkpoint 全局完成了,向持久化存儲中再備份一個 Checkpoint meta 文件。

Checkpoint 的 EXACTLY_ONCE 語義

為了實現 EXACTLY ONCE 語義,Flink 通過一個 input buffer 將在對齊階段收到的數據緩存起來,等對齊完成之后再進行處理。而對于 AT LEAST ONCE 語義,無需緩存收集到的數據,會對后續直接處理,所以導致 restore 時,數據可能會被多次處理。下圖是官網文檔里面就 Checkpoint align 的示意圖:

需要特別注意的是,Flink 的 Checkpoint 機制只能保證 Flink 的計算過程可以做到 EXACTLY ONCE,端到端的 EXACTLY ONCE 需要 source 和 sink 支持。

Savepoint 與 Checkpoint 的區別

作業恢復時,二者均可以使用,主要區別如下:

SavepointExternalized Checkpoint用戶通過命令觸發,由用戶管理其創建與刪除Checkpoint 完成時,在用戶給定的外部持久化存儲保存標準化格式存儲,允許作業升級或者配置變更當作業 FAILED(或者CANCELED)時,外部存儲的 Checkpoint 會保留下來用戶在恢復時需要提供用于恢復作業狀態的 savepoint 路徑用戶在恢復時需要提供用于恢復的作業狀態的 Checkpoint 路徑

感謝各位的閱讀,以上就是“Checkpoint與state的關系以及Checkpoint的執行機制”的內容了,經過本文的學習后,相信大家對Checkpoint與state的關系以及Checkpoint的執行機制這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

河津市| 平谷区| 太仆寺旗| 江孜县| 黑山县| 鄂托克旗| 高雄市| 定安县| 南丹县| 奇台县| 大邑县| 收藏| 桦甸市| 济南市| 通城县| 商洛市| 图们市| 彭泽县| 上林县| 唐河县| 三门峡市| 南丰县| 无极县| 务川| 南华县| 建始县| 永安市| 芷江| 康定县| 京山县| 灵寿县| 蓝田县| 宁都县| 肃南| 东乡| 信阳市| 海兴县| 富顺县| 安多县| 土默特左旗| 郁南县|