您好,登錄后才能下訂單哦!
介紹:一個稍復雜點的處理邏輯往往需要多個 MapReduce 程序串聯處理,多 job 的串聯可以借助MapReduce 框架的 JobControl 實現。
需求:
以下有兩個 MapReduce 任務,分別是 Flow 的 SumMR 和 SortMR,其中有依賴關系:SumMR的輸出是 SortMR 的輸入,所以 SortMR 的啟動得在 SumMR 完成之后
這兩個程序在:https://blog.51cto.com/14048416/2342024
如何實現兩個代碼的依賴關系呢?
代碼實現(這里只給出多 Job 串聯的代碼)
public class JobDecy {
public static void main(String[] args) {
Configuration conf = new Configuration(true);
conf.set("fs.defaultFS", "hdfs://zzy:9000");
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
System.setProperty("HADOOP_USER_NAME", "hadoop");
try {
//job1 FlowSum
Job job1 = Job.getInstance(conf);
job1.setJobName("FlowSum");
//設置任務類
job1.setJarByClass(FlowSum.class);
//設置Mapper Reducer Combine
job1.setMapperClass(FlowSum.MyMapper.class);
job1.setReducerClass(FlowSum.MyReducer.class);
job1.setCombinerClass(FlowSum.FlowSumCombine.class);
//設置map 和reduce 的輸入輸出類型
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(Text.class);
job1.setOutputKeyClass(Text.class);
job1.setMapOutputValueClass(Text.class);
// 指定該 mapreduce 程序數據的輸入和輸出路徑
Path input1 = new Path("/data/input");
Path output1 = new Path("/data/output");
//一定要保證output不存在
if (output1.getFileSystem(conf).exists(output1)) {
output1.getFileSystem(conf).delete(output1, true); //遞歸刪除
}
FileInputFormat.addInputPath(job1, input1);
FileOutputFormat.setOutputPath(job1, output1);
//Job2 FlowSumSort
Job job2= Job.getInstance(conf);
job2.setJarByClass(FlowSumSort.class);
job2.setJobName("FlowSumSort");
job2.setMapperClass(Mapper.class);
job2.setReducerClass(Reducer.class);
job2.setOutputKeyClass(FlowBean.class);
job2.setOutputValueClass(NullWritable.class);
// 指定該 mapreduce 程序數據的輸入和輸出路徑
Path input2=new Path("http://data/output");
Path output2 =new Path("/data/output1");
//一定要保證output不存在
if(output2.getFileSystem(conf).exists(output2)){
output2.getFileSystem(conf).delete(output2,true); //遞歸刪除
}
FileInputFormat.addInputPath(job2,input2);
FileOutputFormat.setOutputPath(job2,output2);
//為每個任務創建ControlledJob
ControlledJob job1_cj=new ControlledJob(job1.getConfiguration());
ControlledJob job2_cj=new ControlledJob(job2.getConfiguration());
//綁定
job1_cj.setJob(job1);
job2_cj.setJob(job2);
// 設置作業依賴關系
job2_cj.addDependingJob(job2_cj); //job2 依賴于job1
//創建jobControl
JobControl jc=new JobControl("sum and sort");
jc.addJob(job1_cj);
jc.addJob(job2_cj);
//使用線程開啟Job
Thread jobThread=new Thread(jc);
//開啟任務
jobThread.start();
//為了保證主程序不終止,沒0.5秒檢查一次是否完成作業
while(!jc.allFinished()){
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//罪作業完成之后,終止線程,釋放資源
jc.stop();
} catch (IOException e) {
e.printStackTrace();
}
}
}
需求:求每個班級的總分最高的前三名
字段:班級 姓名 數學 語文 英語 (字段之間是制表符分割)
分析:
- 利用“班級和總分”作為 key,可以將 map 階段讀取到的所有學生成績數據按照班級和成績排倒序,發送到 reduce
- 在 reduce 端利用 GroupingComparator 將班級相同的 kv 聚合成組,然后取前三個即是前三名
代碼實現:
自定義學生類:
public class Student implements WritableComparable<Student> {
private String t_class;
private String t_name;
private int t_sumSource;
public Student(){
}
public void set(String t_class,String t_name,int chinese,int math,int english){
this.t_class=t_class;
this.t_name=t_name;
this.t_sumSource=chinese+math+english;
}
public String getT_class() {
return t_class;
}
public void setT_class(String t_class) {
this.t_class = t_class;
}
public String getT_name() {
return t_name;
}
public void setT_name(String t_name) {
this.t_name = t_name;
}
public int getT_sumSource() {
return t_sumSource;
}
public void setT_sumSource(int t_sumSource) {
this.t_sumSource = t_sumSource;
}
//比較規則
@Override
public int compareTo(Student stu) {
//首先根據班級比較
int result1=this.t_class.compareTo(stu.t_class);
//班級相同的在根據總分比較
if(result1==0){
return stu.t_sumSource-this.t_sumSource;
}
return result1;
}
//序列化
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(this.t_class);
out.writeUTF(this.t_name);
out.writeInt(this.t_sumSource);
}
//反序列化
@Override
public void readFields(DataInput in) throws IOException {
this.t_class=in.readUTF();
this.t_name=in.readUTF();
this.t_sumSource=in.readInt();
}
}
自定義分組:
//自定義分組規則
private static class MyGroupComparator extends WritableComparator{
//這句代碼必須要加,并且要調用父類的構造
public MyGroupComparator(){
super(Student.class, true);
}
/
決定輸入到 reduce 的數據的分組規則
根據班級進行分組
/
@Override
public int compare(WritableComparable a, WritableComparable b) {
Student stu1=(Student)a;
Student stu2=(Student)a;
return stu1.getTclass().compareTo(stu2.getTclass());
}
}*
MR程序:
//Mapper
private static class MyMapper extends Mapper<LongWritable, Text, Student, NullWritable> {
Student bean = new Student();
NullWritable mv = NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split("\\s+");
//班級 姓名 數學 語文 英語
String t_clas=fields[0];
String t_name=fields[1];
int chinese=Integer.parseInt(fields[2]);
int math=Integer.parseInt(fields[3]);
int english=Integer.parseInt(fields[4]);
bean.set(t_clas,t_name,chinese,math,english);
context.write(bean,mv);
}
}
//Reducer
private static class MyReducer extends Reducer<Student, NullWritable, Student, NullWritable> {
@Override
protected void reduce(Student key, Iterable<NullWritable> values, Context context)
throws IOException, InterruptedException {
int count =0;
for(NullWritable value:values){
if(count>2){
break;
}
context.write(key,value);
count++;
}
}
}
job:
public class ClazzScoreGroupComparator {
public static void main(String[] args) {
Configuration conf=new Configuration(true);
conf.set("fs.defaultFS","hdfs://zzy:9000");
conf.set("fs.defaultFS", "hdfs://zzy:9000");
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
System.setProperty("HADOOP_USER_NAME", "hadoop");
try {
Job job= Job.getInstance(conf);
job.setJarByClass(ClazzScoreGroupComparator.class);
job.setJobName("ClazzScoreGroupComparator");
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
//指定自定義分組
job.setGroupingComparatorClass(MyGroupComparator.class);
job.setOutputKeyClass(Student.class);
job.setOutputValueClass(NullWritable.class);
// 指定該 mapreduce 程序數據的輸入和輸出路徑
Path input=new Path("http://data/student.txt");
Path output =new Path("/data/output2");
//一定要保證output不存在
if(output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output,true); //遞歸刪除
}
FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,output);
boolean success=job.waitForCompletion(true);
System.exit(success?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
介紹:計數器是用來記錄 job 的執行進度和狀態的。它的作用可以理解為日志。我們可以在程序的某個位置插入計數器,記錄數據或者進度的變化情況,MapReduce 自帶了許多默認 Counter,現在我們來分析這些默認 Counter 的含義,方便大家觀察 Job 結果,如輸入的字節數、輸出的字節數、Map 端輸入/輸出的字節數和條數、Reduce 端的輸入/輸出的字節數和條數等。
需求:利用全局計數器來統計一個目錄下所有文件出現的單詞總數和總行數
代碼實現:
public class CounterWordCount {
public static void main(String[] args) {
Configuration conf=new Configuration(true);
conf.set("fs.defaultFS","hdfs://zzy:9000");
conf.set("fs.defaultFS", "hdfs://zzy:9000");
conf.addResource("core-site.xml");
conf.addResource("hdfs-site.xml");
System.setProperty("HADOOP_USER_NAME", "hadoop");
try {
Job job= Job.getInstance(conf);
job.setJarByClass(CounterWordCount.class);
job.setJobName("CounterWordCount");
job.setMapperClass(MyMapper.class);
//設置reduceTask為0
job.setNumReduceTasks(0);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 指定該 mapreduce 程序數據的輸入和輸出路徑
Path input=new Path("http://data/");
Path output =new Path("/data/output3");
//一定要保證output不存在
if(output.getFileSystem(conf).exists(output)){
output.getFileSystem(conf).delete(output,true); //遞歸刪除
}
FileInputFormat.addInputPath(job,input);
FileOutputFormat.setOutputPath(job,output);
boolean success=job.waitForCompletion(true);
System.exit(success?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
//定義枚舉 用于存放計數器
enum CouterWordsCounts{COUNT_WORDS, COUNT_LINES}
//Mapper
private static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
Text mk=new Text();
LongWritable mv=new LongWritable();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 統計行數,因為默認讀取文本是逐行讀取,所以 map 執行一次,行數+1
context.getCounter(CouterWordsCounts.COUNT_LINES).increment(1);
String words[]=value.toString().split("\\s+");
for(String word:words){
context.getCounter(CouterWordsCounts.COUNT_WORDS).increment(1);
}
}
//這個方法,在這個類的最后執行
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
mk.set("行數:");
mv.set(context.getCounter(CouterWordsCounts.COUNT_LINES).getValue());
context.write(mk,mv);
mk.set("單詞數:");
mv.set(context.getCounter(CouterWordsCounts.COUNT_WORDS).getValue());
context.write(mk,mv);
}
}
}
介紹:在各種實際業務場景中,按照某個關鍵字對兩份數據進行連接是非常常見的。如果兩份數據都比較小,那么可以直接在內存中完成連接。如果是大數據量的呢?顯然,在內存中進行連接會發生 OOM。MapReduce 可以用來解決大數據量的連接。在MapReduce join分兩種,map join和reduce join
介紹:MapJoin 適用于有一份數據較小的連接情況。做法是直接把該小份數據直接全部加載到內存當中,按鏈接關鍵字建立索引。然后大份數據就作為 MapTask 的輸入,對 map()方法的每次輸入都去內存當中直接去匹配連接。然后把連接結果按 key 輸出.。
數據介紹:
movies.dat:1::Toy Story (1995)::Animation|Children's|Comedy
字段含義:movieid, moviename, movietype
Ratings.dat:1::1193::5::978300760
字段含義:userid, movieid, rate, timestamp
代碼實現:
public class MovieRatingMapJoinMR {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");
try {
Job job = Job.getInstance(conf);
job.setJarByClass(MovieRatingMapJoinMR.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(0);
String minInput = args[0];
String maxInput = args[1];
String output = args[2];
FileInputFormat.setInputPaths(job, new Path(maxInput));
Path outputPath = new Path(output);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);
//將小表加載到內存
URI uri=new Path(minInput).toUri();
job.addCacheFile(uri);
boolean status = job.waitForCompletion(true);
System.exit(status?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
//Mapper
private static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
Text mk = new Text();
Text mv = new Text();
// 用來存儲小份數據的所有解析出來的 key-value
private static Map<String, String> movieMap = new HashMap<String, String>();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//讀取加載到內存的表數據,并將數據的封裝到movieMap容器中
URI[] cacheFiles = context.getCacheFiles();
//獲取文件名
String myfilePath = cacheFiles[0].toString();
BufferedReader br = new BufferedReader(new FileReader(myfilePath));
// 此處的 line 就是從文件當中逐行讀到的 movie
String line = "";
while ((line = br.readLine()) != null) {
//movieid::moviename::movietype
String fields[] = line.split("::");
movieMap.put(fields[0], fields[1] + "\\t" + fields[2]);
}
IOUtils.closeStream(br);
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] fields = value.toString().split("::");
//userid::movieid::rate::timestamp
String userid = fields[0];
String movieid = fields[1];
String rate = fields[2];
String timestamp = fields[3];
if (movieMap.containsKey(userid)) {
String movieFileds = movieMap.get(userid);
mk.set(userid);
mv.set(movieFileds + "\\t" + movieid + "\\t" + rate + "\\t" + timestamp);
context.write(mk, mv);
}
}
}
}
介紹:
- map 階段,兩份數據 data1 和 data2 會被 map 分別讀入,解析成以鏈接字段為 key 以查詢字段為 value 的 key-value 對,并標明數據來源是 data1 還是 data2。
- reduce 階段,reducetask 會接收來自 data1 和 data2 的相同 key 的數據,在 reduce 端進行乘積鏈接,最直接的影響是很消耗內存,導致 OOM
數據介紹:
movies.dat:1::Toy Story (1995)::Animation|Children's|Comedy
字段含義:movieid, moviename, movietype
Ratings.dat:1::1193::5::978300760
字段含義:userid, movieid, rate, timestamp
代碼實現:
public class MovieRatingReduceJoinMR {
public static void main(String[] args) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://zzy:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");
try {
Job job = Job.getInstance(conf);
job.setJarByClass(MovieRatingReduceJoinMR.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
String Input = args[0];
String output = args[1];
FileInputFormat.setInputPaths(job, new Path(Input));
Path outputPath = new Path(output);
FileSystem fs = FileSystem.get(conf);
if (fs.exists(outputPath)) {
fs.delete(outputPath, true);
}
FileOutputFormat.setOutputPath(job, outputPath);
boolean status = job.waitForCompletion(true);
System.exit(status?0:1);
} catch (Exception e) {
e.printStackTrace();
}
}
//Mapper
private static class MyMapper extends Mapper<LongWritable, Text, Text, Text> {
private String name;
Text mk = new Text();
Text mv = new Text();
//獲取文件名
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//InputSplit是一個抽象類,使用它的實現類FileSplit
FileSplit is=(FileSplit)context.getInputSplit();
name=is.getPath().getName();
}
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//movies.dat movieid::moviename::movietype
//ratings.dat userid::movieid::rate::timestamp
String OutputKey=null;
String OutputValue=null;
String fields[]=value.toString().split("::");
if(name.endsWith("movies.dat")){
OutputKey=fields[0];
OutputValue=fields[1]+"\t"+fields[2]+"_"+"movies";
}else if(name.endsWith("ratings.dat")){
OutputKey=fields[1];
OutputValue=fields[0]+"\t"+fields[2]+"\t"+fields[3]+"_"+"ratings";
}
mk.set(OutputKey);
mv.set(OutputValue);
context.write(mk,mv);
}
}
//Reducer
private static class MyReducer extends Reducer< Text, Text, Text, Text>{
Text rv=new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
List<String> movies=new ArrayList<>();
List<String> ratings=new ArrayList<>();
//將數據分別添加到存放兩張表字段的容器中
for(Text value:values){
String fields[]= value.toString().split("_");
if(fields[1].equals("movies")){
movies.add(fields[0]);
}else if(fields[1].equals("ratings")){
ratings.add(fields[0]);
}
}
//連接兩個表的數據
if(ratings.size()>0&&movies.size()>0){
for(String movie:movies){
for(String rate:ratings){
rv.set(movie+"\t"+rate);
context.write(key,rv);
}
}
}
}
}
}
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。