您好,登錄后才能下訂單哦!
這篇文章主要介紹Storm-Hbase接口怎么用,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
package storm.contrib.hbase.bolts; import static backtype.storm.utils.Utils.tuple; import java.util.Map; import org.apache.hadoop.hbase.HBaseConfiguration; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import storm.contrib.hbase.utils.HBaseCommunicator; import storm.contrib.hbase.utils.HBaseConnector; /* 一個讀取Hbase的Bolt,不斷的從Hbase中讀取表中的行KEY,和列,通過tuples來發送 * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples */ public class HBaseColumnValueLookUpBolt implements IBasicBolt { private static final long serialVersionUID = 1L; private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null; private static transient HBaseConnector connector = null; private static transient HBaseConfiguration conf = null; private static transient HBaseCommunicator communicator = null; OutputCollector _collector; /* * Constructor initializes the variables storing the hbase table information and connects to hbase */ public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) { this.tableName = tableName; this.colFamilyName = colFamilyName; this.colName = colName; this.rowKeyField = rowKeyField; connector = new HBaseConnector(); conf = connector.getHBaseConf(hbaseXmlLocation); communicator = new HBaseCommunicator(conf); } /* * emits the value of the column with name @colName and rowkey @rowKey * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector) */ public void execute(Tuple input, BasicOutputCollector collector) { String rowKey = input.getStringByField(this.rowKeyField); columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName); collector.emit(tuple(rowKey, columnValue)); } public void prepare(Map confMap, TopologyContext context, OutputCollector collector) { _collector = collector; } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("rowKey", "columnValue")); } public Map<String, Object> getComponentConfiguration() { Map<String, Object> map = null; return map; } public void prepare(Map stormConf, TopologyContext context) { } }
package storm.contrib.hbase.bolts; import static backtype.storm.utils.Utils.tuple; import java.util.Map; import org.apache.hadoop.hbase.HBaseConfiguration; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IBasicBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import storm.contrib.hbase.utils.HBaseCommunicator; import storm.contrib.hbase.utils.HBaseConnector; /* * Reads the specified column of HBase table and emits the row key and the column values in the form of tuples */ public class HBaseColumnValueLookUpBolt implements IBasicBolt { private static final long serialVersionUID = 1L; private String tableName = null, colFamilyName = null, colName = null, rowKeyField = null, columnValue = null; private static transient HBaseConnector connector = null; private static transient HBaseConfiguration conf = null; private static transient HBaseCommunicator communicator = null; OutputCollector _collector; /* * Constructor initializes the variables storing the hbase table information and connects to hbase */ public HBaseColumnValueLookUpBolt(final String hbaseXmlLocation, final String rowKeyField, final String tableName, final String colFamilyName, final String colName) { this.tableName = tableName; this.colFamilyName = colFamilyName; this.colName = colName; this.rowKeyField = rowKeyField; connector = new HBaseConnector(); conf = connector.getHBaseConf(hbaseXmlLocation); communicator = new HBaseCommunicator(conf); } /* * emits the value of the column with name @colName and rowkey @rowKey * @see backtype.storm.topology.IBasicBolt#execute(backtype.storm.tuple.Tuple, backtype.storm.topology.BasicOutputCollector) */ public void execute(Tuple input, BasicOutputCollector collector) { String rowKey = input.getStringByField(this.rowKeyField); //通過指定我們的 表名,行鍵,列族,列名,直接通過communitor拿到列的值。 columnValue = communicator.getColEntry(this.tableName, rowKey, this.colFamilyName, this.colName); collector.emit(tuple(rowKey, columnValue)); } public void prepare(Map confMap, TopologyContext context, OutputCollector collector) { _collector = collector; } public void cleanup() { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("rowKey", "columnValue")); } public Map<String, Object> getComponentConfiguration() { Map<String, Object> map = null; return map; } public void prepare(Map stormConf, TopologyContext context) { } }
Rowkey
package storm.contrib.hbase.spouts; import backtype.storm.topology.OutputFieldsDeclarer; import java.util.Map; import java.util.UUID; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Random; import org.apache.log4j.Logger; /* 這個Spout主要是用來發射 Hbase的RowKey,rowkey的集合為自己設置的。 * Spout emitting tuples containing the rowkey of the hbase table */ public class RowKeyEmitterSpout implements IRichSpout { private static final long serialVersionUID = 6814162766489261607L; public static Logger LOG = Logger.getLogger(RowKeyEmitterSpout.class); boolean _isDistributed; SpoutOutputCollector _collector; public RowKeyEmitterSpout() { this(true); } public RowKeyEmitterSpout(boolean isDistributed) { _isDistributed = isDistributed; } public boolean isDistributed() { return _isDistributed; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() { } public void nextTuple() { Utils.sleep(100); Thread.yield(); final String[] words = new String[] {"rowKey1", "rowKey2", "rowKey3", "rowKey4"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word), UUID.randomUUID()); } public void ack(Object msgId) { } public void fail(Object msgId) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } public void activate() { } public void deactivate() { } public Map<String, Object> getComponentConfiguration() { return null; } }
// 我們用來簡單的測試系統的代碼,測試接口是否正確
package storm.contrib.hbase.spouts; import java.util.Map; import java.util.Random; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class TestSpout implements IRichSpout { SpoutOutputCollector _collector; Random _rand; int count = 0; public boolean isDistributed() { return true; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } public void nextTuple() { Utils.sleep(1000); String[] words = new String[] { "hello", "tiwari", "indore", "jayati"}; Integer[] numbers = new Integer[] { 1,2,3,4,5 }; if(count == numbers.length -1) { count = 0; } count ++; int number = numbers[count]; String word = words[count]; int randomNum = (int) (Math.random()*1000); _collector.emit(new Values(word, number)); } public void close() { } public void ack(Object id) { } public void fail(Object id) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "number")); } public void activate() { } public void deactivate() { } public Map<String, Object> getComponentConfiguration() { return null; } }
比較簡單,也就不做解釋了,Storm-hbase的接口并沒有像Storm-kafka的接口那樣,自身去處理輪詢,自身去處理連接的問題。只是簡單的構造了一個Hbase的連接,在連接的過程之中,直接構造了一個Connector就可以了。
以上是“Storm-Hbase接口怎么用”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。