您好,登錄后才能下訂單哦!
這篇文章主要講解了“hadoop中的recordreader和split以及block的關系是怎樣的”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“hadoop中的recordreader和split以及block的關系是怎樣的”吧!
recordreader的作用不言而喻。
通常來講,Inputformat會為沒有一個split產生一個recordreader來提供給maptask使用,進而,MapTask能夠讀取屬于自己管轄處理的那部分split。
這里面,我們以linerecordreader為例子進行講解:
幾個核心的方法
定義了linerecordreader基本的作用,即,是否有下一對kv,獲得下一個key,獲得下一個value。
而這個三個方法的使用地方如下。
暫時忽略...
因為文件在hdfs上分塊存放的,那么split和block什么鬼?為啥不直接按照block去處理就行了唄。原因呢,是block中的數據可能不是連續的。可能某個重要的信息被兩個block分隔了。因此,我們使用邏輯上的概念,即split來處理。
而split并不是真的將文件split了...而是邏輯上的標記下start,length,filepath等即可。
根據Path,可以過得到FileSystem
final Path file = split.getPath();
// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(job);
每個maptask呢,都會使用使用一個linerecordreader,處理對應的split,中間通過了
private FSDataInputStream fileIn;
來維護一個流。
切記:這里的流并不是只針對這個split的,我們之前說過,split只是標記而已,沒有分隔。
因此,這個流fileIn其實是指向整個文件的。
并且呢,這個流呢,會實現jdk中標準的方法,啥read啊之類的。讀取到緩沖區中,但是如果涉及到不同的block呢,這個流會自動幫我們去找對應的block的,這個太復雜。反正記住fileIn屏蔽了頂層的不同block之前的切換,對我們來講就像處理一個大的文件一樣。
既然是流,那么就能夠定位了,因此,不同的maptask就可以根據自己的split中的start位置,通過fileIn流直接定位到要處理文件的那個地方。
fileIn.seek(start); in = new SplitLineReader(fileIn, job, this.recordDelimiterBytes); filePosition = fileIn;
可以看到其中的in對象,是借助fileIn生成,相比,in內部一定借助了這個fileIn流來實現某個功能。
典型的,readLine,
in對象負責一行的讀取邏輯,,而fileIn則負責從文件讀取字符到byte緩沖區。
readline函數,最終會有一個這樣的抵用,可以看到
bufferLength = fillBuffer(in, buffer, prevCharCR);
調用fillbuffer函數,從in.read()中讀取東西到buffer中,
private int readDefaultLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException { /* We're reading data from in, but the head of the stream may be * already buffered in buffer, so we have several cases: * 1. No newline characters are in the buffer, so we need to copy * everything and read another buffer from the stream. * 2. An unambiguously terminated line is in buffer, so we just * copy to str. * 3. Ambiguously terminated line is in buffer, i.e. buffer ends * in CR. In this case we copy everything up to CR to str, but * we also need to see what follows CR: if it's LF, then we * need consume LF as well, so next call to readLine will read * from after that. * We use a flag prevCharCR to signal if previous character was CR * and, if it happens to be at the end of the buffer, delay * consuming it until we have a chance to look at the char that * follows. */ str.clear(); int txtLength = 0; //tracks str.getLength(), as an optimization int newlineLength = 0; //length of terminating newline boolean prevCharCR = false; //true of prev char was CR long bytesConsumed = 0; do { int startPosn = bufferPosn; //starting from where we left off the last time if (bufferPosn >= bufferLength) { startPosn = bufferPosn = 0; if (prevCharCR) { ++bytesConsumed; //account for CR from previous read } bufferLength = fillBuffer(in, buffer, prevCharCR); if (bufferLength <= 0) { break; // EOF } } for (; bufferPosn < bufferLength; ++bufferPosn) { //search for newline if (buffer[bufferPosn] == LF) { newlineLength = (prevCharCR) ? 2 : 1; ++bufferPosn; // at next invocation proceed from following byte break; } if (prevCharCR) { //CR + notLF, we are at notLF newlineLength = 1; break; } prevCharCR = (buffer[bufferPosn] == CR); } int readLength = bufferPosn - startPosn; if (prevCharCR && newlineLength == 0) { --readLength; //CR at the end of the buffer } bytesConsumed += readLength; int appendLength = readLength - newlineLength; if (appendLength > maxLineLength - txtLength) { appendLength = maxLineLength - txtLength; } if (appendLength > 0) { str.append(buffer, startPosn, appendLength); txtLength += appendLength; } } while (newlineLength == 0 && bytesConsumed < maxBytesToConsume); if (bytesConsumed > Integer.MAX_VALUE) { throw new IOException("Too many bytes before newline: " + bytesConsumed); } return (int)bytesConsumed; }
OK,那之后的linerecordreader三個主要的方法就簡答了,讀取就行了。略屌。
但是,有一個問題還沒說。即一行信息如果被某個block分隔了咋辦。
或者這個問題,這樣說,我們知道Inputformat中的getSplit方法呢,就是根據文件的length等屬性直接劃分split的。
參照FileInputformat的getSplits方法
那么一行數據,可能在不同的splits中,也可能在不同的block中。
在不同的block中呢,這個有fileIn對象幫我們處理的了,主要是讀取read到緩沖區,屬于物理上的問題,不是考慮的地方。
處于不同的split呢?這個情況有些問題,因為不同的split就是不同的劃分,并且由不同的map task執行。
那么我們recordreader如何解決這個問題呢?
解決辦法便是,突破split的start和end限制。
linerecordreader的解決辦法:
只不start指向的位置不是文件的第一行,則默認的過濾掉一行(start位置可能是一行中的某一個位置)。
initialize()方法
if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start;
在nextKeyvalue方法中,多讀取一些數據,補充完整的一行。
while (getFilePosition() <= end || in.needAdditionalRecordAfterSplit()) { if (pos == 0) { newSize = skipUtfByteOrderMark(); } else { newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; } if ((newSize == 0) || (newSize < maxLineLength)) { break; } // line too long. try again LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize)); }
OK,通過過濾掉一行,和多讀取一行,就能保證被split分隔的一行,能夠完成的讀取,同時也不會重復處理一些數據。因為,所有的mapTask的linerecordreader都遵循這個方法。
感謝各位的閱讀,以上就是“hadoop中的recordreader和split以及block的關系是怎樣的”的內容了,經過本文的學習后,相信大家對hadoop中的recordreader和split以及block的關系是怎樣的這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。