91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

MapReduce的output輸出過程是什么

發布時間:2021-12-31 16:20:49 來源:億速云 閱讀:141 作者:iii 欄目:大數據

本篇內容主要講解“MapReduce的output輸出過程是什么”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“MapReduce的output輸出過程是什么”吧!

1、首先看 ReduceTask.run()  這個執行入口

//--------------------------ReduceTask.java
public void run(JobConf job, TaskUmbilicalProtocol umbilical) throws IOException, InterruptedException, ClassNotFoundException {
    job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());
    if (this.isMapOrReduce()) {
        this.copyPhase = this.getProgress().addPhase("copy");
        this.sortPhase = this.getProgress().addPhase("sort");
        this.reducePhase = this.getProgress().addPhase("reduce");
    }

    TaskReporter reporter = this.startReporter(umbilical);
    boolean useNewApi = job.getUseNewReducer();
    //reducetask初始化工作
    this.initialize(job, this.getJobID(), reporter, useNewApi);
    if (this.jobCleanup) {
        this.runJobCleanupTask(umbilical, reporter);
    } else if (this.jobSetup) {
        this.runJobSetupTask(umbilical, reporter);
    } else if (this.taskCleanup) {
        this.runTaskCleanupTask(umbilical, reporter);
    } else {
        this.codec = this.initCodec();
        RawKeyValueIterator rIter = null;
        ShuffleConsumerPlugin shuffleConsumerPlugin = null;
        Class combinerClass = this.conf.getCombinerClass();
        CombineOutputCollector combineCollector = null != combinerClass ? new CombineOutputCollector(this.reduceCombineOutputCounter, reporter, this.conf) : null;
        Class<? extends ShuffleConsumerPlugin> clazz = job.getClass("mapreduce.job.reduce.shuffle.consumer.plugin.class", Shuffle.class, ShuffleConsumerPlugin.class);
        shuffleConsumerPlugin = (ShuffleConsumerPlugin)ReflectionUtils.newInstance(clazz, job);
        LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
        Context shuffleContext = new Context(this.getTaskID(), job, FileSystem.getLocal(job), umbilical, super.lDirAlloc, reporter, this.codec, combinerClass, combineCollector, this.spilledRecordsCounter, this.reduceCombineInputCounter, this.shuffledMapsCounter, this.reduceShuffleBytes, this.failedShuffleCounter, this.mergedMapOutputsCounter, this.taskStatus, this.copyPhase, this.sortPhase, this, this.mapOutputFile, this.localMapFiles);
        shuffleConsumerPlugin.init(shuffleContext);
        rIter = shuffleConsumerPlugin.run();
        this.mapOutputFilesOnDisk.clear();
        this.sortPhase.complete();
        this.setPhase(Phase.REDUCE);
        this.statusUpdate(umbilical);
        Class keyClass = job.getMapOutputKeyClass();
        Class valueClass = job.getMapOutputValueClass();
        RawComparator comparator = job.getOutputValueGroupingComparator();
        //開始運行reducetask
        if (useNewApi) {
            this.runNewReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
        } else {
            this.runOldReducer(job, umbilical, reporter, rIter, comparator, keyClass, valueClass);
        }

        shuffleConsumerPlugin.close();
        this.done(umbilical, reporter);
    }

和MapTask類似,主要有 this.initialize() 以及 this.runNewReducer() 這兩個方法。做了初始化以及開始運行task的操作。

2、this.initialize()

