91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何利用MapReduce分析明星微博數據

發布時間:2021-12-03 10:33:36 來源:億速云 閱讀:155 作者:小新 欄目:大數據

這篇文章主要介紹了如何利用MapReduce分析明星微博數據,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

1、項目需求

自定義輸入格式,將明星微博數據排序后按粉絲數關注數 微博數分別輸出到不同文件中。

2、數據集

明星 明星微博名稱 粉絲數 關注數 微博數

俞灝明 俞灝明 10591367 206 558

李敏鎬 李敏鎬 22898071 11 268

林心如 林心如 57488649 214 5940

黃曉明 黃曉明 22616497 506 2011

張靚穎 張靚穎 27878708 238 3846

李娜 李娜 23309493 81 631

徐小平 徐小平 11659926 1929 13795

唐嫣 唐嫣 24301532 200 2391

有斐君 有斐君 8779383 577 4251

3、分析

自定義InputFormat讀取明星微博數據,通過自定義getSortedHashtableByValue方法分別對明星的fan、followers、microblogs數據進行排序,然后利用MultipleOutputs輸出不同項到不同的文件中

4、實現

1)、定義WeiBo實體類,實現WritableComparable接口

package com.buaa;  import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;  import org.apache.hadoop.io.WritableComparable;  /**  * @ProjectName MicroblogStar * @PackageName com.buaa * @ClassName WeiBo * @Description TODO * @Author 劉吉超 * @Date 2016-05-07 14:54:29 */ public class WeiBo implements WritableComparable<Object> {     // 粉絲     private int fan;     // 關注     private int followers;     // 微博數     private int microblogs;          public WeiBo(){};          public WeiBo(int fan,int followers,int microblogs){         this.fan = fan;         this.followers = followers;         this.microblogs = microblogs;     }          public void set(int fan,int followers,int microblogs){         this.fan = fan;         this.followers = followers;         this.microblogs = microblogs;     }          // 實現WritableComparable的readFields()方法,以便該數據能被序列化后完成網絡傳輸或文件輸入     @Override     public void readFields(DataInput in) throws IOException {         fan  = in.readInt();         followers = in.readInt();         microblogs = in.readInt();     }          // 實現WritableComparable的write()方法,以便該數據能被序列化后完成網絡傳輸或文件輸出      @Override     public void write(DataOutput out) throws IOException {         out.writeInt(fan);         out.writeInt(followers);         out.writeInt(microblogs);     }          @Override     public int compareTo(Object o) {         // TODO Auto-generated method stub         return 0;     }      public int getFan() {         return fan;     }      public void setFan(int fan) {         this.fan = fan;     }      public int getFollowers() {         return followers;     }      public void setFollowers(int followers) {         this.followers = followers;     }      public int getMicroblogs() {         return microblogs;     }      public void setMicroblogs(int microblogs) {         this.microblogs = microblogs;     } }

2)、自定義WeiboInputFormat,繼承FileInputFormat抽象類

