91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

MapReduce的典型編程場景3

發布時間:2020-07-03 04:28:31 來源:網絡 閱讀:218 作者:原生zzy 欄目:大數據

1. 自定義InputFormat –數據分類輸出

  需求:小文件的合并

  分析:

     - 在數據采集的時候,就將小文件或小批數據合成大文件再上傳 HDFS
     - 在業務處理之前,在 HDFS 上使用 MapReduce 程序對小文件進行合并
     - 在 MapReduce 處理時,可采用 CombineFileInputFormat 提高效率

  實現思路:

     - 編寫自定義的InoputFormat
     - 改寫 RecordReader,實現一次 maptask 讀取一個小文件的完整內容封裝到一個 KV 對
     - 在Driver 類中一定要設置使用自定義的 InputFormat: job.setInputFormatClass(WholeFileInputFormat.class)


代碼實現

public class MergeDriver {
    //job
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
        Job job = null;
        try {
            job = Job.getInstance(conf, "combine small files to bigfile");
            job.setJarByClass(MergeDriver.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);

            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);

            //設置自定義輸入的類
            job.setInputFormatClass(MyMyFileInputForamt.class);

            Path input = new Path("/hadoop/input/num_add");
            Path output = new Path("/hadoop/output/merge_output1");

            //這里使用自定義得我FileInputForamt去格式化input
            MyMyFileInputForamt.addInputPath(job,input);
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(output)) {
                fs.delete(output, true);
            }
            FileOutputFormat.setOutputPath(job, output);

            int status = job.waitForCompletion(true) ? 0 : 1;
            System.exit(status);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //Mapper
    static private class MyMapper extends Mapper<NullWritable, Text, NullWritable, Text> {
        /*
            這里的map方法就是每讀取一個文件調用一次
         */
        @Override
        protected void map(NullWritable key, Text value, Mapper<NullWritable, Text, NullWritable, Text>.Context context)
                throws IOException, InterruptedException {
            context.write(key, value);
        }
    }
    //Reducer
    private static class MyReducer extends Reducer<NullWritable, Text, NullWritable, Text> {
        @Override
        protected void reduce(NullWritable key, Iterable<Text> values,
                              Reducer<NullWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
            for (Text v : values) {
                context.write(key, v);
            }
        }
    }
    //RecordReader ,這種這個兩個泛型,是map端輸入的key和value的類型
    private static class MyRecordReader extends RecordReader<NullWritable, Text> {

        // 輸出的value對象
        Text map_value = new Text();

        // 文件系統對象,用于獲取文件的輸入流
        FileSystem fs;

        // 判斷當前文件是否已經讀完
        Boolean isReader = false;

        //文件的切片信息
        FileSplit fileSplit;

        //初始化方法,類似于Mapper中的setup,整個類最開始運行
        @Override
        public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            //初始化文件系統對象
            fs = FileSystem.get(context.getConfiguration());
            //獲取文件路徑
            fileSplit = (FileSplit) split;
        }

        //這個方法,在每次調用map中傳入的K-V中,就是在這個方法中給K-V賦值的
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            //先讀取一次
            if (!isReader) {
                FSDataInputStream input = fs.open(fileSplit.getPath());
                //一次性將整個小文件內容都讀取出來
                byte flush[] = new byte[(int) fileSplit.getLength()];
                //將文件內容讀取到這個byte數組中
                /**
                 * 參數一:讀取的字節數組
                 * 參數二:開始讀取的偏移量
                 * 參數三:讀取的長度
                 */
                input.readFully(flush, 0, (int) fileSplit.getLength());
                isReader = true;
                map_value.set(flush);  //將讀取的內容,放置在map的value中
                //保證能正好讀一次,nextKeyValue()第一次返回true正好可以調用一次map,第二次返回false
                return isReader;
            }
            return false;
        }
        @Override
        public NullWritable getCurrentKey() throws IOException, InterruptedException {
            return NullWritable.get();
        }
        @Override
        public Text getCurrentValue() throws IOException, InterruptedException {
            return map_value;
        }
        @Override
        public float getProgress() throws IOException, InterruptedException {
            return 0;
        }
        @Override
        public void close() throws IOException {
            fs.close();
        }
    }
    //FileInputFormat
    private static class MyMyFileInputForamt extends FileInputFormat<NullWritable, Text> {
        @Override
        public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
            MyRecordReader mr = new MyRecordReader();
            //先調用初始化方法
            mr.initialize(split, context);
            return mr;
        }
    }
}

