您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關MapReduce多種join實現的示例分析,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
一、概述
對于RDBMS中的join操作大伙一定非常熟悉,寫sql的時候要十分注意細節,稍有差池就會耗時巨久造成很大的性能瓶頸,而在Hadoop中使用MapReduce框架進行join的操作時同樣耗時,但是由于hadoop的分布式設計理念的特殊性,因此對于這種join操作同樣也具備了一定的特殊性。本文主要對MapReduce框架對表之間的join操作的幾種實現方式進行詳細分析,并且根據我在實際開發過程中遇到的實際例子來進行進一步的說明。
二、實現原理
1、在Reudce端進行連接。
在Reudce端進行連接是MapReduce框架進行表之間join操作最為常見的模式,其具體的實現原理如下:
Map端的主要工作:為來自不同表(文件)的key/value對打標簽以區別不同來源的記錄。然后用連接字段作為key,其余部分和新加的標志作為value,最后進行輸出。
reduce端的主要工作:在reduce端以連接字段作為key的分組已經完成,我們只需要在每一個分組當中將那些來源于不同文件的記錄(在map階段已經打標志)分開,最后進行笛卡爾只就ok了。原理非常簡單,下面來看一個實例:
(1)自定義一個value返回類型:
package com.mr.reduceSizeJoin;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class CombineValues implements WritableComparable<CombineValues>{
//private static final Logger logger = LoggerFactory.getLogger(CombineValues.class);
private Text joinKey;//鏈接關鍵字
private Text flag;//文件來源標志
private Text secondPart;//除了鏈接鍵外的其他部分
public void setJoinKey(Text joinKey) {
this.joinKey = joinKey;
}
public void setFlag(Text flag) {
this.flag = flag;
}
public void setSecondPart(Text secondPart) {
this.secondPart = secondPart;
}
public Text getFlag() {
return flag;
}
public Text getSecondPart() {
return secondPart;
}
public Text getJoinKey() {
return joinKey;
}
public CombineValues() {
this.joinKey = new Text();
this.flag = new Text();
this.secondPart = new Text();
}
@Override
public void write(DataOutput out) throws IOException {
this.joinKey.write(out);
this.flag.write(out);
this.secondPart.write(out);
}
@Override
public void readFields(DataInput in) throws IOException {
this.joinKey.readFields(in);
this.flag.readFields(in);
this.secondPart.readFields(in);
}
@Override
public int compareTo(CombineValues o) {
return this.joinKey.compareTo(o.getJoinKey());
}
@Override
public String toString() {
// TODO Auto-generated method stub
return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";
}
}
(2) map、reduce主體代碼:
package com.mr.reduceSizeJoin;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author zengzhaozheng
* 用途說明:
* reudce side join中的left outer join
* 左連接,兩個文件分別代表2個表,連接字段table1的id字段和table2的cityID字段
* table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
* tb_dim_city.dat文件內容,分隔符為"|":
* id name orderid city_code is_show
* 0 其他 9999 9999 0
* 1 長春 1 901 1
* 2 吉林 2 902 1
* 3 四平 3 903 1
* 4 松原 4 904 1
* 5 通化 5 905 1
* 6 遼源 6 906 1
* 7 白城 7 907 1
* 8 白山 8 908 1
* 9 延吉 9 909 1
* -------------------------風騷的分割線-------------------------------
* table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
* tb_user_profiles.dat文件內容,分隔符為"|":
* userID network flow cityID
* 1 2G 123 1
* 2 3G 333 2
* 3 3G 555 1
* 4 2G 777 3
* 5 3G 666 4
*
* -------------------------風騷的分割線-------------------------------
* 結果:
* 1 長春 1 901 1 1 2G 123
* 1 長春 1 901 1 3 3G 555
* 2 吉林 2 902 1 2 3G 333
* 3 四平 3 903 1 4 2G 777
* 4 松原 4 904 1 5 3G 666
*/
public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{
private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);
public static class LeftOutJoinMapper extends Mapper<Object, Text, Text, CombineValues> {
private CombineValues combineValues = new CombineValues();
private Text flag = new Text();
private Text joinKey = new Text();
private Text secondPart = new Text();
@Override
protected void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
//獲得文件輸入路徑
String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();
//數據來自tb_dim_city.dat文件,標志即為"0"
if(pathName.endsWith("tb_dim_city.dat")){
String[] valueItems = value.toString().split("\\|");
//過濾格式錯誤的記錄
if(valueItems.length != 5){
return;
}
flag.set("0");
joinKey.set(valueItems[0]);
secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);
combineValues.setFlag(flag);
combineValues.setJoinKey(joinKey);
combineValues.setSecondPart(secondPart);
context.write(combineValues.getJoinKey(), combineValues);
}//數據來自于tb_user_profiles.dat,標志即為"1"
else if(pathName.endsWith("tb_user_profiles.dat")){
String[] valueItems = value.toString().split("\\|");
//過濾格式錯誤的記錄
if(valueItems.length != 4){
return;
}
flag.set("1");
joinKey.set(valueItems[3]);
secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);
combineValues.setFlag(flag);
combineValues.setJoinKey(joinKey);
combineValues.setSecondPart(secondPart);
context.write(combineValues.getJoinKey(), combineValues);
}
}
}
public static class LeftOutJoinReducer extends Reducer<Text, CombineValues, Text, Text> {
//存儲一個分組中的左表信息
private ArrayList<Text> leftTable = new ArrayList<Text>();
//存儲一個分組中的右表信息
private ArrayList<Text> rightTable = new ArrayList<Text>();
private Text secondPar = null;
private Text output = new Text();
/**
* 一個分組調用一次reduce函數
*/
@Override
protected void reduce(Text key, Iterable<CombineValues> value, Context context)
throws IOException, InterruptedException {
leftTable.clear();
rightTable.clear();
/**
* 將分組中的元素按照文件分別進行存放
* 這種方法要注意的問題:
* 如果一個分組內的元素太多的話,可能會導致在reduce階段出現OOM,
* 在處理分布式問題之前最好先了解數據的分布情況,根據不同的分布采取最
* 適當的處理方法,這樣可以有效的防止導致OOM和數據過度傾斜問題。
*/
for(CombineValues cv : value){
secondPar = new Text(cv.getSecondPart().toString());
//左表tb_dim_city
if("0".equals(cv.getFlag().toString().trim())){
leftTable.add(secondPar);
}
//右表tb_user_profiles
else if("1".equals(cv.getFlag().toString().trim())){
rightTable.add(secondPar);
}
}
logger.info("tb_dim_city:"+leftTable.toString());
logger.info("tb_user_profiles:"+rightTable.toString());
for(Text leftPart : leftTable){
for(Text rightPart : rightTable){
output.set(leftPart+ "\t" + rightPart);
context.write(key, output);
}
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf=getConf(); //獲得配置文件對象
Job job=new Job(conf,"LeftOutJoinMR");
job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);
FileInputFormat.addInputPath(job, new Path(args[0])); //設置map輸入文件路徑
FileOutputFormat.setOutputPath(job, new Path(args[1])); //設置reduce輸出文件路徑
job.setMapperClass(LeftOutJoinMapper.class);
job.setReducerClass(LeftOutJoinReducer.class);
job.setInputFormatClass(TextInputFormat.class); //設置文件輸入格式
job.setOutputFormatClass(TextOutputFormat.class);//使用默認的output格格式
//設置map的輸出key和value類型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CombineValues.class);
//設置reduce的輸出key和value類型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.waitForCompletion(true);
return job.isSuccessful()?0:1;
}
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
try {
int returnCode = ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);
System.exit(returnCode);
} catch (Exception e) {
// TODO Auto-generated catch block
logger.error(e.getMessage());
}
}
}
其中具體的分析以及數據的輸出輸入請看代碼中的注釋已經寫得比較清楚了,這里主要分析一下reduce join的一些不足。之所以會存在reduce join這種方式,我們可以很明顯的看出原:因為整體數據被分割了,每個map task只處理一部分數據而不能夠獲取到所有需要的join字段,因此我們需要在講join key作為reduce端的分組將所有join key相同的記錄集中起來進行處理,所以reduce join這種方式就出現了。這種方式的缺點很明顯就是會造成map和reduce端也就是shuffle階段出現大量的數據傳輸,效率很低。
關于MapReduce多種join實現的示例分析就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。