您好,登錄后才能下訂單哦!
本篇內容介紹了“如何編寫MapReudce程序”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
Combiner實質上就是不同上下文的Reducer的功能是差不多的.所以說它本質上就是一個Reducer.每一個map可能會產生大量的輸出,combiner的作用就是在map端對輸出先做一次合并,以減少傳輸到reducer的數據量。combiner最基本是實現本地key的歸并,combiner具有類似本地的reduce功能。如果不用combiner,那么,所有的結果都是reduce完成,效率會相對低下(會消耗較多的網絡IO)。使用combiner,先完成的map會在本地聚合,提升速度.
|
案例3:在wordcount的基礎上,實現Combiner編程
package cn.itcast.yun10; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class WordcountCombiner extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { // accept // the same as reduce String word = key.toString(); long count = 0L; for (LongWritable v : values) { count += v.get(); } context.write(new Text(word), new LongWritable(count)); } } 2.指定Combiner
|
使用Combiner編程的兩點注意:
a.不要以為在寫MapReduce程序時設置了Combiner就認為Combiner一定會起作用,實際情況是這樣的嗎?答案是否定的。hadoop文檔中也有說明Combiner可能被執行也可能不被執行。那么在什么情況下不執行呢?如果當前集群在很繁忙的情況下job就是設置了也不會執行Combiner.
b.Combiner的輸出是Reducer的輸入,所以添加Combiner絕不能改變最終的計算結果。所以Combiner只應該用于那種Reduce的輸入key/value與輸出key/value類型完全一致,且不影響最終結果的場景。比如累加,最大值等。但是并不適用于求平均值類似的操作.
至于Combiner的執行時機,待分析完Shuffle之后再來說...?????
MapReduce確保每個Reducer的輸入都按鍵排序.系統執行排序的過程在map輸出之后,而在reducer輸入之前完成。稱為Shuffle---洗牌.觀察shuffle如何工作的,有助于我們理解工作機制例如(優化MapReduce程序).shuffle屬于不斷被優化和改進的代碼庫的一部分.它會隨著版本的不同而隨時改變.在Hadoop里有這么一句話,說shuffle是MapReduce的心臟,是奇跡發生的地方.
Map端:map函數之后.
map函數開始產生輸出時,并不是簡單地將它寫到磁盤中。這個過程是很復雜的。它會利用緩沖區的方式寫到內存。而且處于效率會考慮進行預先排序.
每個map任務都有一個環形內存緩沖區,用于存儲任務的輸出。默認的情況下,緩沖區的大小為100MB,可以通過io.sort.mb的屬性來指定。一旦緩沖區達到閥值(io.sort.spill.percent,默認情況下是80%),就有一個后臺線程開始把內容寫到spill磁盤中。在寫磁盤過程中,map輸出繼續被寫到緩沖區,但如果在此期間緩沖區被填滿,map輸出就會被阻塞直到寫磁盤過程完成。而寫磁盤將按輪詢方式寫到 mapred.local.dir 屬性指定的作業特定子目錄中的目錄中.在這個目錄下新建一個溢出寫文件。
在寫磁盤之前,要partition,sort(數據先分區,然后再排序)。如果有combiner,combiner排序后數據。combiner待榷商。
在寫磁盤的時候會采用壓縮格式。Hadoop中的壓縮庫由 mapred.map.output.compression.codec指定.以后會做詳細的說明.
等最后記錄寫完,合并全部溢出寫文件為一個分區且排序的文件.配置屬性 io.sort.factor控制著一次最多能合并多少流,默認大小為10.這就是merge合并了.
實際上,Conbiner函數的執行時機可能會在map的merge操作完成之前,也可能在merge之后執行,這個時機由配置參數min.num.spill.for.combine(該值默認為3),也就是說在map端產生的spill溢出文件最少有min.num.spill.for.combine的時候,Conbiner函數會在(merge操作合并最終的本機結果文件之前)執行,否則在merge之后執行。通過這種方式,就可以在spill文件很多并且需要做conbiner的時候,減少寫入本地磁盤的數據量,同樣也減少了對磁盤的讀寫頻率,可以起到優化作業的目的。--------也就是說spill出的而文件個數達到三,就可以執行Combiner函數.然后再meger.
reducer會通過HTTP方式得到上述執行的結果(輸出文件的分區) (map中),用于文件分區的工作線程的數量由任務的tracker.http.threads屬性控制.默認值是40.
Reducer端:reduce函數之前
在運行reduce任務之前,需要集群中多個map任務的輸出作為分區材料。但是每個map任務的完成時間很有可能是不同的。所以只要有個map任務完成,reduce就會復制COPY它的輸出。這就是復制階段。在reduce端會開啟幾個復制的線程去COPY。該數字有mapred.reduce.parallel.copies屬性決定。默認值為5.
復制到reduce的話,是有可能到內存,也有可能到磁盤上.這是內存緩沖區大小有mapred.job.shuffle.input.buffer.percenet屬性控制。占堆空間的百分比。一旦緩沖區達到閥值的大小,則會合并后溢出到磁盤。隨著磁盤文件復制文件越來越多。就會合并更大的文件。
然后進入排序階段。準確來說是合并階段,因為排序在map端已經完成。合并時循環進行的。這個合并也是比較復雜的。
最后將得到的數據輸入reduce函數.最后合并可能來自內存也有可能來自磁盤.最后來幾個圖吧。
案例4:存在兩個文件a.txt,b.txt.兩者里面的內容如下:
a.txt文件 -------------------------------- hello world hello tom how are you how do you do ----------------------------------- b.txt文件 hello is fool i say hi how do you think --------------------------------------- c.txt文件 you are all handsome i am the superman how do you think --------------------------------------- 在上述文件中建立倒排索引,就像如下格式: hello --> a.txt,2 b.txt,1 how --->a.txt,2 b.txt,1 c.txt,1 思路如下:通過兩次MapReduce執行出想要的結果. 代碼省略..... 實驗結果: |
單詞計數,數據去重,排序,Top K,選擇,投影,分組,多表連接,單表關聯.都可以通過MapReduce完成。熟悉這些的話,對于后面的Hive學習有很大的用處.
在這里就拿多表連接來做一個案例.
案例5:存在兩個表A,B.兩表之間存在關系。假設兩個表都是以文本文件的形式存儲,SQL語句:select a.id,b.name from a,b where a.id = b.id,得到結果輸出到文件.
思路如下:
代碼省略.
split的作用就是決定mapper的數量,hadoop將mapreduce的輸入數據劃分成等長的小數據塊。稱為輸入分片(input split).在前面的mapreduce輸入類InputFormat中有講到過.這些小數據塊稱為分片。一個分片對應著一個map任務.關于分片的大小,經驗來說,趨向于一個HDFS的默認塊大小.
這樣的話,就可以獲取分片的大小啦......
“如何編寫MapReudce程序”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。