您好,登錄后才能下訂單哦!
- 在數據采集的時候,就將小文件或小批數據合成大文件再上傳 HDFS
- 在業務處理之前,在 HDFS 上使用 MapReduce 程序對小文件進行合并
- 在 MapReduce 處理時,可采用 CombineFileInputFormat 提高效率
- 編寫自定義的InoputFormat
- 改寫 RecordReader,實現一次 maptask 讀取一個小文件的完整內容封裝到一個 KV 對
- 在Driver 類中一定要設置使用自定義的 InputFormat: job.setInputFormatClass(WholeFileInputFormat.class)
代碼實現:
public class MergeDriver {
//job
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
Job job = null;
try {
job = Job.getInstance(conf, "combine small files to bigfile");
job.setJarByClass(MergeDriver.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//設置自定義輸入的類
job.setInputFormatClass(MyMyFileInputForamt.class);
Path input = new Path("/hadoop/input/num_add");
Path output = new Path("/hadoop/output/merge_output1");
//這里使用自定義得我FileInputForamt去格式化input
MyMyFileInputForamt.addInputPath(job,input);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output)) {
fs.delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);
int status = job.waitForCompletion(true) ? 0 : 1;
System.exit(status);
} catch (Exception e) {
e.printStackTrace();
}
}
//Mapper
static private class MyMapper extends Mapper<NullWritable, Text, NullWritable, Text> {
/*
這里的map方法就是每讀取一個文件調用一次
*/
@Override
protected void map(NullWritable key, Text value, Mapper<NullWritable, Text, NullWritable, Text>.Context context)
throws IOException, InterruptedException {
context.write(key, value);
}
}
//Reducer
private static class MyReducer extends Reducer<NullWritable, Text, NullWritable, Text> {
@Override
protected void reduce(NullWritable key, Iterable<Text> values,
Reducer<NullWritable, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
for (Text v : values) {
context.write(key, v);
}
}
}
//RecordReader ,這種這個兩個泛型,是map端輸入的key和value的類型
private static class MyRecordReader extends RecordReader<NullWritable, Text> {
// 輸出的value對象
Text map_value = new Text();
// 文件系統對象,用于獲取文件的輸入流
FileSystem fs;
// 判斷當前文件是否已經讀完
Boolean isReader = false;
//文件的切片信息
FileSplit fileSplit;
//初始化方法,類似于Mapper中的setup,整個類最開始運行
@Override
public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
//初始化文件系統對象
fs = FileSystem.get(context.getConfiguration());
//獲取文件路徑
fileSplit = (FileSplit) split;
}
//這個方法,在每次調用map中傳入的K-V中,就是在這個方法中給K-V賦值的
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
//先讀取一次
if (!isReader) {
FSDataInputStream input = fs.open(fileSplit.getPath());
//一次性將整個小文件內容都讀取出來
byte flush[] = new byte[(int) fileSplit.getLength()];
//將文件內容讀取到這個byte數組中
/**
* 參數一:讀取的字節數組
* 參數二:開始讀取的偏移量
* 參數三:讀取的長度
*/
input.readFully(flush, 0, (int) fileSplit.getLength());
isReader = true;
map_value.set(flush); //將讀取的內容,放置在map的value中
//保證能正好讀一次,nextKeyValue()第一次返回true正好可以調用一次map,第二次返回false
return isReader;
}
return false;
}
@Override
public NullWritable getCurrentKey() throws IOException, InterruptedException {
return NullWritable.get();
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return map_value;
}
@Override
public float getProgress() throws IOException, InterruptedException {
return 0;
}
@Override
public void close() throws IOException {
fs.close();
}
}
//FileInputFormat
private static class MyMyFileInputForamt extends FileInputFormat<NullWritable, Text> {
@Override
public RecordReader<NullWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
MyRecordReader mr = new MyRecordReader();
//先調用初始化方法
mr.initialize(split, context);
return mr;
}
}
}
- 從原始日志文件中讀取數據
- 根據業務獲取業務數據庫中的數據
- 根據某個連接條件獲取相應的連接結果
- 在 MapReduce 中訪問外部資源
- 在業務處理之前,在 HDFS 上使用 MapReduce 程序對小文件進行合并
- 自定義 OutputFormat,改寫其中的 RecordWriter,改寫具體輸出數據的方法 write() CombineFileInputFormat 提高效率
代碼實現
//這里以一個簡單的案例為例,將文件按照不同的等級輸出的不同的文件中
public class Score_DiffDic {
//job
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop01:9000");
Job job = null;
try {
job = Job.getInstance(conf, "Score_DiffDic");
job.setJarByClass(Score_DiffDic.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//設置自定義輸出類型
job.setOutputFormatClass(MyOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
Path input = new Path("/hadoop/input/num_add");
FileInputFormat.addInputPath(job,input);
Path output = new Path("/hadoop/output/merge_output1");
//這是自定義輸出類型
MyOutputFormat.setOutputPath(job,output);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(output)) {
fs.delete(output, true);
}
FileOutputFormat.setOutputPath(job, output);
int status = job.waitForCompletion(true) ? 0 : 1;
System.exit(status);
} catch (Exception e) {
e.printStackTrace();
}
}
//Mapper
private static class MyMapper extends Mapper<LongWritable,Text,Text,DoubleWritable>{
Text mk=new Text();
DoubleWritable mv=new DoubleWritable();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split("\\s+");
//computer,huangxiaoming,85
if(fields.length==3){
mk.set(fields[1]);
mv.set(Double.parseDouble(fields[2]));
context.write(mk, mv);
}
}
}
//Reducer
private static class MyReducer extends Reducer<Text,DoubleWritable,Text,DoubleWritable>{
DoubleWritable mv=new DoubleWritable();
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context)
throws IOException, InterruptedException {
double sum=0;
int count=0;
for(DoubleWritable value:values){
sum+=value.get();
count++;
}
mv.set(sum/count);
context.write(key,mv);
}
}
//FileOutputFormat
private static class MyOutputFormat extends FileOutputFormat<Text, DoubleWritable> {
@Override
public RecordWriter<Text, DoubleWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
FileSystem fs =FileSystem.get(job.getConfiguration());
return new MyRecordWrite(fs);
}
}
//RecordWriter,這里的兩個泛型是Reudcer輸出K-V的類型
private static class MyRecordWrite extends RecordWriter<Text, DoubleWritable> {
FileSystem fs;
//輸出的文件的路徑
Path path2 = new Path("/hadoop/output/score_out1");
Path path3 = new Path("/hadoop/output/score_out2");
FSDataOutputStream output1;
FSDataOutputStream output2;
public MyRecordWrite() {
}
//初始化參數
public MyRecordWrite(FileSystem fs) {
this.fs = fs;
try {
output1=fs.create(path2);
output2=fs.create(path3);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, DoubleWritable value) throws IOException, InterruptedException {
//業務邏輯操作,平均分數大于80的在path2中,其他的在path3中
if(value.get()>80){
output1.write((key.toString()+":"+value.get()+"\n").getBytes());
}else{
output2.write((key.toString()+":"+value.get()+"\n").getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
fs.close();
output1.close();
output2.close();
}
}
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。