您好,登錄后才能下訂單哦!
前置知識:
1、事務的特征:1)、處理且僅被處理一次;2)、輸出且只被輸出一次
2、SparkStreaming進行事務處理有沒有可能處理完全失敗?
這個可能性不大,因為Spark是批處理的方式來進行流處理,在SparkStreaming應用程序啟動的時候,已經為應用程序分配了相關的資源,而且在調度的過程中可以動態的分配資源,所以除非整個集群所有的硬件都奔潰了,否則一般情況下都會被處理的。
3、SparkStreaming寫程序的時候是基于Driver和Executor兩部分
SparkStreaming架構流程:
1、SparkStreaming基本架構流程:
1)、Receiver(不斷的)接收到數據后匯報(把元數據)給Driver,2)、Driver在收到數據之后為了數據的安全性會進行CheckPoint,3)、Job的執行(在Executor中):完全基于SparkCore的調度模式
SparkStreaming基本架構流程圖:
WAL(write ahead log)的機制:寫數據的時候,先通過WAL機制寫入文件系統中,然后存儲到Executor,Executor在存儲到磁盤或者內存中(這個是根據StorageLevel的設置) ,如果前面沒有寫成功的話,后面一定不會存儲到Executor中,而不存儲到Executor中的話,就不會匯報給Driver,數據就不會被處理了
Receiver接收的數據達到一定程度才會把數據存儲到內存或者磁盤,當還沒有積累到一定程度的時候,Executor或者Receiver奔潰了,這時數據就會丟失一點,
SparkStreaming:1、獲取數據;2、產生作業,執行必須透過SparkContext
當出現奔潰的時候數據恢復的過程:
1)、Driver級別的恢復是直接從Driver進行checkpoint的文件系統中把數據讀進來,而在內部是重新啟動SparkContext(還有SparkContext),恢復出元數據再次產生RDD(恢復是基于上一次的job執行的),提交給集群
2)、Receiver的恢復是在以前數據的基礎上接著去接收數據,曾經接收到的數據也會通過WAL機制從磁盤上恢復回來
Exactly Once的事務處理:
1)、數據零丟失:必須有可靠的數據來源和可靠的Receiver,且整個應用程序的metadata必須進行CheckPoint,且通過WAL來保證數據安全;(我們以數據來自Kafka為例,運行在Executor上的Receiver在接收到來自Kafka的數據時會向Kafka發送ACK確認收到信息并讀取下一條信息,kafka會updateOffset來記錄Receiver接收到的偏移,這種方式保證了在Executor數據零丟失。)
2)、Spark在1.3的時候為了避免WAL的性能損失和實現Exactly Once而提供了Kafka Direct API,把Kafka作為文件存儲系統。此時兼具有流的優勢和文件系統的優勢,至此Spark Streaming+Kafka就構建了完美的流處理世界(1,數據不需要拷貝副本;2,不需要WAL對性能的損耗;3,Kafka使用ZeroCopy比HDFS更高效)。所有的Executors通過Kafka API直接消息數據,直接管理Offset,所以也不會重復消費數據
數據丟失及其具體解決方式:
在Receiver收到數據且通過Driver的調度Executor開始計算數據的時候,如果Driver突然奔潰,則此時Executor會被kill掉,那么Executor中的數據就會丟失(如果沒有進行WAL的操作)。
解決方式:此時就必須通過例如WAL的方式,讓所有的數據都通過例如HDFS的方式首先進行安全性容錯處理。此時如果Executor中的數據丟失的話,就可以通過WAL恢復回來(這種方式的弊端是通過WAL的方式會極大額損傷SparkStreaming中Receivers接收數據的性能)
數據重復讀取的情況:
基于Kafka的情況下,Receiver收到數據且保存到了HDFS等持久化引擎但是沒有來得及進行updateOffsets,此時Receiver崩潰后重新啟動就會通過管理Kafka的ZooKeeper中元數據再次重復讀取數據,但是此時SparkStreaming認為是成功的,但是Kafka認為是失敗的(因為沒有更新offset到ZooKeeper中),此時就會導致數據重新消費的情況。
解決方式:以Receiver基于ZooKeeper的方式,當讀取數據時去訪問Kafka的元數據信息,在處理代碼中例如foreachRDD或transform時,將信息寫入到內存數據庫中(memorySet),在計算時讀取內存數據庫信息,判斷是否已處理過,如果以處理過則跳過計算。這些元數據信息可以保存到內存數據結構或者memsql,sqllite中(如果通過Kafka作為數據來源的話,Kafka中有數據,然后Receiver接收的時候又會有數據副本,這個時候其實是存儲資源的浪費)
數據輸出多次重寫
為什么會有這個問題,因為Spark Streaming在計算的時候基于Spark Core,Spark Core天生會做以下事情導致Spark Streaming的部分結果重復輸出(例如數據輸出后,該Task的后續程序發生錯誤,而任務發生錯誤,Spark Core會進入如下程序):
Task重試;慢任務推測(兩個相同任務可能會同時執行),Stage重復;Job重試;
解決方式:
設置spark.task.maxFailures次數為1;
設置spark.speculation為關閉狀態(因為慢任務推測其實非常消耗性能,所以關閉后可以顯著提高Spark Streaming處理性能)
Spark Streaming on Kafka的話,Job失敗后可以設置auto.offset.reset為“largest”的方式;
最后再次強調可以通過transform和foreachRDD基于業務邏輯代碼進行邏輯控制來實現數據不重復消費和輸出不重復!這兩個方式類似于Spark Streaming的后門,可以做任意想象的控制操作!
備注:
1、DT大數據夢工廠微信公眾號DT_Spark
2、IMF晚8點大數據實戰YY直播頻道號:68917580
3、新浪微博: http://www.weibo.com/ilovepains
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。