您好,登錄后才能下訂單哦!
這篇文章主要講解了“Hadoop的整文件讀取方法”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Hadoop的整文件讀取方法”吧!
寫Hadoop程序時,有時候需要讀取整個文件,而不是分片讀取,但默認的為分片讀取,所以,只有編寫自己的整文件讀取類。
需要編寫的有:
WholeInputFormat類,繼承自FileInputFormat類
WholeRecordReader類,繼承自RecordReader類
其中,用于讀取的類是WholeRecordReader類。以下代碼以Text為key值類型,BytesWritable為value的類型,因為大多數格式在hadoop中都沒有相應的類型支持,比如jpg,sdf,png等等,在hadoop中都沒有相應的類,但是都可以轉換為byte[]字節流,然后在轉化為BytesWritable類型,最后在Map或者Reduce再轉換成java中的相應類型。
代碼如下,解釋見 :
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class WholeInputFormat extends FileInputFormat<Text, BytesWritable> { @Override public RecordReader<Text, BytesWritable> createRecordReader (InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException { return new WholeRecordReader(); } @Override //判斷是否分片,false表示不分片,true表示分片。 //其實這個不寫也可以,因為在WholeRecordReader中一次性全部讀完 protected boolean isSplitable(JobContext context,Path file) { return false; } }
下面是WholeRecordReader類:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; public class WholeRecordReader extends RecordReader<Text,BytesWritable> { //Hadoop中處理文件的類 private FileSplit fileSplit; private FSDataInputStream in = null; private BytesWritable value = null; private Text key = null; //用于判斷文件是否讀取完成 //也就是因為這個,所以WholeInputFormat中的isSplitable方法可以不用寫 private boolean processed = false; @Override public void close() throws IOException { //do nothing } @Override public Text getCurrentKey() throws IOException, InterruptedException { return this.key; } @Override public BytesWritable getCurrentValue() throws IOException,InterruptedException { return this.value; } @Override public float getProgress() throws IOException, InterruptedException { return processed ? fileSplit.getLength() : 0; } @Override public void initialize(InputSplit split, TaskAttemptContext context)throws IOException, InterruptedException { //打開一個文件輸入流 fileSplit = (FileSplit)split; Configuration job = context.getConfiguration(); Path file = fileSplit.getPath(); FileSystem temp = file.getFileSystem(job); in = temp.open(file); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if(key == null) { key = new Text(); } if(value == null) { value = new BytesWritable(); } if(!processed) { //申請一個字節數組保存將從文件中讀取的內容 byte[] content = new byte[(int)fileSplit.getLength()]; Path file = fileSplit.getPath(); //以文件的名字作為傳遞給Map函數的key值,可以自行設置 key.set(file.getName()); try{ //讀取文件中的內容 IOUtils.readFully(in,content,0,content.length); //將value的值設置為byte[]中的值 value.set(new BytesWritable(content)); }catch(IOException e) { e.printStackTrace(); }finally{ //關閉輸入流 IOUtils.closeStream(in); } //將processed設置成true,表示讀取文件完成,以后不再讀取 processed = true; return true; } return false; } }
當把這些寫好后,在main()函數或者run()函數里面將job的輸入格式設置成WholeInputFormat,如下:
job.setInputFormatClass(WholeInputFormat.class);
現在,可以整個文件讀取了。其中,key,value的類型可以換成大家需要的類型。不過,當在Hadoop中找不到對應類型的時候建議用BytesWritable類型,然后用byte[]作為中間類型轉化為java可以處理的類型。
感謝各位的閱讀,以上就是“Hadoop的整文件讀取方法”的內容了,經過本文的學習后,相信大家對Hadoop的整文件讀取方法這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。