您好,登錄后才能下訂單哦!
? 在map執行之前,需要將數據進行切片,每個切片對應一個map任務。而每個map任務并不是直接處理這些切片數據的,它是處理KV的。所以問題有兩個:數據是如何切片的、切片是如何轉為KV給map處理的。
? 這就涉及到兩個抽象類,InputFormat以及 RecordReader。具體為什么是這兩個抽象類,請看之前input的源碼分析
public abstract class InputFormat<K, V> {
public InputFormat() {
}
public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;
public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}
我們看到,這個抽象類就兩個方法
getSplits:看名字就知道是用來將數據處理成切片的了
createRecordReader:就是用來創建RecordReader對象的。
所以這就是一個InputFormat基本的功能
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
public RecordReader() {
}
//初始化,一般就是讀取切片的數據
public abstract void initialize(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
//檢查是否還有下一對KV,并且如果有,實際上會將其處理成KV,并賦值給this.key和this.value
public abstract boolean nextKeyValue() throws IOException, InterruptedException;
//返回一個key
public abstract KEYIN getCurrentKey() throws IOException, InterruptedException;
//返回一個value
public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException;
//返回是否在處理
public abstract float getProgress() throws IOException, InterruptedException;
//關閉reader
public abstract void close() throws IOException;
}
這個抽象類就涉及到讀取切片的數據,處理成KV結構。而在input源碼分析中說到,mapper.run方法中通過 context.getCurrentKey() 類似的方法獲取key其實就是調用這個RecordReader中的這些get方法而已。
從上面的源碼可以看到。
InputFormat:負責規劃切片信息,以及創建RecordReader對象
RecordReader:負責按照切片規劃去讀取當前mapper處理的切片數據,并將其處理成KV形式,然后通過context傳遞給mapper。
常用的有:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定義InputFormat(自定義有另外的文章講)
? 這是默認的InputFormat,切片方式是按數據塊的方式切割,默認大小block大小。一個文件至少是一個切片(無論多小)。因為這個類繼承FileInputFormat,使用的是其父類定義的getsplit() 方法進行切片。
? 使用的RecordReader是LineRecordReader。處理切片成KV時,每條記錄是一行輸入。鍵K是LongWritable類型,存儲該行在整個文件中的字節偏移量。值是這行的內容,不包括任何行終止符(換行符和回車符)。
? 這個類也是使用父類FileInputFormat的getsplit() 方法進行切片,所以切片方式和上面一致。
? 使用的RecordReader是KeyValueLineRecordReader。每一行均為一條記錄,被分隔符分割為key,value。可以通過在驅動類中設置conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, " ");來設定分隔符。默認分隔符是tab(\t)。
? 這個類雖然繼承了FileInputFormat,但是自己重寫了getSplit方法,使用另外的方式來切片。是按指定的行數來切片,比如5行,那就5行作為一個切片,無論數據大小。通過mapreduce.input.lineinputformat.linespermap 這個參數設置切片行數。
? 使用的RecordReader是LineRecordReader。和上面類似,不重復說。
? 這個類繼承于 CombineFileInputFormat,父類繼承于FileInputFormat。在CombineFileInputFormat中重寫了 getSplits方法。因為FileInputFormat默認無論多小的文件,一個文件至少是一個切片。如果遇到很多小文件,就會導致很多切片。而這里的切片方式就是嚴格按照大小來切片,會將小文件集合在一起,達到指定大小,才作為一個切片。
? 使用的RecordReader是CombineFileRecordReader。處理方式和 LineRecordReader類似,只不過切片可能是來自多個文件,讀取方式上略顯麻煩。
job.setInputFormatClass(xxxInputFormat.class);
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。