91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

hadoop如何通過CombineFileInputFormat實現小文件合并減少map的個數

發布時間:2021-12-09 16:30:01 來源:億速云 閱讀:278 作者:小新 欄目:大數據

小編給大家分享一下hadoop如何通過CombineFileInputFormat實現小文件合并減少map的個數,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!

//map讀入的鍵
package hgs.combinefileinputformat.test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class CombineFileKey implements  WritableComparable<CombineFileKey> {
	private String fileName;
	private long offset;
	
	
	public String getFileName() {
		return fileName;
	}
	public void setFileName(String fileName) {
		this.fileName = fileName;
	}
	public long getOffset() {
		return offset;
	}
	public void setOffset(long offset) {
		this.offset = offset;
	}
	@Override
	public void readFields(DataInput input) throws IOException {
		this.fileName = Text.readString(input);
		this.offset = input.readLong();
		
	}
	@Override
	public void write(DataOutput output) throws IOException {
		Text.writeString(output, fileName);
		output.writeLong(offset);
		
	}
	@Override
	public int compareTo(CombineFileKey obj) {
		int f = this.fileName.compareTo(obj.fileName);
		if(f==0)
			return (int)Math.signum((double)(this.offset-obj.offset));
		return f;
	}
	@Override
	public int hashCode() {
		//摘自于 http://www.idryman.org/blog/2013/09/22/process-small-files-on-hadoop-using-combinefileinputformat-1/
		final int prime = 31;
	    int result = 1;
	    result = prime * result + ((fileName == null) ? 0 : fileName.hashCode());
	    result = prime * result + (int) (offset ^ (offset >>> 32));
	    return result;
	}
	
	@Override
	public boolean equals(Object o) {
		if(o instanceof CombineFileKey)
			return this.compareTo((CombineFileKey)o)==0;
		return false;
	}
}
package hgs.combinefileinputformat.test;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
import org.apache.hadoop.util.LineReader;
public class CombineFileReader extends RecordReader<CombineFileKey, Text>{
	private long startOffset; //offset of the chunk;
	private long end; //end of the chunk;
	private long position; // current pos
	private FileSystem fs;
	private Path path; 
	private CombineFileKey key;
	private Text value;
	private FSDataInputStream input;
	private LineReader reader;
	public CombineFileReader(CombineFileSplit split,TaskAttemptContext context ,
			Integer index) throws IOException {
		//初始化path fs startOffset end
		this.path = split.getPath(index);
		this.fs = this.path.getFileSystem(context.getConfiguration());
		this.startOffset = split.getOffset(index);
		this.end = split.getLength()+this.startOffset;
		//判斷現在開始的位置是否在一行的內部
		boolean skipFirstLine = false;
		
		//open the file
		this.input = fs.open(this.path);
		//不等于0說明讀取位置在一行的內部
		if(this.startOffset !=0 ){
			skipFirstLine = true;
			--(this.startOffset);
			//定位到開始讀取的位置
			this.input.seek(this.startOffset);
		}
		//初始化reader
		this.reader = new LineReader(input);
		if(skipFirstLine){ // skip first line and re-establish "startOffset".
			//這里著這樣做的原因是 一行可能包含了這個文件的所有的數據,猜測如果遇到一行的話,還是會讀取一行
			//將其實位置調整到一行的開始,這樣的話會舍棄部分數據
			this.startOffset += this.reader.readLine(new Text(), 0, (int)Math.min
					             ((long)Integer.MAX_VALUE, this.end - this.startOffset));
		}
		this.position = this.startOffset;
	}
	
	@Override
	public void close() throws IOException {}
	@Override
	public void initialize(InputSplit splite, TaskAttemptContext context) throws IOException, InterruptedException {}
	//返回當前的key
	@Override
	public CombineFileKey getCurrentKey() throws IOException, InterruptedException {
		return key;
	}
	//返回當前的value
	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		return value;
	}
	//執行的進度
	@Override
	public float getProgress() throws IOException, InterruptedException {
		//返回的類型為float
		if(this.startOffset==this.end){
			return 0.0f;
		}else{
			return Math.min(1.0f, (this.position - this.startOffset)/(float)(this.end - this.startOffset));
		}
	}
	//該方法判斷是否有下一個key value
	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		//對key和value初始化
		if(this.key == null){
			this.key = new CombineFileKey();
			this.key.setFileName(this.path.getName());
		}
		this.key.setOffset(this.position);
		if(this.value == null){
			this.value = new Text();
		}
		//讀取一行數據,如果讀取的newSieze=0說明split的數據已經處理完成
		int newSize = 0;
		if(this.position<this.end){
			newSize = reader.readLine(this.value);
			position += newSize;
		}
		//沒有數據,將key value置位空
		if(newSize == 0){
			this.key = null;
			this.value = null;
			return false;
		}else{
			return true;
		}
	}
	
}
package hgs.combinefileinputformat.test;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
public class CustCombineInputFormat extends CombineFileInputFormat<CombineFileKey, Text> {
	public CustCombineInputFormat(){
		super();
		//最大切片大小
		this.setMaxSplitSize(67108864);//64 MB
	}
	@Override
	public RecordReader<CombineFileKey, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {		
		return new CombineFileRecordReader<CombineFileKey, Text>((CombineFileSplit)split,context,CombineFileReader.class);
	}
	@Override
	protected boolean isSplitable(JobContext context, Path file) {
		return false;
	}
}
//驅動類
package hgs.test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import hgs.combinefileinputformat.test.CustCombineInputFormat;
public class LetterCountDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		//conf.set("mapreduce.map.log.level", "INFO");
		///conf.set("mapreduce.reduce.log.level", "INFO");
		Job job = Job.getInstance(conf, "LetterCount");
		job.setJarByClass(hgs.test.LetterCountDriver.class);
		// TODO: specify a mapper
		job.setMapperClass(LetterCountMapper.class);
		// TODO: specify a reducer
		job.setReducerClass(LetterReducer.class);
		// TODO: specify output types
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		if(args[0].equals("1"))
			job.setInputFormatClass(CustCombineInputFormat.class);
		else{}
		// TODO: specify input and output DIRECTORIES (not files)
		FileInputFormat.setInputPaths(job, new Path("/words"));
		FileOutputFormat.setOutputPath(job, new Path("/result"));
		if (!job.waitForCompletion(true))
			return;
	}
}

hdfs文件:

hadoop如何通過CombineFileInputFormat實現小文件合并減少map的個數

運行結果:不使用自定義的:CustCombineInputFormat

hadoop如何通過CombineFileInputFormat實現小文件合并減少map的個數

hadoop如何通過CombineFileInputFormat實現小文件合并減少map的個數

運行結果:在使用自定義的:CustCombineInputFormat

hadoop如何通過CombineFileInputFormat實現小文件合并減少map的個數

hadoop如何通過CombineFileInputFormat實現小文件合并減少map的個數

以上是“hadoop如何通過CombineFileInputFormat實現小文件合并減少map的個數”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

安康市| 曲水县| 美姑县| 囊谦县| 汝城县| 定边县| 玛沁县| 江华| 丰都县| 伊通| 新化县| 温州市| 鄂温| 修文县| 德化县| 甘谷县| 津市市| 玛纳斯县| 嵊泗县| 江安县| 太原市| 思南县| 天峻县| 新平| 长葛市| 三台县| 台北市| 镇原县| 松潘县| 镇雄县| 平乡县| 宕昌县| 西安市| 兴海县| 开平市| 柘城县| 奉新县| 商都县| 河津市| 松原市| 南靖县|