您好,登錄后才能下訂單哦!
本篇文章主要從二個方面展開:
一、Exactly Once
二、輸出不重復
事務:
銀行轉帳為例,A用戶轉賬給B用戶,B用戶可能收到多筆錢,如何保證事務的一致性,也就是說事務輸出,能夠輸出且只會輸出一次,即A只轉一次,B只收一次。
從事務視角解密SparkStreaming架構:
SparkStreaming應用程序啟動,會分配資源,除非整個集群硬件資源奔潰,一般情況下都不會有問題。SparkStreaming程序分成而部分,一部分是Driver,另外一部分是Executor。Receiver接收到數據后不斷發送元數據給Driver,Driver接收到元數據信息后進行CheckPoint處理。其中CheckPoint包括:Configuration(含有Spark Conf、Spark Streaming等配置信息)、Block MetaData、DStreamGraph、未處理完和等待中的Job。當然Receiver可以在多個Executor節點的上執行Job,Job的執行完全基于SparkCore的調度模式進行的。
Executor只有函數處理邏輯和數據,外部InputStream流入到Receiver中通過BlockManager寫入磁盤、內存、WAL進行容錯。WAL先寫入磁盤然后寫入Executor中,失敗可能性不大。如果1G數據要處理,Executor一條一條接收,Receiver接收數據是積累到一定記錄后才會寫入WAL,如果Receiver線程失敗時,數據有可能會丟失。
Driver處理元數據前會進行CheckPoint,SparkStreaming獲取數據、產生作業,但沒有解決執行的問題,執行一定要經過SparkContext。Dirver級別的數據修復需從Driver CheckPoint中把元數據讀入,在其內部會重新構建SparkContext、StreamingContext、SparkJob,再提交到Spark集群運行,Receiver重新恢復時會通過WAL從磁盤中恢復過來。
SparkStreaming和Kafka結合不會出現WAL數據丟失的問題,SparkStreaming必須考慮外部流水線的方式處理。
怎么能完成完整的語義、事務的一致性,保證數據的零丟失,Exactly Once的事務處理:
怎么保證數據零丟失?
必須要有可靠的數據來源和可靠的Receiver、整個應用程序的MetaData必須進行CheckPoint、通過WAL來保證數據安全(生產環境下Receiver接收Kafka的數據,默認情況下會在Executor中存在二份數據,且默認情況下必須二份數據備份后才進行計算;如果Receiver接收數據時奔潰,沒有Copy副本,此時會重新從Kafka中進行Copy,Copy的依據是zookeeper元數據)。
大家可以將Kafka看作是一個簡單的文件存儲系統,在Executor中Receiver確定受到Kafka的每一條記錄后進行Replication到其他Executor成功后會通過ack向Kafka發送確認收到的信息并繼續從Kafka中讀取下一條信息。
再次思考數據在哪些地方可能丟失?
數據丟失的主要場景如下:
在Receiver收到數據且通過Driver的調度,Executor開始計算數據的時候如果Driver突然奔潰(導致Executor會被Kill掉),此時Executor會被Kill掉,那么Executor中的數據就會丟失,此時就必須通過例如WAL機制讓所有的數據通過類似HDFS的方式進行安全性容錯處理,從而解決Executor被Kill掉后導致數據丟失可以通過WAL機制恢復回來。
下面需要考慮二個很重要的場景:
數據的處理怎么保證有且僅有被處理一次?
數據零丟失并不能保證Exactly Once,如果Receiver接收且保存起來后沒來得及更新updateOffsets時,就會導致數據被重復處理。
更詳細的說明數據重復讀取的場景:
在Receiver收到數據且保存到了hdfs時Receiver奔潰,此時持久化引擎沒有來得及進行updateOffset,Receiver重新啟動后就會從管理Kafka的ZooKeeper中再次讀取元數據從而導致重復讀取元數據;從SparkStreaming來看是成功的,但是Kafka認為是失敗的(因為Receiver奔潰時沒有及時更新offsets到ZooKeeper中)重新恢復時會重新消費一次,此時會導致數據重新消費的情況。
性能補充:
通過WAL方式保證數據不丟失,但弊端是通過WAL方式會極大的損傷SparkStreaming中的Receiver接收數據的性能(現網生產環境通常會Kafka direct api直接處理)。
需要注意到是:如果通過Kafka作為數據來源的話,Kafka中有數據,然后Receiver接受數據的時候又會有數據副本,這個時候其實是存儲資源的浪費。(重復讀取數據解決辦法,讀取數據時可以將元數據信息放入內存數據庫中,再次計算時檢查元數據是否被計算過)。
Spark1.3的時候為了避免WAL的性能損失和實現Exactly Once而提供了Kafka direct api,把Kafka作為文件存儲系統,此時Kafka兼具有流的優勢和文件系統的優勢,至此,Spark Streaming+Kafka就構建了完美的流處理世界!
數據不需要copy副本,不需要WAL性能損耗,不需要Receiver,而直接通過kafka direct api直接消費數據,所有的Executors通過kafka api直接消費數據,直接管理offset,所以也不會重復消費數據;事務一致性就實現了!
最后一個問題,關于Spark Streaming數據輸出多次重寫及解決方案:
為什么會有這個問題,因為SparkStreaming在計算的時候基于SparkCore,SparkCore天生會做以下事情導致SparkStreaming的結果(部分)重復輸出:
1.Task重試;
2.慢任務推測;
3.Stage重復;
4.Job重試;
會導致數據的丟失。
對應的解決方案:
1.一個任務失敗就是job 失敗,設置spark.task.maxFailures次數為1;
2.設置spark.speculation為關閉狀態(因為慢任務推測其實非常消耗性能,所以關閉后可以顯著的提高Spark Streaming處理性能)
3.Spark streaming on kafka的話,假如job失敗后可以設置kafka的auto.offset.reset為largest的方式會自動恢復job的執行。
最后再次強調:
可以通過transform和foreachRDD基于業務邏輯代碼進行邏輯控制來實現數據不重復消費和輸出不重復!這二個方法類似于spark streaming的后門,可以做任意想象的控制操作!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。