您好,登錄后才能下訂單哦!
云智慧(北京)科技有限公司陳鑫
寫這個文章的時候才意識到新舊API是同時存在于1.1.2的hadoop中的。以前還一直納悶兒為什么有時候是jobClient提交任務,有時是Job...不管API是否更新,下面這些類也還是存在于API中的,經過自己跟蹤源碼,發現原理還是這些。只不過進行了重新組織,進行了一些封裝,使得擴展性更好。所以還是把這些東西從記事本貼進來吧。
關于這些類的介紹以及使用,有的是在自己debug中看到的,多數為純翻譯API的注釋,但是翻譯的過程受益良多。
GenericOptionsParser
parseGeneralOptions(Optionsopts, Configuration conf, String[] args)解析命令行參數
GenericOptionsParser是為hadoop框架解析命令行參數的工具類。它能夠辨認標準的命令行參數,使app能夠輕松指定namenode,jobtracker,以及額外的配置資源或信息等。它支持的功能有:
-conf 指定配置文件;
-D 指定配置信息;
-fs 指定namenode
-jt 指定jobtracker
-files 指定需要copy到MR集群的文件,以逗號分隔
-libjars指定需要copy到MR集群的classpath的jar包,以逗號分隔
-archives指定需要copy到MR集群的壓縮文件,以逗號分隔,會自動解壓縮
1.String[] otherArgs = new GenericOptionsParser(job, args)
2. .getRemainingArgs();
3.if (otherArgs.length != 2) {
4. System.err.println("Usage: wordcount <in> <out>");
5. System.exit(2);
6.}
ToolRunner
用來跑實現Tool接口的工具。它與GenericOptionsParser合作來解析命令行參數,只在此次運行中更改configuration的參數。
Tool
處理命令行參數的接口。Tool是MR的任何tool/app的標準。這些實現應該代理對標準命令行參數的處理。下面是典型實現:
1.public class MyApp extends Configured implements Tool {
2.
3. public int run(String[] args) throws Exception {
4. // 即將被ToolRunner執行的Configuration
5. Configuration conf = getConf();
6.
7. // 使用conf建立JobConf
8. JobConf job = new JobConf(conf, MyApp.class);
9.
10. // 執行客戶端參數
11. Path in = new Path(args[1]);
12. Path out = new Path(args[2]);
13.
14. // 指定job相關的參數
15. job.setJobName("my-app");
16. job.setInputPath(in);
17. job.setOutputPath(out);
18. job.setMapperClass(MyApp.MyMapper.class);
19. job.setReducerClass(MyApp.MyReducer.class);
20.*
21. // 提交job,然后監視進度直到job完成
22. JobClient.runJob(job);
23. }
24.
25. public static void main(String[] args) throws Exception {
26. // 讓ToolRunner 處理命令行參數
27. int res = ToolRunner.run(new Configuration(), new Sort(), args); //這里封裝了GenericOptionsParser解析args
28.
29. System.exit(res);
30. }
31. }
MultipleOutputFormat
自定義輸出文件名稱或者說名稱格式。在jobconf中setOutputFormat(MultipleOutputFormat的子類)就行了。而不是那種part-r-00000啥的了。。。并且可以分配結果到多個文件中。
MultipleOutputFormat繼承了FileOutputFormat, 允許將輸出數據寫進不同的輸出文件中。有三種應用場景:
a. 最少有一個reducer的mapreduce任務。這個reducer想要根據實際的key將輸出寫進不同的文件中。假設一個key編碼了實際的key和為實際的key指定的位置
b. 只有map的任務。這個任務想要把輸入文件或者輸入內容的部分名稱設為輸出文件名。
c. 只有map的任務。這個任務為輸出命名時,需要依賴keys和輸入文件名。
1.//這里是根據key生成多個文件的地方,可以看到還有value,name等參數
2.@Override
3.protected String generateFileNameForKeyValue(Text key,
4. IntWritable value, String name) {
5. char c = key.toString().toLowerCase().charAt(0);
6. if (c >= 'a' && c <= 'z') {
7. return c + ".txt";
8. }
9. return "result.txt";
10.}
DistributedCache
在集群中快速分發大的只讀文件。DistributedCache是MR用來緩存app需要的諸如text,archive,jar等的文件的。app通過jobconf中的url來指定需要緩存的文件。它會假定指定的這個文件已經在url指定的對應位置上了。在job在node上執行之前,DistributedCache會copy必要的文件到這個slave node。它的功效就是為每個job只copy一次,而且copy到指定位置,能夠自動解壓縮。
DistributedCache可以用來分發簡單的只讀文件,或者一些復雜的例如archive,jar文件等。archive文件會自動解壓縮,而jar文件會被自動放置到任務的classpath中(lib)。分發壓縮archive時,可以指定解壓名稱如:dict.zip#dict。這樣就會解壓到dict中,否則默認是dict.zip中。
文件是有執行權限的。用戶可以選擇在任務的工作目錄下建立指向DistributedCache的軟鏈接。
1.DistributedCache.createSymlink(conf);
2. DistributedCache.addCacheFile(new Path("hdfs://host:port/absolute-path#link-name").toUri(), conf);
DistributedCache.createSymlink(Configuration)方法讓DistributedCache在當前工作目錄下創建到緩存文件的符號鏈接。則在task的當前工作目錄會有link-name的鏈接,相當于快捷方法,鏈接到expr.txt文件,在setup方法使用的情況則要簡單許多。或者通過設置配置文件屬性mapred.create.symlink為yes。分布式緩存會截取URI的片段作為鏈接的名字。例如,URI是hdfs://namenode:port/lib.so.1#lib.so,則在task當前工作目錄會有名為lib.so的鏈接,它會鏈接分布式緩存中的lib.so.1
DistributedCache會跟蹤修改緩存文件的timestamp。
下面是使用的例子, 為應用app設置緩存
1. 將需要的文件copy到FileSystem中:
1. $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat
2. $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip
3. $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
4. $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
5. $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
6. $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
2. 設置app的jobConf:
7. JobConf job = new JobConf();
8. DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),
9. job);
10. DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
11. DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
12. DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
13. DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
14. DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
3. 在mapper或者reducer中使用緩存文件:
15. public static class MapClass extends MapReduceBase
16. implements Mapper<K, V, K, V> {
17.
18. private Path[] localArchives;
19. private Path[] localFiles;
20.
21. public void configure(JobConf job) {
22. // 得到剛剛緩存的文件
23. localArchives = DistributedCache.getLocalCacheArchives(job);
24. localFiles = DistributedCache.getLocalCacheFiles(job);
25. }
26.
27. public void map(K key, V value,
28. OutputCollector<K, V>; output, Reporter reporter)
29. throws IOException {
30. // 使用緩存文件
31. // ...
32. // ...
33. output.collect(k, v);
34. }
35. }
它跟GenericOptionsParser的部分功能有異曲同工之妙。
PathFilter + 通配符。accept(Path path)篩選path是否通過。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。