您好,登錄后才能下訂單哦!
當Mapper沒有數據輸入,mapper.run中的while循環會調用context.nextKeyValue就返回false,于是便返回到runNewMapper中,在這里程序會關閉輸入通道和輸出通道,這里關閉輸出通道并沒有關閉collector,必須要先flush一下。
獲取更多大數據視頻資料請加QQ群:947967114 代碼結構:
Maptask.runNewMapper->NewOutputCollector.close->MapOutputBuffer.flush
我們看flush幫我們做了什么事情,為什么要flush。
public void flush() throws IOException, ClassNotFoundException,
InterruptedException {
LOG.info("Starting flush of map output");
spillLock.lock();
try {
while (spillInProgress) {
reporter.progress();
spillDone.await();
//這里查看spillInProgress狀態,如果有spill就等待完成,并且報告狀態。
}
checkSpillException();
final int kvbend = 4 * kvend;
//kvend是元數據塊的終點,元數據是向下伸展的。
//kvend是以整數計的數組下標,kvbend是以字節計的數組下標
if ((kvbend + METASIZE) % kvbuffer.length !=
equator - (equator % METASIZE)) {
//這個條件說明緩沖區中原來有數據,現在spill已經完成,需要釋放空間。 獲取更多大數據視頻資料請加QQ群:947967114
// spill finished
//spill一次需要調整一些參數,以釋放空間,這個工作通過resetSpill完成
resetSpill();
private void resetSpill() {
final int e = equator;
bufstart = bufend = e;
final int aligned = e - (e % METASIZE);
// set start/end to point to first meta record
// Cast one of the operands to long to avoid integer overflow
kvstart = kvend = (int)
(((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
(kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
}
//這里其實就是在調整各個參數的位置。比如原點位,kvstart等。
}
if (kvindex != kvend) {
//再來判斷緩沖區是否為空,如果不空表示不滿足spill條件(80%),但map處理完成沒有數據輸入。
kvend = (kvindex + NMETA) % kvmeta.capacity();
bufend = bufmark;
LOG.info("Spilling map output");
LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
"; bufvoid = " + bufvoid);
LOG.info("kvstart = " + kvstart + "(" + (kvstart * 4) +
"); kvend = " + kvend + "(" + (kvend * 4) +
"); length = " + (distanceTo(kvend, kvstart,
kvmeta.capacity()) + 1) + "/" + maxRec);
sortAndSpill();
//調用一次sortAndSpill過程。 獲取更多大數據視頻資料請加QQ群:947967114
}
} catch (InterruptedException e) {
throw new IOException("Interrupted while waiting for the writer", e);
} finally {
spillLock.unlock();
}
//至此所有數據都已經溢寫出去,緩沖區已空,所有數據都spill到文件中
assert !spillLock.isHeldByCurrentThread();
// shut down spill thread and wait for it to exit. Since the preceding
// ensures that it is finished with its work (and sortAndSpill did not
// throw), we elect to use an interrupt instead of setting a flag.
// Spilling simultaneously from this thread while the spill thread
// finishes its work might be both a useful way to extend this and also
// sufficient motivation for the latter approach.
try {
spillThread.interrupt();
//讓spill線程不在運行
spillThread.join();
//結束spill線程
} catch (InterruptedException e) {
throw new IOException("Spill failed", e);
}
// release sort buffer before the merge
kvbuffer = null;
mergeParts();
//合并spill文件
Path outputPath = mapOutputFile.getOutputFile();
fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
}
flush的目的,首先讓緩沖區的所有KV對數據都進入spill文件,因為每次spill都會產生一個spill文件,所有spill文件可能不止一個,所以要把spill文件合并到單個文件中,分發給reduce。
所以如果有spill正在進行必須等待其完成,也可能沒有spill但是緩沖區非空,需要再一次sortAndSpill,總之要把緩沖區清空為止。所有數據都spill完成后就可以進行mergeParts了
代碼結構:
Maptask.runNewMapper--->NewOutputCollector.close--->MapOutputBuffer.flush--->MapOutputBuffer.mergeParts
源代碼如下:
private void mergeParts() throws IOException, InterruptedException, ClassNotFoundException {
// get the approximate size of the final output/index files
long finalOutFileSize = 0;
long finalIndexFileSize = 0;
final Path[] filename = new Path[numSpills];
//每次溢寫都會有一個文件,所以數組的大小是numSpills。 獲取更多大數據視頻資料請加QQ群:947967114
final TaskAttemptID mapId = getTaskID();
for(int i = 0; i < numSpills; i++) {
//統計所有這些文件合并之后的大小
filename[i] = mapOutputFile.getSpillFile(i);
//通過spill文件的編號獲取到指定的spill文件路徑
finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();//獲取文件大小
}
if (numSpills == 1) {
//合并輸出有倆文件一個是output/file.out,一個是output/file.out.index
sameVolRename(filename[0],
mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
//換個文件名,在原文件名上加個file.out
if (indexCacheList.size() == 0) {
//索引塊緩存indexCacheList已空
sameVolRename(mapOutputFile.getSpillIndexFile(0), mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));//spillIndexFile改名。
} else {
//索引塊緩存indexCacheList中還有索引記錄,要寫到索引文件
indexCacheList.get(0).writeToFile(
//寫入文件
mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
}
sortPhase.complete();
return;
//如果只有一個spill合并已經完成。 獲取更多大數據視頻資料請加QQ群:947967114
}
// read in paged indices
for (int i = indexCacheList.size(); i < numSpills; ++i) {
//如果spill文件不止一個,需要合并
Path indexFileName = mapOutputFile.getSpillIndexFile(i);
indexCacheList.add(new SpillRecord(indexFileName, job));
//先把所有的SpillIndexFile收集在一起。
}
//make correction in the length to include the sequence file header
//lengths for each partition
finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
//每個partition都有header
finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
//IndexFile,每個partition一個記錄。
Path finalOutputFile =
mapOutputFile.getOutputFileForWrite(finalOutFileSize);
Path finalIndexFile =
mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
//The output stream for the final single output file
FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
//創建合并,最終輸出。
if (numSpills == 0) {
//要是沒有SipillFile生成,也創建一個空文件
//create dummy files
IndexRecord rec = new IndexRecord();
//創建索引記錄
SpillRecord sr = new SpillRecord(partitions);
//創建spill記錄
try {
for (int i = 0; i < partitions; i++) {
long segmentStart = finalOut.getPos();
FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);
Writer<K, V> writer =
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec, null);
writer.close();
//創建后馬上關閉,形成空文件。
rec.startOffset = segmentStart;
rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
sr.putIndex(rec, i);
}
sr.writeToFile(finalIndexFile, job);
//所以記錄寫入索引文件
} finally {
finalOut.close();
}
sortPhase.complete();
return;
}
{
sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
IndexRecord rec = new IndexRecord();
final SpillRecord spillRec = new SpillRecord(partitions);
for (int parts = 0; parts < partitions; parts++) {
//finalOut最終輸出文件。循環分區獲得所有spill文件的該分區數據,合并寫入finalOut
//create the segments to be merged
List<Segment<K,V>> segmentList =
new ArrayList<Segment<K, V>>(numSpills);
//創建Segment,數據段
for(int i = 0; i < numSpills; i++) {
//準備合并所有的Spill文件
IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
Segment<K,V> s =
new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
indexRecord.partLength, codec, true);
segmentList.add(i, s);
//把每個Spill文件中相同partition的區段位置收集起來。 獲取更多大數據視頻資料請加QQ群:947967114
if (LOG.isDebugEnabled()) {
LOG.debug("MapId=" + mapId + " Reducer=" + parts +
"Spill =" + i + "(" + indexRecord.startOffset + "," +
indexRecord.rawLength + ", " + indexRecord.partLength + ")");
}
}
int mergeFactor = job.getInt(JobContext.IO_SORT_FACTOR, 100);
//做merge操作時同時操作的stream數上限
boolean sortSegments = segmentList.size() > mergeFactor;
//對segment進行排序
@SuppressWarnings("unchecked")
RawKeyValueIterator kvIter = Merger.merge(job, rfs,
keyClass, valClass, codec,
segmentList, mergeFactor,
new Path(mapId.toString()),
job.getOutputKeyComparator(), reporter, sortSegments,
null, spilledRecordsCounter, sortPhase.phase(),
TaskType.MAP);
//合并同一partition在所有spill文件中的內容,可能還需要sort,合并后的結構是一個序列。
//write merged output to disk
long segmentStart = finalOut.getPos();
FSDataOutputStream finalPartitionOut = CryptoUtils.wrapIfNecessary(job, finalOut);
Writer<K, V> writer =
new Writer<K, V>(job, finalPartitionOut, keyClass, valClass, codec,
spilledRecordsCounter);
if (combinerRunner == null || numSpills < minSpillsForCombine) { // minSpillsForCombine在MapOutputBuffer構造函數內被初始化,numSpills 為mapTask已經溢寫到磁盤spill文件數量
Merger.writeFile(kvIter, writer, reporter, job);
//將合并后的結果直接寫入文件。下面看一下writeFile的源代碼;
public static <K extends Object, V extends Object>
void writeFile(RawKeyValueIterator records, Writer<K, V> writer,
Progressable progressable, Configuration conf)
throws IOException {
long progressBar = conf.getLong(JobContext.RECORDS_BEFORE_PROGRESS,
10000);
long recordCtr = 0;
while(records.next()) {
writer.append(records.getKey(), records.getValue());
//追加的方式輸出到writer中
if (((recordCtr++) % progressBar) == 0) {
progressable.progress();
}
}
回到主代碼:
} else {
//有combiner
combineCollector.setWriter(writer);
//就插入combiner環節
combinerRunner.combine(kvIter, combineCollector);
//將合并的結果經過combiner后寫入文件
}
//close
writer.close();//關閉writer通道
sortPhase.startNextPhase();
// record offsets
rec.startOffset = segmentStart;
//從當前段的起點開始
rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
spillRec.putIndex(rec, parts);
}
spillRec.writeToFile(finalIndexFile, job);
//把spillFile寫入合并的indexFle
finalOut.close();
//關閉最終輸出流
for(int i = 0; i < numSpills; i++) {
rfs.delete(filename[i],true);
//刪除所有spill文件
}
}
}
該方法會將所有臨時文件合并成一個大文件保存到output/file.out中,同時生成相應的索引文件output/file.out.index。 在進行文件合并的過程中,Map Task以分區為單位進行合并。對于某個分區,它將采用多輪遞歸合并的方式:每輪合并io.sort.factor,默認是100,個文件,并將產生的文 件重新加入待合并列表中,對文件排序后,重復上述過程,直到只有一個文件。只生產一個文件可以避免同時打開大量的文件和同時讀取大量的小文件產生的隨機讀 取帶來的開銷。最后會刪除所有的spill文件。
另外需要注意的是,mergeParts()中也有combiner的操作,但是需要滿足一定的條件:1、用戶設置了combiner;2、spill文件的數量超過了minSpillsForCombine的值,對應配置項"min.num.spills.for.combine",可自行設置,默認是3。這倆必須同時具備才會在此啟動combiner的本地聚集操作。所以在Map階段有可能combiner會執行兩次,所以有可能你的combiner執行兩次之后輸出數據不符合預期了。
這樣Map階段的任務就算完成了。主要是讀取數據然后寫入內存緩沖區,緩存區滿足條件就會快排后并設置partition后,spill到本地文件和索引文件;如果有combiner,spill之前也會做一次聚集操作,待數據跑完會通過歸并合并所有spill文件和索引文件,如果有combiner,合并之前在滿足條件后會做一次綜合的聚集操作。map階段的結果都會存儲在本地中(如果有reducer的話),非HDFS。
Mapper完成對所有輸入文件的處理,并將緩沖區的數據寫出到spill文件之后,spill文件的存在只有三種可能:沒有spill,一個spill,多個spill。針對這三種都需要一個最終的輸出文件,不管內容有沒有,內容多少。這個最終文件是和單個spill文件是一樣的,按照partition分成若干段,然后是排好序的KV數據,這個merge操作結合之前的spill文件進行sort。就構成了一次mergeSort,這個mergeSort只針對同一個Mapper的多個spill文件,以后在Reducer那里還會有Merge針對不同的Mapper文件。
當Maptask完成后,從runNewMapper返回,下一個操作就是done。也就是MapTask的收尾工作。MapTask的收尾涉及到怎么把生成的數據輸出交給ReduceTask。MapTask和ReduceTask都是擴展自Task。但是他們都沒有自己定義done函數,所以他們都調用了Task的done。
程序在這里跳出runNewMapper 獲取更多大數據視頻資料請加QQ群:947967114
if (useNewApi) {
runNewMapper(job, splitMetaInfo, umbilical, reporter);
} else {
runOldMapper(job, splitMetaInfo, umbilical, reporter);
}
done(umbilical, reporter);
這個done我們點進去后發現是Task.done,源碼如下;
public void done(TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, InterruptedException {
LOG.info("Task:" + taskId + " is done."
+ " And is in the process of committing");
updateCounters();
//更新容器
boolean commitRequired = isCommitRequired();
if (commitRequired) {
int retries = MAX_RETRIES;
setState(TaskStatus.State.COMMIT_PENDING);
// say the task tracker that task is commit pending
while (true) {
try {
umbilical.commitPending(taskId, taskStatus);
break;
//如果commitPending沒有發生異常,就退出,否則重試。
} catch (InterruptedException ie) {
// ignore
} catch (IOException ie) {
LOG.warn("Failure sending commit pending: " +
StringUtils.stringifyException(ie));
if (--retries == 0) {
System.exit(67);
}
}
}
//wait for commit approval and commit
commit(umbilical, reporter, committer);
}
taskDone.set(true);
reporter.stopCommunicationThread();
// Make sure we send at least one set of counter increments. It's
// ok to call updateCounters() in this thread after comm thread stopped.
updateCounters();
sendLastUpdate(umbilical);
//signal the tasktracker that we are done
sendDone(umbilical);
實現sendDone的源代碼:
private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {
int retries = MAX_RETRIES;
while (true) {
try {
umbilical.done(getTaskID());
//實際上這里向MRAppMaster上的TaskAttemptImpl發送TA_DONE事件
LOG.info("Task '" + taskId + "' done.");
return;
} catch (IOException ie) {
LOG.warn("Failure signalling completion: " +
StringUtils.stringifyException(ie));
if (--retries == 0) {
throw ie;
}
}
}
}
umbilical.done(getTaskID()); 獲取更多大數據視頻資料請加QQ群:947967114
//實際上這里向MRAppMaster上的TaskAttemptImpl發送TA_DONE事件,在TA_DONE事件的驅動下,相應的TaskAttemptImpl對象的狀態機執行CleanupContainerTransition.transition,然后轉入SUCCESS_CONTAINER_CLEANUP狀態。注意這里有一個TaskAttemptEventType.TA_DONE事件是由具體的MapTask所在節點上發出的,但不是引起的狀態機的跳變是在MRAppMaster節點上。對于Maptask,會有一個umbilical,就代表著MRAppMaster。
MPAppmaster接到CONTAINER_REMOTE_CLEANUP事件,ContainerLauncher通過RPC機制調用Maptask所在節點的ContainerManagerImpl.stopContainers.使這個MapTask的容器進入KILLED_BY_APPMASTER狀態從而不在活躍。操作成功后向相應的TaskAttemptImpl發送TO_CONTAINER_CLEANED事件。如果一次TaskAttempt成功了,就意味著嘗試的任務也成功了,所以TaskAttempt的狀態關系到TaskImpl對象,taskImpl的掃描和善后,包括向上層的JobImpl對象發送TaskState.SUCCESSED事件。向自身TaskImpl發送的SUCCESSED事件會導致TaskImpl.handleTaskAttemptCompletion操作。
Mapper節點上產生一個過程setMapOutputServerAdress函數,把本節點的MapOutputServer地址設置成一個Web地址,意味著MapTask留下的數據輸出(合并后的spill文件)可以通過HTTP連接獲取。至此Mapper的所有過程完成。 獲取更多大數據視頻資料請加QQ群:947967114
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。