您好,登錄后才能下訂單哦!
這篇文章主要講解了“Flink的Checkpoint機制是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Flink的Checkpoint機制是什么”吧!
一、Checkpoint概念
Flink本身為了保證其高可用的特性,以及保證作用的Exactly Once的快速恢復,進而提供了一套強大的Checkpoint機制。
Checkpoint機制是Flink可靠性的基石,可以保證Flink集群在某個算子因為某些原因(如 異常退出)出現故障時,能夠將整個應用流圖的狀態恢復到故障之前的某一狀態,保 證應用流圖狀態的一致性。Flink的Checkpoint機制原理來自“Chandy-Lamport algorithm”算法(分布式快照)。
二、Checkpoint核心元素Barriers
Flink分布式快照中的核心元素是 stream barriers。這些barriers將注入到數據流中,并與記錄一起作為數據流的一部分流動。barriers從不超越記錄,它們嚴格按照順序排列。barriers將數據流中的記錄分為進入當前快照的記錄集和進入下一個快照的記錄集。每個barriers都包含快照的ID,快照的記錄已推送到快照的前面。屏障不會中斷流的流動,因此非常輕便。來自不同快照的多個barriers可以同時出現在流中,這意味著各種快照可能同時發生。
stream barriers被注入到流數據源的并行數據流中。快照n的barriers被注入的位置(我們稱之為Sn)是流數據源中快照覆蓋數據的位置。例如,在Apache Kafka中,這個位置是分區中最后一條記錄的偏移量。這個位置的Sn被報告給checkpoint coordinator (Flink的JobManager)。
這些barriers隨后會順流而下。當中間操作符從它的所有輸入流接收到快照n的barriers時,它將快照n的barriers發送到它的所有輸出流。一旦接收操作符(流DAG的末端)從其所有輸入流接收到barrier n,它就向checkpoint coordinator 確認快照n。在所有接收確認了快照之后,就認為完成了快照。
Barriers對齊
接收多個輸入流的Operators需要在快照屏障上對齊輸入流,下面是Flink官網的Barrier對齊流程圖:
a.Operators一旦從傳入流中接收到快照barriers n,就無法處理該流中的任何其他記錄,直到它也從其他輸入接收到barriers n為止。否則,它將混合屬于快照n的記錄和屬于快照n + 1的記錄。
b.報告barriers n的流被暫時擱置。從這些流接收的記錄不會被處理,而是放入輸入緩沖區中。
c.一旦最后一個流接收到barriers n,Operators將發出所有未決的傳出記錄,然后自身發出barriers n屏障。
d.之后,它將恢復處理所有輸入流中的記錄,處理輸入緩沖中的記錄,然后再處理流中的記錄。
當一個operator接收到所有上游發送的 checkpoint n barrier 向下游發送之前,會對狀態進行一次快照,將offset state 等值保存起來,默認情況下是保存在JobManager的內存中,由于可能會比較大,可以存在狀態后端中,生成中建議放hdfs;
Operators在從輸入流接收到所有快照barriers的時間點,以及向輸出流發出barriers之前,對其狀態進行快照。屆時,將對進行barriers 之前的記錄進行狀態的所有更新,并且不應用依賴于應用barriers 后的記錄進行的任何更新。由于快照的狀態可能很大,因此將其存儲在可配置state backend中。默認情況下,這是JobManager的內存,但對于生產用途,應配置分布式可靠存儲(例如HDFS)。在存儲狀態之后,操作員確認檢查點,將快照barriers發送到輸出流中,然后繼續。
三、Checkpoint設置
1.代碼中一些相關配置
默認checkpoint功能是disabled的,想要使用的時候需要先啟用checkpoint開啟之后,默認的checkPointMode是Exactly-once。下面是官網一些默認配置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
1).Checkpoint默認是是disabled,通過enableCheckpointing方法來開啟;兩種函數實現:
enableCheckpointing(long interval),
enableCheckpointing(long interval, CheckpointingMode mode);
interval用于指定checkpoint的觸發間隔(單位milliseconds);
CheckpointingMode默認是CheckpointingMode.EXACTLY_ONCE,也可以指定為CheckpointingMode.AT_LEAST_ONCE。
2).也可以通過setCheckpointingMode方法設置CheckpointingMode;
3).checkpointTimeout 指定checkpoint執行的超時時間(單位milliseconds
),超時沒完成就會被abort掉;
4).minPauseBetweenCheckpoints 指定checkpoint coordinator上一個checkpoint完成之后最小等多久可以出發另一個checkpoint,當指定這個參數時,maxConcurrentCheckpoints的值為1;
5).maxConcurrentCheckpoints 指定運行中的checkpoint最多可以有多少個,當4)指定minPauseBetweenCheckpoints后,則其就不起作用了,需要將其設置為1;
6).enableExternalizedCheckpoints 指定開啟checkpoints的外部持久化,但是在job失敗的時候不會自動清理,需要自己手工清理state;
7).ExternalizedCheckpointCleanup 指定當job canceled的時候externalized checkpoint該如何清理,DELETE_ON_CANCELLATION的話,在job canceled的時候會自動刪除externalized state,但是如果是FAILED的狀態則會保留;RETAIN_ON_CANCELLATION則在job canceled的時候會保留externalized checkpoint state;
8).failOnCheckpointingErrors 指定在checkpoint發生異常的時候,是否應該fail該task,默認為true,如果設置為false,則task會拒絕checkpoint然后繼續運行;
2.flink-conf.yaml中一些相關配置
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
# state.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
#
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend).
#
# state.backend.incremental: false
1).state.backend用于指定checkpoint state存儲后端,默認為none,state會保存在taskmanager的內存中,checkpoint會存儲在JobManager的內存中;
state.backend的值可以是下面幾種:
jobmanager(MemoryStateBackend),
filesystem(FsStateBackend),
rocksdb(RocksDBStateBackend)
2).state.backend.async用于指定backend是否使用異步snapshot(默認為true),有些不支持async或者只支持async的state backend可能會忽略這個參數;
3).state.backend.fs.memory-threshold,默認為1024,用于指定存儲于files的state大小閾值,如果小于該值則會存儲在root checkpoint metadata file;
4).state.backend.incremental,默認為false,用于指定是否采用增量checkpoint,有些不支持增量checkpoint的backend會忽略該配置;
5).state.backend.local-recovery,默認為false,此選項配置此狀態后端的本地恢復。默認情況下,本地恢復是不可用的。本地恢復目前只有鍵狀態后端可用。目前,memorystateback不支持本地恢復,并忽略此選項;
6).state.checkpoints.dir,默認為none,用于指定checkpoint的data files和meta data存儲的目錄,該目錄必須對所有參與的TaskManagers及JobManagers可見;
7).state.checkpoints.num-retained,默認為1,用于指定保留的已完成的checkpoints個數;
8).state.savepoints.dir,默認為none,保存點的默認目錄。用于將保存點寫入文件系統的狀態后端(memorystate后端、fsstate后端、rocksdbstate后端);
9).taskmanager.state.local.root-dirs,默認為none;配置參數定義根目錄,用于存儲本地恢復的基于文件的狀態。本地恢復目前只覆蓋鍵狀態后端。目前,memorystateback不支持本地恢復,并忽略此選項;
感謝各位的閱讀,以上就是“Flink的Checkpoint機制是什么”的內容了,經過本文的學習后,相信大家對Flink的Checkpoint機制是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。