//----------------------------------------ReduceTask.java
public void initialize(JobConf job, JobID id, Reporter reporter, boolean useNewApi) throws IOException, ClassNotFoundException, InterruptedException {

    //創建上下文對象
    this.jobContext = new JobContextImpl(job, id, reporter);
    this.taskContext = new TaskAttemptContextImpl(job, this.taskId, reporter);
    //修改reducetask的狀態為運行中
    if (this.getState() == org.apache.hadoop.mapred.TaskStatus.State.UNASSIGNED) {
        this.setState(org.apache.hadoop.mapred.TaskStatus.State.RUNNING);
    }

    if (useNewApi) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("using new api for output committer");
        }

        //反射獲取outputformat類對象。getOutputFormatClass這個方法在JobContextImpl中。
        //默認是TextOutputFormat.class
        this.outputFormat = (OutputFormat)ReflectionUtils.newInstance(this.taskContext.getOutputFormatClass(), job);
        this.committer = this.outputFormat.getOutputCommitter(this.taskContext);
    } else {
        this.committer = this.conf.getOutputCommitter();
    }

    //獲取輸出路徑
    Path outputPath = FileOutputFormat.getOutputPath(this.conf);
    if (outputPath != null) {
        if (this.committer instanceof FileOutputCommitter) {
            FileOutputFormat.setWorkOutputPath(this.conf, ((FileOutputCommitter)this.committer).getTaskAttemptPath(this.taskContext));
        } else {
            FileOutputFormat.setWorkOutputPath(this.conf, outputPath);
        }
    }

    this.committer.setupTask(this.taskContext);
    Class<? extends ResourceCalculatorProcessTree> clazz = this.conf.getClass("mapreduce.job.process-tree.class", (Class)null, ResourceCalculatorProcessTree.class);
    this.pTree = ResourceCalculatorProcessTree.getResourceCalculatorProcessTree((String)System.getenv().get("JVM_PID"), clazz, this.conf);
    LOG.info(" Using ResourceCalculatorProcessTree : " + this.pTree);
    if (this.pTree != null) {
        this.pTree.updateProcessTree();
        this.initCpuCumulativeTime = this.pTree.getCumulativeCpuTime();
    }

}

主要就是初始化上下文對象,獲取outputformat對象。

3、this.runNewReducer()

//-----------------------------------------------ReduceTask.java
private <INKEY, INVALUE, OUTKEY, OUTVALUE> void runNewReducer(JobConf job, TaskUmbilicalProtocol umbilical, final TaskReporter reporter, final RawKeyValueIterator rIter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass) throws IOException, InterruptedException, ClassNotFoundException {
    //匿名內部類,用于構建key,value的迭代器
    rIter = new RawKeyValueIterator() {
        public void close() throws IOException {
            rIter.close();
        }

        public DataInputBuffer getKey() throws IOException {
            return rIter.getKey();
        }

        public Progress getProgress() {
            return rIter.getProgress();
        }

        public DataInputBuffer getValue() throws IOException {
            return rIter.getValue();
        }

        public boolean next() throws IOException {
            boolean ret = rIter.next();
            reporter.setProgress(rIter.getProgress().getProgress());
            return ret;
        }
    };
    TaskAttemptContext taskContext = new TaskAttemptContextImpl(job, this.getTaskID(), reporter);
    //反射獲取Reducer對象
    org.apache.hadoop.mapreduce.Reducer<INKEY, INVALUE, OUTKEY, OUTVALUE> reducer = (org.apache.hadoop.mapreduce.Reducer)ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
    //獲取RecordWriter對象,用于將結果寫入到文件中
    org.apache.hadoop.mapreduce.RecordWriter<OUTKEY, OUTVALUE> trackedRW = new ReduceTask.NewTrackingRecordWriter(this, taskContext);
    job.setBoolean("mapred.skip.on", this.isSkipping());
    job.setBoolean("mapreduce.job.skiprecords", this.isSkipping());
    //創建reduceContext對象,用于reduce任務
    org.apache.hadoop.mapreduce.Reducer.Context reducerContext = createReduceContext(reducer, job, this.getTaskID(), rIter, this.reduceInputKeyCounter, this.reduceInputValueCounter, trackedRW, this.committer, reporter, comparator, keyClass, valueClass);

    //開始運行reduce
    try {
        reducer.run(reducerContext);
    } finally {
        //關閉輸出流
        trackedRW.close(reducerContext);
    }

}

可以看到,主要做了以下工作:
1)獲取reducer對象,用于運行run() ,也就是運行reduce方法
2)創建 RecordWriter對象
3)創建reduceContext
4)開始運行reducer中的run

4、ReduceTask.NewTrackingRecordWriter()

//--------------------------------------NewTrackingRecordWriter.java
static class NewTrackingRecordWriter<K, V> extends org.apache.hadoop.mapreduce.RecordWriter<K, V> {
    private final org.apache.hadoop.mapreduce.RecordWriter<K, V> real;
    private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;
    private final org.apache.hadoop.mapreduce.Counter fileOutputByteCounter;
    private final List<Statistics> fsStats;

