您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“Hadoop輔助排序的示例分析”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“Hadoop輔助排序的示例分析”這篇文章吧。
1. 需求
求每年的最高溫度
2. 樣例數據
1995 10 1996 11 1995 16 1995 22 1996 26 1995 3 1996 7 1996 10 1996 20 1996 33 1995 21 1996 9 1995 31 1995 -13 1995 22 1997 -2 1997 28 1997 15 1995 8
3. 思路、代碼
將記錄按年份分組并按溫度降序排序,然后才將同一年份的所有記錄送到一個 reducer 組,則各組的首條記錄就是這一年的最高溫度。實現此方案的要點是:
a. 定義包括自然鍵(年份)和自然值(溫度)的組合鍵。
b. 根據組合鍵對記錄進行排序。
c. 針對組合鍵進行分區和分組時均只考慮自然鍵。
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /** * 組合鍵,此例中用于輔助排序,包括年份和溫度。 */ public class IntPair implements WritableComparable<IntPair> { private IntWritable first; private IntWritable second; public IntPair() { this.first = new IntWritable(); this.second = new IntWritable(); //若注釋掉上面兩行,使用時會發生異常 java.lang.NullPointerException at IntPair.readFields } public IntPair(int first, int second) { set(new IntWritable(first), new IntWritable(second)); } public IntPair(IntWritable first, IntWritable second) { set(first, second); } public void set(IntWritable first, IntWritable second) { this.first = first; this.second = second; } public IntWritable getFirst() { return first; } public IntWritable getSecond() { return second; } public void write(DataOutput out) throws IOException { first.write(out); second.write(out); } public void readFields(DataInput in) throws IOException { first.readFields(in); second.readFields(in); } @Override public int hashCode() { return first.hashCode() * 163 + second.hashCode(); } @Override public boolean equals(Object obj) { if (obj instanceof IntPair) { IntPair ip = (IntPair) obj; return first.get() == ip.first.get() && second.get() == ip.second.get(); } return false; } @Override public String toString() { return first + "\t" + second; } public int compareTo(IntPair o) { int cmp = first.compareTo(o.first); if (cmp == 0) { cmp = second.compareTo(o.second); } return cmp; } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import java.io.IOException; public class MaxTemperatureUsingSecondarySort extends Configured implements Tool { static class MaxTemperatureMapper extends Mapper<LongWritable, Text, IntPair, NullWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] val = value.toString().split("\\t"); if (val.length == 2) { context.write(new IntPair(Integer.parseInt(val[0]), Integer.parseInt(val[1])), NullWritable.get()); } } } static class MaxTemperatureReducer extends Reducer<IntPair, NullWritable, IntPair, NullWritable> { @Override protected void reduce(IntPair key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException { context.write(key, NullWritable.get()); //僅輸出第一行 } } //僅根據 first 分區 public static class FirstPartitioner extends Partitioner<IntPair, NullWritable> { @Override public int getPartition(IntPair key, NullWritable value, int numPartitions) { return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } } //僅根據 first 分組 public static class GroupComparator extends WritableComparator { private static final IntWritable.Comparator INT_COMPARATOR = new IntWritable.Comparator(); protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { try { int firstL1 = WritableUtils.decodeVIntSize(b1[s1]) + readVInt(b1, s1); int firstL2 = WritableUtils.decodeVIntSize(b2[s2]) + readVInt(b2, s2); return INT_COMPARATOR.compare(b1, s1, firstL1, b2, s2, firstL2); } catch (IOException e) { throw new IllegalArgumentException(e); } } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof IntPair && b instanceof IntPair) { return ((IntPair) a).getFirst().compareTo(((IntPair) b).getFirst()); } return super.compare(a, b); } } //根據組合鍵排序 public static class KeyComparator extends WritableComparator { protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable a, WritableComparable b) { if (a instanceof IntPair && b instanceof IntPair) { IntPair ip1 = (IntPair) a; IntPair ip2 = (IntPair) b; int cmp = ip1.getFirst().compareTo(ip2.getFirst()); //升序(年份) if (cmp != 0) { return cmp; } return -ip1.getSecond().compareTo(ip2.getSecond()); //降序(溫度) } return super.compare(a, b); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Parameter number is wrong, please enter two parameters:<input> <output>"); System.exit(-1); } Path inputPath = new Path(otherArgs[0]); Path outputPath = new Path(otherArgs[1]); //conf.set("fs.defaultFS", "hdfs://vmnode.zhch:9000"); Job job = Job.getInstance(conf, "MaxTemperatureUsingSecondarySort"); //job.setJar("F:/workspace/AssistRanking2/target/AssistRanking2-1.0-SNAPSHOT.jar"); job.setJarByClass(MaxTemperatureUsingSecondarySort.class); job.setMapperClass(MaxTemperatureMapper.class); job.setPartitionerClass(FirstPartitioner.class); job.setSortComparatorClass(KeyComparator.class); //默認根據 Key 的 compareTo 函數排序 job.setGroupingComparatorClass(GroupComparator.class); job.setReducerClass(MaxTemperatureReducer.class); job.setMapOutputKeyClass(IntPair.class); job.setOutputKeyClass(IntPair.class); job.setOutputValueClass(NullWritable.class); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args); System.exit(exitCode); } }
4. 運行截圖
以上是“Hadoop輔助排序的示例分析”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。