您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關Hadoop中MapReduce常用算法有哪些,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
hadoop fs -mkdir /import
創建一個或者多個文本,上傳
hadoop fs -put test.txt /import/
package com.cuiweiyou.sort; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //hadoop默認排序: //如果k2、v2類型是Text-文本,結果是按照字典順序 //如果k2、v2類型是LongWritable-數字,結果是按照數字大小順序 public class SortTest { /** * 內部類:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> */ public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> { /** * 重寫map方法 */ public void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { //這里v1轉為k2-數字類型,舍棄k1。null為v2 context.write(new LongWritable(Long.parseLong(v1.toString())), NullWritable.get()); //因為v1可能重復,這時,k2也是可能有重復的 } } /** * 內部類:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> */ public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { /** * 重寫reduce方法 * 在此方法執行前,有個shuffle過程,會根據k2將對應的v2歸并為v2[...] */ protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Reducer<LongWritable, Context context) throws IOException, InterruptedException { //k2=>k3, v2[...]舍棄。null => v3 context.write(k2, NullWritable.get()); //此時,k3如果發生重復,根據默認算法會發生覆蓋,即最終僅保存一個k3 } } public static void main(String[] args) throws Exception { // 聲明配置信息 Configuration conf = new Configuration(); conf.set("fs.default.name", "hdfs://localhost:9000"); // 創建作業 Job job = new Job(conf, "SortTest"); job.setJarByClass(SortTest.class); // 設置mr job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); // 設置輸出類型,和Context上下文對象write的參數類型一致 job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(NullWritable.class); // 設置輸入輸出路徑 FileInputFormat.setInputPaths(job, new Path("/import/")); FileOutputFormat.setOutputPath(job, new Path("/out")); // 執行 System.exit(job.waitForCompletion(true) ? 0 : 1); } }
可以看到,不僅排序而且去重了。
需求:查取手機號有哪些。這里的思路和上面排序算法的思路是一致的,僅僅多了分割出手機號這一步驟。
創建兩個文本,手動輸入一些測試內容。每個字段用制表符隔開。日期,電話,地址,方式,數據量。
/** * 映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> */ public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> { /** * 重寫map方法 */ protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException { //按照制表符進行分割 String[] tels = v1.toString().split("\t"); //k1 => k2-第2列手機號,null => v2 context.write(new Text(tels[1]), NullWritable.get()); } } /************************************************************ * 在map后,reduce前,有個shuffle過程,會根據k2將對應的v2歸并為v2[...] ***********************************************************/ /** * 拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> */ public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> { /** * 重寫reduce方法 */ protected void reduce(Text k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException { //此時,k3如果發生重復,根據默認算法會發生覆蓋,即最終僅保存一個k3,達到去重到效果 context.write(k2, NullWritable.get()); } }
// 設置輸出類型,和Context上下文對象write的參數類型一致 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class);
需求:查詢在北京地區發生的上網記錄。思路同上,當寫出 k2 、 v2 時加一個判斷即可。
同上。
/** * 內部類:映射器 Mapper<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> */ public static class MyMapper extends Mapper<LongWritable, Text, Text, NullWritable> { /** * 重寫map方法 */ protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException { //按照制表符進行分割 final String[] adds = v1.toString().split("\t"); //地址在第3列 //k1 => k2-地址,null => v2 if(adds[2].equals("beijing")){ context.write(new Text(v1.toString()), NullWritable.get()); } } } /** * 內部類:拆分器 Reducer<KEY_IN, VALUE_IN, KEY_OUT, VALUE_OUT> */ public static class MyReducer extends Reducer<Text, NullWritable, Text, NullWritable> { /** * 重寫reduce方法 */ protected void reduce(Text k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException { context.write(k2, NullWritable.get()); } }
// 設置輸出類型,和Context上下文對象write的參數類型一致 job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class);
這個算法非常經典,面試必問。實現這個效果的算法也很多。下面是個簡單的示例。
需求:找到流量最大值;找出前5個最大值。
同上。
//map public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> { //首先創建一個臨時變量,保存一個可存儲的最小值:Long.MIN_VALUE=-9223372036854775808 long temp = Long.MIN_VALUE; //找出最大值 protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException { //按照制表符進行分割 final String[] flows = v1.toString().split("\t"); //將文本轉數值 final long val = Long.parseLong(flows[4]); //如果v1比臨時變量大,則保存v1的值 if(temp<val){ temp = val; } } /** ---此方法在全部的map任務結束后執行一次。這時僅輸出臨時變量到最大值--- **/ protected void cleanup(Context context) throws IOException ,InterruptedException { context.write(new LongWritable(temp), NullWritable.get()); System.out.println("文件讀取完畢"); } } //reduce public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { //臨時變量 Long temp = Long.MIN_VALUE; //因為一個文件得到一個最大值,再次將這些值比對,得到最大的 protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException { long long1 = Long.parseLong(k2.toString()); //如果k2比臨時變量大,則保存k2的值 if(temp<long1){ temp = long1; } } /** !!!此方法在全部的reduce任務結束后執行一次。這時僅輸出臨時變量到最大值!!! **/ protected void cleanup(Context context) throws IOException, InterruptedException { context.write(new LongWritable(temp), NullWritable.get()); } }
// 設置輸出類型 job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(NullWritable.class);
//map public static class MyMapper extends Mapper<LongWritable, Text, LongWritable, NullWritable> { //首先創建一個臨時變量,保存一個可存儲的最小值:Long.MIN_VALUE=-9223372036854775808 long temp = Long.MIN_VALUE; //Top5存儲空間 long[] tops; /** 次方法在run中調用,在全部map之前執行一次 **/ protected void setup(Context context) { //初始化數組長度為5 tops = new long[5]; } //找出最大值 protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException { //按照制表符進行分割 final String[] flows = v1.toString().split("\t"); //將文本轉數值 final long val = Long.parseLong(flows[4]); //保存在0索引 tops[0] = val; //排序后最大值在最后一個索引,這樣從后到前依次減小 Arrays.sort(tops); } /** ---此方法在全部到map任務結束后執行一次。這時僅輸出臨時變量到最大值--- **/ protected void cleanup(Context context) throws IOException ,InterruptedException { //保存前5條數據 for( int i = 0; i < tops.length; i++) { context.write(new LongWritable(tops[i]), NullWritable.get()); } } } //reduce public static class MyReducer extends Reducer<LongWritable, NullWritable, LongWritable, NullWritable> { //臨時變量 Long temp = Long.MIN_VALUE; //Top5存儲空間 long[] tops; /** 次方法在run中調用,在全部map之前執行一次 **/ protected void setup(Context context) { //初始化長度為5 tops = new long[5]; } //因為每個文件都得到5個值,再次將這些值比對,得到最大的 protected void reduce(LongWritable k2, Iterable<NullWritable> v2, Context context) throws IOException ,InterruptedException { long top = Long.parseLong(k2.toString()); // tops[0] = top; // Arrays.sort(tops); } /** ---此方法在全部到reduce任務結束后執行一次。輸出前5個最大值--- **/ protected void cleanup(Context context) throws IOException, InterruptedException { //保存前5條數據 for( int i = 0; i < tops.length; i++) { context.write(new LongWritable(tops[i]), NullWritable.get()); } } }
// 設置輸出類型 job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(NullWritable.class);
本例中的單表實際就是一個文本文件。
//map public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { //拆分原始數據 protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException { //按制表符拆分記錄 String[] splits = v1.toString().split("\t"); //一條k2v2記錄:把孫輩作為k2;祖輩加下劃線區分,作為v2 context.write(new Text(splits[0]), new Text("_"+splits[1])); //一條k2v2記錄:把祖輩作為k2;孫輩作為v2。就是把原兩個單詞調換位置保存 context.write(new Text(splits[1]), new Text(splits[0])); } /** 張三 _張三爸爸 張三爸爸 張三 張三爸爸 _張三爺爺 張三爺爺 張三爸爸 **/ } //reduce public static class MyReducer extends Reducer<Text, Text, Text, Text> { //拆分k2v2[...]數據 protected void reduce(Text k2, Iterable<Text> v2, Context context) throws IOException ,InterruptedException { String grandchild = ""; //孫輩 String grandfather = ""; //祖輩 /** 張三爸爸 [_張三爺爺,張三] **/ //從迭代中遍歷v2[...] for (Text man : v2) { String p = man.toString(); //如果單詞是以下劃線開始的 if(p.startsWith("_")){ //從索引1開始截取字符串,保存到祖輩變量 grandfather = p.substring(1); } //如果單詞沒有下劃線起始 else{ //直接賦值給孫輩變量 grandchild = p; } } //在得到有效數據的情況下 if( grandchild!="" && grandfather!=""){ //寫出得到的結果。 context.write(new Text(grandchild), new Text(grandfather)); } /** k3=張三,v3=張三爺爺 **/ } }
// 設置輸出類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
本例中仍簡單采用兩個文本文件。
//map public static class MyMapper extends Mapper<LongWritable, Text, Text, Text> { //拆分原始數據 protected void map(LongWritable k1, Text v1, Context context) throws IOException ,InterruptedException { //拆分記錄 String[] splited = v1.toString().split("\t"); //如果第一列是數字(使用正則判斷),就是地址表 if(splited[0].matches("^[-+]?(([0-9]+)([.]([0-9]+))?|([.]([0-9]+))?)$")){ String addreId = splited[0]; String address = splited[1]; //k2,v2-加兩條下劃線作為前綴標識為地址 context.write(new Text(addreId), new Text("__"+address)); } //否則就是人員表 else{ String personId = splited[1]; String persName = splited[0]; //k2,v2-加兩條橫線作為前綴標識為人員 context.write(new Text(personId), new Text("--"+persName)); } /** 1 __北京 1 --張三 **/ } } //reduce public static class MyReducer extends Reducer<Text, Text, Text, Text> { //拆分k2v2[...]數據 protected void reduce(Text k2, Iterable<Text> v2, Context context) throws IOException ,InterruptedException { String address = ""; //地址 String person = ""; //人員 /** 1, [__北京,--張三] **/ //迭代的是address或者person for (Text text : v2) { String tmp = text.toString(); if(tmp.startsWith("__")){ //如果是__開頭的是address address = tmp.substring(2); //從索引2開始截取字符串 } if(tmp.startsWith("--")){ //如果是--開頭的是person person = tmp.substring(2); } } context.write(new Text(person), new Text(address)); } /** k3=張三,v3=北京 **/
// 設置輸出類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class);
關于“Hadoop中MapReduce常用算法有哪些”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。