91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

十一、MapReduce--自定義Input輸入

發布時間:2020-06-11 11:55:03 來源:網絡 閱讀:224 作者:隔壁小白 欄目:大數據

在“MapReduce--input之輸入原理”中說到實現定義輸入的方法,其實就是繼承InputFormat以及 RecordReader實現其中的方法。下面例子講解操作。

1、需求

將多個文件合并成一個大文件(有點類似于combineInputFormat),并輸出。大文件中包括小文件所在的路徑,以及小文件的內容。

2、源碼

inputFormat

public class SFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
    /**
     * 是否切片
     * @param context
     * @param filename
     * @return
     */
    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }

    /**
     * 返回讀取文件內容的讀取器
     * @param inputSplit
     * @param taskAttemptContext
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        SRecordReader sRecordReader = new SRecordReader();
        sRecordReader.initialize(inputSplit, taskAttemptContext);
        return sRecordReader;

    }
}

RecordReader:

public class SRecordReader extends RecordReader<NullWritable, BytesWritable> {
    private Configuration conf;
    private FileSplit split;
    //當前分片是否已讀取的標志位
    private boolean process = false;
    private BytesWritable value = new BytesWritable();

    /**
     * 初始化
     * @param inputSplit
     * @param taskAttemptContext
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        split = (FileSplit)inputSplit;
        conf = taskAttemptContext.getConfiguration();
    }

    /**
     * 從分片中讀取下一個KV
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!process) {
            byte[] buffer = new byte[(int) split.getLength()];

            //獲取文件系統
            Path path = split.getPath();
            FileSystem fs = path.getFileSystem(conf);

            //創建輸入流
            FSDataInputStream fis = fs.open(path);

            //流對接,將數據讀取緩沖區
            IOUtils.readFully(fis, buffer, 0, buffer.length);

            //將數據裝載入value
            value.set(buffer, 0, buffer.length);

            //關閉流
            IOUtils.closeStream(fis);

            //讀完就標志位設置為true,表示已讀
            process = true;
            return true;

        }

        return false;
    }

    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return this.value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return process? 1 : 0;
    }

    @Override
    public void close() throws IOException {

    }
}

mapper:

public class SFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
    Text k = new Text();

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        FileSplit inputSplit = (FileSplit)context.getInputSplit();
        String name = inputSplit.getPath().toString();
        k.set(name);
    }

    @Override
    protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
        context.write(k, value);
    }   
}

reducer:

public class SFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
    @Override
    protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key, values.iterator().next());
    }
}

driver:

public class SFileDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        args = new String[]{"G:\\test\\date\\A\\order\\", "G:\\test\\date\\A\\order2\\"};

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(SFileDriver.class);
        job.setMapperClass(SFileMapper.class);
        job.setReducerClass(SFileReducer.class);

        //設置輸入和輸出類,默認是 TextInputFormat
        job.setInputFormatClass(SFileInputFormat.class);
        job.setOutputFormatClass(SequenceFileOutputFormat.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(BytesWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(BytesWritable.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);

    }
}

自定義的inputformat需要在job中通過 job.setInputFormatClass() 來指定

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

上饶县| 洞口县| 濉溪县| 小金县| 泾源县| 金山区| 太保市| 永寿县| 合水县| 通山县| 桓台县| 行唐县| 靖西县| 呼玛县| 云梦县| 伊宁市| 肇州县| 临桂县| 桐梓县| 凉山| 客服| 如东县| 甘南县| 焦作市| 郧西县| 自治县| 邮箱| 车险| 曲水县| 金塔县| 富宁县| 景泰县| 昌宁县| 江安县| 武平县| 长春市| 清水县| 威海市| 离岛区| 襄樊市| 平罗县|