您好,登錄后才能下訂單哦!
本篇內容主要講解“MapReduce的output輸出過程是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“MapReduce的output輸出過程是什么”吧!
//--------------------------ReduceTask.java public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException { job.setBoolean("mapreduce.job.skiprecords", this.isSkipping()); if (this.isMapOrReduce()) { this.copyPhase = this.getProgress().addPhase("copy"); this.sortPhase = this.getProgress().addPhase("sort"); this.reducePhase = this.getProgress().addPhase("reduce"); } TaskReporter reporter = this.startReporter(umbilical); boolean useNewApi = job.getUseNewReducer(); //reducetask初始化工作 this.initialize(job, this.getJobID(), reporter, useNewApi); if (this.jobCleanup) { this.runJobCleanupTask(umbilical, reporter); } else if (this.jobSetup) { this.runJobSetupTask(umbilical, reporter); } else if (this.taskCleanup) { this.runTaskCleanupTask(umbilical, reporter); } else { this.codec = this.initCodec(); RawKeyValueIterator rIter = null; ShuffleConsumerPlugin shuffleConsumerPlugin = null; Class combinerClass = this.conf.getCombinerClass(); CombineOutputCollector combineCollector = null != combinerClass ? new CombineOutputCollector(this.reduceCombineOutputCounter, reporter, this.conf) : null; Class<? extends ShuffleConsumerPlugin> clazz = job.getClass("mapreduce.job.reduce.shuffle.consumer.plugin.class", Shuffle.class, ShuffleConsumerPlugin.class); shuffleConsumerPlugin = (ShuffleConsumerPlugin)ReflectionUtils.newInstance(clazz, job); LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin); Context shuffleContext = new Context(this.getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, this.codec, combinerClass, combineCollector, this.spilledRecordsCounter, this.reduceCombineInputCounter, this.shuffledMapsCounter, this.reduceShuffleBytes, this.failedShuffleCounter, this.mergedMapOutputsCounter, this.taskStatus, this.copyPhase, this.sortPhase, this, this.mapOutputFile, this.localMapFiles); shuffleConsumerPlugin.init(shuffleContext); rIter = shuffleConsumerPlugin.run(); this.mapOutputFilesOnDisk.clear(); this.sortPhase.complete(); this.setPhase(Phase.REDUCE); this.statusUpdate(umbilical); Class keyClass = job.getMapOutputKeyClass(); Class valueClass = job.getMapOutputValueClass(); RawComparator comparator = job.getOutputValueGroupingComparator(); //開始運行reducetask if (useNewApi) { this.runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } else { this.runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass); } shuffleConsumerPlugin.close(); this.done(umbilical, reporter); }
和MapTask類似,主要有 this.initialize() 以及 this.runNewReducer() 這兩個方法。做了初始化以及開始運行task的操作。
//----------------------------------------ReduceTask.java public void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException { //創建上下文對象 this.jobContext = new JobContextImpl(job, id, reporter); this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter); //修改reducetask的狀態為運行中 if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) { this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING); } if (useNewApi) { if (LOG.isDebugEnabled()) { LOG.debug("using new api for output committer"); } //反射獲取outputformat類對象。getOutputFormatClass這個方法在JobContextImpl中。 //默認是TextOutputFormat.class this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job); this.committer = this.outputFormat.getOutputCommitter(this.taskContext); } else { this.committer = this.conf.getOutputCommitter(); } //獲取輸出路徑 Path outputPath = FileOutputFormat.getOutputPath(this.conf); if (outputPath != null) { if (this.committer instanceof FileOutputCommitter) { FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext)); } else { FileOutputFormat.setWorkOutputPath(this.conf, outputPath); } } this.committer.setupTask(this.taskContext); Class<? extends ResourceCalculatorProcessTree> clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class); this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf); LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree); if (this.pTree != null) { this.pTree.updateProcessTree(); this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime(); } }
主要就是初始化上下文對象,獲取outputformat對象。
//-----------------------------------------------ReduceTask.java private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewReducer(JobConf job, TaskUmbilicalProtocol umbilical, final TaskReporter reporter, final RawKeyValueIterator rIter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass) throws IOException, InterruptedException, ClassNotFoundException { //匿名內部類,用于構建key,value的迭代器 rIter = new RawKeyValueIterator() { public void close() throws IOException { rIter.close(); } public DataInputBuffer getKey() throws IOException { return rIter.getKey(); } public Progress getProgress() { return rIter.getProgress(); } public DataInputBuffer getValue() throws IOException { return rIter.getValue(); } public boolean next() throws IOException { boolean ret = rIter.next(); reporter.setProgress(rIter.getProgress().getProgress()); return ret; } }; TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter); //反射獲取Reducer對象 org.apache.hadoop.mapreduce.Reducer<INKEY, INVALUE, OUTKEY, OUTVALUE> reducer = (org.apache.hadoop.mapreduce.Reducer)ReflectionUtils.newInstance(taskContext.getReducerClass(), job); //獲取RecordWriter對象,用于將結果寫入到文件中 org.apache.hadoop.mapreduce.RecordWriter<OUTKEY, OUTVALUE> trackedRW = new ReduceTask.NewTrackingRecordWriter(this, taskContext); job.setBoolean("mapred.skip.on", this.isSkipping()); job.setBoolean("mapreduce.job.skiprecords", this.isSkipping()); //創建reduceContext對象,用于reduce任務 org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, this.getTaskID(), rIter, this.reduceInputKeyCounter, this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass); //開始運行reduce try { reducer.run(reducerContext); } finally { //關閉輸出流 trackedRW.close(reducerContext); } }
可以看到,主要做了以下工作:
1)獲取reducer對象,用于運行run() ,也就是運行reduce方法
2)創建 RecordWriter對象
3)創建reduceContext
4)開始運行reducer中的run
//--------------------------------------NewTrackingRecordWriter.java static class NewTrackingRecordWriter<K, V> extends org.apache.hadoop.mapreduce.RecordWriter<K, V> { private final org.apache.hadoop.mapreduce.RecordWriter<K, V> real; private final org.apache.hadoop.mapreduce.Counter outputRecordCounter; private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter; private final List<Statistics> fsStats; NewTrackingRecordWriter(ReduceTask reduce, TaskAttemptContext taskContext) throws InterruptedException, IOException { this.outputRecordCounter = reduce.reduceOutputCounter; this.fileOutputByteCounter = reduce.fileOutputByteCounter; List<Statistics> matchedStats = null; if (reduce.outputFormat instanceof FileOutputFormat) { matchedStats = Task.getFsStatistics(FileOutputFormat.getOutputPath(taskContext), taskContext.getConfiguration()); } this.fsStats = matchedStats; long bytesOutPrev = this.getOutputBytes(this.fsStats); //通過outputFormat創建RecordWriter對象 this.real = reduce.outputFormat.getRecordWriter(taskContext); long bytesOutCurr = this.getOutputBytes(this.fsStats); this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev); } ..................... }
重點的就是通過outputFormat.getRecordWriter來創建 RecordWriter 對象。
上面也說到,outputFormat默認就是 TextOutputFormat,所以下面看看
TextOutputFormat.getRecordWriter()
public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> { public TextOutputFormat() { } //可以看到,返回的是靜態內部類TextOutputFormat.LineRecordWriter public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException { boolean isCompressed = getCompressOutput(job); //key和value的分隔符,默認是 \t String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t"); //分為壓縮和非壓縮輸出 if (!isCompressed) { //獲取輸出路徑 Path file = FileOutputFormat.getTaskOutputPath(job, name); FileSystem fs = file.getFileSystem(job); //創建輸出流 FSDataOutputStream fileOut = fs.create(file, progress); return new TextOutputFormat.LineRecordWriter(fileOut, keyValueSeparator); } else { Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class); CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job); Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension()); FileSystem fs = file.getFileSystem(job); FSDataOutputStream fileOut = fs.create(file, progress); //返回LineRecordWriter對象 return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator); } } //這里就是 LineRecordWriter 類 protected static class LineRecordWriter<K, V> implements RecordWriter<K, V> { private static final byte[] NEWLINE; protected DataOutputStream out; private final byte[] keyValueSeparator; public LineRecordWriter(DataOutputStream out, String keyValueSeparator) { this.out = out; this.keyValueSeparator = keyValueSeparator.getBytes(StandardCharsets.UTF_8); } public LineRecordWriter(DataOutputStream out) { this(out, "\t"); } private void writeObject(Object o) throws IOException { if (o instanceof Text) { Text to = (Text)o; this.out.write(to.getBytes(), 0, to.getLength()); } else { this.out.write(o.toString().getBytes(StandardCharsets.UTF_8)); } } //將KV輸出 public synchronized void write(K key, V value) throws IOException { boolean nullKey = key == null || key instanceof NullWritable; boolean nullValue = value == null || value instanceof NullWritable; if (!nullKey || !nullValue) { //先寫key if (!nullKey) { this.writeObject(key); } //接著寫入key和value之間的分隔符 if (!nullKey && !nullValue) { this.out.write(this.keyValueSeparator); } //最后寫入value if (!nullValue) { this.writeObject(value); } //接著寫入新的一行 this.out.write(NEWLINE); } } public synchronized void close(Reporter reporter) throws IOException { this.out.close(); } static { NEWLINE = "\n".getBytes(StandardCharsets.UTF_8); } } }
可以看到,最終返回的RecordWriter對象是 LineRecordWriter 類型的。
接著回到3中,看 reduceContext這個對象的類
protected static <INKEY, INVALUE, OUTKEY, OUTVALUE> Reducer<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context createReduceContext(Reducer<INKEY, INVALUE, OUTKEY, OUTVALUE> reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, RecordWriter<OUTKEY, OUTVALUE> output, OutputCommitter committer, StatusReporter reporter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass) throws IOException, InterruptedException { ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> reduceContext = new ReduceContextImpl(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass); Reducer<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context reducerContext = (new WrappedReducer()).getReducerContext(reduceContext); return reducerContext; }
可以看到reducerContext是一個ReduceContextImpl類對象。
下面看看ReduceContextImpl 這個類的構造方法
//---------------------------------ReduceContextImpl.java public ReduceContextImpl(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter<KEYOUT, VALUEOUT> output, OutputCommitter committer, StatusReporter reporter, RawComparator<KEYIN> comparator, Class<KEYIN> keyClass, Class<VALUEIN> valueClass) throws InterruptedException, IOException { //父類是 TaskInputOutputContextImpl,把outputformat對象傳遞進去了 super(conf, taskid, output, committer, reporter); this.input = input; this.inputKeyCounter = inputKeyCounter; this.inputValueCounter = inputValueCounter; this.comparator = comparator; this.serializationFactory = new SerializationFactory(conf); this.keyDeserializer = this.serializationFactory.getDeserializer(keyClass); this.keyDeserializer.open(this.buffer); this.valueDeserializer = this.serializationFactory.getDeserializer(valueClass); this.valueDeserializer.open(this.buffer); this.hasMore = input.next(); this.keyClass = keyClass; this.valueClass = valueClass; this.conf = conf; this.taskid = taskid; }
這里面,它繼續調用了父類的構造方法,把outputformat對象傳遞進去了。
繼續看看父類 TaskInputOutputContextImpl
public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid, RecordWriter<KEYOUT, VALUEOUT> output, OutputCommitter committer, StatusReporter reporter) { //可以看到這里的output就是recordWriter對象 super(conf, taskid, reporter); this.output = output; this.committer = committer; } //這里的邏輯其實就是先讀取KV到 this.key和this.value中,如果沒有KV就返回false,否則返回true public abstract boolean nextKeyValue() throws IOException, InterruptedException; public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; //調用recordWriter的write方法,將KV輸出,默認是LineRecordWriter這個類 public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException { this.output.write(key, value);
可以看到,這里有3個抽象方法(在子類ReduceContextImpl中實現了邏輯,和RecordWriter無關),以及write這個具體方法。分別用于獲取KV以及將結果KV寫入。write這個寫入方法,就是調用的 recordWriter的write方法,也就是5中創建的LineRecordWriter對象中的write方法,將KV輸出。
public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException { this.setup(context); try { while(context.nextKey()) { this.reduce(context.getCurrentKey(), context.getValues(), context); Iterator<VALUEIN> iter = context.getValues().iterator(); if (iter instanceof ValueIterator) { ((ValueIterator)iter).resetBackupStore(); } } } finally { this.cleanup(context); } }
可以看到,這里就是調用6中創建的 reduceContext中的方法來獲取KV。而且在reduce方法中,我們會通過 context.write(key,value)來將結果KV輸出。調用的其實就是 LineRecordWriter對象中的write方法。
到此,相信大家對“MapReduce的output輸出過程是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。