91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

MapReduce多種join實現的示例分析

發布時間:2021-12-18 17:25:22 來源:億速云 閱讀:188 作者:柒染 欄目:互聯網科技

這篇文章將為大家詳細講解有關MapReduce多種join實現的示例分析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。

一、概述 

MapReduce多種join實現的示例分析 
 
對于RDBMS中的join操作大伙一定非常熟悉,寫sql的時候要十分注意細節,稍有差池就會耗時巨久造成很大的性能瓶頸,而在Hadoop中使用MapReduce框架進行join的操作時同樣耗時,但是由于hadoop的分布式設計理念的特殊性,因此對于這種join操作同樣也具備了一定的特殊性。本文主要對MapReduce框架對表之間的join操作的幾種實現方式進行詳細分析,并且根據我在實際開發過程中遇到的實際例子來進行進一步的說明。
 
 
二、實現原理

MapReduce多種join實現的示例分析

1、在Reudce端進行連接。
在Reudce端進行連接是MapReduce框架進行表之間join操作最為常見的模式,其具體的實現原理如下:
Map端的主要工作:為來自不同表(文件)的key/value對打標簽以區別不同來源的記錄。然后用連接字段作為key,其余部分和新加的標志作為value,最后進行輸出。
reduce端的主要工作:在reduce端以連接字段作為key的分組已經完成,我們只需要在每一個分組當中將那些來源于不同文件的記錄(在map階段已經打標志)分開,最后進行笛卡爾只就ok了。原理非常簡單,下面來看一個實例:
(1)自定義一個value返回類型:

  1. package com.mr.reduceSizeJoin;   

  2. import java.io.DataInput;   

  3. import java.io.DataOutput;   

  4. import java.io.IOException;   

  5. import org.apache.hadoop.io.Text;   

  6. import org.apache.hadoop.io.WritableComparable;   

  7. public class CombineValues implements WritableComparable<CombineValues>{   

  8.     //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class); 

  9.     private Text joinKey;//鏈接關鍵字 

  10.     private Text flag;//文件來源標志 

  11.     private Text secondPart;//除了鏈接鍵外的其他部分 

  12.     public void setJoinKey(Text joinKey) {   

  13.         this.joinKey = joinKey;   

  14.     }   

  15.     public void setFlag(Text flag) {   

  16.         this.flag = flag;   

  17.     }   

  18.     public void setSecondPart(Text secondPart) {   

  19.         this.secondPart = secondPart;   

  20.     }   

  21.     public Text getFlag() {   

  22.         return flag;   

  23.     }   

  24.     public Text getSecondPart() {   

  25.         return secondPart;   

  26.     }   

  27.     public Text getJoinKey() {   

  28.         return joinKey;   

  29.     }   

  30.     public CombineValues() {   

  31.         this.joinKey =  new Text();   

  32.         this.flag = new Text();   

  33.         this.secondPart = new Text();   

  34.     }

  35.  

  36.     @Override

  37.     public void write(DataOutput out) throws IOException {   

  38.         this.joinKey.write(out);   

  39.         this.flag.write(out);   

  40.         this.secondPart.write(out);   

  41.     }   

  42.     @Override

  43.     public void readFields(DataInput in) throws IOException {   

  44.         this.joinKey.readFields(in);   

  45.         this.flag.readFields(in);   

  46.         this.secondPart.readFields(in);   

  47.     }   

  48.     @Override

  49.     public int compareTo(CombineValues o) {   

  50.         return this.joinKey.compareTo(o.getJoinKey());   

  51.     }   

  52.     @Override

  53.     public String toString() {   

  54.         // TODO Auto-generated method stub 

  55.         return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";   

  56.     }   

 
(2) map、reduce主體代碼:

  1. package com.mr.reduceSizeJoin;   

  2. import java.io.IOException;   

  3. import java.util.ArrayList;   

  4. import org.apache.hadoop.conf.Configuration;   

  5. import org.apache.hadoop.conf.Configured;   

  6. import org.apache.hadoop.fs.Path;   

  7. import org.apache.hadoop.io.Text;   

  8. import org.apache.hadoop.mapreduce.Job;   

  9. import org.apache.hadoop.mapreduce.Mapper;   

  10. import org.apache.hadoop.mapreduce.Reducer;   

  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;   

  12. import org.apache.hadoop.mapreduce.lib.input.FileSplit;   

  13. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;   

  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   

  15. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;   

  16. import org.apache.hadoop.util.Tool;   

  17. import org.apache.hadoop.util.ToolRunner;   

  18. import org.slf4j.Logger;   

  19. import org.slf4j.LoggerFactory;   

  20. /** 

  21.  * @author zengzhaozheng 

  22.  * 用途說明: 

  23.  * reudce side join中的left outer join 

  24.  * 左連接,兩個文件分別代表2個表,連接字段table1的id字段和table2的cityID字段 

  25.  * table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show) 

  26.  * tb_dim_city.dat文件內容,分隔符為"|": 

  27.  * id     name  orderid  city_code  is_show 

  28.  * 0       其他        9999     9999         0 

  29.  * 1       長春        1        901          1 

  30.  * 2       吉林        2        902          1 

  31.  * 3       四平        3        903          1 

  32.  * 4       松原        4        904          1 

  33.  * 5       通化        5        905          1 

  34.  * 6       遼源        6        906          1 

  35.  * 7       白城        7        907          1 

  36.  * 8       白山        8        908          1 

  37.  * 9       延吉        9        909          1 

  38.  * -------------------------風騷的分割線------------------------------- 

  39.  * table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int) 

  40.  * tb_user_profiles.dat文件內容,分隔符為"|": 

  41.  * userID   network     flow    cityID 

  42.  * 1           2G       123      1 

  43.  * 2           3G       333      2 

  44.  * 3           3G       555      1 

  45.  * 4           2G       777      3 

  46.  * 5           3G       666      4 

  47.  * 

  48.  * -------------------------風騷的分割線------------------------------- 

  49.  *  結果: 

  50.  *  1   長春  1   901 1   1   2G  123 

  51.  *  1   長春  1   901 1   3   3G  555 

  52.  *  2   吉林  2   902 1   2   3G  333 

  53.  *  3   四平  3   903 1   4   2G  777 

  54.  *  4   松原  4   904 1   5   3G  666 

  55.  */

  56. public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{   

  57.     private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);   

  58.     public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, CombineValues> {   

  59.         private CombineValues combineValues = new CombineValues();   

  60.         private Text flag = new Text();   

  61.         private Text joinKey = new Text();   

  62.         private Text secondPart = new Text();   

  63.         @Override

  64.         protected void map(Object key, Text value, Context context)   

  65.                 throws IOException, InterruptedException {   

  66.             //獲得文件輸入路徑 

  67.             String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   

  68.             //數據來自tb_dim_city.dat文件,標志即為"0" 

  69.             if(pathName.endsWith("tb_dim_city.dat")){   

  70.                 String[] valueItems = value.toString().split("\\|");   

  71.                 //過濾格式錯誤的記錄 

  72.                 if(valueItems.length != 5){   

  73.                     return;   

  74.                 }   

  75.                 flag.set("0");   

  76.                 joinKey.set(valueItems[0]);   

  77.                 secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   

  78.                 combineValues.setFlag(flag);   

  79.                 combineValues.setJoinKey(joinKey);   

  80.                 combineValues.setSecondPart(secondPart);   

  81.                 context.write(combineValues.getJoinKey(), combineValues);

  82.  

  83.                 }//數據來自于tb_user_profiles.dat,標志即為"1" 

  84.             else if(pathName.endsWith("tb_user_profiles.dat")){   

  85.                 String[] valueItems = value.toString().split("\\|");   

  86.                 //過濾格式錯誤的記錄 

  87.                 if(valueItems.length != 4){   

  88.                     return;   

  89.                 }   

  90.                 flag.set("1");   

  91.                 joinKey.set(valueItems[3]);   

  92.                 secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   

  93.                 combineValues.setFlag(flag);   

  94.                 combineValues.setJoinKey(joinKey);   

  95.                 combineValues.setSecondPart(secondPart);   

  96.                 context.write(combineValues.getJoinKey(), combineValues);   

  97.             }   

  98.         }   

  99.     }   

  100.     public static class LeftOutJoinReducer extends Reducer<Text, CombineValues, Text, Text> {   

  101.         //存儲一個分組中的左表信息 

  102.         private ArrayList<Text> leftTable = new ArrayList<Text>();   

  103.         //存儲一個分組中的右表信息 

  104.         private ArrayList<Text> rightTable = new ArrayList<Text>();   

  105.         private Text secondPar = null;   

  106.         private Text output = new Text();   

  107.         /** 

  108.          * 一個分組調用一次reduce函數 

  109.          */

  110.         @Override

  111.         protected void reduce(Text key, Iterable<CombineValues> value, Context context)   

  112.                 throws IOException, InterruptedException {   

  113.             leftTable.clear();   

  114.             rightTable.clear();   

  115.             /** 

  116.              * 將分組中的元素按照文件分別進行存放 

  117.              * 這種方法要注意的問題: 

  118.              * 如果一個分組內的元素太多的話,可能會導致在reduce階段出現OOM, 

  119.              * 在處理分布式問題之前最好先了解數據的分布情況,根據不同的分布采取最 

  120.              * 適當的處理方法,這樣可以有效的防止導致OOM和數據過度傾斜問題。 

  121.              */

  122.             for(CombineValues cv : value){   

  123.                 secondPar = new Text(cv.getSecondPart().toString());   

  124.                 //左表tb_dim_city 

  125.                 if("0".equals(cv.getFlag().toString().trim())){   

  126.                     leftTable.add(secondPar);   

  127.                 }   

  128.                 //右表tb_user_profiles 

  129.                 else if("1".equals(cv.getFlag().toString().trim())){   

  130.                     rightTable.add(secondPar);   

  131.                 }   

  132.             }   

  133.             logger.info("tb_dim_city:"+leftTable.toString());   

  134.             logger.info("tb_user_profiles:"+rightTable.toString());   

  135.             for(Text leftPart : leftTable){   

  136.                 for(Text rightPart : rightTable){   

  137.                     output.set(leftPart+ "\t" + rightPart);   

  138.                     context.write(key, output);   

  139.                 }   

  140.             }   

  141.         }   

  142.     }   

  143.     @Override

  144.     public int run(String[] args) throws Exception {   

  145.           Configuration conf=getConf(); //獲得配置文件對象 

  146.             Job job=new Job(conf,"LeftOutJoinMR");   

  147.             job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);

  148.             FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑 

  149.             FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置reduce輸出文件路徑

  150.             job.setMapperClass(LeftOutJoinMapper.class);   

  151.             job.setReducerClass(LeftOutJoinReducer.class);

  152.             job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式 

  153.             job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格格式

  154.  

  155.             //設置map的輸出key和value類型 

  156.             job.setMapOutputKeyClass(Text.class);   

  157.             job.setMapOutputValueClass(CombineValues.class);

  158.  

  159.             //設置reduce的輸出key和value類型 

  160.             job.setOutputKeyClass(Text.class);   

  161.             job.setOutputValueClass(Text.class);   

  162.             job.waitForCompletion(true);   

  163.             return job.isSuccessful()?0:1;   

  164.     }   

  165.     public static void main(String[] args) throws IOException,   

  166.             ClassNotFoundException, InterruptedException {   

  167.         try {   

  168.             int returnCode =  ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);   

  169.             System.exit(returnCode);   

  170.         } catch (Exception e) {   

  171.             // TODO Auto-generated catch block 

  172.             logger.error(e.getMessage());   

  173.         }   

  174.     }   

其中具體的分析以及數據的輸出輸入請看代碼中的注釋已經寫得比較清楚了,這里主要分析一下reduce join的一些不足。之所以會存在reduce join這種方式,我們可以很明顯的看出原:因為整體數據被分割了,每個map task只處理一部分數據而不能夠獲取到所有需要的join字段,因此我們需要在講join key作為reduce端的分組將所有join key相同的記錄集中起來進行處理,所以reduce join這種方式就出現了。這種方式的缺點很明顯就是會造成map和reduce端也就是shuffle階段出現大量的數據傳輸,效率很低。

關于MapReduce多種join實現的示例分析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

巴塘县| 渭南市| 赣榆县| 宜昌市| 日照市| 泗阳县| 平乡县| 自治县| 景洪市| 阳江市| 镇原县| 成武县| 贞丰县| 临西县| 德清县| 彭州市| 鹤庆县| 乌鲁木齐市| 拜泉县| 婺源县| 巴东县| 筠连县| 武宣县| 望城县| 卢氏县| 广灵县| 镇巴县| 吴旗县| 蒙山县| 山东省| 黄大仙区| 射洪县| 丽水市| 称多县| 临湘市| 桦甸市| 临高县| 开鲁县| 枣庄市| 渝北区| 江孜县|