您好,登錄后才能下訂單哦!
//mapreduce程序 import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { /** * TokenizerMapper 繼續自 Mapper<LongWritable, Text, Text, IntWritable> * * [一個文件就一個map,兩個文件就會有兩個map] * map[這里讀入輸入文件內容 以" \t\n\r\f" 進行分割,然后設置 word ==> one 的key/value對] * * @param Object Input key Type: * @param Text Input value Type: * @param Text Output key Type: * @param IntWritable Output value Type: * * Writable的主要特點是它使得Hadoop框架知道對一個Writable類型的對象怎樣進行serialize以及deserialize. * WritableComparable在Writable的基礎上增加了compareT接口,使得Hadoop框架知道怎樣對WritableComparable類型的對象進行排序。 * * @ author liuqingjie * */ public static class TokenizerMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } /** * IntSumReducer 繼承自 Reducer<Text,IntWritable,Text,IntWritable> * * [不管幾個Map,都只有一個Reduce,這是一個匯總] * reduce[循環所有的map值,把word ==> one 的key/value對進行匯總] * * 這里的key為Mapper設置的word[每一個key/value都會有一次reduce] * * 當循環結束后,最后的確context就是最后的結果. * * @author liuqingjie * */ public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); if (args.length != 2) { System.err.println("請配置路徑 "); System.exit(2); } Job job = new Job(conf, "wordcount"); job.setJarByClass(WordCount.class);//主類 job.setMapperClass(TokenizerMapper.class);//mapper job.setReducerClass(IntSumReducer.class);//reducer job.setMapOutputKeyClass(Text.class);//設置map輸出數據的關鍵類 job.setMapOutputValueClass(IntWritable.class);//設置map輸出值類 job.setOutputKeyClass(Text.class);//設置作業輸出數據的關鍵類 job.setOutputValueClass(IntWritable.class);//設置作業輸出值類 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//文件輸入 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//文件輸出 System.exit(job.waitForCompletion(true) ? 0 : 1);//等待完成退出. } }
編寫過程分析:
(1)數據類型
整型:IntWritable, 這是Hadoop對int的封裝
字符串型:Text,這是Hadoop對String的封裝
上下文對象:Context,它用來與MapReduce系統進行通信,如把map的結果傳給reduce
處理
(2)執行過程
分為兩個階段:map階段和reduce階段, 以key/value為輸入輸出,其中key、value的類型可以由程序員自定義。
map編寫:
自定義一個類,繼承于基類Mapper,該基類是一個泛型,有4個形參類型:用來指定map函數的輸入鍵、輸入值,輸出鍵、輸 出值,格式如下:public class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOU>。
根據實際需要,重寫map函數,函數類型由Mapper指定。每一對<key,value>調用一次map函數。
wordcount程序中,map方法中的value值存儲的是文本文件中的一行,key值為該行的首字符相對于文本文件首字符的偏移量,在本程序中,key值未使用。StringTokenizer類是將每一行拆分為一個個的單詞。
reduce編寫:
自定義一個類,繼承于基類Reducer,該基類是一個泛型,有4個形參類型:用來指定reduce函數的輸入鍵、輸入值,輸出鍵、輸出值,格式public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>,其中reduce的輸入類型必須與map的輸出類型一致。
根據實際需要,重寫reduce方法,方法的類型由Reducer指定。每一個key調用一次reduce方法。
主函數編寫:
在主函數中進行作業的配置,主要配置有:
Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class);//主類 job.setMapperClass(TokenizerMapper.class);//mapper job.setReducerClass(IntSumReducer.class);//reducer job.setMapOutputKeyClass(Text.class);//設置map輸出數據的關鍵類 job.setMapOutputValueClass(IntWritable.class);//設置map輸出值類 job.setOutputKeyClass(Text.class);//設置作業輸出數據的關鍵類 job.setOutputValueClass(IntWritable.class);//設置作業輸出值類 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));//文件輸入 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//文件輸出 System.exit(job.waitForCompletion(true) ? 0 : 1);//等待完成退出.
(3)數據處理過程
1)將文件拆分為splits,并由MapReduce框架自動完成分割,將每一個split分割為<key,value>對
2)每一對<key,value>調用一次map函數,處理后生產新的<key,value>對,由Context傳遞給reduce處理
3)Mapper對<key,value>對進行按key值進行排序,將key值相同的value進行合并。最后得到Mapper的最終輸出結果
4)reduce處理,處理后將新的<key,value>對輸出。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。