您好,登錄后才能下訂單哦!
這篇文章主要介紹“spark shuffle調優的方法是什么”,在日常操作中,相信很多人在spark shuffle調優的方法是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”spark shuffle調優的方法是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
什么情況下會發生shuffle,然后shuffle的原理是什么?
在spark中,主要是以下幾個算子:groupByKey、reduceByKey、countByKey、join,等等。
groupByKey,要把分布在集群各個節點上的數據中的同一個key,對應的values,都給集中到一塊兒,集中到集群中同一個節點上,更嚴密一點說,就是集中到一個節點的一個executor的一個task中。然后呢,集中一個key對應的values之后,才能交給我們來進行處理,<key, Iterable<value>>;reduceByKey,算子函數去對values集合進行reduce操作,最后變成一個value;countByKey,需要在一個task中,獲取到一個key對應的所有的value,然后進行計數,統計總共有多少個value;join,RDD<key, value>,RDD<key, value>,只要是兩個RDD中,key相同對應的2個value,都能到一個節點的executor的task中,給我們進行處理。
問題在于,同一個單詞,比如說(hello, 1),可能散落在不同的節點上;對每個單詞進行累加計數,就必須讓所有單詞都跑到同一個節點的一個task中,給一個task來進行處理;
每一個shuffle的前半部分stage的task,每個task都會創建下一個stage的task數量相同的文件,比如下一個stage會有100個task,那么當前stage每個task都會創建100份文件;會將同一個key對應的values,一定是寫入同一個文件中的;
shuffle的后半部分stage的task,每個task都會從各個節點上的task寫的屬于自己的那一份文件中,拉取key, value對;然后task會有一個內存緩沖區,然后會用HashMap,進行key, values的匯聚;(key ,values);
task會用我們自己定義的聚合函數,比如reduceByKey(_+_),把所有values進行一對一的累加,聚合出來最終的值。就完成了shuffle;
shuffle,一定是分為兩個stage來完成的。因為這其實是個逆向的過程,不是stage決定shuffle,是shuffle決定stage。
reduceByKey(_+_),在某個action觸發job的時候,DAGScheduler,會負責劃分job為多個stage。劃分的依據,就是,如果發現有會觸發shuffle操作的算子,比如reduceByKey,就將這個操作的前半部分,以及之前所有的RDD和transformation操作,劃分為一個stage;shuffle操作的后半部分,以及后面的,直到action為止的RDD和transformation操作,劃分為另外一個stage;
shuffle前半部分的task在寫入數據到磁盤文件之前,都會先寫入一個一個的內存緩沖,內存緩沖滿溢之后,再spill溢寫到磁盤文件中。
如果不合并map端輸出文件的話,會怎么樣?
減少網絡傳輸、disk io、減少reduce端內存緩沖
實際生產環境的條件:
100個節點(每個節點一個executor):100個executor,每個executor:2個cpu core,總共1000個task:每個executor平均10個task,上游1000個task,下游1000個task,每個節點,10個task,每個節點或者說每一個executor會輸出多少份map端文件?10 * 1000=1萬個文件(M*R)
總共有多少份map端輸出文件?100 * 10000 = 100萬。
問題來了:默認的這種shuffle行為,對性能有什么樣的惡劣影響呢?
shuffle中的寫磁盤的操作,基本上就是shuffle中性能消耗最為嚴重的部分。
通過上面的分析,一個普通的生產環境的spark job的一個shuffle環節,會寫入磁盤100萬個文件。
磁盤IO對性能和spark作業執行速度的影響,是極其驚人和嚇人的。
基本上,spark作業的性能,都消耗在shuffle中了,雖然不只是shuffle的map端輸出文件這一個部分,但是這里也是非常大的一個性能消耗點。
new SparkConf().set("spark.shuffle.consolidateFiles", "true")
開啟shuffle map端輸出文件合并的機制;默認情況下,是不開啟的,就是會發生如上所述的大量map端輸出文件的操作,嚴重影響性能。
開啟了map端輸出文件的合并機制之后:
第一個stage,同時就運行cpu core個task,比如cpu core是2個,并行運行2個task;
每個task都創建下一個stage的task數量個文件;
第一個stage,并行運行的2個task執行完以后,就會執行另外兩個task;
另外2個task不會再重新創建輸出文件;而是復用之前的task創建的map端輸出文件,將數據寫入上一批task的輸出文件中;
第二個stage,task在拉取數據的時候,就不會去拉取上一個stage每一個task為自己創建的那份輸出文件了;
提醒一下(map端輸出文件合并):
只有并行執行的task會去創建新的輸出文件;
下一批并行執行的task,就會去復用之前已有的輸出文件;
但是有一個例外,比如2個task并行在執行,但是此時又啟動要執行2個task(不是同一批次);
那么這個時候的話,就無法去復用剛才的2個task創建的輸出文件了;
而是還是只能去創建新的輸出文件。
要實現輸出文件的合并的效果,必須是一批task先執行,然后下一批task再執行,
才能復用之前的輸出文件;負責多批task同時起來執行,還是做不到復用的。
開啟了map端輸出文件合并機制之后,生產環境上的例子,會有什么樣的變化?
實際生產環境的條件:
100個節點(每個節點一個executor):100個executor
每個executor:2個cpu core
總共1000個task:每個executor平均10個task
上游1000個task,下游1000個task
每個節點,2個cpu core,有多少份輸出文件呢?2 * 1000 = 2000個(C*R)
總共100個節點,總共創建多少份輸出文件呢?100 * 2000 = 20萬個文件
相比較開啟合并機制之前的情況,100萬個
map端輸出文件,在生產環境中,立減5倍!
合并map端輸出文件,對咱們的spark的性能有哪些方面的影響呢?
map task寫入磁盤文件的IO,減少:100萬文件 -> 20萬文件
第二個stage,原本要拉取第一個stage的task數量份文件,1000個task,第二個stage的每個task,都要拉取1000份文件,走網絡傳輸;合并以后,100個節點,每個節點2個cpu core,第二個stage的每個task,主要拉取1000 * 2 = 2000個文件即可;網絡傳輸的性能消耗是不是也大大減少分享一下,實際在生產環境中,使用了spark.shuffle.consolidateFiles機制以后,實際的性能調優的效果:對于上述的這種生產環境的配置,性能的提升,還是相當的客觀的。
spark作業,5個小時 -> 2~3個小時。
大家不要小看這個map端輸出文件合并機制。實際上,在數據量比較大,你自己本身做了前面的性能調優,
executor上去->cpu core上去->并行度(task數量)上去,shuffle沒調優,shuffle就很糟糕了;
大量的map端輸出文件的產生。對性能有比較惡劣的影響。
這個時候,去開啟這個機制,可以很有效的提升性能。
spark.shuffle.manager hash M*R 個小文件
spark.shuffle.manager sort C*R 個小文件 (默認的shuffle管理機制)
spark.shuffle.file.buffer,默認32k
spark.shuffle.memoryFraction,0.2
默認情況下,shuffle的map task,輸出到磁盤文件的時候,統一都會先寫入每個task自己關聯的一個內存緩沖區。這個緩沖區大小,默認是32kb。每一次,當內存緩沖區滿溢之后,才會進行spill操作,溢寫操作,溢寫到磁盤文件中去reduce端task,在拉取到數據之后,會用hashmap的數據格式,來對各個key對應的values進行匯聚。針對每個key對應的values,執行我們自定義的聚合函數的代碼,比如_ + _(把所有values累加起來)reduce task,在進行匯聚、聚合等操作的時候,實際上,使用的就是自己對應的executor的內存,executor(jvm進程,堆),默認executor內存中劃分給reduce task進行聚合的比例,是0.2。問題來了,因為比例是0.2,所以,理論上,很有可能會出現,拉取過來的數據很多,那么在內存中,放不下;這個時候,默認的行為,就是說,將在內存放不下的數據,都spill(溢寫)到磁盤文件中去。
原理說完之后,來看一下,默認情況下,不調優,可能會出現什么樣的問題?
默認,map端內存緩沖是每個task,32kb。
默認,reduce端聚合內存比例,是0.2,也就是20%。
如果map端的task,處理的數據量比較大,但是呢,你的內存緩沖大小是固定的。
可能會出現什么樣的情況?
每個task就處理320kb,32kb,總共會向磁盤溢寫320 / 32 = 10次。
每個task處理32000kb,32kb,總共會向磁盤溢寫32000 / 32 = 1000次。
在map task處理的數據量比較大的情況下,而你的task的內存緩沖默認是比較小的,32kb。可能會造成多次的map端往磁盤文件的spill溢寫操作,發生大量的磁盤IO,從而降低性能。
reduce端聚合內存,占比。默認是0.2。如果數據量比較大,reduce task拉取過來的數據很多,那么就會頻繁發生reduce端聚合內存不夠用,頻繁發生spill操作,溢寫到磁盤上去。而且最要命的是,磁盤上溢寫的數據量越大,后面在進行聚合操作的時候,很可能會多次讀取磁盤中的數據,進行聚合。
默認不調優,在數據量比較大的情況下,可能頻繁地發生reduce端的磁盤文件的讀寫。
這兩個點之所以放在一起講,是因為他們倆是有關聯的。數據量變大,map端肯定會出點問題;
reduce端肯定也會出點問題;出的問題是一樣的,都是磁盤IO頻繁,變多,影響性能。
調優:
調節map task內存緩沖:spark.shuffle.file.buffer,默認32k(spark 1.3.x不是這個參數,
后面還有一個后綴,kb;spark 1.5.x以后,變了,就是現在這個參數)
調節reduce端聚合內存占比:spark.shuffle.memoryFraction,0.2
在實際生產環境中,我們在什么時候來調節兩個參數?
看Spark UI,如果你的公司是決定采用standalone模式,那么很簡單,你的spark跑起來,會顯示一個Spark UI的地址,4040的端口,進去看,依次點擊進去,可以看到,你的每個stage的詳情,有哪些executor,有哪些task,每個task的shuffle write和shuffle read的量,shuffle的磁盤和內存,讀寫的數據量;如果是用的yarn模式來提交,課程最前面,從yarn的界面進去,點擊對應的application,進入Spark UI,查看詳情。
如果發現shuffle 磁盤的write和read,很大,可以調節這兩個參數
調節上面說的那兩個參數。調節的時候的原則。spark.shuffle.file.buffer,每次擴大一倍,然后看看效果,64,128;spark.shuffle.memoryFraction,每次提高0.1,看看效果。不能調節的太大,太大了以后過猶不及,因為內存資源是有限的,你這里調節的太大了,其他環節的內存使用就會有問題了。
調節了以后,效果?map task內存緩沖變大了,減少spill到磁盤文件的次數;reduce端聚合內存變大了,
減少spill到磁盤的次數,而且減少了后面聚合讀取磁盤文件的數量。
到此,關于“spark shuffle調優的方法是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。