您好,登錄后才能下訂單哦!
這篇文章主要介紹“MapReduce基本原理是什么”,在日常操作中,相信很多人在MapReduce基本原理是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”MapReduce基本原理是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
是一個分布式運算程序編程框架。核心功能是將用戶編寫的業務邏輯代碼和自帶的默認組件整合成一個完整的分布式程序,并發運行在一個hadoop集群上。
(1)優點
1>易于編程:以普通程序的編程方法加上使用MapReduce提供的接口,可以快速完成分布式程序的編寫。
2>良好的擴展性:計算資源得不到滿足時,可以通過簡單的增加計算機器來擴展計算能力
3>高容錯性:如果一個任務所在計算節點掛了,上面的計算任務可以自動轉移到另外的節點上執行,即故障自動轉移,這個過程是內部完成的,無需人工干預
4>適合PB級別以上數據的離線處理
(2)缺點
1>實時計算:無法像mysql一樣在毫秒級或者秒級返回計算結果
2>流式計算:流式計算的輸入數據是動態的,而MapReduce要求輸入數據是靜態的,已經持久化在存儲上的。
3>DAG(有向無環圖)計算:多個應用程序存在依賴關系,后一個應用程序的輸入為前一個的輸出,這種情況下,MapReduce的性能很低。因為MapReduce的每個階段的輸出結果都會先寫入到磁盤中,大量的磁盤IO會造成性能的急劇下降。
核心思想就是分為map和reduce兩個階段。
1)首先將輸出的數據進行切片處理,然后各個切片數據分給獨立的一個map task任務。map內部根據業務邏輯對數據進行統計處理。每個map task之間互不影響。
2)接著就是將所有map task 的輸出作為 reduce task的輸入(reduce task的數量與分區有關,后面細講),將各個map task的局部統計匯總成全局統計,最終完成結果輸出
3)MapReduce編程模型中只能由一個map和reduce階段,多個MapReduce程序只能串行運行,無法并行運行
基本概述:
1)當我們編寫完MR作業后,需要通過JobClient來提交一個job,提交的信息會發送到JobTracker模塊,這個模塊是第一代MapReduce計算框架的核心之一,它負責與集群中的其他節點維持心跳,為提交的作業分配資源,管理提交的作業的正常運作(失敗,重啟等)。
2)第一代MapReduce的另一個核心的功能是TaskTracker,在各個TaskTracker安裝節點上,它的主要功能是監控自己所在節點的資源使用情況。
3)TaskTracker監控當前節點的Tasks的運行情況,其中包含Map Task和Reduce Task,最后由Reduce Task到Reduce階段,將結果輸送到HDFS的文件系統中;其中的具體流程如圖中描述的1-7步驟。TaskTracker在監控期間,需要把這些信息通過心跳機制發送給JobTracker,JobTracker收集到這些信息后,給新提交的作業分配其他的資源,避免重復資源分配。
缺點:
1)JobTracker是第一代MapReduce的入口點,若是JobTracker服務宕機,整個服務將會癱瘓,存在單點問題。
2)JobTracker負責的事情太多,完成來太多的任務,占用過多的資源,當Job數非常多的時候,會消耗很多內存,容易出現性能瓶頸。
3)對TaskTracker而言,Task擔當的角色過于簡單,沒有考慮到CPU及內存的使用情況,若存在多個大內存的Task被集中調度,容易出現內存溢出。
4)另外,TaskTracker把資源強制分為map task slot和reduce task slot,若是MR任務中只存在其中一個(map或是reduce),會出現資源浪費的情況,資源利用率低。也就是說資源是靜態分配的
V2比起V1最大的不同就是增加了 yarn 這個組件。
架構重構的基本思想在于將JobTracker的兩個核心的功能單獨分離成獨立的組件了。分離后的組件分別為資源管理(Applications Manager)和任務調度器(Resource Scheduler)。新的資源管理器(Resource Manager)管理整個系統的資源分配,而每一個Node Manager下的App Master(Application Master)負責對應的調度和協調工作(每個MapReduce任務都有一個對應的app master),而在實際中,App Master從Resource Manager上獲得資源,讓Node Manager來協同工作和任務監控。
對比于MR V1中的Task的監控,重啟等內熱都交由App Master來處理,Resource Manager提供中心服務,負責資源的分配與調度。Node Manager負責維護Container的狀態,并將收集的信息上報給Resource Manager,以及負責和Resource Manager維持心跳。
優點:
1)減少資源消耗,讓監控每一個作業更加分布式了。
2)加入了yarn之后,支持更多的編程模型,比如spark等
3)將資源以內存量的概念來描述,比V1中的slot更加合理,而且資源都是動態分配
4)資源的調度和分配更加有層次化,RM負責總的資源管理和調度,每個節點上的appMaster負責當前節點的資源管理和調度
其中上面從第7步到16步稱為shuffle機制,
1)maptask收集我們的map()方法輸出的kv對,放到內存緩沖區中
2)從內存緩沖區不斷溢出本地磁盤文件,可能會溢出多個文件
3)多個溢出文件會被合并成大的溢出文件
4)在溢出過程中,及合并的過程中,都要調用partitioner進行分區和針對key進行排序
5)reducetask根據自己的分區號,去各個maptask機器上取相應的結果分區數據
6)reducetask會取到同一個分區的來自不同maptask的結果文件,reducetask會將這些文件再進行合并(歸并排序)
7)合并成大文件后,shuffle的過程也就結束了,后面進入reducetask的邏輯運算過程(從文件中取出一個一個的鍵值對group,調用用戶自定義的reduce()方法)
由MapReduce的工作流程可以知道,maptask的數量決定于切片的數量,所以我們看看切片的原理。
在MapReduce的工作流程中,在對數據進行map運算前,會先對數據進行切片處理,然后每一片交給一個獨立的map task進行處理。那么map task是如何獲取到切片實現類的呢?
首先 MapTask是以 run 方法為入口開始map任務的。
/* MapTask.java */ public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, ClassNotFoundException, InterruptedException { //此處省略好多代碼,直接看這個方法,其實就是新舊api的兼容 this.runNewMapper(job, this.splitMetaInfo, umbilical, reporter); } else { this.runOldMapper(job, this.splitMetaInfo, umbilical, reporter); } this.done(umbilical, reporter); } } //下面是 runNewMapper 方法 private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewMapper(JobConf job, TaskSplitIndex splitIndex, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException, InterruptedException { ................ //這里就看到獲取 inputFormat 的實現類,關鍵就在于 taskContext這對象,它的類是 TaskAttemptContextImpl InputFormat<INKEY, INVALUE> inputFormat = (InputFormat)ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); } /* TaskAttemptContextImpl.java 繼承 JobContextImpl類 JobContextImpl 實現了 JobContext 接口,該接口定義很多set和get方法,用于配置job上下文對象的 */ public class JobContextImpl implements JobContext { public Class<? extends InputFormat<?, ?>> getInputFormatClass() throws ClassNotFoundException { //可以看到這里就是從conf對象中獲取inputformat.class,默認值就是TextInputFormat return this.conf.getClass("mapreduce.job.inputformat.class", TextInputFormat.class); } }
由此我們可以看到,默認處理輸入數據的類是 TextInputFormat,但是這個類并沒有實現切片方法,在它的父類 FileInputFormat中實現了切片方法:
/* FileInputFormat.java */ public List<InputSplit> getSplits(JobContext job) throws IOException { StopWatch sw = (new StopWatch()).start(); long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); //這個就是存儲切片信息的數組 List<InputSplit> splits = new ArrayList(); //獲取輸入路徑的所有文件 List<FileStatus> files = this.listStatus(job); Iterator i$ = files.iterator(); while(true) { while(true) { while(i$.hasNext()) { FileStatus file = (FileStatus)i$.next(); Path path = file.getPath(); long length = file.getLen(); if (length != 0L) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { //獲取文件塊信息 blkLocations = ((LocatedFileStatus)file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0L, length); } //從這里開始就正式切片 if (this.isSplitable(job, path)) { long blockSize = file.getBlockSize(); //獲取切片大小 long splitSize = this.computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining; int blkIndex; //循環對文件進行切片,可以看到這里是判斷文件剩余部分是否大于1.1倍的切片大小的 for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); //將文件,切片起始和終止位置,切片大小,切片的block所在主機等記錄到切片數組中作為切片信息。 splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } //這里是將文件最后的內容作為最后一個切片添加到切片規劃中 if (bytesRemaining != 0L) { blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining); splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); } } else { splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { splits.add(this.makeSplit(path, 0L, length, new String[0])); } } job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size()); sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits; } } } /* 這個方法是決定切片大小的,簡單說主要決定于 maxsize和blocksize 的大小, maxsize > blockSize, 則 splitsize = blockSize maxsize < blockSize, 則 splitsize = maxsize minSize>blockSize,則 splitsize = minSize minSize<blockSize,則 splitsize = blockSize 當然要注意的是,maxsize需要是永遠大于minSize的 */ protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return Math.max(minSize, Math.min(maxSize, blockSize)); }
上面的切片值是規劃而已,并沒有真正的切片,而是當job提交給yarn執行的之后,才會真正按照切片規劃進行數據讀取。上述切片的特點總結如下:
1)按照文件的內容的長度進行切片
2)切片是按照每個文件獨立進行切片,并不會將所有文件當做一個整體去切片,這樣有缺點(后面講)
3)切片大小:默認為blocksize,計算機制如上,這里不重復
FileInputFormat.setMaxInputSplitSize(); maxsize FileInputFormat.setMinIutputSplitSize(); minsize
可以通過設置兩個值來改變切片大小
4)切片的方式:根據源碼,每次切片時,都會判斷切完剩下的部分是否大于splitSize的1.1倍,如果不大于,那么此時切片就終止,并將剩下的部分作為最后一個切片。
我們從(2)中可以知道,TextInputFormat(FileInputFormat)切片時是按照文件進行切片的,也就是說一個文件至少是一個切片,無論文件的大小是多大。而如果有大量的小文件,那么就會生成很多個maptask,處理效率很低。對于這種情況,解決方案為:
1)從數據源頭上解決,將數據合并后再上傳至HDFS,不產生大量小文件
2)如果必須處理大量小文件,那么就采用CombineTextInputFormat來進行切片。
切片邏輯如下(源碼挺長的,下面直接說我研究源碼后的結果):
首先CombineTextInputFormat沒有實現 getSplit() 方法,而是由它的父類 CombineFileInputformat實現的,它會將一個目錄下的多個文件作為一個整體的數據源進行切片,切片的大小取決于 MaxSplitSize 設定的最大切片大小大小,單位是byte。切片邏輯為
totalSize<=1.5*MaxSplitSize 1片, splitSize=totalSize 1.5*MaxSplitSize<totalsize<2*MaxSplitSize 2片,splitSize=MaxSplitSize totalsize>2*MaxSplitSize n片,splitSize=MaxSplitSize 要注意的是: 如果總的數據大小遠大于MaxSplitSize時,切到最后一片的時候,會判斷切片后,剩下的部分是否大于2倍MaxSplitSize,如果不大于,就算作一片,如果大于就兩片
使用 CombineTextInputFormat 作為InpuFormat的操作類:
//設置 InputFormat的類為CombineTextInputFormat job.setInputFormatClass(CombineTextInputFormat.class); //分別設置切片最大值和最小值 CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
前面說到,map task的數量決定于切片的數量,那么reduce task的數量決定于什么呢?決定于分區的數量。
1)首先需要自定義一個分區類,并繼承 Partitioner<key,value>
2)重寫public Int getPartition() 方法。返回的是分區號
3)在job中設置自定義的類為分區類,否則默認的分區類就是HashPartitioner
job.setPartitionerClass(CustomPartitioner.class);
4)設置reduce task數量,一般和分區數相同 ,
job.setNumReduceTasks(N);
注意:分區數和reduce task數的聯系
如果reduceTask的數量> getPartition的結果數,則會多產生幾個空的輸出文件part-r-000xx;
如果1<reduceTask的數量<getPartition的結果數,則有一部分分區數據無處安放,會Exception;
如果reduceTask的數量=1,則不管mapTask端輸出多少個分區文件,最終結果都交給這一個reduceTask,最終也就只會產生一個結果文件 part-r-00000;
public class ProvincePartitioner extends Partitioner<Text, FlowBean> { @Override public int getPartition(Text key, FlowBean value, int numPartitions) { // 1 獲取電話號碼的前三位 String preNum = key.toString().substring(0, 3); //默認分區號,如果都不符合下面條件,則KV劃分到這個分區 int partition = 4; // 2 判斷是哪個省 if ("136".equals(preNum)) { partition = 0; }else if ("137".equals(preNum)) { partition = 1; }else if ("138".equals(preNum)) { partition = 2; }else if ("139".equals(preNum)) { partition = 3; } return partition; } }
到此,關于“MapReduce基本原理是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。