    NewTrackingRecordWriter(ReduceTask reduce, TaskAttemptContext taskContext) throws InterruptedException, IOException {
        this.outputRecordCounter = reduce.reduceOutputCounter;
        this.fileOutputByteCounter = reduce.fileOutputByteCounter;
        List<Statistics> matchedStats = null;
        if (reduce.outputFormat instanceof FileOutputFormat) {
            matchedStats = Task.getFsStatistics(FileOutputFormat.getOutputPath(taskContext), taskContext.getConfiguration());
        }

        this.fsStats = matchedStats;
        long bytesOutPrev = this.getOutputBytes(this.fsStats);
        //通過outputFormat創建RecordWriter對象
        this.real = reduce.outputFormat.getRecordWriter(taskContext);
        long bytesOutCurr = this.getOutputBytes(this.fsStats);
        this.fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
    }
    .....................
}

重點的就是通過outputFormat.getRecordWriter來創建 RecordWriter 對象。
上面也說到,outputFormat默認就是 TextOutputFormat,所以下面看看
TextOutputFormat.getRecordWriter()

5、TextOutputFormat.getRecordWriter()

public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
    public TextOutputFormat() {
    }

    //可以看到,返回的是靜態內部類TextOutputFormat.LineRecordWriter
    public RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job, String name, Progressable progress) throws IOException {
        boolean isCompressed = getCompressOutput(job);
        //key和value的分隔符,默認是 \t
        String keyValueSeparator = job.get("mapreduce.output.textoutputformat.separator", "\t");
        //分為壓縮和非壓縮輸出
        if (!isCompressed) {
            //獲取輸出路徑
            Path file = FileOutputFormat.getTaskOutputPath(job, name);
            FileSystem fs = file.getFileSystem(job);
            //創建輸出流
            FSDataOutputStream fileOut = fs.create(file, progress);
            return new TextOutputFormat.LineRecordWriter(fileOut, keyValueSeparator);
        } else {
            Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job, GzipCodec.class);
            CompressionCodec codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, job);
            Path file = FileOutputFormat.getTaskOutputPath(job, name + codec.getDefaultExtension());
            FileSystem fs = file.getFileSystem(job);
            FSDataOutputStream fileOut = fs.create(file, progress);
            //返回LineRecordWriter對象
            return new TextOutputFormat.LineRecordWriter(new DataOutputStream(codec.createOutputStream(fileOut)), keyValueSeparator);
        }
    }

    //這里就是 LineRecordWriter 類
    protected static class LineRecordWriter<K, V> implements RecordWriter<K, V> {
        private static final byte[] NEWLINE;
        protected DataOutputStream out;
        private final byte[] keyValueSeparator;

        public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
            this.out = out;
            this.keyValueSeparator = keyValueSeparator.getBytes(StandardCharsets.UTF_8);
        }

        public LineRecordWriter(DataOutputStream out) {
            this(out, "\t");
        }

        private void writeObject(Object o) throws IOException {
            if (o instanceof Text) {
                Text to = (Text)o;
                this.out.write(to.getBytes(), 0, to.getLength());
            } else {
                this.out.write(o.toString().getBytes(StandardCharsets.UTF_8));
            }

        }

        //將KV輸出
        public synchronized void write(K key, V value) throws IOException {
            boolean nullKey = key == null || key instanceof NullWritable;
            boolean nullValue = value == null || value instanceof NullWritable;
            if (!nullKey || !nullValue) {
                //先寫key
                if (!nullKey) {
                    this.writeObject(key);
                }

                //接著寫入key和value之間的分隔符
                if (!nullKey && !nullValue) {
                    this.out.write(this.keyValueSeparator);
                }

                //最后寫入value
                if (!nullValue) {
                    this.writeObject(value);
                }

                //接著寫入新的一行
                this.out.write(NEWLINE);
            }
        }

        public synchronized void close(Reporter reporter) throws IOException {
            this.out.close();
        }

        static {
            NEWLINE = "\n".getBytes(StandardCharsets.UTF_8);
        }
    }
}

可以看到,最終返回的RecordWriter對象是 LineRecordWriter 類型的。
接著回到3中,看 reduceContext這個對象的類

6、reduceContext = ReduceTask.createReduceContext()

protected static <INKEY, INVALUE, OUTKEY, OUTVALUE> Reducer<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context createReduceContext(Reducer<INKEY, INVALUE, OUTKEY, OUTVALUE> reducer, Configuration job, org.apache.hadoop.mapreduce.TaskAttemptID taskId, RawKeyValueIterator rIter, org.apache.hadoop.mapreduce.Counter inputKeyCounter, org.apache.hadoop.mapreduce.Counter inputValueCounter, RecordWriter<OUTKEY, OUTVALUE> output, OutputCommitter committer, StatusReporter reporter, RawComparator<INKEY> comparator, Class<INKEY> keyClass, Class<INVALUE> valueClass) throws IOException, InterruptedException {
    ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE> reduceContext = new ReduceContextImpl(job, taskId, rIter, inputKeyCounter, inputValueCounter, output, committer, reporter, comparator, keyClass, valueClass);
    Reducer<INKEY, INVALUE, OUTKEY, OUTVALUE>.Context reducerContext = (new WrappedReducer()).getReducerContext(reduceContext);
    return reducerContext;
}

