您好,登錄后才能下訂單哦!
本篇內容介紹了“hadoop多文件輸出新舊API的方法是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
一般來說Map/Reduce都是輸出一組文件,但是有些情況下需要我們輸出多組文件,比如我上面提到的需求,接下來我用新舊API分別說明如何實現多文件輸出
舊API:
MultipleTextOutputFormat 這個類很重要,我們其實只要寫個類繼承MultipleTextOutputFormat,并且重寫generateFileNameForKeyValue(Object key, Object value, String name)方法就好了。因為MultipleTextOutputFormat中有個write方法,即將記錄寫到hdfs上,在這個方法中,會調用generateFileNameForKeyValue。廢話不多說,上代碼:
public class MultiFileOutputFormat extends MultipleTextOutputFormat<Object, Object>{ @Override protected String generateFileNameForKeyValue(Object key, Object value, String name) { if(key instanceof OutputFileName){ return ((OutputFileName) key).getPath()+"/"+name; }else{ return super.generateFileNameForKeyValue(key, value, name); } } }
其中OutputFileName是我自己定義的枚舉類,便于管理而已,這里也可以return一個路徑,以下是OutputFileName的代碼
public enum OutputFileName { ERRORLOG("errorlog","logtype=errorlog"), APIREQUEST("apiRequest","logtype=apiRequest"), FIRSTINTOTIME("firstIntoTime","logtype=firstIntoTime"), TABFLUSHTIME("tabFlushTime","logtype=tabFlushTime"), PERFORMANCE("performance","logtype=performance"), FILEREQUEST("fileRequest","logtype=fileRequest"); private String name; private String path; private String tempPath; private OutputFileName(String name,String path){ this.name = name; this.path = path; } public String getName(){ return this.name; } public String getPath(){ if(!StringUtil.isEmpty(tempPath)){ String temp = this.tempPath; this.tempPath = null; return temp; }else{ return this.path; } } }
如何使用MultiFileOutputFormat這個自己寫的類呢?就這么用
//job所在類的main方法中 JobConf conf = new JobConf(config,XXX.class); conf.setOutputFormat(MultiFileOutputFormat.class); //map函數中 collector.collect(OutputFileName.ERRORLOG, new Text(log));
此示例做了以上 的操作就可以將數據寫到logtype=errorlog目錄下了,當然可以根據不同的日志去設置輸出目錄了
新API:
對于新的API,我沒發現MultipleTextOutputFormat這個類,很頭疼,我甚至看了源碼,仿照舊API自己寫了MultipleTextOutputFormat,這就需要做很多事情,必須寫個集成RecordWriter的類,重寫里面的方法,當時確實可以做到將數據寫到不同的路徑下,但是也有bug,數據很多的時候,路徑下的數據只有一部分保留,做了一下測試,確實把所有的記錄都寫了,但卻只是把最后寫的一部分保留在設定好的路徑下了,至今都沒發現原因,這里就不給代碼了,只能保留60多萬行的記錄
當然我還是有辦法的,經過百般折磨,終于在網上找到相關資料,使用這個類MultipleOutputs,查查API,還真有,只不過是在org.apache.hadoop.mapreduce.lib.output包下,這個類相當于把舊的API東西又重新整理了一遍,我們不用再去寫其他的類集成MultipleTextOutputFormat。具體使用方法看代碼吧
public static class MapperClass extends Mapper<Object, Text, Text, NullWritable> { private Text outkey = new Text(""); private MultipleOutputs<Text, NullWritable> mos; public void map(Object key, Text value, Context context) throws IOException,InterruptedExceptio{ String log = value.toString(); outkey.set(log); int begin = log.indexOf("@[#("); if(begin != -1){ String logForSplit = log.substring(begin+"@".length()); String [] split = logForSplit.split("#"); if(split != null && split.length >0){ String cType = split[0]; if(!StringUtil.isEmpty(cType)){ if("apiRequest".equals(cType)){ mos.write("apiRequest", outkey, NullWritable.get()); }else if("errlog".equals(cType)){ mos.write("errorlog", outkey, NullWritable.get()); } } } } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mos.close(); super.cleanup(context); } @Override protected void setup(Context context) throws IOException, InterruptedException { mos = new MultipleOutputs<Text, NullWritable>(context); super.setup(context); } }
public class TestJob { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = new Job(conf, "ss"); job.setInputFormatClass(TrackInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setJarByClass(TestJob.class); job.setMapperClass(TestJob.MapperClass.class); job.setNumReduceTasks(0); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); if(inputPaths.length > 0){ Path[] paths = new Path[inputPaths.length]; for(int i = 0 ; i < inputPaths.length ; i++){ paths[i] = new Path(inputPaths[i]); } FileInputFormat.setInputPaths(job, paths); }else{ FileInputFormat.setInputPaths(job, new Path(args[0])); } FileOutputFormat.setOutputPath(job, new Path(args[1])); MultipleOutputs.addNamedOutput(job, "errorlog", TextOutputFormat.class, Text.class, NullWritable.class); MultipleOutputs.addNamedOutput(job, "apiRequest", TextOutputFormat.class, Text.class, NullWritable.class); } }
OK,這就可以了,總結一下需要注意的問題,首先在我們的map類中一定要定義MultipleOutputs的對象,并且重寫cleanup和setup方法,分別用來關閉和創建MultipleOutputs對象,最重要的是在job所在的類中注冊我們的文件名,比如errorlog,apiRequest等
上述的兩個例子有點區別,第一個是將數據寫到不同的目錄下,而第二個是寫到同一個目錄下,但是會分成不同類型的文件,如我截取的記錄
-rw-r--r-- 2 hadoop supergroup 10569073 2014-06-06 11:50 /test/aa/fileRequest-m-00063.lzo
-rw-r--r-- 2 hadoop supergroup 10512656 2014-06-06 11:50 /test/aa/fileRequest-m-00064.lzo
-rw-r--r-- 2 hadoop supergroup 68780 2014-06-06 11:51 /test/aa/firstIntoTime-m-00000.lzo
-rw-r--r-- 2 hadoop supergroup 67901 2014-06-06 11:51 /test/aa/firstIntoTime-m-00001.lzo
至于怎么樣輸出到不同的目錄下,有待研究,這種方式有個不好的地方, 會產生很多的
-rw-r--r-- 2 hadoop supergroup 42 2014-06-06 11:50 /test/aa/part-m-00035.lzo 空文件
“hadoop多文件輸出新舊API的方法是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。