您好,登錄后才能下訂單哦!
魯春利的工作筆記,誰說程序員不能有文藝范?
一個最簡單的MapReduce程序
package com.lucl.hadoop.mapreduce; public class MiniMRDriver extends Configured implements Tool { public static void main(String[] args) { try { ToolRunner.run(new MiniMRDriver(), args); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { Job job = Job.getInstance(this.getConf(), this.getClass().getSimpleName()); job.setJarByClass(MiniMRDriver.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); return job.waitForCompletion(true) ? 0 : 1; } }
查看MapReduce任務的數據
[hadoop@nnode code]$ hdfs dfs -text /data/HTTP_SITE_FLOW.log 視頻網站 15 1527 信息安全 20 3156 站點統計 24 6960 搜索引擎 28 3659 站點統計 3 1938 綜合門戶 15 1938 搜索引擎 21 9531 搜索引擎 63 11058 [hadoop@nnode code]$
打包運行該MapReduce程序
[hadoop@nnode code]$ hadoop jar MiniMR.jar /data/HTTP_SITE_FLOW.log /201511302119 15/11/30 21:19:46 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032 15/11/30 21:19:48 INFO input.FileInputFormat: Total input paths to process : 1 15/11/30 21:19:48 INFO mapreduce.JobSubmitter: number of splits:1 15/11/30 21:19:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448889273221_0001 15/11/30 21:19:50 INFO impl.YarnClientImpl: Submitted application application_1448889273221_0001 15/11/30 21:19:50 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448889273221_0001/ 15/11/30 21:19:50 INFO mapreduce.Job: Running job: job_1448889273221_0001 15/11/30 21:20:26 INFO mapreduce.Job: Job job_1448889273221_0001 running in uber mode : false 15/11/30 21:20:26 INFO mapreduce.Job: map 0% reduce 0% 15/11/30 21:20:59 INFO mapreduce.Job: map 100% reduce 0% 15/11/30 21:21:30 INFO mapreduce.Job: map 100% reduce 100% 15/11/30 21:21:31 INFO mapreduce.Job: Job job_1448889273221_0001 completed successfully 15/11/30 21:21:31 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=254 FILE: Number of bytes written=213863 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=277 HDFS: Number of bytes written=194 HDFS: Number of read operations=6 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=1 Launched reduce tasks=1 Data-local map tasks=1 Total time spent by all maps in occupied slots (ms)=30256 Total time spent by all reduces in occupied slots (ms)=27787 Total time spent by all map tasks (ms)=30256 Total time spent by all reduce tasks (ms)=27787 Total vcore-seconds taken by all map tasks=30256 Total vcore-seconds taken by all reduce tasks=27787 Total megabyte-seconds taken by all map tasks=30982144 Total megabyte-seconds taken by all reduce tasks=28453888 Map-Reduce Framework Map input records=8 Map output records=8 Map output bytes=232 Map output materialized bytes=254 Input split bytes=103 Combine input records=0 Combine output records=0 Reduce input groups=8 Reduce shuffle bytes=254 Reduce input records=8 Reduce output records=8 Spilled Records=16 Shuffled Maps =1 Failed Shuffles=0 Merged Map outputs=1 GC time elapsed (ms)=182 CPU time spent (ms)=2000 Physical memory (bytes) snapshot=305459200 Virtual memory (bytes) snapshot=1697824768 Total committed heap usage (bytes)=136450048 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=174 File Output Format Counters Bytes Written=194 [hadoop@nnode code]$
查看輸出結果
[hadoop@nnode code]$ hdfs dfs -ls /201511302119 Found 2 items -rw-r--r-- 2 hadoop hadoop 0 2015-11-30 21:21 /201511302119/_SUCCESS -rw-r--r-- 2 hadoop hadoop 194 2015-11-30 21:21 /201511302119/part-r-00000 [hadoop@nnode code]$ hdfs dfs -text /201511302119/part-r-00000 0 視頻網站 15 1527 22 信息安全 20 3156 44 站點統計 24 6960 66 搜索引擎 28 3659 88 站點統計 3 1938 109 綜合門戶 15 1938 131 搜索引擎 21 9531 153 搜索引擎 63 11058 [hadoop@nnode code]$
在這里沒有指定Mapper類、Reducer類,并通過FileInputFormat和FileOutputFormat指定了輸入數據及輸出結果存儲路徑,執行后把行偏移量和行內容保存到了指定的輸出路徑下。
FileInputFormat的默認實現為TextInputFormat,專門用來處理文本數據,以回車換行符作為一行的分割標記,其中key為該行的行偏移量,value為這一行內容。
類定義如下:
public class TextInputFormat extends FileInputFormat<LongWritable, Text> { @Override public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) { // 略 return new LineRecordReader(recordDelimiterBytes); } @Override protected boolean isSplitable(JobContext context, Path file) { // 是否可切片 } }
在Job任務中可以通過public void setInputFormatClass(Class<? extends InputFormat> cls)方法設定希望使用的InputFormat格式。
public abstract class InputFormat<K, V> { public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException; public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context ) throws IOException, InterruptedException; }
文件在HDFS上是以Block塊的形式存儲的,而在MapReduce計算中則是以劃分的切片(split后稱為split分片或chunk)進行讀取的,每個split的就對應一個mapper task,split的數量決定了mappertask的數量。
注意:MapReduce是由Mapper和Reducer組成的,MapperTask由split決定,那么Reducer由什么來決定呢?后面會逐漸通過示例代碼進行說明
List<InputSplit> getSplits(JobContext context)負責將一個大數據邏輯分成多片。比如數據庫表有100條數據,按照主鍵ID升序存儲,假設每20條分成一片,這個List的大小就是5,然后每個InputSplit記錄兩個參數,第一個為這個分片的起始ID,第二個為這個分片數據的大小(這里是20)。InputSplit并沒有真正存儲數據,只是提供了一個如何將數據分片的方法。
RecordReader<K, V) createRecordReader(InputSplit split, TaskAttemptContext context)根據InputSplit定義的分片方法,返回一個能夠讀取分片記錄的RecordReader。
InputSplit類定義
public abstract class InputSplit { // Split分片的大小,用來實現輸入的split的排序 public abstract long getLength() throws IOException, InterruptedException; // 用來獲取存儲分片的位置列表 public abstract String[] getLocations() throws IOException, InterruptedException; }
RecordReader類定義
public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable { public abstract void initialize(InputSplit split,TaskAttemptContext context ) throws IOException, InterruptedException; public abstract boolean nextKeyValue() throws IOException, InterruptedException; public abstract KEYIN getCurrentKey() throws IOException, InterruptedException; public abstract VALUEIN getCurrentValue() throws IOException, InterruptedException; public abstract float getProgress() throws IOException, InterruptedException; public abstract void close() throws IOException; }
InputSplit描述了數據塊的切分方式,RecordReader類則是實際用來加載split分片數據,并把數據轉換為適合Mapper類里面map()方法處理的<key, value>形式。
RecordReader實例是由輸入格式定義的,默認的輸入格式為TextInputFormat,提供了一個LineRecordReader,把每一行的行偏移量作為key,把內容作為value。RecordReader會在輸入塊上被反復調用,直到整個輸入塊被處理完畢,每一次調用RecordReader都會調用Mapper類的map()函數。
TextInputFormat并沒有getSplits的實現,而是其父類FileInputFormat進行了實現。
public abstract class FileInputFormat<K, V> extends InputFormat<K, V> { // Generate the list of files and make them into FileSplits public List<InputSplit> getSplits(JobContext job) throws IOException { // 1. 通過JobContext中獲取List<FileStatus>; // 2. 遍歷文件屬性數據 // 2.1. 如果是空文件,則初始化一個無主機信息的FileSplits實例; // 2.2. 非空文件,判斷是否分片,默認是分片的 // 如果不分片則每個文件作為一個FileSplit // 計算分片大小splitSize // getFormatMinSplitSize()返回固定值1 // getMinSplitSize(job)通過Configuration獲取,配置參數為(mapred-default.xml): // mapreduce.input.fileinputformat.split.minsize默認值為0 // minSize的值為1 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); // 實際調用context.getConfiguration().getLong(SPLIT_MAXSIZE, Long.MAX_VALUE); // 通過Configuration獲取,配置參數為(mapred-default.xml無該參數): // mapreduce.input.fileinputformat.split.maxsize // 未配置該參數,取Long.MAX_VALUE,maxSize的值為Long.MAX_VALUE long maxSize = getMaxSplitSize(job); // generate splits List<InputSplit> splits = new ArrayList<InputSplit>(); List<FileStatus> files = listStatus(job); for (FileStatus file: files) { Path path = file.getPath(); // 在HDFS上的絕對路徑 long length = file.getLen(); // 文件的實際大小 if (length != 0) { BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { FileSystem fs = path.getFileSystem(job.getConfiguration()); blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(job, path)) { // 這里取的是Block塊的大小,在2.6里面默認是134217728(即128M) long blockSize = file.getBlockSize(); // 獲取切片大小,computeSplitSize(blockSize, minSize, maxSize)實際調用: // 1 Long.MAX_VALUE 128M // Math.max(minSize, Math.min(maxSize, blockSize)); // split的大小剛好等于block塊的大小,為128M long splitSize = computeSplitSize(blockSize, minSize, maxSize); long bytesRemaining = length; // 取文件的實際大小 // 如果文件的實際大小/splitSize > 1.1(即實際大小大于128M * 1.1) while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { // getBlockIndex判斷is the offset inside this block? // 第一次length-bytesRemaining的值為0,取block塊的第一個復本 int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; // 依次減去分片的大小,對剩余長度再次分片 } /** * 加入有一個300M的文件,設置bytesRemaining = length = 300M; * 1、判定bytesRemaining / splitSize = 300 / 128 > 1.1 * makeSplie-->FileSplit(path, length - bytesRemaining = 0, splitSize=128M) * bytesRemaining -= splitSize => bytesRemaining = 172M * 2、判定bytesRemaining / splitSize = 172 / 128 > 1.1 * makeSplie-->FileSplit(path, length - bytesRemaining = 128, splitSize=128M) * bytesRemaining -= splitSize => bytesRemaining = 44M * 3、判定bytesRemaining / splitSize = 44 / 128 < 1.1 * while循環結束。 */ // 多次分片后,最后的數據長度仍不為0但又不足一個分片大小 if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts())); // 在這里把最后的44M又make了一個分片 // makeSplie-->FileSplit(path, length - bytesRemaining = 256, splitSize=44) } } else { // not splitable,就取實際大小 splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files splits.add(makeSplit(path, 0, length, new String[0])); } } // Save the number of input files for metrics/loadgen job.getConfiguration().setLong(NUM_INPUT_FILES, files.size()); return splits; } }
說明:List<FileStatus>中FileStatus可能為LocatedFileStatus(a FileStatus that includes a file's block locations)。
LineRecordReader提供對文本數據的讀取解析,并依次調用Mapper的map()函數傳入<key, value>。
個人理解:TextInputFormat通過Split將文件邏輯上進行分片,對于每一個分片分別new一個LineRecordReader進行解析處理,解析后的買一行調用一次map()函數,而map task仍是一個。
public class LineRecordReader extends RecordReader<LongWritable, Text> { public void initialize(InputSplit genericSplit,TaskAttemptContext context) throws IOException { // 1. 接收split(FileSplit對象)分片,并通過分片解析出: // 分片起始位置:start = split.getStart(); // 結束位置:end = start + split.getLength(); // 文件位置:在HDFS上的絕對路徑final Path file = split.getPath(); // 2. 獲取文件的輸入流 // 通過FileSystem獲取文件,并獲取輸入流 fileIn = fs.open(file); // 3. 判定是否為壓縮文件,并獲取壓縮格式 // CompressionCodec codec = new CompressionCodecFactory(job).getCodec(file); // 4. 計算行偏移量(原始解釋如下) // If this is not the first split, we always throw away first record // because we always (except the last split) read one extra line in // next() method. if (start != 0) { start += in.readLine(new Text(), 0, maxBytesToConsume(start)); } this.pos = start; } public boolean nextKeyValue() throws IOException { if (key == null) { // key-->這里為map task中map()函數的key key = new LongWritable(); } key.set(pos); // 取的是行偏移量 if (value == null) { value = new Text(); } // 判定split是否已經讀取解析完成,如果未完成的話就讀取一行數據 // 通過org.apache.hadoop.util.LineReader的readCustomLine或readDefaultLine讀取 // 如果指定了行分隔符則調用readCustomLine; // 否則默認通過回車換行作為分隔符調用readDefaultLine newSize = in.readLine(value, maxLineLength, maxBytesToConsume(pos)); pos += newSize; // 偏移量加上個讀取的行的長度,作為下一行的偏移量 } /** * nextKeyValue是一個對split分片依次讀入迭代的過程, * 每次讀一行,并從這一行中解析出key和value,并分別賦值, * 傳入到map函數時將該<key, value>值傳入(具體是怎么調用map函數的,后續分析)。 */ @Override public LongWritable getCurrentKey() { return key; } @Override public Text getCurrentValue() { return value; } /** * Get the progress within the split */ public float getProgress() throws IOException { if (start == end) { return 0.0f; } else { return Math.min(1.0f, (getFilePosition() - start) / (float)(end - start)); } } // 關閉打開的從hdfs的輸入流對象 public synchronized void close() throws IOException { try { if (in != null) { in.close(); } } finally { if (decompressor != null) { CodecPool.returnDecompressor(decompressor); } } } }
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。