您好,登錄后才能下訂單哦!
這篇文章給大家介紹如何實現一個MapReduce讀取數據存入HBase,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
車輛位置數據文件,格式:車輛id 速度:油耗:當前里程。
通過MapReduce算出每輛車的平均速度、油耗、里程
vid1 78:8:120 vid1 56:11:124 vid1 98:5:130 vid1 72:6:131 vid2 78:4:281 vid2 58:9:298 vid2 67:15:309
創建Map類和map函數
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class VehicleMapper extends Mapper<Object, Text, Text, Text> { @Override public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String vehicle = value.toString();// 將輸入的純文本的數據轉換成String // 將輸入的數據先按行進行分割 StringTokenizer tokenizerArticle = new StringTokenizer(vehicle, "\n"); // 分別對每一行進行處理 while (tokenizerArticle.hasMoreTokens()) { // 每行按空格劃分 StringTokenizer tokenizer = new StringTokenizer(tokenizerArticle.nextToken()); String vehicleId = tokenizer.nextToken(); // vid String vehicleInfo = tokenizer.nextToken(); // 車輛信息 Text vid = new Text(vehicleId); Text info = new Text(vehicleInfo); context.write(vid, info); } } }
創建Reduce類
import java.io.IOException; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; public class VehicleReduce extends TableReducer<Text, Text, ImmutableBytesWritable> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int speed = 0; int oil = 0; int mile = 0; int count = 0; for (Text val : values) { String str = val.toString(); String[] arr = str.split(":"); speed += Integer.valueOf(arr[0]); oil += Integer.valueOf(arr[1]); mile += Integer.valueOf(arr[2]) - mile; // 累積里程 count++; } speed = (int) speed / count; // 求平均值 oil = (int) oil / count; mile = (int) mile / count; String result = speed + ":" + oil + ":" + mile; Put put = new Put(key.getBytes()); put.add(Bytes.toBytes("info"), Bytes.toBytes("property"), Bytes.toBytes(result)); ImmutableBytesWritable keys = new ImmutableBytesWritable(key.getBytes()); context.write(keys, put); } }
運行任務
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class VehicleMapReduceJob { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); conf = HBaseConfiguration.create(conf); Job job = new Job(conf, "HBase_VehicleInfo"); job.setJarByClass(VehicleMapReduceJob.class); job.setMapperClass(VehicleMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); FileInputFormat.addInputPath(job, new Path(args[0])); // 設置輸入文件路徑 TableMapReduceUtil.initTableReducerJob("vehicle", VehicleReduce.class, job); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
將代碼導出成vehicle.jar,放在hadoop-1.2.1目錄下,輸入命令
./bin/hadoop jar vehicle.jar com/xh/vehicle/VehicleMapReduceJob input/vehicle.txt
HBase結果查詢:
關于如何實現一個MapReduce讀取數據存入HBase就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。