您好,登錄后才能下訂單哦!
這篇文章主要介紹了Hadoop之TeraSort的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
TeraSort源碼包含很多個java文件,其中可以分為三個部分:TeraGen, TeraSort和TeraValidate。
TeraGen負責生成排序所需的隨機數據,TeraValidate用來驗證排序結果。
而與TeraSort排序相關的java文件有TeraSort.java, TeraInputFormat.java, TeraOutputFormat.java, TeraScheduler.java。
編譯完源代碼之后,設置運行參數,用TeraGen生成500M的數據,然后運行TeraSort(代碼中添加Job.setNumReduceTasks(10),即設定Reduce Task的數量為10, 否則默認為1),會發現在進入Map階段之前有下列幾行輸出,我就從這幾句輸出入手來學習這幾個java文件。
Spent 251ms computing base-splits. (1)
Spent 27ms computing TeraScheduler splits. (2)
Computing input splits took 280ms (3)
Sampling 10 splits of 15 (4)
Making 10 from 100000 sampled records (5)
Computing parititions took 1216ms (6)
Spent 1534ms computing partitions. (7)
接下來就按照這幾句輸出的順序看起,
先看TeraSort的main方法:
int res = ToolRunner.run(new Configuration(), new TeraSort(), args); System.exit(res);
Ctrl點進run這個方法里面看到:
public static int run(Configuration conf, Tool tool, String[] args) throws Exception{ if(conf == null) { conf = new Configuration(); } GenericOptionsParser parser = new GenericOptionsParser(conf, args); //set the configuration back, so that Tool can configure itself tool.setConf(conf); //get the args w/o generic hadoop args String[] toolArgs = parser.getRemainingArgs(); return tool.run(toolArgs); }
即上面new TeraSort()對應下面的參數Tool tool,而下面方法return tool.run,也就是運行TeraSort得到其返回值,TeraSort運行完成返回1, 否則返回0,所以在完成時System.exit(res)退出。
然后開始看TeraSort.java的run方法,一路各種set之后有一句關鍵的
TeraInputFormat.writePartitionFile(job, partitionFile);
即調用了TeraInputFormat里的writePartitionFile方法,于是轉向該方法,看到
final TeraInputFormat inFormat = new TeraInputFormat(); final TextSampler sampler = new TextSampler(); int partitions = job.getNumReduceTasks(); long sampleSize = conf.getLong(SAMPLE_SIZE, 100000); final List<InputSplit> splits = inFormat.getSplits(job);
最后一句即獲取輸入文件的Splits,我們知道Map Task就是對輸入文件的Split進行處理,那我們應該怎么getSplits呢?
進入該方法,我們看到:
t1 = System.currentTimeMillis(); lastContext = job; lastResult = super.getSplits(job); t2 = System.currentTimeMillis(); System.out.println("Spent " + (t2 - t1) + "ms computing base-splits."); if (job.getConfiguration().getBoolean(TeraScheduler.USE, true)) { TeraScheduler scheduler = new TeraScheduler( lastResult.toArray(new FileSplit[0]), job.getConfiguration()); lastResult = scheduler.getNewFileSplits(); t3 = System.currentTimeMillis(); System.out.println("Spent " + (t3 - t2) + "ms computing TeraScheduler splits."); }
我們看到原來TeraInputFormat并沒有重寫切片的方法,而是繼承了父類FileInputFormat里的getSplits方法獲取切片,即super.getSplits(job); 接下來我們就看到了開頭所提到的七句輸出里面的前兩句,證實了代碼的執行順序和我們學習的順序是一樣的,那么下面的這個TeraScheduler又是個什么東西呢?閱讀它的代碼看得好像不是很明白它是做什么的,那我們來看它的功能的介紹:
Solve the schedule and modify the FileSplit array to reflect the new schedule. It will move placed splits to front and unplacable splits to the end.
原來是對文件的輸入切片進行處理從而優化調度的呀!那它相比默認的(或沒有)調度方法有什么好處呢?通過Stack Overflaw上網友的解答可以知道,這種調度方法可以:
1、make sort local as much as possible;
2、distribute the work evenly across machine.
知道了這個我們回到TeraInputFormat.java里面的writePartitionFIle方法里繼續往下看:
final List<InputSplit> splits = inFormat.getSplits(job); long t2 = System.currentTimeMillis(); System.out.println("Computing input splits took " + (t2 - t1) + "ms"); int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.size()); System.out.println("Sampling " + samples + " splits of " + splits.size()); final long recordsPerSample = sampleSize / samples; final int sampleStep = splits.size() / samples;
可以看到在getSplits工作結束以后,輸出了我們開頭提到的第三和第四句話,第三句話即表明我們獲取這些輸入文件的切片花了多長時間,而第四句輸出表明我們從這些分片(splits.size)中采樣了多少個樣本(samples),那splits.size等于多少呢?我們點開父類的getSplits方法可以看到這幾行代碼:
long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); blkLocations = fs.getFileBlockLocations(file, 0, length); long splitSize = computeSplitSize(blockSize, minSize, maxSize);
也就是說它是由這幾個參數共同決定的,這幾個參數可以在hadoop的配置文件里進行設置,所以分片的數量也會有所不同,我們這里是500M數據有15個分片,而它采樣了其中的10個作為樣本,為什么是10個呢?答案在這里:
int samples = Math.min(conf.getInt(NUM_PARTITIONS, 10), splits.size());
我們的Reduce Task數量設置的是10, 而Partition數量與Reduce Task數量是相同的,所以samples取10和15的最小值,即10.
這些工作完成之后,就開始通過SamplerThreadGroup進行第一次采樣操作,為什么說是第一次呢?因為還有第二次,哈哈哈哈哈,我們往下看到第一次采樣之后:
for(Text split : sampler.createPartitions(partitions)) { split.write(writer); }
這個循環就完成了二次采樣,即從一次采樣得到的100000(默認值)個樣本中二次采集10個(等于Partition數量)樣本,我們看到它這里調用了createParitions方法,我們跟過去瞧一眼:
Text[] createPartitions(int numPartitions) { int numRecords = records.size(); System.out.println("Making " + numPartitions + " from " + numRecords + " sampled records"); if (numPartitions > numRecords) { throw new IllegalArgumentException ("Requested more partitions than input keys (" + numPartitions + " > " + numRecords + ")"); } new QuickSort().sort(this, 0, records.size()); float stepSize = numRecords / (float) numPartitions; Text[] result = new Text[numPartitions-1]; for(int i=1; i < numPartitions; ++i) { result[i-1] = records.get(Math.round(stepSize * i)); } return result; } }
果不其然,按照順序我們看到了開頭的第五句輸出,這里的records.size就等于100000,,即第一次采樣得到的樣本大小,我們可以看到這個方法最后返回的是含有9條抽樣數據的數組,為什么是9條呢?因為我們設定了10個Partition所需要的分割點數目就等于Partition數目減一,OK,這就完成了二次采樣,我們返回到剛才的writePartitionFile方法接著看,
writer.close(); long t3 = System.currentTimeMillis(); System.out.println("Computing parititions took " + (t3 - t2) + "ms");
第六句輸出出現了,即兩次采樣過程所花費的時間,到這里我們writePartitionFIle的部分就結束啦,回歸到TeraSort.java部分繼續看!
long end = System.currentTimeMillis(); System.out.println("Spent " + (end - start) + "ms computing partitions."); job.setPartitionerClass(TotalOrderPartitioner.class);
最后一句輸出get!也就是完成writePartitionFile所花費的總時間。
這些分片啊、調度啊、采樣啊等一系列準備工作結束以后就進入我們的mapreduce階段啦,我們看到它設置TotalOrderPartitioner.class作為它的Partitioner方法,而細心的我們可以看到其實TeraSort中定義了兩個Partitioner,其中一個是我們看到的TotalOrderPartitioner,另一個是沒什么存在感的SimplePartitioner,這個SimplePartitioner有什么用處呢?其實并沒有什么用處,因為它僅僅是對key的前綴進行簡單的處理,并不能實現負載的相對均衡,所以一般情況下useSimplePartitioer = false,即使用TotalOrderPartitioner來作為Partition方法,那么這個方法都做了些什么呢?
閱讀代碼我們知道:TeraInputFormat將采樣得到的9個cut point存入了_partition.lst這個文件當中,而TotalOrderPartitioner就先從這個lst中讀出這些cut point,然后根據這些cut point構造一棵三層的Trie,即字典樹,例子如下:
當key中的前綴字母小于等于某一個節點時,它就被標記為該節點所對應的Partition,從而保證了Partition間的全局有序(Total Order),那它究竟有沒有傳說中的效果呢?我們在buildTrie-----getPartition方法中加入
BufferedWriter bf; try { bf = new BufferedWriter(new FileWriter(file,true)); bf.append(String.valueOf(trie.findPartition(key))); bf.newLine(); bf.close(); } catch (IOException e) { e.printStackTrace(); }
這樣就可以把500萬條記錄所分配的Partition的序號輸出到我們指定的文件中去,之后用matlab中的函數進行個數統計,因為我們設定了10個Reduce Task,所以它們的序號就是0--9,下面的數據就是0--9分別Partition記錄的個數及比例:
個數:
501736
510323
501848
502895
499158
495228
494499
496448
502608
495257
比例:
10.0347
10.2065
10.0370
10.0579
9.9832
9.9046
9.8900
9.9290
10.0522
9.9051
可以明顯的看到,相當的均勻!
感謝你能夠認真閱讀完這篇文章,希望小編分享的“Hadoop之TeraSort的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。