您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關如何剖析具體實現,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
一、概述
這里我們從源碼角度剖析BypassMergeSortShuffleWriter實現策略的原理和具體的實現細節。
BypassMergeSortShuffleWriter具體的實現都在對應類的write()函數中,我們直接看源碼進行剖析
1.先看構造函數初始化
BypassMergeSortShuffleWriter( BlockManager blockManager, IndexShuffleBlockResolver shuffleBlockResolver, BypassMergeSortShuffleHandle<K, V> handle, int mapId, TaskContext taskContext, SparkConf conf) { // 獲取spark.shuffle.file.buffer參數值,默認32k,這里是一個比較重要的條有參數, // 該參數用于設置shuffle write task的BufferedOutputStream的buffer緩沖大小。 // 將數據寫到磁盤文件之前,會先寫入buffer緩沖中,待緩沖寫滿之后,才會溢寫到磁盤 //如果作業可用的內存資源較為充足的話,可以適當增加這個參數的大小(比如64k),從而減少shuffle write過程中溢寫磁盤文件的次數, // 也就可以減少磁盤IO次數,進而提升性能 this.fileBufferSize = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024; // 是否采用NIO的從文件到文件流的復制方式,默認值是true 一般不用修改 this.transferToEnabled = conf.getBoolean("spark.file.transferTo", true); this.blockManager = blockManager; // 獲取shufflehandle中的ShuffleDependency對象,通過該對象得到分區器和分區個數等數據。 final ShuffleDependency<K, V, V> dep = handle.dependency(); this.mapId = mapId; this.shuffleId = dep.shuffleId(); this.partitioner = dep.partitioner(); this.numPartitions = partitioner.numPartitions(); this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics(); //設置序列化工具對象,和shuffleBlockResolver對象, // 該對象用來創建和維護shuffle的數據的邏輯塊和物理文件位置之間的映射的對象 this.serializer = dep.serializer(); this.shuffleBlockResolver = shuffleBlockResolver; }
2.再看write()函數,源碼如下:
//這里大體意思是 為每個分區在磁盤創建臨時文件 并給每一個writer
上面代碼的大體思路如下:
a.確定分區數,然后為每個分區創建DiskBlockObjectWriter和臨時文件
b.循環將record通過Partitioner進行分區,并寫入對應分區臨時文件
c. 將分區數據刷到磁盤
d.根據shuffleId和mapId,構建ShuffleDataBlockId,創建合并文件data和合并文件的臨時文件,文件格式為:
shuffle_{shuffleId}_{mapId}_{reduceId}.data
e.將分區文件合并到一個總的臨時文件,合并后會重命名為最終輸出文件名,并返回一個對應分區文件長度的數組
f.創建索引文件index和索引臨時文件,每一個分區的長度和offset寫入索引文件等;并且重命名臨時data文件和臨時index文件
g.將一些信息封裝到MapStatus返回
存在問題:
這種Writer會為每個分區創建一個臨時文件,如果分區過多時,會創建很多的output輸出流和臨時文件對象,占用資源過多,性能會下降。
重點關注:
參數:spark.shuffle.file.buffer 默認值32k
默認情況下,shuffle的map task,輸出到磁盤文件的時候,統一都會先寫入到每個task自己關聯的一個內存緩沖區,每一次當內存緩沖區滿溢后,然后才會進行溢寫到磁盤中。如果內存沖突可適當調大這個參數,從而減少shuffle write過程中溢寫磁盤文件的次數,也就可以減少磁盤IO次數,進而提升性能。在實踐中發現,合理調節該參數,性能會有1%~5%的提升。
關于如何剖析具體實現就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。