您好,登錄后才能下訂單哦!
這篇文章主要講解了“MapTask流程是怎樣的”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“MapTask流程是怎樣的”吧!
1、從job提交流程的24步,開始mapTask的流程分析,進入submitJob --LocalJobRunner.java中的788行 Job job = new Job(JobID.downgrade(jobid), jobSubmitDir); //創建一個可以真正執行的Job 該Job: LocalJobRunner$Job , 且是一個線程 $表示內部類
2、因為當前的Job對象是一個線程,所有執行線程要執行run方法,因此直接找到 LocalJobRunner的run方法進行查看 --定位到537行 TaskSplitMetaInfo[] taskSplitMetaInfos = SplitMetaInfoReader.readSplitMetaInfo(jobId, localFs, conf, systemJobDir); //讀取切片的metainfo信息,即提交job過程中在臨時目錄中生成的job.splitmetainfo文件
3、向下走斷點,定位到下方代碼 --547行 List<RunnableWithThrowable> mapRunnables = getMapTaskRunnables( taskSplitMetaInfos, jobId, mapOutputFiles); //根據切片的metainfo信息,可以得出有多少個切片,再生成對應個數的Runnable對象. 每個Runnable對象對應一個線程,每一個MapTask運行在一個線程中(基于本地模式的分析) Runnable : LocalJobRunnber$Job$MapTaskRunnable ---聯想到線程
4、ExecutorService mapService = createMapExecutor(); //創建線程池對象 runTasks(mapRunnables, mapService, "map");// 將所有的LocalJobRunnber$Job$MapTaskRunnable對象提交給 線程池執行,進入到runTasks方法內部。 --LocalJobRunner中的466行
5、//每個線程負責一個Runnable執行,定位到每個Runnable內部的run方法,查看具體執行(以內部類的方式嵌套) for (Runnable r : runnables) { service.submit(r); } LocalJobRunnber$Job$MapTaskRunnable交給每個線程執行時,會執行到 LocalJobRunnber$Job$MapTaskRunnable的run方法,因此接下來看 LocalJobRunnber$Job$MapTaskRunnable的run方法 --LocalJobRunner中的248行
6、進入到run方法內部,定位到254行 MapTask map = new MapTask(systemJobFile.toString(), mapId, taskId, info.getSplitIndex(), 1); //創建MapTask對象 --在每一個線程中都會執行,會創建一個mapTask對象
7、進入map.run(localConf, Job.this); --271行 //執行MapTask的run方法,關聯到MapTask方法中的run
進入到MapTask的run方法內 首先進行分區設置 partitions = jobContext.getNumReduceTasks(); if (partitions > 1) { partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>) ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job); } else { partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() { @Override public int getPartition(K key, V value, int numPartitions) { return partitions - 1; } }; } 8、定位到MapTask中run方法的347行,并進入runNewMapper()方法,提前判斷下是否使用新的api 進入runNewMapper()方法,定位到MapTask的745行開始讀源碼
9、--反射的方式創建Mapper對象. 例如: WordCountMapper org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper = (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>) ReflectionUtils.newInstance(taskContext.getMapperClass(), job); --反射的方式創建Inputformat對象, 例如: TextInputFormat(默認) org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat = (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>) ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job); --獲取當前MapTask所負責的切片信息 org.apache.hadoop.mapreduce.InputSplit split = null; split = getSplitDetails(new Path(splitIndex.getSplitLocation()), splitIndex.getStartOffset()); --獲取RecordReader對象 org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input = new NewTrackingRecordReader<INKEY,INVALUE> (split, inputFormat, reporter, taskContext);
10、向下讀取,定位到MapTask的782行 output = new NewOutputCollector(taskContext, job, umbilical, reporter);方法進入
11、定位到MapTask的710行 collector = createSortingCollector(job, reporter); //收集器對象,可以理解為緩沖區對象 12、進入到createSortingCollector方法, --MapTask中的388行 13、collector.init(context); --初始化緩沖區對象 collector: MapTask$MapOutputBuffer 14、進入到init方法中 --MapTask的968行
15、 ①:定位到init方法的980行 --//獲取溢寫百分比 80%,通過mapreduce.map.sort.spill.percent參數來配置 final float spillper = job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); --//獲取緩沖區大小 100M, 通過 mapreduce.task.io.sort.mb 參數來配置 final int sortmb = job.getInt(MRJobConfig.IO_SORT_MB, MRJobConfig.DEFAULT_IO_SORT_MB); --//獲取排序對象 QuickSort.class, 只排索引 sorter = ReflectionUtils.newInstance(job.getClass( MRJobConfig.MAP_SORT_CLASS, QuickSort.class, IndexedSorter.class), job); --//獲取key的比較器對象 comparator = job.getOutputKeyComparator(); --//獲取key的序列化對象 k/v serialization 獲取kv的序列化對象 --//獲取計數器對象 output counters --//compression 獲取編解碼器,進行壓縮操作 --//combiner 獲取Combiner對象,在溢寫及歸并可以使用combiner --//spillThread.start(); 啟動溢寫線程 ,只有達到溢寫百分比才會發生溢寫操作
16、mapper.run(mapperContext);執行到Mapper對象中的run方法,例如WordCountMapper中的run方法 進入到mapper.run()方法內 執行 setup(context); --143行 執行 map(context.getCurrentKey(), context.getCurrentValue(), context); --146行, 進入到wordCount中的map()方法,是一個循環執行的過程 context.wirte(outK,outV);將map方法中處理好的kv寫出 執行cleanup(context);
感謝各位的閱讀,以上就是“MapTask流程是怎樣的”的內容了,經過本文的學習后,相信大家對MapTask流程是怎樣的這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。