2. 自定義OutputFormat

  需求:一些原始日志需要做增強解析處理,流程

     - 從原始日志文件中讀取數據
     - 根據業務獲取業務數據庫中的數據
     - 根據某個連接條件獲取相應的連接結果

  分析:

     - 在 MapReduce 中訪問外部資源
     - 在業務處理之前,在 HDFS 上使用 MapReduce 程序對小文件進行合并
     - 自定義 OutputFormat,改寫其中的 RecordWriter,改寫具體輸出數據的方法 write() CombineFileInputFormat 提高效率


代碼實現
//這里以一個簡單的案例為例,將文件按照不同的等級輸出的不同的文件中

 public class Score_DiffDic {
    //job
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
        Job job = null;
        try {
            job = Job.getInstance(conf, "Score_DiffDic");
            job.setJarByClass(Score_DiffDic.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            //設置自定義輸出類型
            job.setOutputFormatClass(MyOutputFormat.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(DoubleWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);

            Path input = new Path("/hadoop/input/num_add");
            FileInputFormat.addInputPath(job,input);
            Path output = new Path("/hadoop/output/merge_output1");
            //這是自定義輸出類型
            MyOutputFormat.setOutputPath(job,output);
            FileSystem fs = FileSystem.get(conf);
            if (fs.exists(output)) {
                fs.delete(output, true);
            }
            FileOutputFormat.setOutputPath(job, output);

            int status = job.waitForCompletion(true) ? 0 : 1;
            System.exit(status);

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    //Mapper
    private static class MyMapper extends Mapper<LongWritable,Text,Text,DoubleWritable>{
        Text mk=new Text();
        DoubleWritable mv=new DoubleWritable();
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
             String[] fields = value.toString().split("\\s+");
            //computer,huangxiaoming,85
            if(fields.length==3){
                mk.set(fields[1]);
                mv.set(Double.parseDouble(fields[2]));
                context.write(mk, mv);
            }
        }
    }
    //Reducer
    private static class MyReducer extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
        DoubleWritable mv=new DoubleWritable();
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
                throws IOException, InterruptedException {
            double  sum=0;
            int count=0;
            for(DoubleWritable value:values){
                sum+=value.get();
                count++;
            }
            mv.set(sum/count);
            context.write(key,mv);
        }
    }

    //FileOutputFormat
    private static class MyOutputFormat extends FileOutputFormat<Text, DoubleWritable> {
        @Override
        public RecordWriter<Text, DoubleWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
            FileSystem fs =FileSystem.get(job.getConfiguration());
            return new MyRecordWrite(fs);
        }
    }

    //RecordWriter,這里的兩個泛型是Reudcer輸出K-V的類型
    private static class MyRecordWrite extends RecordWriter<Text, DoubleWritable> {
        FileSystem fs;
        //輸出的文件的路徑
        Path path2 = new Path("/hadoop/output/score_out1");
        Path path3 = new Path("/hadoop/output/score_out2");
        FSDataOutputStream output1;
        FSDataOutputStream output2;

        public MyRecordWrite() {

        }
        //初始化參數
        public MyRecordWrite(FileSystem fs) {
            this.fs = fs;
            try {
                output1=fs.create(path2);
                output2=fs.create(path3);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        @Override
        public void write(Text key, DoubleWritable value) throws IOException, InterruptedException {
            //業務邏輯操作,平均分數大于80的在path2中,其他的在path3中
            if(value.get()>80){
                output1.write((key.toString()+":"+value.get()+"\n").getBytes());
            }else{
                output2.write((key.toString()+":"+value.get()+"\n").getBytes());
            }
        }
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            fs.close();
            output1.close();
            output2.close();
        }
    }
}
向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

仙居县| 汤阴县| 罗山县| 吐鲁番市| 中卫市| 白水县| 田东县| 尉氏县| 宁武县| 洛南县| 北辰区| 宣威市| 城市| 宝山区| 章丘市| 青龙| 奎屯市| 灯塔市| 永修县| 宾阳县| 曲松县| 三亚市| 乌鲁木齐县| 麟游县| 武山县| 芒康县| 高陵县| 恩施市| 瑞金市| 石柱| 亳州市| 时尚| 璧山县| 冀州市| 易门县| 本溪市| 黔江区| 富平县| 大丰市| 高州市| 玛多县|