您好,登錄后才能下訂單哦!
這篇“Hadoop之Mapreduce序列化怎么實現”文章的知識點大部分人都不太理解,所以小編給大家總結了以下內容,內容詳細,步驟清晰,具有一定的借鑒價值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來看看這篇“Hadoop之Mapreduce序列化怎么實現”文章吧。
序列化就是把內存中的對象,轉換成字節序列(或其他數據傳輸協議)以便于存儲到磁盤(持久化)和網絡傳輸。
反序列化就是將收到字節序列(或其他數據傳輸協議)或者是磁盤的持久化數據,轉換成內存中的對象。
一般來說,“活的”對象只生存在內存里,關機斷 電就沒有了。而且“活的”對象只能由本地的進程使用,不能被發送到網絡上的另外一臺計算機。 然而序列化可以存儲“活的”對象,可以將“活的”對象發送到遠程計算機。
在Java中也是有序列化的,我們為什么不通過idea使用Java的序列化那?
因為Java的序列化框架(Serializable)是一個繁重的框架,附帶信息比較多(各種校驗信息,Header,繼承體系等),不便于在網絡中高效傳輸。所以,Hadoop自己開發了一套序列化機制(Writable)。
Hadoop的序列化比較精簡,只有簡單的校驗,有緊湊(高效使用存儲空間),快速(讀寫數據的額外開銷小),互操作(支持多語言的交互)的特點。
在開發過程中,基本序列化類型不能滿足所有需求,比如在Hadoop框架內部傳遞一個bean對象(不是基本的數據類型(某個類)----沒有對應的Hadoop類型),那么該對象就需要實現序列化接口。
Writable接口(好像也分析不出什么)
兩個方法:
1.write: 進行序列化
2.readFields:進行反序列化
(1) 反序列化時,需要反射調用空參構造函數,所以必須有空參構造
public FlowBean() { super(); }
(2) 重寫接口中的兩個方法***(注意:反序列化的順序和序列化的順序完全一致)
如數據結構中的隊列一樣先進先出,先序列化則先反序列化
(3)需要重寫toString()方法,因為需要打印出來,否則打印出來的是地址
(4) 如果需要將自定義的bean放在key中傳輸,則還需要實現Comparable接口,因為MapReduce框中的Shuffle過程要求對key必須能排序。(比如:上一篇博客中的計算單詞出現次數中 最后呈現的單詞是按照26個英文字母的順序進行排序的)
看一個樣例源碼(字符串Text):
看到上圖 實現接口:
WritableComparable<BinaryComparable>
跟進一下:
看到該接口繼承自Comparable接口(這是Java中的一個API)
統計每一個手機號耗費的總上行流量、總下行流量、總流量
先輸入數據,輸入數據后需要進行mapper階段---》reduce階段---》輸出階段
mapper階段:
先考慮輸入kv (k---偏移量 v是一行數據)
輸出(kv)為reduce的輸入(kv) (本樣例中使用的k是手機號--統計手機號的流量使用 v為上行流量,下行流量,總流量 則需要封裝bean類(自定義對象) 再進行序列化傳輸(為什么要進行序列化那?----因為再計算的過程中可能由于資源問題mapper和reduce不在同一臺服務器上))
輸出(kv)同樣也是(手機號,bean類)
1.FlowBean代碼:
package com.tangxiaocong.mapreduce.writable; import org.apache.hadoop.io.Writable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; /* * * 定義bean類 * 需要實現writable * 重寫序列化和反序列化方法 * 重寫空參構造 * 重寫tostring方法 * * */ public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow; public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; } public void setSumFlow() { this.sumFlow = this.downFlow+this.upFlow; } //生成空參構造函數由于反射 快捷鍵alt + insert public FlowBean() { } @Override public void write(DataOutput out) throws IOException { //序列化方法 // 向緩沖流中寫入Long類型的數據 out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } @Override public void readFields(DataInput in) throws IOException { //反序列化方法 //讀取緩沖區中數據 this.upFlow= in.readLong(); this.downFlow= in.readLong(); this.sumFlow= in.readLong(); } @Override public String toString() { return upFlow + "\t"+downFlow +"\t"+ sumFlow ; } }
2.Mapper代碼:
package com.tangxiaocong.mapreduce.writable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class FlowMapper extends Mapper<LongWritable, Text,Text,FlowBean> { private Text outK=new Text(); private FlowBean outV=new FlowBean(); //調用的無參構造函數 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { //1 獲取一行 //1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200 String s = value.toString();// 將數據轉換成string //2 進行切割 String[] split = s.split("\t"); //將數據按寫入形式進行切割 //3 抓取想要的數據 //根據角標獲取 手機號 上行流量 下行流量 String phone = split[1]; String up = split[split.length - 3];// 不能正序 因為有的屬性是沒有字段的 String down = split[split.length - 2]; // 封裝輸出的kv outK.set(phone); outV.setUpFlow(Long.parseLong(up));// up為string類型 outV.setDownFlow(Long.parseLong(down)); outV.setSumFlow(); // //寫出 context.write(outK,outV); } }
3. reduce代碼:
package com.tangxiaocong.mapreduce.writable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; public class FlowReducer extends Reducer <Text,FlowBean,Text,FlowBean>{ private FlowBean outv=new FlowBean(); @Override protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException { long totalUp=0; long totaldown=0; //分析 傳入TEXT 為手機號 后邊為集合(Bean類的對象的集合)輸出還是一個一個bean類 (每個手機號的總和) for (FlowBean value : values) { //傳入的參數是同一個key的 totalUp+=value.getUpFlow(); totaldown+=value.getDownFlow(); } // 現在求出的是每個手機號的總的上行流量 下行流量 //封裝 key不需要 //outv outv.setUpFlow(totalUp); outv.setDownFlow(totaldown); outv.setSumFlow(); //寫出 context.write(key,outv); } }
4.driver代碼:
package com.tangxiaocong.mapreduce.writable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; public class FlowDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { //獲取JOB Configuration entries = new Configuration(); Job job = Job.getInstance(entries); job.setJarByClass(FlowDriver.class); //關聯mapper 和reduce job.setMapperClass(FlowMapper.class); job.setReducerClass(FlowReducer.class); //設置mapper 輸出的key 和value job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); // 設置最終的數據輸出的key和value 類型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //設置數據的輸入路徑和輸出路徑 FileInputFormat.setInputPaths(job, new Path("D:\\hadoop\\phone_data.txt")); FileOutputFormat.setOutputPath(job, new Path("D:\\hadoop\\output3")); //提交job boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
最后運行
出現了bug 經過兩個小時的調試 找出答案 是在driver類中設置mapper類輸出kv類型出現差錯沒有配型成功
現在是運作正確的
以上就是關于“Hadoop之Mapreduce序列化怎么實現”這篇文章的內容,相信大家都有了一定的了解,希望小編分享的內容對大家有幫助,若想了解更多相關的知識內容,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。