您好,登錄后才能下訂單哦!
這篇文章主要介紹“MapReduce的Shuffle機制是什么”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“MapReduce的Shuffle機制是什么”文章能幫助大家解決問題。
Shuffle過程,也稱Copy階段。reduce task從各個map task上遠程拷貝一片數據,并針對某一片數據,如果其大小超過一定的閥值,則寫到磁盤上,否則直接放到內存中。
map函數開始產生輸出時,并不是簡單地將它寫到磁盤上。這個過程更復雜,它利用緩沖的方式寫到內存并出于效率的目的進行預排序。
每個map任務都有一個環形緩沖區用于存儲任務輸出。在默認情況下,緩沖區的大小為100MB,這個值可以通過mapreduce.task.io.sort.mb
屬性來調整。一旦緩沖內容達到閾值(mapreduce.map.sort.spill.percent
,默認為80%),一個后臺線程便開始把內容溢寫(spill)到磁盤,在溢寫到磁盤的過程中,map輸出繼續寫道緩沖區,但如果在此期間緩沖區被寫滿,map會被阻塞直到磁盤過程完成。溢寫過程按輪詢方式將緩沖區的內容寫到mapreduce.cluster.local.dir
屬性在作業特定子目錄下的指定的目錄中。在寫磁盤之前,線程首先根據數據最終要傳的reducer把數據劃分成相應的分區(partition,用戶也可自定義分區函數,但默認的partitioner通過哈希函數來分區,也很高效)。在每個分區中,后臺線程按鍵進行內存中排序,如果有一個combiner函數,它就在排序后的輸出上運行。運行combiner函數使得map輸出結果更緊湊,因此減少寫到磁盤的數據和傳遞給reducer的數據。
每次內存緩沖區達到溢出閾值時,就會新建一個溢出文件(spill file),因此,在map任務寫完其最后一個輸出記錄后,會有幾個溢寫文件。在任務完成之前,溢寫文件被合并成一個已分區且已排序的輸出文件。配置屬性是mapreduce.task.io.sort.factor
控制著一次最多能合并多少流,默認值是10.
如果至少存在3個溢寫文件(通過mapreduce.map.combine.minspills
屬性設置)時,則combiner就會在輸出文件寫到磁盤之前再次運行。combiner可以在輸入上反復運行,但并不影響最終結果。如果只有1個或者2個溢寫文件,那么由于map輸出規模減少,因此不值得調用combiner帶來的開銷,因此不會為該map輸出再次運行combiner。
在將壓縮map輸出寫到磁盤的過程中對他進行壓縮往往是一個很好的主意,因為這樣寫磁盤的速度更快,節約磁盤空間,并且減少傳給reducer的數據量。在默認情況下,輸出時不壓縮的,但只要將mapreduce.map.output.compress
設置為true,就可以輕松使用此功能。使用的壓縮庫由mapreduce.map.output.compress.codec
指定。
reducer通過HTTP得到輸出文件的分區。用于文件分區的工作線程的數量由任務的mapreduce.shuffle.max.threads
屬性控制,此設置針對的是每一個節點管理器,而不是針對每個map任務。默認值0將最大線程數設置為機器中處理器數量的兩倍。
現在轉到處理過程的reduce部分。map輸出文件位于運行map任務的tasktracker的本地磁盤(注意,盡管map輸出經常寫到map tasktracker 的本地磁盤,但reduce輸出并不這樣),現在,tasktracker需要為分區文件運行reduce任務。并且,reduce任務需要集群上若干個map任務的map輸出作為其特殊的分區文件。每個map任務的完成時間可能不同,因此在每個任務完成時,reduce任務就開始復制其輸出。這就是reduce任務的復制階段。reduce任務有少量復制線程,因此能夠并行取得map輸出。默認值是5個線程,但這個默認值可以修改設置mapreduce.reduce.shuffle.parallelcopies
屬性即可。
如果map輸出相當小,會被復制到reduce任務JVM的內存(緩沖區大小由mapreduce.reduce.shuffle.input.buffer.percent
屬性控制,指定用于此用途的堆空間的百分比),否則,map輸出被復制到磁盤。一旦內存緩沖區達到閾值大小(由mapreduce.reduce.shuffle.merge.percent
決定)或者達到map輸出閾值(由mapreduce.reduce.merge.inmen.threshold
控制),則合并后溢出寫到磁盤中。如果指定combiner,則在合并期間運行它以降低寫入硬盤的數據量。
隨著磁盤上副本增多,后臺線程會將它們合并為更大的、排好序的文件。這會為后面的合并節省一些時間。注意,為了合并,壓縮的map輸出(通過map任務)都必須在內存中被解壓縮。
復制完所有map輸出后,reduce任務進入排序階段(更恰當的說法是合并階段,因為排序是在map端進行的),這個階段將合并map輸出,維持其順序排序。這是循環進行的。比如,如果有50個map輸出,而合并因子是10(10為默認設置,由mapreduce.task.io.sort.factor
屬性設置,與map的合并類似),合并將進行5趟,每趟將10個文件合并成一個文件,因此最后有5個中間文件。
在最后階段,即reduce階段,直接把數據輸入reduce函數,從而省略了一次磁盤往返行程,并沒有將這5個文件合并成一個已排序的文件作為最后一趟。最后的合并可以來自內存和磁盤片段。
?
每趟合并的文件數實際上比事例中展示有所不同。目標是合并最少數量的文件以便滿足于最后一趟的合并系數。因此如果有40個文件,我們并不會在四趟中每趟合并10個文件從而得到4個文件。相反,第一趟只合并4個文件,隨后的三趟合并完整的10個文件。在最后一趟中,4個已合并的文件和余下的6個(未合并的)文件合計10個。
在reduce階段,對已排序輸出中的每個鍵都調用reduce函數。此階段的輸出直接寫到輸出文件系統,一般為HDFS(可自定義)。如果采用HDFS,由于節點管理器也運行數據節點,所以第一個塊的副本將被寫入到本地磁盤。
關于“MapReduce的Shuffle機制是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。