您好,登錄后才能下訂單哦!
如何進行Spark底層原理的解析,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
Apache Spark是用于大規模數據處理的統一分析引擎,基于內存計算,提高了在大數據環境下數據處理的實時性,同時保證了高容錯性和高可伸縮性,允許用戶將Spark部署在大量硬件之上,形成集群。
Spark源碼從1.x的40w行發展到現在的超過100w行,有1400多位大牛貢獻了代碼。整個Spark框架源碼是一個巨大的工程。下面我們一起來看下spark的底層執行原理。
Spark運行流程
具體運行流程如下:
SparkContext 向資源管理器注冊并向資源管理器申請運行Executor
資源管理器分配Executor,然后資源管理器啟動Executor
Executor 發送心跳至資源管理器
SparkContext 構建DAG有向無環圖
將DAG分解成Stage(TaskSet)
把Stage發送給TaskScheduler
Executor 向 SparkContext 申請 Task
TaskScheduler 將 Task 發送給 Executor 運行
同時 SparkContext 將應用程序代碼發放給 Executor
Task 在 Executor 上運行,運行完畢釋放所有資源
Val lines1 = sc.textFile(inputPath2).map(...).map(...) Val lines2 = sc.textFile(inputPath3).map(...) Val lines3 = sc.textFile(inputPath4) Val dtinone1 = lines2.union(lines3) Val dtinone = lines1.join(dtinone1) dtinone.saveAsTextFile(...) dtinone.filter(...).foreach(...)
構建DAG圖
Spark內核會在需要計算發生的時刻繪制一張關于計算路徑的有向無環圖,也就是如上圖所示的DAG。
Spark 的計算發生在RDD的Action操作,而對Action之前的所有Transformation,Spark只是記錄下RDD生成的軌跡,而不會觸發真正的計算。
一個Application可以有多個job多個Stage:
Spark Application中可以因為不同的Action觸發眾多的job,一個Application中可以有很多的job,每個job是由一個或者多個Stage構成的,后面的Stage依賴于前面的Stage,也就是說只有前面依賴的Stage計算完畢后,后面的Stage才會運行。
劃分依據:
Stage劃分的依據就是寬依賴,像reduceByKey,groupByKey等算子,會導致寬依賴的產生。
回顧下寬窄依賴的劃分原則:
窄依賴:父RDD的一個分區只會被子RDD的一個分區依賴。即一對一或者多對一的關系,可理解為獨生子女。 常見的窄依賴有:map、filter、union、mapPartitions、mapValues、join(父RDD是hash-partitioned)等。
寬依賴:父RDD的一個分區會被子RDD的多個分區依賴(涉及到shuffle)。即一對多的關系,可理解為超生。 常見的寬依賴有groupByKey、partitionBy、reduceByKey、join(父RDD不是hash-partitioned)等。
核心算法:回溯算法
從后往前回溯/反向解析,遇到窄依賴加入本Stage,遇見寬依賴進行Stage切分。
Spark內核會從觸發Action操作的那個RDD開始從后往前推,首先會為最后一個RDD創建一個Stage,然后繼續倒推,如果發現對某個RDD是寬依賴,那么就會將寬依賴的那個RDD創建一個新的Stage,那個RDD就是新的Stage的最后一個RDD。
然后依次類推,繼續倒推,根據窄依賴或者寬依賴進行Stage的劃分,直到所有的RDD全部遍歷完成為止。
DAG劃分Stage
一個Spark程序可以有多個DAG(有幾個Action,就有幾個DAG,上圖最后只有一個Action(圖中未表現),那么就是一個DAG)。
一個DAG可以有多個Stage(根據寬依賴/shuffle進行劃分)。
同一個Stage可以有多個Task并行執行(task數=分區數,如上圖,Stage1 中有三個分區P1、P2、P3,對應的也有三個 Task)。
可以看到這個DAG中只reduceByKey操作是一個寬依賴,Spark內核會以此為邊界將其前后劃分成不同的Stage。
同時我們可以注意到,在圖中Stage1中,從textFile到flatMap到map都是窄依賴,這幾步操作可以形成一個流水線操作,通過flatMap操作生成的partition可以不用等待整個RDD計算結束,而是繼續進行map操作,這樣大大提高了計算的效率。
調度階段的提交,最終會被轉換成一個任務集的提交,DAGScheduler通過TaskScheduler接口提交任務集,這個任務集最終會觸發TaskScheduler構建一個TaskSetManager的實例來管理這個任務集的生命周期,對于DAGScheduler來說,提交調度階段的工作到此就完成了。
而TaskScheduler的具體實現則會在得到計算資源的時候,進一步通過TaskSetManager調度具體的任務到對應的Executor節點上進行運算。
任務總體調度
每個Application獲取專屬的Executor進程,該進程在Application期間一直駐留,并以多線程方式運行Tasks。
Spark Application不能跨應用程序共享數據,除非將數據寫入到外部存儲系統。如圖所示:
支持多種資源管理器
提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack(機架)里,因為Spark Application運行過程中SparkContext和Executor之間有大量的信息交換;
如果想在遠程集群中運行,最好使用RPC將SparkContext提交給集群,不要遠離Worker運行SparkContext。
移動程序而非移動數據的原則執行,Task采用了數據本地性和推測執行的優化機制。
關鍵方法:taskIdToLocations、getPreferedLocations。
看完上述內容,你們掌握如何進行Spark底層原理的解析的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。