可以看到reducerContext是一個ReduceContextImpl類對象。
下面看看ReduceContextImpl 這個類的構造方法

//---------------------------------ReduceContextImpl.java
public ReduceContextImpl(Configuration conf, TaskAttemptID taskid, RawKeyValueIterator input, Counter inputKeyCounter, Counter inputValueCounter, RecordWriter<KEYOUT, VALUEOUT> output, OutputCommitter committer, StatusReporter reporter, RawComparator<KEYIN> comparator, Class<KEYIN> keyClass, Class<VALUEIN> valueClass) throws InterruptedException, IOException {
    //父類是 TaskInputOutputContextImpl,把outputformat對象傳遞進去了
    super(conf, taskid, output, committer, reporter);
    this.input = input;
    this.inputKeyCounter = inputKeyCounter;
    this.inputValueCounter = inputValueCounter;
    this.comparator = comparator;
    this.serializationFactory = new SerializationFactory(conf);
    this.keyDeserializer = this.serializationFactory.getDeserializer(keyClass);
    this.keyDeserializer.open(this.buffer);
    this.valueDeserializer = this.serializationFactory.getDeserializer(valueClass);
    this.valueDeserializer.open(this.buffer);
    this.hasMore = input.next();
    this.keyClass = keyClass;
    this.valueClass = valueClass;
    this.conf = conf;
    this.taskid = taskid;
}

這里面,它繼續調用了父類的構造方法,把outputformat對象傳遞進去了。
繼續看看父類 TaskInputOutputContextImpl

public TaskInputOutputContextImpl(Configuration conf, TaskAttemptID taskid, RecordWriter<KEYOUT, VALUEOUT> output, OutputCommitter committer, StatusReporter reporter) {
    //可以看到這里的output就是recordWriter對象
    super(conf, taskid, reporter);
    this.output = output;
    this.committer = committer;
}

//這里的邏輯其實就是先讀取KV到 this.key和this.value中,如果沒有KV就返回false,否則返回true
public abstract boolean nextKeyValue() throws IOException, InterruptedException;

public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;

public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;

//調用recordWriter的write方法,將KV輸出,默認是LineRecordWriter這個類
public void write(KEYOUT key, VALUEOUT value) throws IOException, InterruptedException {
    this.output.write(key, value);

可以看到,這里有3個抽象方法(在子類ReduceContextImpl中實現了邏輯,和RecordWriter無關),以及write這個具體方法。分別用于獲取KV以及將結果KV寫入。write這個寫入方法,就是調用的 recordWriter的write方法,也就是5中創建的LineRecordWriter對象中的write方法,將KV輸出。

7、reducer.run()

public void run(Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
        this.setup(context);

        try {
            while(context.nextKey()) {
                this.reduce(context.getCurrentKey(), context.getValues(), context);
                Iterator<VALUEIN> iter = context.getValues().iterator();
                if (iter instanceof ValueIterator) {
                    ((ValueIterator)iter).resetBackupStore();
                }
            }
        } finally {
            this.cleanup(context);
        }

    }

可以看到,這里就是調用6中創建的 reduceContext中的方法來獲取KV。而且在reduce方法中,我們會通過 context.write(key,value)來將結果KV輸出。調用的其實就是 LineRecordWriter對象中的write方法。

到此,相信大家對“MapReduce的output輸出過程是什么”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

定陶县| 台中市| 噶尔县| 邹城市| 清水县| 潮安县| 平度市| 沙湾县| 夹江县| 本溪市| 彩票| 遂宁市| 潮安县| 临海市| 长泰县| 柞水县| 偏关县| 吉安市| 揭阳市| 黄梅县| 交口县| 淮安市| 枞阳县| 龙岩市| 突泉县| 成安县| 湟中县| 汝城县| 吉水县| 龙江县| 安国市| 揭东县| 泗洪县| 定结县| 寿宁县| 吉木萨尔县| 新晃| 古蔺县| 高密市| 辽源市| 汕头市|