91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

十五、MapReduce--自定義output輸出

發布時間:2020-06-15 07:36:43 來源:網絡 閱讀:288 作者:隔壁小白 欄目:大數據

我們要自定義輸出時,首先繼承兩個抽象類,一個是 OutputFormat,一個是 RecordWriter
。前者是主要是創建RecordWriter,后者就是主要實現 write方法來將kv寫入文件。

1、需求
將reduce輸出的KV中,如果key中包含特定字符串,則將其輸出到一個文件中,剩下的KV則輸出到另外的文件中。

2、源碼
源數據

http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.itstar.com
http://www.itstar1.com
http://www.itstar2.com
http://www.itstar3.com
http://www.baidu.com
http://www.sin2a.com
http://www.sin2a.comw.google.com
http://www.sin2desa.com
http://www.sin2desa.comw.google.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com

outputFormat

public class MyOutputFormat extends FileOutputFormat<Text, NullWritable> {

    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new MyRecordWriter(taskAttemptContext);
    }
}

RecordWriter

public class MyRecordWriter extends RecordWriter<Text, NullWritable> {
    private FSDataOutputStream startOut;
    private FSDataOutputStream otherOut;

    public MyRecordWriter(TaskAttemptContext job) {
        try {
            FileSystem fs = FileSystem.get(job.getConfiguration());
            startOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\startout.log"));
            otherOut = fs.create(new Path("G:\\test\\date\\A\\itstarlog\\logdir\\otherout.log"));
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        String line = key.toString();

        //如果key中包含itstar就寫入到另外一個文件中
        if (line.contains("itstar")) {
            this.startOut.writeUTF(line);
        } else {
            this.otherOut.writeUTF(line);
        }
    }

    @Override
    public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        this.startOut.close();
        this.otherOut.close();
    }
}

mapper

public class MyOutputMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        context.write(value, NullWritable.get());
    }
}

reducer

public class MyOutputReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
    Text k = new Text();

    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        String line = key.toString();
        line = line + "\r\n";
        k.set(line);

        context.write(k, NullWritable.get());
    }
}

driver

ublic class MyDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        args = new String[]{"G:\\test\\date\\A\\itstarlog\\A\\other.log", "G:\\test\\date\\A\\itstarlog\\logresult\\"};

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(MyDriver.class);
        job.setMapperClass(MyOutputMapper.class);
        job.setReducerClass(MyOutputReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //自定義輸出的實現子類,也是繼承FileOutputFormat
        job.setOutputFormatClass(MyOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //這個路徑輸出的是job的執行成功successs文件的輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

郸城县| 长宁县| 石阡县| 枣庄市| 三门县| 蓬安县| 金昌市| 巴林右旗| 长汀县| 鹰潭市| 富裕县| 福海县| 平武县| 长岭县| 新沂市| 甘洛县| 吉安市| 宣汉县| 旅游| 蕉岭县| 石景山区| 延长县| 安达市| 奉新县| 宣威市| 营山县| 三穗县| 车险| 秦皇岛市| 德江县| 白河县| 贵定县| 岳池县| 长兴县| 屯留县| 弥渡县| 连江县| 江西省| 镇赉县| 塔河县| 赫章县|