package com.buaa;  import java.io.IOException;  import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader;  /**  * @ProjectName MicroblogStar * @PackageName com.buaa * @ClassName WeiboInputFormat * @Description TODO * @Author 劉吉超 * @Date 2016-05-07 10:23:28 */ public class WeiboInputFormat extends FileInputFormat<Text,WeiBo>{       @Override      public RecordReader<Text, WeiBo> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {           // 自定義WeiboRecordReader類,按行讀取           return new WeiboRecordReader();      }       public class WeiboRecordReader extends RecordReader<Text, WeiBo>{             public LineReader in;              // 聲明key類型             public Text lineKey = new Text();             // 聲明 value類型             public WeiBo lineValue = new WeiBo();                          @Override             public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {                 // 獲取split                 FileSplit split = (FileSplit)input;                 // 獲取配置                  Configuration job = context.getConfiguration();                 // 分片路徑                  Path file = split.getPath();                                  FileSystem fs = file.getFileSystem(job);                  // 打開文件                    FSDataInputStream filein = fs.open(file);                                  in = new LineReader(filein,job);              }              @Override             public boolean nextKeyValue() throws IOException, InterruptedException {                 // 一行數據                 Text line = new Text();                                  int linesize = in.readLine(line);                                  if(linesize == 0)                      return false;                                   // 通過分隔符'\t',將每行的數據解析成數組                 String[] pieces = line.toString().split("\t");                                  if(pieces.length != 5){                       throw new IOException("Invalid record received");                   }                                   int a,b,c;                 try{                       // 粉絲                       a = Integer.parseInt(pieces[2].trim());                     // 關注                     b = Integer.parseInt(pieces[3].trim());                     // 微博數                     c = Integer.parseInt(pieces[4].trim());                 }catch(NumberFormatException nfe){                       throw new IOException("Error parsing floating poing value in record");                   }                                  //自定義key和value值                 lineKey.set(pieces[0]);                   lineValue.set(a, b, c);                                  return true;             }                          @Override             public void close() throws IOException {                 if(in != null){                     in.close();                 }             }              @Override             public Text getCurrentKey() throws IOException, InterruptedException {                 return lineKey;             }              @Override             public WeiBo getCurrentValue() throws IOException, InterruptedException {                 return lineValue;             }              @Override             public float getProgress() throws IOException, InterruptedException {                 return 0;             }                      } }

3)、編寫mr程序

package com.buaa;  import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry;  import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;  /**  * @ProjectName MicroblogStar * @PackageName com.buaa * @ClassName WeiboCount * @Description TODO * @Author 劉吉超 * @Date 2016-05-07 09:07:36 */ public class WeiboCount extends Configured implements Tool {     // tab分隔符     private static String TAB_SEPARATOR = "\t";     // 粉絲     private static String FAN = "fan";     // 關注     private static String FOLLOWERS = "followers";     // 微博數     private static String MICROBLOGS = "microblogs";          public static class WeiBoMapper extends Mapper<Text, WeiBo, Text, Text> {         @Override         protected void map(Text key, WeiBo value, Context context) throws IOException, InterruptedException {             // 粉絲             context.write(new Text(FAN), new Text(key.toString() + TAB_SEPARATOR + value.getFan()));             // 關注             context.write(new Text(FOLLOWERS), new Text(key.toString() + TAB_SEPARATOR + value.getFollowers()));             // 微博數             context.write(new Text(MICROBLOGS), new Text(key.toString() + TAB_SEPARATOR + value.getMicroblogs()));         }     }          public static class WeiBoReducer extends Reducer<Text, Text, Text, IntWritable> {         private MultipleOutputs<Text, IntWritable> mos;          protected void setup(Context context) throws IOException, InterruptedException {             mos = new MultipleOutputs<Text, IntWritable>(context);         }          protected void reduce(Text Key, Iterable<Text> Values,Context context) throws IOException, InterruptedException {             Map<String,Integer> map = new HashMap< String,Integer>();                          for(Text value : Values){                 // value = 名稱 + (粉絲數 或 關注數 或 微博數)                 String[] records = value.toString().split(TAB_SEPARATOR);                 map.put(records[0], Integer.parseInt(records[1].toString()));             }                          // 對Map內的數據進行排序             Map.Entry<String, Integer>[] entries = getSortedHashtableByValue(map);                          for(int i = 0; i < entries.length;i++){                 mos.write(Key.toString(),entries[i].getKey(), entries[i].getValue());             }                        }          protected void cleanup(Context context) throws IOException, InterruptedException {             mos.close();         }     }          @SuppressWarnings("deprecation")     @Override     public int run(String[] args) throws Exception {         // 配置文件對象         Configuration conf = new Configuration();                  // 判斷路徑是否存在,如果存在,則刪除         Path mypath = new Path(args[1]);         FileSystem hdfs = mypath.getFileSystem(conf);         if (hdfs.isDirectory(mypath)) {             hdfs.delete(mypath, true);         }                  // 構造任務         Job job = new Job(conf, "weibo");         // 主類         job.setJarByClass(WeiboCount.class);          // Mapper         job.setMapperClass(WeiBoMapper.class);         // Mapper key輸出類型         job.setMapOutputKeyClass(Text.class);         // Mapper value輸出類型         job.setMapOutputValueClass(Text.class);                  // Reducer         job.setReducerClass(WeiBoReducer.class);         // Reducer key輸出類型         job.setOutputKeyClass(Text.class);         // Reducer value輸出類型         job.setOutputValueClass(IntWritable.class);                  // 輸入路徑         FileInputFormat.addInputPath(job, new Path(args[0]));         // 輸出路徑         FileOutputFormat.setOutputPath(job, new Path(args[1]));                  // 自定義輸入格式         job.setInputFormatClass(WeiboInputFormat.class) ;         //自定義文件輸出類別         MultipleOutputs.addNamedOutput(job, FAN, TextOutputFormat.class, Text.class, IntWritable.class);         MultipleOutputs.addNamedOutput(job, FOLLOWERS, TextOutputFormat.class, Text.class, IntWritable.class);         MultipleOutputs.addNamedOutput(job, MICROBLOGS, TextOutputFormat.class, Text.class, IntWritable.class);                  // 去掉job設置outputFormatClass,改為通過LazyOutputFormat設置           LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);                    //提交任務           return job.waitForCompletion(true)?0:1;     }          // 對Map內的數據進行排序(只適合小數據量)     @SuppressWarnings("unchecked")     public static Entry<String, Integer>[] getSortedHashtableByValue(Map<String, Integer> h) {           Entry<String, Integer>[] entries = (Entry<String, Integer>[]) h.entrySet().toArray(new Entry[0]);           // 排序         Arrays.sort(entries, new Comparator<Entry<String, Integer>>() {             public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) {                 return entry2.getValue().compareTo(entry1.getValue());             }          });         return entries;       }          public static void main(String[] args) throws Exception {         String[] args0 = {                 "hdfs://ljc:9000/buaa/microblog/weibo.txt",                 "hdfs://ljc:9000/buaa/microblog/out/"          };         int ec = ToolRunner.run(new Configuration(), new WeiboCount(), args0);         System.exit(ec);     } }

5、運行結果

如何利用MapReduce分析明星微博數據

感謝你能夠認真閱讀完這篇文章,希望小編分享的“如何利用MapReduce分析明星微博數據”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

津市市| 鄂托克旗| 通渭县| 永寿县| 灵石县| 麟游县| 故城县| 托里县| 吴旗县| 项城市| 厦门市| 晋中市| 康乐县| 绥芬河市| 井陉县| 县级市| 台江县| 新郑市| 荃湾区| 修水县| 玛多县| 宝山区| 阳东县| 绿春县| 苗栗市| 敦化市| 宜良县| 青海省| 云南省| 东阿县| 利津县| 沁阳市| 富裕县| 邯郸县| 新竹市| 思茅市| 台江县| 云和县| 定州市| 苍溪县| 广丰县|