您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關Hadoop中Map-Reduce如何配置、測試和調試,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。
Evironvemnt:
cdh6.1
使用cdh6.1,該文件位于
/etc/hadoop/conf, 其實/etc/hadoop下面有幾種目錄,比如conf.dist,conf.pseudo,conf.impala
文件列表
hadoop-env.sh,可以控制全局環境變量
core-site.xml,最重要的是參數fs.defaultFS
1.value = File:\\\home\, 這個是單機模式(single-node standalone),hadoop daemon運行在一個jvm進程。主要方便調試。
2.value = hdfs://localhost:8020,這個是偽分布式(Pseudo-distributes),就是每個daemon運行在單獨的jvm進程,但還是都在一臺主機上。主要用于學習測試調試等。
3.value = hdfs://host:8020, 集群模式.
hdfs-site.xml,最重要的是參數dfs.replication
除了集群模式是3,一般都設置為1.
dfs.namenode.replication.min = 1,塊復制的底線
mapred-site.xml,最重要的是參數mapred.job.tracker
也就是jobtracker運行在那一臺機器上。
yarn-site.xml,主要用來配置resourcemanager。
hadoop-metrics.properties,如果配置了Ambari,需要配置此文件,以便于發射監控指標給Ambari服務器。
log4j.properties
如果有多個配置文件加載,那么一般情況下,后加載的配置覆蓋相同的早加載的配置文件。為了防止不期望的覆蓋,配置文件中有final的關鍵字,它可以防止后面的覆蓋。
conf和jvm的配置, 我們可以把某些配置寫入jvm properties,如果這樣做,它是最高優先級的,比conf高。
hadoop jar -Ddfs.replication=1
首先說主程序,MyWordCount繼承于Tool 和 Configured, Configured主要用來幫助Tool實現Configurable.
interface Tool extends Configurable
Configured extends Configurable
一般都會調用ToolRunner來運行程序,ToolRunner內部會調用GenericOptionsParser,所以你的程序可以添加參數的能力。
這里和hadoop1的不同在于org.apache.hadoop.mapreduce,我記得1.0,好像是mapred.
/** * write by jinbao */ package com.jinbao.hadoop.mapred.unittest; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * @author cloudera * */ public class MyWordCount extends Configured implements Tool { /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { try { ToolRunner.run(new MyWordCount(), args); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { if(args.length != 2){ System.err.printf("usage: %s, [generic options] <input> <output> \n",getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"word counting"); job.setJarByClass(MyWordCount.class); job.setMapperClass(TokenizerMapper.class); job.setReducerClass(SumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); System.exit(job.waitForCompletion(true)?0:1); return 0; } /** * @author cloudera * */ public static class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException,InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while ( itr.hasMoreTokens()){ word.set(itr.nextToken()); context.write(word, one); } } } public static class SumReducer extends Reducer<Text,IntWritable, Text, IntWritable>{ private static IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val:values){ sum += val.get(); } result.set(sum); context.write(key, result); } } }
MapReduce Web UI
MRv1: http://jobtracker-host:50030
MRv2: http://resourcemgr-host:8088/cluster
application細節,可以到job history里邊去看。
這是一個專門針對map-reduce單元測試的工具包
需要下載依賴
1. junit,這個eclipse已經自帶了,hadoop的lib下面也有。
2. mockito,這個下面的包里有。
3. powermock,下載連接here
4. MRUnit,去apache家找here。
下面上我的程序:
package com.jinbao.hadoop.mapred.unittest; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.hadoop.io.*; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PrepareForTest; import org.apache.hadoop.mrunit.mapreduce.MapDriver; import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver; import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; import org.apache.hadoop.mrunit.types.Pair; public class MyWordCountTest { private MapDriver<Object, Text, Text, IntWritable> mapDriver; private ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver; private MapReduceDriver<Object, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver; @Before public void setUp() { MyWordCount.TokenizerMapper mapper = new MyWordCount.TokenizerMapper(); MyWordCount.SumReducer reducer = new MyWordCount.SumReducer(); mapDriver = MapDriver.newMapDriver(mapper); reduceDriver = ReduceDriver.newReduceDriver(reducer); mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer); } @Test public void testMapper() throws IOException { mapDriver.withInput(new LongWritable(), new Text("test input from unit test")); ArrayList<Pair<Text,IntWritable>> outputRecords = new ArrayList<Pair<Text,IntWritable>>(); outputRecords.add( new Pair<Text,IntWritable>(new Text("test"),new IntWritable(1) ) ); outputRecords.add( new Pair<Text,IntWritable>(new Text("input"),new IntWritable(1) ) ); outputRecords.add( new Pair<Text,IntWritable>(new Text("from"),new IntWritable(1) ) ); outputRecords.add( new Pair<Text,IntWritable>(new Text("unit"),new IntWritable(1) ) ); outputRecords.add( new Pair<Text,IntWritable>(new Text("test"),new IntWritable(1) ) ); mapDriver.withAllOutput(outputRecords); mapDriver.runTest(); } @Test public void testReducer() throws IOException { reduceDriver.withInput(new Text("input"), new ArrayList<IntWritable>(Arrays.asList(new IntWritable(1), new IntWritable(3))) ); reduceDriver.withOutput(new Text("input"), new IntWritable(4)); reduceDriver.runTest(); } @Test public void testMapperReducer() throws IOException { mapReduceDriver.withInput(new LongWritable(), new Text("test input input input input input test") ); ArrayList<Pair<Text,IntWritable>> outputRecords = new ArrayList<Pair<Text,IntWritable>>(); outputRecords.add( new Pair<Text,IntWritable>(new Text("input"),new IntWritable(5) ) ); outputRecords.add( new Pair<Text,IntWritable>(new Text("test"),new IntWritable(2) ) ); mapReduceDriver.withAllOutput(outputRecords); mapReduceDriver.runTest(); } }
上圖直接運行@Test方法就可以解決90%以上的問題,否則你的UnitTest覆蓋率太低,那么后期在cluster出問題,就debug成本比較高了.
Eclipse里邊配置Debug Configuration:
/home/cloudera/workspace/in /home/cloudera/workspace/out
注意:job runner運行的都是本地目錄,使用toolrunner默認是啟動一個standalone的jvm來運行hadoop,另外,只能有0或1個reduce.這個不是問題,只要非常方便的調試就可以了.
YARN里邊默認是mapreduce.framework.name必須設置為local,不過這都是默認的,不需要管它。
導出jar,我都是用eclipse來干,用ant,命令行等都可以,看喜好了。
如果你的jar包有依賴,那么也要把依賴包到處在某個lib里邊,并且minifest里邊配置main class是哪一個.這個package和war打包沒什么區別
%hadoop fs -copyFromLocal /home/cloudera/word.txt data/in
%hadoop jar wordcount.jar data/in data/out
前提:keep.failed.task.files,該選項默認為 false,表示對于失敗的task,其運行的臨時數據和目錄是不會被保存的。這是一個per job的配置,運行job的時候加上這個選項。 如何重跑: 當fail的task環境具備以后,就可以對單獨的task進行重跑了。重跑的方式為: 1. 上到task出錯的tasktracker機器 上 2. 在該tasktracker上找到fail的task運行時的目錄環境 1. 在 tasktracker中,對于每一個task都會有一個單獨的執行環境,其中包括其work目錄,其對應的中間文件,以及其運行時需要用到的配置文件等 2. 這些 目錄是由tasktracker的配置決定,配置選項為: mapred.local.dir. 該選項可能是一個逗號分隔的路徑list,每個 list都是tasktracker對在其上執行的task建立工作目錄的根目錄。比如如果mapred.local.dir=/disk1 /mapred/local,/disk2/mapred/local,那么task的執行環境就是mapred.local.dir /taskTracker/jobcache/job-ID/task-attempt-ID 3. 找到該task的執行工作目錄后,就可以進入到 該目錄下,然后其中就會有該task的運行環境,通常包括一個work目錄,一個job.xml文件,以及一個task要進行操作的數據文件(對map來 說是split.dta,對reduce來說是file.out)。 4. 找到環境以后,就可以重跑task了。 1. cd work 2. hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml ? ? 這樣,IsolationRunner就會讀取job.xml的配置(這里的job.xml相當 于提交客戶端的hadoop-site.xml配置文件與命令行-D配置的接合),然后對該map或者reduce進行重新運行。 1. 到這里為止,已經實現了task單獨重跑,但是還是沒有解決對其進行單步斷點debug。這里利用到的其實是jvm的遠程 debug的功能。方式如下: 1. 在重跑task之前,export一個環境變 量:export HADOOP_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8888" 2. 這 樣,hadoop的指令就會通過8888端口將debug信息發送出去 3. 然后在自己本地的開發環境IDE中(比如 eclipse),launch一個遠程調試,并在代碼中打一個斷點,就可以對在tasktracker上運行的獨立map或者reduce task進行遠程單步調試了。 詳細可以去到這個blog看看。 http://blog.csdn.net/cwyspy/article/details/10004995
Note: 非常不幸,在最近的版本里面,IsolationRunner已經不能使用,所以在hadoop2里邊,需要找到失敗節點后,把問題文件拷貝出來,進行單機調試。
根據Reduce個數,可以會有多個part的結果集,那么可以使用下面命令來合并
% hadoop fs -getmerge max-temp max-temp-local
% sort max-temp-local | tail
Number of mappers
Number of reducers
Combiners
Intermediate compression
Custom serialization
Shuffle tweaks
In other words, as a rule of thumb, think about adding more jobs, rather than adding complexity to jobs.
ChainMapper and ChainReducer
It's a Map*/Reduce model, which means multiple mappers work as a chain, and after last mapper, output will go to reducer. this sounds reduced network IO.
Though called 'ChainReducer', actually only a Reducer working for ChainMapper, so gets the name.
Mapper1->Mapper2->MapperN->Reducer
JobControl
MR has a class JobControl, but as I test it's really not maintained well.
Simply to use:
if(Run(job1)
Run(job2)
Apache Oozie
Oozie是一種Java Web應用程序,它運行在Java servlet容器——即Tomcat——中,并使用數據庫來存儲以下內容:
工作流定義
當前運行的工作流實例,包括實例的狀態和變量
Oozie工作流是放置在控制依賴DAG(有向無環圖 Direct Acyclic Graph)中的一組動作(例如,Hadoop的Map/Reduce作業、Pig作業等),其中指定了動作執行的順序。我們會使用hPDL(一種XML流程定義語言)來描述這個圖。
hPDL是一種很簡潔的語言,只會使用少數流程控制和動作節點。控制節點會定義執行的流程,并包含工作流的起點和終點(start、end和fail節點)以及控制工作流執行路徑的機制(decision、fork和join節點)。動作節點是一些機制,通過它們工作流會觸發執行計算或者處理任務。Oozie為以下類型的動作提供支持: Hadoop map-reduce、Hadoop文件系統、Pig、Java和Oozie的子工作流(SSH動作已經從Oozie schema 0.2之后的版本中移除了)
關于“Hadoop中Map-Reduce如何配置、測試和調試”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。