您好,登錄后才能下訂單哦!
這期內容當中小編將會給大家帶來有關如何進行Spark Shuffle 原理分析,文章內容豐富且以專業的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
Shuffle就是對數據進行重組,由于分布式計算的特性和要求,在實現細節上更加繁瑣和復雜。 在MapReduce框架,Shuffle是連接Map和Reduce之間的橋梁,Map階段通過shuffle讀取數據并輸出到對應的Reduce;而Reduce階段負責從Map端拉取數據并進行計算。在整個shuffle過程中,往往伴隨著大量的磁盤和網絡I/O。所以shuffle性能的高低也直接決定了整個程序的性能高低。Spark也會有自己的shuffle實現過程。
在DAG調度的過程中,Stage階段的劃分是根據是否有shuffle過程,也就是存在wide Dependency寬依賴的時候,需要進行shuffle,這時候會將作業job劃分成多個Stage,每一個stage內部有很多可以并行運行的task。 stage與stage之間的過程就是shuffle階段,在Spark的中,負責shuffle過程的執行、計算和處理的組件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager隨著Spark的發展有兩種實現的方式,分別為HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle兩種。
在Spark 1.2以前,默認的shuffle計算引擎是HashShuffleManager。 該ShuffleManager-HashShuffleManager有著一個非常嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操作影響了性能。因此在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager。 SortShuffleManager相較于HashShuffleManager來說,有了一定的改進。主要就在于每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁盤文件,但是最后會將所有的臨時文件合并(merge)成一個磁盤文件,因此每個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可。
Hash shuffle
一種是普通運行機制
另一種是合并的運行機制。
HashShuffleManager的運行機制主要分成兩種
合并機制主要是通過復用buffer來優化Shuffle過程中產生的小文件的數量。
Hash shuffle是不具有排序的Shuffle。
這里我們先明確一個假設前提:每個Executor只有1個CPU core,也就是說,無論這個Executor上分配多少個task線程,同一時間都只能執行一個task線程。 圖中有3個ReduceTask,從ShuffleMapTask 開始那邊各自把自己進行 Hash 計算(分區器:hash/numreduce取模),分類出3個不同的類別,每個 ShuffleMapTask 都分成3種類別的數據,想把不同的數據匯聚然后計算出最終的結果,所以ReduceTask 會在屬于自己類別的數據收集過來,匯聚成一個同類別的大集合,每1個 ShuffleMapTask 輸出3份本地文件,這里有4個 ShuffleMapTask,所以總共輸出了4 x 3個分類文件 = 12個本地小文件。
主要就是在一個stage結束計算之后,為了下一個stage可以執行shuffle類的算子(比如reduceByKey,groupByKey),而將每個task處理的數據按key進行“分區”。所謂“分區”,就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每一個磁盤文件都只屬于reduce端的stage的一個task。在將數據寫入磁盤之前,會先將數據寫入內存緩沖中,當內存緩沖填滿之后,才會溢寫到磁盤文件中去。 那么每個執行shuffle write的task,要為下一個stage創建多少個磁盤文件呢? 很簡單,下一個stage的task有多少個,當前stage的每個task就要創建多少份磁盤文件。比如下一個stage總共有100個task,那么當前stage的每個task都要創建100份磁盤文件。如果當前stage有50個task,總共有10個Executor,每個Executor執行5個Task,那么每個Executor上總共就要創建500個磁盤文件,所有Executor上會創建5000個磁盤文件。由此可見,未經優化的shuffle write操作所產生的磁盤文件的數量是極其驚人的。
shuffle read,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要將上一個stage的計算結果中的所有相同key,從各個節點上通過網絡都拉取到自己所在的節點上,然后進行key的聚合或連接等操作。由于shuffle write的過程中,task給Reduce端的stage的每個task都創建了一個磁盤文件,因此shuffle read的過程中,每個task只要從上游stage的所有task所在節點上,拉取屬于自己的那一個磁盤文件即可。 shuffle read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩沖,每次都只能拉取與buffer緩沖相同大小的數據,然后通過內存中的一個Map進行聚合等操作。聚合完一批數據后,再拉取下一批數據,并放到buffer緩沖中進行聚合操作。以此類推,直到最后將所有數據到拉取完,并得到最終的結果。
(1)buffer起到的是緩存作用,緩存能夠加速寫磁盤,提高計算的效率,buffer的默認大小32k。 (2)分區器:根據hash/numRedcue取模決定數據由幾個Reduce處理,也決定了寫入幾個buffer中 (3)block file:磁盤小文件,從圖中我們可以知道磁盤小文件的個數計算公式: block file=M*R (4) M為map task的數量,R為Reduce的數量,一般Reduce的數量等于buffer的數量,都是由分區器決定的
(1).Shuffle階段在磁盤上會產生海量的小文件,建立通信和拉取數據的次數變多,此時會產生大量耗時低效的 IO 操作 (因為產生過多的小文件) (2).可能導致OOM,大量耗時低效的 IO 操作 ,導致寫磁盤時的對象過多,讀磁盤時候的對象也過多,這些對象存儲在堆內存中,會導致堆內存不足,相應會導致頻繁的GC,GC會導致OOM。由于內存中需要保存海量文件操作句柄和臨時信息,如果數據處理的規模比較龐大的話,內存不可承受,會出現 OOM 等問題
合并機制就是復用buffer緩沖區,開啟合并機制的配置是spark.shuffle.consolidateFiles。該參數默認值為false,將其設置為true即可開啟優化機制。通常來說,如果我們使用HashShuffleManager,那么都建議開啟這個選項。
這里有6個這里有6個shuffleMapTask,數據類別還是分成3種類型,因為Hash算法會根據你的 Key 進行分類,在同一個進程中,無論是有多少過Task,都會把同樣的Key放在同一個Buffer里,然后把Buffer中的數據寫入以Core數量為單位的本地文件中,(一個Core只有一種類型的Key的數據),每1個Task所在的進程中,分別寫入共同進程中的3份本地文件,這里有6個shuffleMapTasks,所以總共輸出是 2個Cores x 3個分類文件 = 6個本地小文件。
(1).啟動HashShuffle的合并機制ConsolidatedShuffle的配置 spark.shuffle.consolidateFiles=true(2).block file=Core*R Core為CPU的核數,R為Reduce的數量
如果 Reducer 端的并行任務或者是數據分片過多的話則 Core * Reducer Task 依舊過大,也會產生很多小文件。
SortShuffleManager的運行機制主要分成兩種,
一種是普通運行機制
另一種是bypass運行機制
在該模式下,數據會先寫入一個數據結構,聚合算子寫入Map,一邊通過Map局部聚合,一邊寫入內存。Join算子寫入ArrayList直接寫入內存中。然后需要判斷是否達到閾值(5M),如果達到就會將內存數據結構的數據寫入到磁盤,清空內存數據結構。 在溢寫磁盤前,先根據key進行排序,排序過后的數據,會分批寫入到磁盤文件中。默認批次為10000條,數據會以每批一萬條寫入到磁盤文件。寫入磁盤文件通過緩沖區溢寫的方式,每次溢寫都會產生一個磁盤文件,也就是說一個task過程會產生多個臨時文件 。 最后在每個task中,將所有的臨時文件合并,這就是merge過程,此過程將所有臨時文件讀取出來,一次寫入到最終文件。意味著一個task的所有數據都在這一個文件中。同時單獨寫一份索引文件,標識下游各個task的數據在文件中的索引start offset和end offset。 這樣算來如果第一個stage 50個task,每個Executor執行一個task,那么無論下游有幾個task,就需要50*2=100個磁盤文件。
1. 小文件明顯變少了,一個task只生成一個file文件 2. file文件整體有序,加上索引文件的輔助,查找變快,雖然排序浪費一些性能,但是查找變快很多
shuffle map task數量小于spark.shuffle.sort.bypassMergeThreshold參數的值
不是聚合類的shuffle算子(比如reduceByKey)
該機制與sortshuffle的普通機制相比,在shuffleMapTask不多的情況下,首先寫的機制是不同,其次不會進行排序。這樣就可以節約一部分性能開銷。
在shuffleMapTask數量小于默認值200時,啟用bypass模式的sortShuffle(原因是數據量本身比較少,沒必要進行sort全排序,因為數據量少本身查詢速度就快,正好省了sort的那部分性能開銷。) 該機制與普通SortShuffleManager運行機制的不同在于: 第一: 磁盤寫機制不同; 第二: 不會進行sort排序;
碰到ShuffleDenpendency就進行stage的劃分,ShuffleMapStage: 為shuffle提供數據的中間stage,ResultStage: 為一個action操作計算結果的stage。
解決的一個問題是resut task如何知道從哪個Executor去拉取Shuffle data
ShuffleWriter
(1)HashShuffleWriter
特點:根據Hash分區,分區數是m * n 個。
val counts: RDD[(String, Int)] = wordCount.reduceByKey(new HashPartitioner(2), (x, y) => x + y)
(2)SortShuffleWriter
特點:
a、文件數量為m
b、如果需要排序或者需要combine,那么每一個partition數據排序要自己實現。(SortShuffleWriter里的sort指的是對partition的分區號進行排序)
c、數據先放在內存,內存不夠則寫到磁盤中,最后再全部寫到磁盤。
(3)BypassMergeSortShuffleWriter
這種模式同時具有HashShuffleWriter和SortShuffleter的特點。因為其實HashShufflerWriter的性能不錯,但是如果task數太多的話,性能話下降,所以Spark在task數較少的時候自動使用了這種模式,一開始還是像HashShufflerWriter那種生成多個文件,但是最后會把多個文件合并成一個文件。然后下游來讀取文件。默認map的分區需要小于spark.shuffle.sort.bypassMergeThreshold(默認是200),因為如何分區數太多,產生的小文件就會很多性能就會下降。
默認值:32k
參數說明:該參數用于設置shuffle write task的BufferedOutputStream的buffer緩沖大小。將數據寫到磁盤文件之前,會先寫入buffer緩沖中,待緩沖寫滿之后,才會溢寫到磁盤。
調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如64k),從而減少shuffle write過程中溢寫磁盤文件的次數,也就可以減少磁盤IO次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。
默認值:48m
參數說明:該參數用于設置shuffle read task的buffer緩沖大小,而這個buffer緩沖決定了每次能夠拉取多少數據。
調優建議:如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如96m),從而減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。
默認值:3
參數說明:shuffle read task從shuffle write task所在節點拉取屬于自己的數據時,如果因為網絡異常導致拉取失敗,是會自動進行重試的。該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗。
調優建議:對于那些包含了特別耗時的shuffle操作的作業,建議增加重試最大次數(比如60次),以避免由于JVM的full gc或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對于針對超大數據量(數十億~上百億)的shuffle過程,調節該參數可以大幅度提升穩定性。
默認值:5s
參數說明:具體解釋同上,該參數代表了每次重試拉取數據的等待間隔,默認是5s。
調優建議:建議加大間隔時長(比如60s),以增加shuffle操作的穩定性。
(Spark1.6是這個參數,1.6以后參數變了,具體參考上一講Spark內存模型知識)
默認值:0.2
參數說明:該參數代表了Executor內存中,分配給shuffle read task進行聚合操作的內存比例,默認是20%。
調優建議:在資源參數調優中講解過這個參數。如果內存充足,而且很少使用持久化操作,建議調高這個比例,給shuffle read的聚合操作更多內存,以避免由于內存不足導致聚合過程中頻繁讀寫磁盤。在實踐中發現,合理調節該參數可以將性能提升10%左右。
默認值:sort
參數說明:該參數用于設置ShuffleManager的類型。Spark 1.5以后,有三個可選項:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默認選項,但是Spark 1.2以及之后的版本默認都是SortShuffleManager了。Spark1.6以后把hash方式給移除了,tungsten-sort與sort類似,但是使用了tungsten計劃中的堆外內存管理機制,內存使用效率更高。
調優建議:由于SortShuffleManager默認會對數據進行排序,因此如果你的業務邏輯中需要該排序機制的話,則使用默認的SortShuffleManager就可以;而如果你的業務邏輯不需要對數據進行排序,那么建議參考后面的幾個參數調優,通過bypass機制或優化的HashShuffleManager來避免排序操作,同時提供較好的磁盤讀寫性能。這里要注意的是,tungsten-sort要慎用,因為之前發現了一些相應的bug。
默認值:200
參數說明:當ShuffleManager為SortShuffleManager時,如果shuffle read task的數量小于這個閾值(默認是200),則shuffle write過程中不會進行排序操作,而是直接按照未經優化的HashShuffleManager的方式去寫數據,但是最后會將每個task產生的所有臨時磁盤文件都合并成一個文件,并會創建單獨的索引文件。
調優建議:當你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數調大一些,大于shuffle read task的數量。那么此時就會自動啟用bypass機制,map-side就不會進行排序了,減少了排序的性能開銷。但是這種方式下,依然會產生大量的磁盤文件,因此shuffle write性能有待提高。
上述就是小編為大家分享的如何進行Spark Shuffle 原理分析了,如果剛好有類似的疑惑,不妨參照上述分析進行理解。如果想知道更多相關知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。