您好,登錄后才能下訂單哦!
本篇內容介紹了“TopKey怎么設置分隔符”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
key和value的默認分隔符為tab鍵
設置分隔符
程序一
package org.conan.myhadoop.TopKey; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //單文件最值 public class TopKMapReduce { static class TopKMapper extends Mapper<LongWritable, Text, Text, LongWritable> { // 輸出的key private Text mapOutputKey = new Text(); // 輸出的value private LongWritable mapOutputValue = new LongWritable(); // 存儲最大值和初始值 long topkValue = Long.MIN_VALUE; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split("\t"); // 中間值 long tempValue = Long.valueOf(strs[1]); if (topkValue < tempValue) { topkValue = tempValue; mapOutputKey.set(strs[0]); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { mapOutputValue.set(topkValue); context.write(mapOutputKey, mapOutputValue); } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKMapReduce.class.getSimpleName()); job.setJarByClass(TopKMapReduce.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // job.setReducerClass(ModuleReducer.class); // job.setOutputKeyClass(LongWritable.class); // job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput", "hdfs://hadoop-master:9000/data/topkoutput" }; int status = new TopKMapReduce().run(args); System.exit(status); } }
程序二
package org.conan.myhadoop.TopKey; import java.io.IOException; import java.util.Set; import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //單文件 top n TreeMap實現 public class TopKMapReduceV2 { static class TopKMapper extends Mapper<LongWritable, Text, Text, LongWritable> { public static final int K=3;//前三名 private LongWritable mapKey = new LongWritable(); private Text mapValue = new Text(); TreeMap<LongWritable, Text> topMap = null;//默認按key的升序排列 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split("\t"); long tempValue = Long.valueOf(strs[1]); String tempKey=strs[0]; mapKey.set(tempValue); mapValue.set(tempKey); topMap.put(mapKey, mapValue); if(topMap.size()>K){ topMap.remove(topMap.firstKey()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { Set<LongWritable> keySet= topMap.keySet(); for( LongWritable key:keySet) { context.write(topMap.get(key), key); } } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKMapReduceV2.class.getSimpleName()); job.setJarByClass(TopKMapReduceV2.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // job.setReducerClass(ModuleReducer.class); // job.setOutputKeyClass(LongWritable.class); // job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput", "hdfs://hadoop-master:9000/data/topkoutput2" }; int status = new TopKMapReduceV2().run(args); System.exit(status); } }
程序三
package org.conan.myhadoop.TopKey; import java.io.IOException; import java.util.Comparator; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //單文件 top n TreeSet實現 public class TopKMapReduceV3 { static class TopKMapper extends Mapper<LongWritable, Text, Text, LongWritable> { public static final int K=3;//前三名 TreeSet<TopKWritable> topSet = new TreeSet<TopKWritable>(// new Comparator<TopKWritable>() { @Override public int compare(TopKWritable o1, TopKWritable o2) { return o1.getCount().compareTo(o2.getCount()); } }) ; @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split("\t"); long tempValue = Long.valueOf(strs[1]); topSet.add(new TopKWritable(strs[0], tempValue)); if(topSet.size()>K){ topSet.remove(topSet.first()); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { for( TopKWritable top:topSet) { context.write(new Text(top.getWord()), new LongWritable(top.getCount())); } } @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKMapReduceV3.class.getSimpleName()); job.setJarByClass(TopKMapReduceV3.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // job.setReducerClass(ModuleReducer.class); // job.setOutputKeyClass(LongWritable.class); // job.setOutputValueClass(Text.class); job.setNumReduceTasks(0); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput", "hdfs://hadoop-master:9000/data/topkoutput3" }; int status = new TopKMapReduceV3().run(args); System.exit(status); } }
程序四 自定義數據類型加比較器
package org.conan.myhadoop.TopKey; import java.io.IOException; import java.util.Comparator; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; //多個文件,需要reduce統計top n public class TopKMapReduceV4 { static class TopKMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); String[] strs = lineValue.split("\t"); long tempValue = Long.valueOf(strs[1]); context.write(new Text(strs[0]), new LongWritable(tempValue)); } @Override public void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public static class TopKReducer extends Reducer<Text, LongWritable, Text, LongWritable> { public static final int K = 3;// 前三名 TreeSet<TopKWritable> topSet = new TreeSet<TopKWritable>(// new Comparator<TopKWritable>() { @Override public int compare(TopKWritable o1, TopKWritable o2) { return o1.getCount().compareTo(o2.getCount()); } }); @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long count = 0; for (LongWritable value : values) { count += value.get(); } topSet.add(new TopKWritable(key.toString(), count)); if (topSet.size() > K) { topSet.remove(topSet.first()); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { for (TopKWritable top : topSet) { context.write(new Text(top.getWord()), new LongWritable(top.getCount())); } } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKMapReduceV4.class.getSimpleName()); job.setJarByClass(TopKMapReduceV4.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(TopKReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); job.setNumReduceTasks(1); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/wcoutput", "hdfs://hadoop-master:9000/data/topkoutput4" }; int status = new TopKMapReduceV4().run(args); System.exit(status); } }
package org.conan.myhadoop.TopKey; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; //自定義數據類型 public class TopKWritable implements WritableComparable<TopKWritable> { private String word; private Long count; public TopKWritable(){}; public TopKWritable(String word,Long count) { this.set(word, count); } public void set(String word,Long count) { this.word = word; this.count = count; } public String getWord() { return word; } public Long getCount() { return count; } @Override public void write(DataOutput out) throws IOException { out.writeUTF(word); out.writeLong(count); } @Override public void readFields(DataInput in) throws IOException { this.word=in.readUTF(); this.count=in.readLong(); } @Override public int compareTo(TopKWritable o) { int cmp=this.word.compareTo(o.getWord()); if(0!=cmp){ return cmp; } return this.count.compareTo(o.getCount()); } @Override public String toString() { return word +"\t"+count; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((count == null) ? 0 : count.hashCode()); result = prime * result + ((word == null) ? 0 : word.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; TopKWritable other = (TopKWritable) obj; if (count == null) { if (other.count != null) return false; } else if (!count.equals(other.count)) return false; if (word == null) { if (other.word != null) return false; } else if (!word.equals(other.word)) return false; return true; } }
程序五:經典案例
package org.conan.myhadoop.TopKey; import java.io.IOException; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 數據格式: 語言類別 歌曲名稱 收藏次數 播放次數 歌手名稱 * * 需求: 統計前十首播放最多的歌曲名稱和次數 * * */ public class TopKeyMapReduce { public static final int K = 10; static class TopKeyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String lineValue = value.toString(); if (null == lineValue) { return; } String[] strs = lineValue.split("\t"); if (null!=strs&&strs.length==5){ String languageType=strs[0]; String singName=strs[1]; String playTimes=strs[3]; context.write(// new Text(languageType+"\t"+ singName),// new LongWritable(Long.valueOf(playTimes))); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } } public static class TopKeyReducer extends Reducer<Text, LongWritable, TopKeyWritable, NullWritable> { TreeSet<TopKeyWritable> topSet = new TreeSet<TopKeyWritable>(); @Override public void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { if (null==key){ return; } String[] splited =key.toString().split("\t"); if(null==splited||splited.length==0){ return ; } String languageType=splited[0]; String singName=splited[1]; Long playTimes=0L; for (LongWritable value : values) { playTimes += value.get(); } topSet.add(new TopKeyWritable(languageType, singName, playTimes)); if (topSet.size() > K) { topSet.remove(topSet.last()); } } @Override public void cleanup(Context context) throws IOException, InterruptedException { for (TopKeyWritable top : topSet) { context.write(top,NullWritable.get()); } } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, TopKeyMapReduce.class.getSimpleName()); job.setJarByClass(TopKeyMapReduce.class); Path inputDir = new Path(args[0]); FileInputFormat.addInputPath(job, inputDir); job.setInputFormatClass(TextInputFormat.class); job.setMapperClass(TopKeyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setReducerClass(TopKeyReducer.class); job.setOutputKeyClass(TopKeyWritable.class); job.setOutputValueClass(NullWritable.class); job.setNumReduceTasks(1); Path outputDir = new Path(args[1]); FileOutputFormat.setOutputPath(job, outputDir); Boolean isCompletion = job.waitForCompletion(true); return isCompletion ? 0 : 1; } public static void main(String[] args) throws Exception { args = new String[] { "hdfs://hadoop-master:9000/data/topkey/input", "hdfs://hadoop-master:9000/data/topkey/output" }; int status = new TopKMapReduceV4().run(args); System.exit(status); } }
package org.conan.myhadoop.TopKey; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; public class TopKeyWritable implements WritableComparable<TopKeyWritable> { String languageType; String singName; Long playTimes; public TopKeyWritable() { }; public TopKeyWritable(String languageType, String singName, Long playTimes) { this.set(languageType, singName, playTimes); }; public void set(String languageType, String singName, Long playTimes) { this.languageType = languageType; this.singName = singName; this.playTimes = playTimes; } public String getLanguageType() { return languageType; } public String getSingName() { return singName; } public Long getPlayTimes() { return playTimes; } @Override public void readFields(DataInput in) throws IOException { this.languageType = in.readUTF(); this.singName = in.readUTF(); this.playTimes = in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(languageType); out.writeUTF(singName); out.writeLong(playTimes); } @Override public int compareTo(TopKeyWritable o) { // 加個負號倒排序 return -(this.getPlayTimes().compareTo(o.getPlayTimes())); } @Override public String toString() { return languageType + "\t" + singName + "\t" + playTimes; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((languageType == null) ? 0 : languageType.hashCode()); result = prime * result + ((playTimes == null) ? 0 : playTimes.hashCode()); result = prime * result + ((singName == null) ? 0 : singName.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; TopKeyWritable other = (TopKeyWritable) obj; if (languageType == null) { if (other.languageType != null) return false; } else if (!languageType.equals(other.languageType)) return false; if (playTimes == null) { if (other.playTimes != null) return false; } else if (!playTimes.equals(other.playTimes)) return false; if (singName == null) { if (other.singName != null) return false; } else if (!singName.equals(other.singName)) return false; return true; } }
“TopKey怎么設置分隔符”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。