您好,登錄后才能下訂單哦!
本篇內容介紹了“Shuffle流程是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
1、從WordCountMapper類中的map方法中寫出kv后,進入shuffle流程 --context.write(outK,outV); 進入TaskInputOutputContext中的write()方法 --看下就過 進入WrappedMapper.java中的mapContext.write(key, value);方法 //112行 進入TaskInputOutputContextImpl.java 中output.write(key, value);方法 //89行 最終定位到MapTask的write()方法內, //726行
2、重點步驟,收集器對象將kv收集到緩沖區,并在收集前將kv的分區號計算出來. collector.collect(key, value,partitioner.getPartition(key, value, partitions)); 第一次進入該方法時,因為沒有設置reduce的個數,所以最終返回的永遠是0號分區
3、定位到MapTask類中的collect方法并進入 //1082行 bufferRemaining -= METASIZE; //計算緩沖區剩余大小,該行代碼前面的代碼是對kv類型的一個判斷 如果bufferRemaining < 0 則開始進行溢寫操作,內部是對數據的一些校驗和計算
4、定位到startSpill(); --1126行 //只有當溢寫數據大小滿足80%時,才會觸發該操作 WordCountMapper持續往緩沖區寫數據,當達到溢寫條件80%時,開始溢寫
5、進入到startSpill()方法內部 --MapTask類1590行 spillReady.signal(); //1602行 --線程通信, 通知溢寫線程開始干活 //執行溢寫線程(MapTask內部類SpillThread)的run方法 //run方法中調用MapTask$MapOutputBuffer中的sortAndSpill()方法 直接執行下面的排序和溢寫方法 --sortAndSpill()方法 --MapTask的1605行
6、定位到1615行 final SpillRecord spillRec = new SpillRecord(partitions); //根據分區數創建溢寫記錄對象 --排序按照分區排序,溢寫按照分區溢寫 final Path filename =mapOutputFile.getSpillFileForWrite(numSpills, size);//獲取溢寫文件名稱 ///tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619 _0001/attempt_local1440922619_0001_m_000000_0/output/(spill0.out),這時還沒有溢寫文件,只有目錄 out = rfs.create(filename); //創建執行改步后,在上述的目錄下生成溢寫文件spill0.out文件
7、繼續向下走,定位到MapTask類的1625行 sorter.sort(MapOutputBuffer.this, mstart, mend, reporter); //溢寫前排序 8、定位到1629行,進入for循環 --按照分區進行溢寫 9、分析for循環內代碼,看具體溢寫過程 9.1 先有一個writer對象,通過該對象來完成數據溢寫 writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec, 9.2 判斷是否有設置combinerRunner對象 如果有,則按照設置的combinerRunner業務去處理; 如果沒有,則走默認的溢寫規則 10、執行到1667行,即writer.close();方法,本次溢寫完畢,此時我們再去看溢寫文件spill0.out文件有數據
11、if (totalIndexCacheMemory >= indexCacheMemoryLimit(大小為:1M)) {} //MapTask類的1685行 // 如果索引數據超過指定的內存大小,也需要溢寫到文件中.(該現象一般情況很難發生.)
12、當本次溢寫完畢之后,繼續回到WordCountMapper類中的map方法內的context.write(outk,outv);方法處 --說明:因為我們使用本地debug模式調試,所以看不到并行的效果,只能是串行效果,因此看到的是當內存內讀取滿足 80%時,發生溢寫操作,其實溢寫并未停止,只不過我們看不到,剩余的溢寫數據在20%內存進行
13、如上溢寫過程,在整個mapTask中會出現N次,具體多少看數據量. 如果map中最后的數據寫到緩沖區,但是沒有滿足 80%溢寫條件的情況,最終也需要將緩沖區的數據刷寫到磁盤(最后一次溢寫)。 最后一次會發生在 MapTask中關閉 NewOutputCollector對象的時候. 即在該行代碼處發生 output.close(mapperContext); --MapTask的805行 14、進入output.close(mapperContext);方法內 --MapTask的732行 定位到collector.flush();方法 // 735行 -->將緩沖區的數據刷寫到磁盤-->重新走sortAndSpill()方法(最后一次刷寫)
上述流程,每發生一次溢寫就會生成一個溢寫小文件(溢寫文件內的數據是排好序的) 最終所有的數據都寫到磁盤中后,在磁盤上就是多個溢寫文件, 比如:spill0.out,spill1.out,...spillN.out
15、溢寫全部完成之后,就進入歸并操作 --MapTask的1527行 mergeParts();方法,進入該方法,定位到MapTask的1844行 filename[0]: /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local 1440922619_0001/attempt_local1440922619_0001_m_000000_0/output/spill0.out
16、繼續向下走,定位到MapTask的1880行 Path finalOutputFile = mapOutputFile.getOutputFileForWrite(finalOutFileSize); --歸并后,最終輸出的文件路徑 /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_00 01/attempt_local1440922619_0001_m_000000_0/output/file.out 17、繼續向下走,定位到MapTask的1882行 Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize); --歸并后,最終輸出文件的索引文件 /tmp/hadoop-Administrator/mapred/local/localRunner/Administrator/jobcache/job_local1440922619_00 01/attempt_local1440922619_0001_m_000000_0/output/file.out.index 18、創建file.out 文件 FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096); 19、for (int parts = 0; parts < partitions; parts++) {} //1925行,按照分區進行歸并排序 20、for循環內具體的歸并操作 //1950行 RawKeyValueIterator kvIter = Merger.merge(job, rfs, keyClass, valClass, codec, segmentList, mergeFactor, new Path(mapId.toString()), job.getOutputKeyComparator(), reporter, sortSegments, null, spilledRecordsCounter, sortPhase.phase(), TaskType.MAP);
21、歸并后的數據寫出到文件 Writer<K, V> writer = new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,spilledRecordsCounter); //1961行 //歸并也可以使用combiner,但是前提條件是設置了combiner,并且溢寫次數大于等于3 if (combinerRunner == null || numSpills < minSpillsForCombine(3)) { Merger.writeFile(kvIter, writer, reporter, job); } else { combineCollector.setWriter(writer); combinerRunner.combine(kvIter, combineCollector); } 22、歸并完成 writer.close(); //1972行
23、寫出索引文件 spillRec.writeToFile(finalIndexFile, job); //1986行 24、刪除所有的溢寫文件spill0.out spill1.out ... spill0.out,只保留最終的輸出文件。 for(int i = 0; i < numSpills; i++) { rfs.delete(filename[i],true); }
“Shuffle流程是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。