您好,登錄后才能下訂單哦!
這篇文章主要介紹了Java大數據開發Hadoop MapReduce的優缺點是什么的相關知識,內容詳細易懂,操作簡單快捷,具有一定借鑒價值,相信大家閱讀完這篇Java大數據開發Hadoop MapReduce的優缺點是什么文章都會有所收獲,下面我們一起來看看吧。
MapReduce是一個進行分布式運算的編程框架,使用戶開發基于hadoop進行數據分析的核心框架。 MapReduce 核心功能就是將用戶編寫的業務邏輯代碼和自帶的默認組件整合成一個完整的 分布式運算程序,并發運行在一個 Hadoop 集群上。
MapReduce的思想核心是分而治之,適用于大規模數據處理場景。
map負責分,將復雜的任務拆解成可以并行計算的若干個任務來處理
reduce負責合,對map階段的結果進行全局匯總
比如說:老師作業留的有點多,一個人寫太費勁了,就可以用MapReduce這種分而治之的思想,將作業進行map處理,分給不同的人,最后所有寫完的部分發到群里進行reduce匯總,復雜的作業簡簡單單。
易于編程
MapReduce將做什么和怎么做分開了,提供了一些接口,程序員只需關注應用層上的問題。具體如何實現并行計算任務則被隱藏了起來。
擴展性
當計算資源不足時,可以增加機器來提高擴展能力
高容錯
一臺機器掛了,可以將計算任務轉移到另一臺節點上進行
適合PB級海量數據的離線處理
不擅長實時計算
無法做到在毫秒級別返回結果
不擅長流式計算
MapReduce處理的數據源只能是靜態的,不能動態變化
不擅長DAG(有向無環圖)計算
每個MR作業處理結束,結果都會寫入到磁盤,造成大量的磁盤IO,導致性能低下
一個MapReduce程序在分布式運行時有三類的實例進程
MrAppMaster : 負責整個程序的過程調度及狀態協調
MapTask : 負責Map階段的數據處理流程
ReduceTask : 負責Reduce階段的數據處理流程
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
以上代碼實現了兩個類:TokenizerMapper和IntSumReducer,它們分別實現了Map和Reduce功能。
Map函數將輸入的每一行文本進行分詞,并將每個單詞映射為一個鍵值對,其中鍵為單詞,值為1,然后將這些鍵值對輸出給Reduce函數。
Reduce函數將相同鍵的值相加,并將最終結果輸出。
在這個例子中,CombinerClass被設置為相同的Reducer類,用于在Map任務結束后本地合并中間結果,以減少網絡傳輸。
最后,將輸入文件和輸出文件的路徑作為命令行參數傳遞給main函數,并啟動MapReduce作業。
job的講解
在Hadoop MapReduce程序中,Job對象是用來定義和運行一個MapReduce作業的。
Job對象的主要功能是封裝了整個MapReduce作業的配置和運行信息,包括輸入數據和輸出數據的路徑、Mapper類和Reducer類的設置、中間結果的輸出類型和格式、作業的提交方式等。
在main函數中,我們創建一個Job對象并設置它的相關屬性。
Job.getInstance()方法返回一個新的Job實例,其中的Configuration對象用來指定作業的一些配置信息。
setJarByClass()方法用來設置作業的jar包,它的參數是定義MapReduce作業的主類。
setMapperClass()、setCombinerClass()和setReducerClass()方法用來指定Mapper、Combiner和Reducer的實現類。
setOutputKeyClass()和setOutputValueClass()方法分別用來設置MapReduce作業的輸出鍵和輸出值的類型。
FileInputFormat.addInputPath()和FileOutputFormat.setOutputPath()方法用來指定輸入文件和輸出文件的路徑。
最后,我們調用job.waitForCompletion()方法來提交并運行作業,并等待作業完成。
如果作業成功完成,waitForCompletion()方法將返回true,否則返回false。
如果作業失敗,我們可以通過job.getJobState()方法來獲取作業的狀態信息,或者查看作業的日志信息來進行排錯和調試。
序列化就是將內存中對象轉換成字節序列,便于存儲到磁盤和網絡傳輸
反序列化時將字節序列或磁盤中的持久化數據轉換成內存中的對象
一般來說,對象只能在本地進程中使用,不能通過網絡發送到另一臺計算機
序列化可以存儲對象,可以將對象發送到遠程計算機
Hadoop序列化和Java序列化都是將對象轉換為字節序列以便于在網絡上傳輸或者存儲到磁盤等持久化存儲介質中。它們的主要區別在于以下幾點:
序列化速度和效率不同:Hadoop序列化比Java序列化更快,因為它采用的是二進制格式,而Java序列化采用的是基于文本的XML或JSON格式。
支持的數據類型不同:Hadoop序列化支持的數據類型比Java序列化更多,包括基本類型、數組、集合、映射、枚舉、自定義類等。
序列化后的數據大小不同:Hadoop序列化生成的字節流比Java序列化生成的字節流更小,因為它使用更緊湊的二進制格式,這對于在網絡上傳輸和存儲到磁盤等介質中非常重要。
可移植性不同:Java序列化生成的字節流只能被Java程序讀取,而Hadoop序列化生成的字節流可以被任何語言的程序讀取,因為它使用了通用的二進制格式。
總的來說,Hadoop序列化更適合用于大規模數據的處理和分布式計算,而Java序列化更適合用于小規模數據的傳輸和存儲。
數據輸入:MapReduce從Hadoop分布式文件系統(HDFS)中讀取輸入數據,并將其分成固定大小的數據塊,每個數據塊大小通常為64MB。
Map階段:在Map階段,MapReduce將每個數據塊分發給一組可擴展的計算節點,每個計算節點運行Map函數來處理它們分配的數據塊。Map函數將輸入數據轉換為一組鍵值對(Key-Value Pairs)的形式,這些鍵值對可以被后續的Reduce函數處理。
Shuffle階段:在Map函數處理完數據之后,MapReduce框架將所有的鍵值對按照它們的鍵進行排序,并將相同鍵的值合并在一起。這個過程通常被稱為“Shuffle”。
Reduce階段:在Reduce階段,MapReduce框架將合并后的鍵值對發送到一組可擴展的計算節點。每個節點運行Reduce函數來處理它們收到的所有鍵值對,并生成最終的輸出結果。
數據輸出:在Reduce函數處理完數據之后,MapReduce將輸出結果寫入HDFS中。
這些步驟中,Map和Reduce函數是由開發者自行編寫的,它們實現了具體的業務邏輯。MapReduce框架提供了分布式計算的基礎設施,負責管理計算節點、任務分配、故障處理等任務,以保證整個計算過程的可靠性和高效性。
總的來說,MapReduce框架的原理是將大數據集劃分成多個小數據塊,然后將這些數據塊分發給多個計算節點并行處理,最后將處理結果合并為一個最終結果。它通過這種方式來充分利用集群中的計算資源,提高計算效率和數據處理能力。
數據塊:Block時HDFS在物理上對數據進行切塊,是HDFS存儲數據的單位
數據切片:數據切片是在邏輯上對輸入進行切片。切片是MR程序計算輸入數據的單位,一個切片會啟動一個MapTask
客戶端提交job時的切片數決定了map階段的并行度
默認情況下,切片大小為BlockSize
切片不會考慮數據整體,是對每個文件進行單獨切片
在MapReduce中,FileInputFormat是一個抽象類,用于定義如何將文件分割成輸入數據塊并生成適合Mapper處理的RecordReader。它是MapReduce中的輸入格式類之一,用于讀取Hadoop分布式文件系統(HDFS)或本地文件系統中的數據。
FileInputFormat包括兩個關鍵方法:getSplits()和createRecordReader()。
getSplits()方法將輸入文件劃分成適合Map任務的數據塊,每個數據塊對應一個Map任務。該方法返回一個InputSplit對象的數組,其中每個InputSplit表示一個文件數據塊。
createRecordReader()方法創建一個RecordReader對象,用于讀取InputSplit中的數據塊。RecordReader負責讀取一個數據塊中的所有記錄,并將它們轉換成key-value對。
FileInputFormat還提供了一些其他的方法,如isSplitable()用于判斷一個文件是否可以被劃分成多個數據塊。
Hadoop提供了一些預定義的FileInputFormat類,如TextInputFormat用于讀取文本文件,SequenceFileInputFormat用于讀取SequenceFile格式的文件等,用戶也可以通過繼承FileInputFormat自定義輸入格式類。
TextInputFormat是FileInputFormat默認的實現類,按行讀取每條記錄
key為該行的起始字節偏移量,為LongWritable類型
value 為這一行的內容,不包括終止符,為Text類型
TextInputFormat是按文件進行規劃分片,不管文件有多小,都是是一個單獨的切片,這樣會產生大量的MapTask,效率低下
CombineTextInputFormat用于小文件過多的場景,可以將多個小文件在邏輯上劃分到一個切片
決定哪些塊放入同一個分片時,CombineTextInputFormat會考慮到節點和機架的因素,所以在MR作業處理輸入的速度不會下降
CombineTextInputFormat不僅可以很好的處理小文件,在處理大文件時也有好處,因為它在每個節點生成了一個分片,分片可能又多個塊組成,CombineTextInputFormat使map操作中處理的數據量和HDFS中文件塊的大小的耦合度降低了
讀取輸入數據:MapTask通過InputFormat獲得RecordReader,從輸入InputSplit中解析出KV
Map階段:將解析出的KV交給map()函數處理,產生一系列新的KV
Collect收集:數據處理完成之后,會調用OutputCollector.collect()輸出結果。在該函數的內部,會生成KV分區,寫入環形緩沖區中
Spill階段:環形緩沖區滿了之后,MR會將數據寫到本地磁盤,形成一個臨時文件,在寫入之前,會對數據進行一次排序
merge階段:所有數據處理完畢之后,MapTask會對所有臨時文件進行一次合并,確保只生成一個數據文件
Spill階段詳情:
通過快速排序對環形緩沖區內的數據進行排序,先按照partition(后面會介紹)編號進行排序,然后再按照K進行排序。排序過后,數據以分區為單位聚集,分區內的所有數按照K有序
按照分區編號由小到大將分區數據寫入工作目錄下的臨時文件 output/spillN.out(N表示當前溢寫的次數),如果設置了combiner(后面會介紹),則寫入文件之前,還會將分區中的數據進行一次聚集操作
將分區數據的元數據寫入到內存索引數據結構SpillRecord中,每個分區的元數據包括臨時文件的偏移量、壓縮前后的數據大小,如果內存索引大于1MB,會將內存索引寫到文件 output.spillN.out.index中
在MapReduce計算模型中,Map任務會將生成的鍵值對按照鍵進行排序,并將其劃分到不同的分區中。分區的數量通常等于Reduce任務的數量。具體來說,Map任務會按照Partitioner函數定義的分區規則對鍵值對進行劃分。Partitioner函數將每個鍵值對映射到一個分區編號,然后Map任務將其輸出到對應的分區中。
Partitioner函數通常是由用戶自定義實現的,其作用是將鍵值對映射到一個特定的分區。Hadoop提供了默認的Partitioner實現,即HashPartitioner,它將鍵哈希后取模得到分區編號,從而實現對鍵值對的劃分。在實際應用中,用戶可以根據自己的需求自定義Partitioner函數,以便將鍵值對劃分到特定的分區中。
如果ReduceTask數量 > Partition數量,會產生多個空的輸出文件
如果ReduceTask數量 < Partition數量,會導致有分區的數據無處安放,會Exception
如果ReduceTask數量 = 1,則不管有多少個分區文件,最終都會只產生一個文件
Combiner在每個mapTask所在的節點運行
Combiner對每個MapTask的輸出進行局部匯總,減少reduce階段的負擔
Combiner使用的前提是不能影響業務邏輯
Copy:拉取數據,Reduce進程啟動copy進程(Fetcher),通過HTTP的方式請求maptask獲取自己的文件,map task分區表示每個map task屬于哪個reduce task
Merge:ReduceTask啟動兩個線程對內存和磁盤中的文件進行合并,防止文件過多。當內存中的數據達到一定閾值時,就會啟動內存到磁盤的merge,與map端的相似。直到沒有map端的數據才結束
合并排序:將數據合并成一個大數據,并進行排序
對排序后的數據調用reduce方法:對鍵相同的鍵值對調用reduce方法,每次調用會產生零個或多個鍵值對,最后將輸出的鍵值對存入HDFS。
Shuffle階段的過程可以分為三個階段:
Map端的輸出:Map任務將生成的鍵值對按照鍵排序,并將其劃分到不同的分區中。如果Map任務的輸出緩存區已滿,則需要將其溢出到本地磁盤的臨時文件中。
數據傳輸:在Shuffle階段中,Map任務的輸出需要傳輸到Reduce任務所在的節點,以便Reduce任務可以從中提取和合并數據。數據傳輸是Shuffle階段的關鍵步驟,其速度和效率直接影響整個MapReduce作業的性能。
Reduce端的輸入:Reduce任務需要從本地磁盤讀取屬于自己的分區的臨時文件,并對同一個分區中的鍵值對進行合并和排序。Reduce任務將合并后的結果輸出到最終的輸出文件中。
Shuffle階段是MapReduce計算模型中非常重要的一個階段,它的性能和效率對整個作業的執行時間和性能影響非常大。因此,優化Shuffle階段的性能和效率是MapReduce應用程序優化的一個關鍵方向。
MapTask和ReduceTask對key進行排序是為了方便后續的數據處理和計算。
具體來說,對于MapTask而言,對輸出的key進行排序可以將具有相同key值的記錄聚合在一起,方便ReduceTask進行處理。
而對于ReduceTask而言,對輸入的key進行排序可以讓具有相同key值的記錄相鄰排列,方便進行聚合和計算。
一般來說,在Map任務中,對鍵值對進行快速排序的次數是一次,即將數據寫入環形緩沖區之前對其中的鍵值對進行排序。這是因為,對于同一個Map任務的輸出,在Map輸出的環形緩沖區中進行快速排序即可滿足Reduce任務在Shuffle階段的需求,而不需要進行額外的排序。
在Shuffle階段,如果存在多個環形緩沖區需要合并,Reduce任務會對它們進行歸并排序。這是因為,不同Map任務的輸出在Shuffle階段需要合并,而這些輸出之間的順序是無序的,因此需要進行排序以便進行合并。這次排序是對整個數據集進行的,而不是對單個Map任務的輸出進行的。
當Reduce任務接收到來自多個Map任務的中間結果時,它會對同一個分區內的所有鍵值對進行排序。這里采用的排序算法一般也是歸并排序,因為歸并排序的時間復雜度是O(nlogn),且適合對大量數據進行排序。這次排序也是對整個數據集進行的,而不是對單個Map任務的輸出進行的。
因此,總體來說,Shuffle階段需要進行多次排序,具體排序的次數可能因具體實現而有所不同。但無論是哪種具體實現,Shuffle階段都需要對整個數據集進行排序以便后續的計算和處理。
關于“Java大數據開發Hadoop MapReduce的優缺點是什么”這篇文章的內容就介紹到這里,感謝各位的閱讀!相信大家對“Java大數據開發Hadoop MapReduce的優缺點是什么”知識都有一定的了解,大家如果還想學習更多知識,歡迎關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。