您好,登錄后才能下訂單哦!
在“MapReduce--input之輸入原理”中說到實現定義輸入的方法,其實就是繼承InputFormat以及 RecordReader實現其中的方法。下面例子講解操作。
將多個文件合并成一個大文件(有點類似于combineInputFormat),并輸出。大文件中包括小文件所在的路徑,以及小文件的內容。
inputFormat
public class SFileInputFormat extends FileInputFormat<NullWritable, BytesWritable> {
/**
* 是否切片
* @param context
* @param filename
* @return
*/
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
/**
* 返回讀取文件內容的讀取器
* @param inputSplit
* @param taskAttemptContext
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
SRecordReader sRecordReader = new SRecordReader();
sRecordReader.initialize(inputSplit, taskAttemptContext);
return sRecordReader;
}
}
RecordReader:
public class SRecordReader extends RecordReader<NullWritable, BytesWritable> {
private Configuration conf;
private FileSplit split;
//當前分片是否已讀取的標志位
private boolean process = false;
private BytesWritable value = new BytesWritable();
/**
* 初始化
* @param inputSplit
* @param taskAttemptContext
* @throws IOException
* @throws InterruptedException
*/
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
split = (FileSplit)inputSplit;
conf = taskAttemptContext.getConfiguration();
}
/**
* 從分片中讀取下一個KV
* @return
* @throws IOException
* @throws InterruptedException
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (!process) {
byte[] buffer = new byte[(int) split.getLength()];
//獲取文件系統
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
//創建輸入流
FSDataInputStream fis = fs.open(path);
//流對接,將數據讀取緩沖區
IOUtils.readFully(fis, buffer, 0, buffer.length);
//將數據裝載入value
value.set(buffer, 0, buffer.length);
//關閉流
IOUtils.closeStream(fis);
//讀完就標志位設置為true,表示已讀
process = true;
return true;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public BytesWritable getCurrentValue() throws IOException, InterruptedException {
return this.value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return process? 1 : 0;
}
@Override
public void close() throws IOException {
}
}
mapper:
public class SFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
Text k = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit)context.getInputSplit();
String name = inputSplit.getPath().toString();
k.set(name);
}
@Override
protected void map(NullWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
context.write(k, value);
}
}
reducer:
public class SFileReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
@Override
protected void reduce(Text key, Iterable<BytesWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, values.iterator().next());
}
}
driver:
public class SFileDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"G:\\test\\date\\A\\order\\", "G:\\test\\date\\A\\order2\\"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SFileDriver.class);
job.setMapperClass(SFileMapper.class);
job.setReducerClass(SFileReducer.class);
//設置輸入和輸出類,默認是 TextInputFormat
job.setInputFormatClass(SFileInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(BytesWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
自定義的inputformat需要在job中通過 job.setInputFormatClass() 來指定
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。