您好,登錄后才能下訂單哦!
這篇文章主要講解了“Hadoop Mapreduce二次排序過程是怎樣的”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Hadoop Mapreduce二次排序過程是怎樣的”吧!
1、MapReduce中數據流動
(1)最簡單的過程: map - reduce
(2)定制了partitioner以將map的結果送往指定reducer的過程: map - partition - reduce
(3)增加了在本地先進性一次reduce(優化)過程: map - combin(本地reduce) - partition -reduce
2、Mapreduce中Partition的概念以及使用。
(1)Partition的原理和作用
得到map給的記錄后,他們該分配給哪些reducer來處理呢?hadoop采用的默認的派發方式是根據散列值來派發的,但是實際中,這并不能很高效或者按照我們要求的去執行任務。例如,經過partition處理后,一個節點的reducer分配到了20條記錄,另一個卻分配道了10W萬條,試想,這種情況效率如何。又或者,我們想要處理后得到的文件按照一定的規律進行輸出,假設有兩個reducer,我們想要最終結果中part-00000中存儲的是"h"開頭的記錄的結果,part-00001中存儲其他開頭的結果,這些默認的partitioner是做不到的。所以需要我們自己定制partition來根據自己的要求,選擇記錄的reducer。自定義partitioner很簡單,只要自定義一個類,并且繼承Partitioner類,重寫其getPartition方法就好了,在使用的時候通過調用Job的setPartitionerClass指定一下即可
Map的結果,會通過partition分發到Reducer上。Mapper的結果,可能送到Combiner做合并,Combiner在系統中并沒有自己的基類,而是用Reducer作為Combiner的基類,他們對外的功能是一樣的,只是使用的位置和使用時的上下文不太一樣而已。Mapper最終處理的鍵值對<key, value>,是需要送到Reducer去合并的,合并的時候,有相同key的鍵/值對會送到同一個Reducer那。哪個key到哪個Reducer的分配過程,是由Partitioner規定的。它只有一個方法,
getPartition(Text key, Text value, int numPartitions)
輸入是Map的結果對<key, value>和Reducer的數目,輸出則是分配的Reducer(整數編號)。就是指定Mappr輸出的鍵值對到哪一個reducer上去。系統缺省的Partitioner是HashPartitioner,它以key的Hash值對Reducer的數目取模,得到對應的Reducer。這樣保證如果有相同的key值,肯定被分配到同一個reducre上。如果有N個reducer,編號就為0,1,2,3……(N-1)。
(2)Partition的使用
分區出現的必要性,如何使用Hadoop產生一個全局排序的文件?最簡單的方法就是使用一個分區,但是該方法在處理大型文件時效率極低,因為一臺機器必須處理所有輸出文件,從而完全喪失了MapReduce所提供的并行架構的優勢。事實上我們可以這樣做,首先創建一系列排好序的文件;其次,串聯這些文件(類似于歸并排序);最后得到一個全局有序的文件。主要的思路是使用一個partitioner來描述全局排序的輸出。比方說我們有1000個1-10000的數據,跑10個ruduce任務, 如果我們運行進行partition的時候,能夠將在1-1000中數據的分配到第一個reduce中,1001-2000的數據分配到第二個reduce中,以此類推。即第n個reduce所分配到的數據全部大于第n-1個reduce中的數據。這樣,每個reduce出來之后都是有序的了,我們只要cat所有的輸出文件,變成一個大的文件,就都是有序的了
基本思路就是這樣,但是現在有一個問題,就是數據的區間如何劃分,在數據量大,還有我們并不清楚數據分布的情況下。一個比較簡單的方法就是采樣,假如有一億的數據,我們可以對數據進行采樣,如取10000個數據采樣,然后對采樣數據分區間。在Hadoop中,patition我們可以用TotalOrderPartitioner替換默認的分區。然后將采樣的結果傳給他,就可以實現我們想要的分區。在采樣時,我們可以使用hadoop的幾種采樣工具,RandomSampler,InputSampler,IntervalSampler。
這樣,我們就可以對利用分布式文件系統進行大數據量的排序了,我們也可以重寫Partitioner類中的compare函數,來定義比較的規則,從而可以實現字符串或其他非數字類型的排序,也可以實現二次排序乃至多次排序。
2、MapReduce中分組的概念和使用
分區的目的是根據Key值決定Mapper的輸出記錄被送到哪一個Reducer上去處理。而分組的就比較好理解了。筆者認為,分組就是與記錄的Key相關。在同一個分區里面,具有相同Key值的記錄是屬于同一個分組的。
3、MapReduce中Combiner的使用
很多MapReduce程序受限于集群上可用的帶寬,所以它會盡力最小化需要在map和reduce任務之間傳輸的中間數據。Hadoop允許用戶聲明一個combiner function來處理map的輸出,同時把自己對map的處理結果作為reduce的輸入。因為combiner function本身只是一種優化,hadoop并不保證對于某個map輸出,這個方法會被調用多少次。換句話說,不管combiner function被調用多少次,對應的reduce輸出結果都應該是一樣的。
下面我們以《權威指南》的例子來加以說明,假設1950年的天氣數據讀取是由兩個map完成的,其中第一個map的輸出如下:
(1950, 0)
(1950, 20)
(1950, 10)
第二個map的輸出為:
(1950, 25)
(1950, 15)
而reduce得到的輸入為:(1950, [0, 20, 10, 25, 15]), 輸出為:(1950, 25)
由于25是集合中的最大值,我們可以使用一個類似于reduce function的combiner function來找出每個map輸出中的最大值,這樣的話,reduce的輸入就變成了:
(1950, [20, 25])
各個funciton 對溫度值的處理過程可以表示如下:max(0, 20, 10, 25, 15) =max(max(0, 20, 10), max(25, 15)) = max(20, 25) = 25
注意:并不是所有的函數都擁有這個屬性的(有這個屬性的函數我們稱之為commutative和associative),例如,如果我們要計算平均溫度,就不能這樣使用combiner function,因為mean(0, 20, 10, 25, 15) =14,而mean(mean(0, 20, 10),mean(25, 15)) = mean(10, 20) = 15
combiner function并不能取代reduce function(因為仍然需要reduce function處理來自不同map的帶有相同key的記錄)。但是他可以幫助減少需要在map和reduce之間傳輸的數據,就為這一點combiner function就值得考慮使用。
4、Shuffle階段排序流程詳解
我們首先看一下MapReduce中的排序的總體流程。
MapReduce框架會確保每一個Reducer的輸入都是按Key進行排序的。一般,將排序以及Map的輸出傳輸到Reduce的過程稱為混洗(shuffle)。每一個Map都包含一個環形的緩存,默認100M,Map首先將輸出寫到緩存當中。當緩存的內容達到“閾值”時(閾值默認的大小是緩存的80%),一個后臺線程負責將結果寫到硬盤,這個過程稱為“spill”。Spill過程中,Map仍可以向緩存寫入結果,如果緩存已經寫滿,那么Map進行等待。
Spill的具體過程如下:首先,后臺線程根據Reducer的個數將輸出結果進行分組,每一個分組對應一個Reducer。其次,對于每一個分組后臺線程對輸出結果的Key進行排序。在排序過程中,如果有Combiner函數,則對排序結果進行Combiner函數進行調用。每一次spill都會在硬盤產生一個spill文件。因此,一個Map task有可能會產生多個spill文件,當Map寫出最后一個輸出時,會將所有的spill文件進行合并與排序,輸出最終的結果文件。在這個過程中Combiner函數仍然會被調用。從整個過程來看,Combiner函數的調用次數是不確定的。下面我們重點分析下Shuffle階段的排序過程:
Shuffle階段的排序可以理解成兩部分,一個是對spill進行分區時,由于一個分區包含多個key值,所以要對分區內的<key,value>按照key進行排序,即key值相同的一串<key,value>存放在一起,這樣一個partition內按照key值整體有序了。
第二部分并不是排序,而是進行merge,merge有兩次,一次是map端將多個spill 按照分區和分區內的key進行merge,形成一個大的文件。第二次merge是在reduce端,進入同一個reduce的多個map的輸出 merge在一起,該merge理解起來有點復雜,最終不是形成一個大文件,而且期間數據在內存和磁盤上都有。所以shuffle階段的merge并不是嚴格的排序意義,只是將多個整體有序的文件merge成一個大的文件,由于不同的task執行map的輸出會有所不同,所以merge后的結果不是每次都相同,不過還是嚴格要求按照分區劃分,同時每個分區內的具有相同key的<key,value>對挨在一起。
Shuffle排序綜述:如果只定義了map函數,沒有定義reduce函數,那么輸入數據經過shuffle的排序后,結果為key值相同的輸出挨在一起,且key值小的一定在前面,這樣整體來看key值有序(宏觀意義的,不一定是按從大到小,因為如果采用默認的HashPartitioner,則key 的hash值相等的在一個分區,如果key為IntWritable的話,每個分區內的key會排序好的),而每個key對應的value不是有序的。
5、MapReduce中輔助排序的原理與實現
(1)任務
我們需要把內容如下的sample.txt文件處理為下面文件:
源文件:Sample.txt
bbb 654
ccc 534
ddd 423
aaa 754
bbb 842
ccc 120
ddd 219
aaa 344
bbb 214
ccc 547
ddd 654
aaa 122
bbb 102
ccc 479
ddd 742
aaa 146
目標:part-r-00000
aaa 122
bbb 102
ccc 120
ddd 219
(2)工作原理
過程導引:
1、定義包含記錄值和自然值的組合鍵,本例中為MyPariWritable.
2、自定義鍵的比較器(comparator)來根據組合鍵對記錄進行排序,即同時利用自然鍵和自然值進行排序。(aaa 122組合為一個鍵)。
3、針對組合鍵的Partitioner(本示例使用默認的hashPartitioner)和分組comparator在進行分區和分組時均只考慮自然鍵。
詳細過程:
首先在map階段,使用job.setInputFormatClass定義的InputFormat將輸入的數據集分割成小數據塊splites,同時InputFormat提供一個RecordReder的實現。本例子中使用的是TextInputFormat,他提供的RecordReder會將文本的一行的行號作為key,這一行的文本作為value。這就是自定義Map的輸入是<LongWritable, Text>的原因。然后調用自定義Map的map方法,將一個個<LongWritable, Text>對輸入給Map的map方法。注意輸出應該符合自定義Map中定義的輸出< MyPariWritable, NullWritable>。最終是生成一個List< MyPariWritable, NullWritable>。在map階段的最后,會先調用job.setPartitionerClass對這個List進行分區,每個分區映射到一個reducer。每個分區內又調用job.setSortComparatorClass設置的key比較函數類排序。可以看到,這本身就是一個二次排序。在reduce階段,reducer接收到所有映射到這個reducer的map輸出后,也是會調用job.setSortComparatorClass設置的key比較函數類對所有數據對排序。然后開始構造一個key對應的value迭代器。這時就要用到分組,使用jobjob.setGroupingComparatorClass設置的分組函數類。只要這個比較器比較的兩個key相同,他們就屬于同一個組(本例中由于要求得每一個分區內的最小值,因此比較MyPariWritable類型的Key時,只需要比較自然鍵,這樣就能保證只要兩個MyPariWritable的自然鍵相同,則它們被送到Reduce端時候的Key就認為在相同的分組,由于該分組的Key只取分組中的第一個,而這些數據已經按照自定義MyPariWritable比較器排好序,則第一個Key正好包含了每一個自然鍵對應的最小值),它們的value放在一個value迭代器,而這個迭代器的key使用屬于同一個組的所有key的第一個key。最后就是進入Reducer的reduce方法,reduce方法的輸入是所有的key和它的value迭代器。同樣注意輸入與輸出的類型必須與自定義的Reducer中聲明的一致。
感謝各位的閱讀,以上就是“Hadoop Mapreduce二次排序過程是怎樣的”的內容了,經過本文的學習后,相信大家對Hadoop Mapreduce二次排序過程是怎樣的這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。