您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關如何進行sparkcore離線性能調優,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
Spark 性能調優的第一步,就是為任務分配更多的資源,在一定范圍內,增加資源的分配與性能的提升是成正比的,實現了最優的資源配置后,在此基礎上再考慮進行后面論述的性能調優策略。
資源的分配在使用腳本提交Spark任務時進行指定,標準的Spark任務提交腳本如代碼清單:
/usr/opt/modules/spark/bin/spark-submit
--class com.atguigu.spark.Analysis
--num-executors 80
--driver-memory 6g
--executor-memory 6g
--executor-cores 3
/usr/opt/modules/spark/jar/spark.jar \
*名稱* | *說明* |
---|---|
*–num-executors* | 配置Executor的數量 |
*–driver-memory* | 配置Driver內存(影響不大) |
*–executor-memory* | 配置每個Executor的內存大小 |
*–executor-cores* | 配置每個Executor的CPU core數量 |
調節原則:盡量將任務分配的資源調節到可以使用的資源的最大限度。
對于具體資源的分配,我們分別討論 Spark 的兩種 Cluste 運行模式:
? 第一種是Spark Standalone模式,你在提交任務前,一定知道或者可以從運維部門獲取到你可以使用的資源情況,在編寫submit腳本的時候,就根據可用的資源情況進行資源的分配,比如說集群有15臺機器,每臺機器為8G內存,2個CPU core,那么就指定15個Executor,每個Executor分配8G內存,2個CPU core。
? 第二種是Spark Yarn模式,由于Yarn使用資源隊列進行資源的分配和調度,在表寫 submit 腳本的時候,就根據Spark作業要提交到的資源隊列,進行資源的分配,比如資源隊列有400G內存,100個CPU core,那么指定50個Executor,每個Executor分配8G內存,2個CPU core。
資源調節后的性能提升
*名稱* | *解析* |
---|---|
*增加Executor·個數* | 在資源允許的情況下,增加Executor的個數可以提高執行task的并行度。比如有4個Executor,每個Executor有2個CPU core,那么可以并行執行8個task,如果將Executor的個數增加到8個(資源允許的情況下),那么可以并行執行16個task,此時的并行能力提升了一倍。 |
*增加每個Executor的CPU core個數* | 在資源允許的情況下,增加每個Executor的Cpu core個數,可以提高執行task的并行度。比如有4個Executor,每個Executor有2個CPU core,那么可以并行執行8個task,如果將每個Executor的CPU core個數增加到4個(資源允許的情況下),那么可以并行執行16個task,此時的并行能力提升了一倍。 |
*增加每個Executor的內存量* | 在資源允許的情況下,增加每個Executor的內存量以后,對性能的提升有三點: 1. 可以緩存更多的數據(即對RDD進行cache),寫入磁盤的數據相應減少,甚至可以不寫入磁盤,減少了可能的磁盤IO; 2. 可以為shuffle操作提供更多內存,即有更多空間來存放reduce端拉取的數據,寫入磁盤的數據相應減少,甚至可以不寫入磁盤,減少了可能的磁盤IO; 3. 可以為task的執行提供更多內存,在task的執行過程中可能創建很多對象,內存較小時會引發頻繁的GC,增加內存后,可以避免頻繁的GC,提升整體性能。 |
在對RDD進行算子時,要避免相同的算子和計算邏輯之下對 RDD 進行重復的計算
在Spark中,當多次對同一個 RDD 執行算子操作時,每一次都會對這個 RDD 的祖先 RDD 重新計算一次,這種情況是必須要避免的,對同一個RDD的重復計算是對資源的極大浪費,因此,必須對多次使用的RDD進行持久化,通過持久化將公共RDD的數據緩存到內存/磁盤中,之后對于公共RDD的計算都會從內存/磁盤中直接獲取RDD數據。 對于RDD的持久化,有兩點需要說明: 1. ,RDD的持久化是可以進行序列化的,當內存無法將RDD的數據完整的進行存放的時候,可以考慮使用序列化的方式減小數據體積,將數據完整存儲在內存中。
如果對于數據的可靠性要求很高,并且內存充足,可以使用副本機制,對RDD數據進行持久化。當持久化啟用了復本機制時,對于持久化的每個數據單元都存儲一個副本,放在其他節點上面,由此實現數據的容錯,一旦一個副本數據丟失,不需要重新計算,還可以使用另外一個副本。
獲取到初始RDD后,應該考慮盡早地過濾掉不需要的數據,進而減少對內存的占用,從而提升Spark作業的運行效率。
Spark作業中的并行度指各個stage 的 task 的數量。
如果并行度設置不合理而導致并行度過低,會導致資源的極大浪費,例如,20個 Executor,每個 Executor 分配 3 個CPU core,而Spark作業有 40 個task,這樣每個Executor分配到的task個數是2個,這就使得每個Executor有一個CPU core空閑,導致資源的浪費。
理想的并行度設置,應該是讓并行度與資源相匹配,簡單來說就是在資源允許的前提下,并行度要設置的盡可能大,達到可以充分利用集群資源。合理的設置并行度,可以提升整個 Spark 作業的性能和運行速度。
Spark官方推薦,task數量應該設置為Spark作業總CPU core數量的2~3倍。之所以沒有推薦task數量與CPU core總數相等,是因為task的執行時間不同,有的task執行速度快而有的task執行速度慢,如果task數量與CPU core總數相等,那么執行快的task執行完成后,會出現CPU core空閑的情況。*如果task數量設置為CPU core總數的**2~3**倍,那么一個task執行完畢后,CPU core會立刻執行下一個task,降低了資源的浪費,同時提升了Spark作業運行的效率。*
Spark作業并行度的設置如代碼:
new SparkConf() .set("spark.default.parallelism", "500")
默認情況下,task 中的算子中如果使用了外部的變量,每個 task 都會獲取一份變量的復本,這就造成了內存的極大消耗。 - 一方面,如果后續對 RDD 進行持久化,可能就無法將 RDD 數據存入內存,只能寫入磁盤,磁盤IO將會嚴重消耗性能; - 另一方面,task在創建對象的時候,也許會發現堆內存無法存放新創建的對象,這就會導致頻繁的GC,GC會導致工作線程停止,進而導致Spark暫停工作一段時間,嚴重影響Spark性能。
假設當前任務配置了20個Executor,指定500個task,有一個20M的變量被所有task共用,此時會在500個task中產生500個副本,耗費集群10G的內存,如果使用了廣播變量, 那么每個Executor保存一個副本,一共消耗400M內存,內存消耗減少了5倍。
廣播變量在每個Executor保存一個副本,此Executor的所有task共用此廣播變量,這讓變量產生的副本數量大大減少。
默認情況下,Spark 使用 Java 的序列化機制。Java的序列化機制使用方便,不需要額外的配置,在算子中使用的變量實現Serializable接口即可,但是,Java 序列化機制的效率不高,序列化速度慢并且序列化后的數據所占用的空間依然較大。
Kryo序列化機制比Java序列化機制性能提高10倍左右,Spark之所以沒有默認使用Kryo作為序列化類庫,是因為它不支持所有對象的序列化,同時Kryo需要用戶在使用前注冊需要序列化的類型,不夠方便,但從Spark 2.0.0版本開始,簡單類型、簡單類型數組、字符串類型的Shuffling RDDs 已經默認使用Kryo序列化方式了。
public class MyKryoRegistrator implements KryoRegistrator{ @Override public void registerClasses(Kryo kryo){ kryo.register(StartupReportLogs.class); } } //創建SparkConf對象 val conf = new SparkConf().setMaster(…).setAppName(…) //使用Kryo序列化庫,如果要使用Java序列化庫,需要把該行屏蔽掉 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); //在Kryo序列化庫中注冊自定義的類集合,如果要使用Java序列化庫,需要把該行屏蔽掉 conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator");
Spark 作業運行過程中,Driver 會對每一個 stage 的 task 進行分配。根據 Spark 的 task 分配算法,Spark希望task能夠運行在它要計算的數據所在的節點(數據本地化思想),這樣就可以避免數據的網絡傳輸。
通常來說,task可能不會被分配到它處理的數據所在的節點,因為這些節點可用的資源可能已經用盡,此時,Spark會等待一段時間,默認3s,如果等待指定時間后仍然無法在指定節點運行,那么會自動降級,嘗試將task分配到比較差的本地化級別所對應的節點上,比如將task分配到離它要計算的數據比較近的一個節點,然后進行計算,如果當前級別仍然不行,那么繼續降級。
當task要處理的數據不在task所在節點上時,會發生數據的傳輸。task會通過所在節點的BlockManager獲取數據,BlockManager發現數據不在本地時,會通過網絡傳輸組件從數據所在節點的BlockManager處獲取數據。
網絡傳輸數據的情況是我們不愿意看到的,大量的網絡傳輸會嚴重影響性能,因此,我們希望通過調節本地化等待時長,如果在等待時長這段時間內,目標節點處理完成了一部分task,那么當前的task將有機會得到執行,這樣就能夠改善Spark作業的整體性能。
表2-3 Spark本地化等級
*名稱* | *解析* |
---|---|
*PROCESS_LOCAL* | 進程本地化,task和數據在同一個Executor中,性能最好。 |
*NODE_LOCAL* | 節點本地化,task和數據在同一個節點中,但是task和數據不在同一個Executor中,數據需要在進程間進行傳輸。 |
*RACK_LOCAL* | 機架本地化,task和數據在同一個機架的兩個節點上,數據需要通過網絡在節點之間進行傳輸。 |
*NO_PREF* | 對于task來說,從哪里獲取都一樣,沒有好壞之分。 |
*ANY* | task和數據可以在集群的任何地方,而且不在一個機架中,性能最差。 |
在Spark項目開發階段,可以使用client模式對程序進行測試,此時,可以在本地看到比較全的日志信息,日志信息中有明確的task數據本地化的級別,如果大部分都是PROCESS_LOCAL,那么就無需進行調節,但是如果發現很多的級別都是NODE_LOCAL、ANY,那么需要對本地化的等待時長進行調節,通過延長本地化等待時長,看看task的本地化級別有沒有提升,并觀察Spark作業的運行時間有沒有縮短。 注意,過猶不及,不要將本地化等待時長延長地過長,導致因為大量的等待時長,使得Spark作業的運行時間反而增加了。
val conf = new SparkConf() .set("spark.locality.wait", "6")//配置這一個,就都有了相當于
普通的 map 算子對 RDD 中的每一個元素進行操作,而 mapPartitions 算子對 RDD 中每一個分區進行操作。
使用了foreachPartition算子后,可以獲得以下的性能提升:
1.對于我們寫的function函數,一次處理一整個分區的數據;
2.對于一個分區內的數據,創建唯一的數據庫連接;
3.只需要向數據庫發送一次SQL語句和多組參數;
在生產環境中,全部都會使用foreachPartition算子完成數據庫操作。foreachPartition算子存在一個問題,與mapPartitions算子類似,如果一個分區的數據量特別大,可能會造成OOM,即內存溢出。
在Spark任務中我們經常會使用filter算子完成RDD中數據的過濾,在任務初始階段,從各個分區中加載到的數據量是相近的,但是一旦進過filter過濾后,每個分區的數據量有可能會存在較大差異
repartition與coalesce都可以用來進行重分區,其中repartition只是coalesce接口中shuffle為true的簡易實現,coalesce默認情況下不進行shuffle,但是可以通過參數進行設置。
在第一節的常規性能調優中我們講解了并行度的調節策略,但是,并行度的設置對于Spark SQL是不生效的,用戶設置的并行度只對于Spark SQL以外的所有Spark的stage生效。
Spark SQL的并行度不允許用戶自己指定,Spark SQL自己會默認根據 hive 表對應的 HDFS 文件的 split 個數自動設置 Spark SQL 所在的那個 stage 的并行度,用戶自己通spark.default.parallelism參數指定的并行度,只會在沒Spark SQL的stage中生效。
由于Spark SQL所在stage的并行度無法手動設置,如果數據量較大,并且此stage中后續的transformation操作有著復雜的業務邏輯,而Spark SQL自動設置的task數量很少,這就意味著每個task要處理為數不少的數據量,然后還要執行非常復雜的處理邏輯,這就可能表現為第一個有 Spark SQL 的 stage 速度很慢,而后續的沒有 Spark SQL 的 stage 運行速度非常快。
為了解決Spark SQL無法設置并行度和 task 數量的問題,我們可以使用repartition算子。
類似于mapreduce的combiner,可以實現本地預聚合,降低shuffle傳輸的數據量,提升性能。
在 Spark 任務運行過程中,如果 shuffle 的map端處理的數據量比較大,但是map端緩沖的大小是固定的,可能會出現map端緩沖數據頻繁spill溢寫到磁盤文件中的情況,使得性能非常低下,通過調節map端緩沖的大小,可以避免頻繁的磁盤 IO 操作,進而提升 Spark 任務的整體性能。
map端緩沖的默認配置是32KB,如果每個task處理640KB的數據,那么會發生640/32 = 20次溢寫,如果每個task處理64000KB的數據,機會發生64000/32=2000此溢寫,這對于性能的影響是非常嚴重的。
val conf = new SparkConf() .set("spark.shuffle.file.buffer", "64")
Spark Shuffle 過程中,shuffle reduce task 的 buffer緩沖區大小決定了reduce task 每次能夠緩沖的數據量,也就是每次能夠拉取的數據量,如果內存資源較為充足,適當增加拉取數據緩沖區的大小,可以減少拉取數據的次數,也就可以減少網絡傳輸的次數,進而提升性能。
reduce端數據拉取緩沖區的大小可以通過spark.reducer.maxSizeInFlight參數進行設置,默認為48MB,
val conf = new SparkConf() .set("spark.reducer.maxSizeInFlight", "96")
Spark Shuffle 過程中,reduce task 拉取屬于自己的數據時,如果因為網絡異常等原因導致失敗會自動進行重試。對于那些包含了特別耗時的 shuffle 操作的作業,建議增加重試最大次數(比如60次),以避免由于 JVM 的full gc 或者網絡不穩定等因素導致的數據拉取失敗。在實踐中發現,對于針對超大數據量(數十億~上百億)的shuffle 過程,調節該參數可以大幅度提升穩定性。
reduce 端拉取數據重試次數可以通過spark.shuffle.io.maxRetries參數進行設置,該參數就代表了可以重試的最大次數。如果在指定次數之內拉取還是沒有成功,就可能會導致作業執行失敗,默認為3,
val conf = new SparkConf() .set("spark.shuffle.io.maxRetries", "6")
Spark Shuffle 過程中,reduce task 拉取屬于自己的數據時,如果因為網絡異常等原因導致失敗會自動進行重試,在一次失敗后,會等待一定的時間間隔再進行重試,可以通過加大間隔時長(比如60s),以增加shuffle操作的穩定性。
reduce端拉取數據等待間隔可以通過spark.shuffle.io.retryWait參數進行設置,默認值為5s,
val conf = new SparkConf() .set("spark.shuffle.io.retryWait", "60s")
對于SortShuffleManager,如果shuffle reduce task的數量小于某一閾值則shuffle write過程中不會進行排序操作,而是直接按照未經優化的HashShuffleManager的方式去寫數據,但是最后會將每個task產生的所有臨時磁盤文件都合并成一個文件,并會創建單獨的索引文件。
當你使用SortShuffleManager時,如果的確不需要排序操作,那么建議將這個參數調大一些,大于shuffle read task的數量,那么此時map-side就不會進行排序了,減少了排序的性能開銷,但是這種方式下,依然會產生大量的磁盤文件,因此shuffle write性能有待提高。 SortShuffleManager排序操作閾值的設置可以通過spark.shuffle.sort. bypassMergeThreshold這一參數進行設置,默認值為200,
val conf = new SparkConf() .set("spark.shuffle.sort.bypassMergeThreshold", "400")
根據 Spark 靜態內存管理機制,堆內存被劃分為了兩塊,Storage 和 Execution。
Storage 主要用于緩存 RDD數據和 broadcast 數據,Execution主要用于緩存在shuffle過程中產生的中間數據,Storage占系統內存的60%,Execution占系統內存的20%,并且兩者完全獨立。 在一般情況下,Storage的內存都提供給了cache操作,但是如果在某些情況下cache操作內存不是很緊張,而task的算子中創建的對象很多,Execution內存又相對較小,這回導致頻繁的minor gc,甚至于頻繁的full gc,進而導致Spark頻繁的停止工作,性能影響會很大。 在Spark UI中可以查看每個stage的運行情況,包括每個task的運行時間、gc時間等等,如果發現gc太頻繁,時間太長,就可以考慮調節Storage的內存占比,讓task執行算子函數式,有更多的內存可以使用。 Storage內存區域可以通過spark.storage.memoryFraction參數進行指定,默認為0.6,即60%,可以逐級向下遞減,
val conf = new SparkConf() .set("spark.storage.memoryFraction", "0.4")
根據Spark統一內存管理機制,堆內存被劃分為了兩塊,Storage 和 Execution。Storage 主要用于緩存數據,Execution 主要用于緩存在 shuffle 過程中產生的中間數據,兩者所組成的內存部分稱為統一內存,Storage和Execution各占統一內存的50%,由于動態占用機制的實現,shuffle 過程需要的內存過大時,會自動占用Storage 的內存區域,因此無需手動進行調節。
Executor 的堆外內存主要用于程序的共享庫、Perm Space、 線程Stack和一些Memory mapping等, 或者類C方式allocate object。
有時,如果你的Spark作業處理的數據量非常大,達到幾億的數據量,此時運行 Spark 作業會時不時地報錯,例如shuffle output file cannot find,executor lost,task lost,out of memory等,這可能是Executor的堆外內存不太夠用,導致 Executor 在運行的過程中內存溢出。
stage 的 task 在運行的時候,可能要從一些 Executor 中去拉取 shuffle map output 文件,但是 Executor 可能已經由于內存溢出掛掉了,其關聯的 BlockManager 也沒有了,這就可能會報出 shuffle output file cannot find,executor lost,task lost,out of memory等錯誤,此時,就可以考慮調節一下Executor的堆外內存,也就可以避免報錯,與此同時,堆外內存調節的比較大的時候,對于性能來講,也會帶來一定的提升。
默認情況下,Executor 堆外內存上限大概為300多MB,在實際的生產環境下,對海量數據進行處理的時候,這里都會出現問題,導致Spark作業反復崩潰,無法運行,此時就會去調節這個參數,到至少1G,甚至于2G、4G。
Executor堆外內存的配置需要在spark-submit腳本里配置,
--conf spark.executor.memoryOverhead=2048
以上參數配置完成后,會避免掉某些JVM OOM的異常問題,同時,可以提升整體 Spark 作業的性能。
在 Spark 作業運行過程中,Executor 優先從自己本地關聯的 BlockManager 中獲取某份數據,如果本地BlockManager沒有的話,會通過TransferService遠程連接其他節點上Executor的BlockManager來獲取數據。
如果 task 在運行過程中創建大量對象或者創建的對象較大,會占用大量的內存,這會導致頻繁的垃圾回收,但是垃圾回收會導致工作現場全部停止,也就是說,垃圾回收一旦執行,Spark 的 Executor 進程就會停止工作,無法提供相應,此時,由于沒有響應,無法建立網絡連接,會導致網絡連接超時。
在生產環境下,有時會遇到file not found、file lost這類錯誤,在這種情況下,很有可能是Executor的BlockManager在拉取數據的時候,無法建立連接,然后超過默認的連接等待時長120s后,宣告數據拉取失敗,如果反復嘗試都拉取不到數據,可能會導致 Spark 作業的崩潰。這種情況也可能會導致 DAGScheduler 反復提交幾次 stage,TaskScheduler 返回提交幾次 task,大大延長了我們的 Spark 作業的運行時間。
此時,可以考慮調節連接的超時時長,連接等待時長需要在spark-submit腳本中進行設置
--conf spark.core.connection.ack.wait.timeout=300
調節連接等待時長后,通常可以避免部分的XX文件拉取失敗、XX文件lost等報錯。
以上就是如何進行sparkcore離線性能調優,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。