您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關hadoop2.2.0如何定制mapreduce輸出到數據庫,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
這里以redis數據庫為例。
這里的例子是,我想統計日志文件中的某天各個小時的訪問量,日志格式為:
2014-02-10 04:52:34 127.0.0.1 xxx
我們知道在寫mapreduce job時,要配置輸入輸出,然后編寫mapper和reducer類,hadoop默認輸出是到hdfs的文件中,例如:
job.setOutputFormatClass(FileOutputFormat.class);
現在我們想要將任務計算結果輸出到數據庫(redis)中,怎么做呢?可以繼承FileOutputFormat類,定制自己的類,看代碼:
public class LoginLogOutputFormat<K, V> extends FileOutputFormat<K, V> { /** * 重點也是定制一個RecordWriter類,每一條reduce處理后的記錄,我們便可將該記錄輸出到數據庫中 */ protected static class RedisRecordWriter<K, V> extends RecordWriter<K, V>{ private Jedis jedis; //redis的client實例 public RedisRecordWriter(Jedis jedis){ this.jedis = jedis; } @Override public void write(K key, V value) throws IOException, InterruptedException { boolean nullKey = key == null; boolean nullValue = value == null; if (nullKey || nullValue) return; String[] sKey = key.toString().split("-"); String outKey = sKey[0]+"-"+sKey[1]+"-"+sKey[2]+"_login_stat"; //zset key為yyyy-MM-dd_login_stat jedis.zadd(outKey.getBytes("UTF-8"), -1, (sKey[3]+":"+value).getBytes("UTF-8")); //zadd, 其值格式為: 時刻:訪問量 } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if (jedis != null) jedis.disconnect(); //關閉鏈接 } } @Override public RecordWriter<K, V> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { Jedis jedis = RedisClient.newJedis(); //構建一個redis,這里你可以自己根據實際情況來構建數據庫連接對象 //System.out.println("構建RedisRecordWriter"); return new RedisRecordWriter<K, V>(jedis); } }
下面就是整個job實現:
public class LoginLogStatTask extends Configured implements Tool { public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { if (value == null || "".equals(value)) return; // 解析value,如: 2014-02-10 04:52:34 127.0.0.1 xxx String[] fields = value.toString().split(" "); String date = fields[0]; String time = fields[1]; String hour = time.split(":")[0]; String outKey = date+"-"+hour; context.write(new Text(outKey), new IntWritable(1)); } } public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; while (values.iterator().hasNext()){ //統計數量 count ++; values.iterator().next(); } context.write(key, new IntWritable(count)); } } @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); List<Path> inputs = new ArrayList<>(); String inputPath = args[0]; if (inputPath.endsWith("/")){ //如果是目錄 inputs.addAll(HdfsUtil.listFiles(inputPath, conf)); } else{ //如果是文件 inputs.add(new Path(inputPath)); } long ts = System.currentTimeMillis(); String jobName = "login_logs_stat_job_" + ts; Job job = Job.getInstance(conf, jobName); job.setJarByClass(LoginLogStatTask.class); //添加輸入文件路徑 for (Path p : inputs){ FileInputFormat.addInputPath(job, p); } //設置輸出路徑 Path out = new Path(jobName + ".out"); //以jobName.out作為輸出 FileOutputFormat.setOutputPath(job, out); //設置mapper job.setMapperClass(MyMapper.class); //設置reducer job.setReducerClass(MyReducer.class); //設置輸入格式 job.setInputFormatClass(TextInputFormat.class); //設置輸出格式 job.setOutputFormatClass(LoginLogOutputFormat.class); //設置輸出key類型 job.setOutputKeyClass(Text.class); //設置輸出value類型 job.setOutputValueClass(IntWritable.class); job.waitForCompletion(true); return job.isSuccessful()?0:1; } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); int res = ToolRunner.run(conf, new LoginLogStatTask(), args); System.exit(res); }
運行job后,就會在redis數據庫中有對應的key:
看完上述內容,你們對hadoop2.2.0如何定制mapreduce輸出到數據庫有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。