您好,登錄后才能下訂單哦!
這篇文章主要介紹“Storm開發細節是什么”,在日常操作中,相信很多人在Storm開發細節是什么問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm開發細節是什么”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
package test; import java.io.IOException; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import storm.copyFromClass.TestWordSpout; import com.esotericsoftware.minlog.Log; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; // 測試目的,在這里我們需要測試一下當前Spout 不斷產生數據的過程 public class testWordSpoutTopology { public static class TestSimpleBolt extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { System.out.println(input.toString()); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println("Method declare"); } } public static void main(String[] args) throws IOException { // 首先,我們必須建立一個新的TopologyBuilder TopologyBuilder builder = new TopologyBuilder(); //其次,我們需要配置如下的組件: 1 Spout,2Bolt builder.setSpout("word-emit-byThread", new TestWordSpout()); //在這個Spout之中,我們約定將 【word-emit-byThread】Spout組件 發射的元祖進行 shuffleGrouping builder.setBolt("word-show", new TestSimpleBolt()).shuffleGrouping( "word-emit-byThread"); Config config = new Config(); config.setDebug(false); //最后進行本地提交 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("simple", config, builder.createTopology()); } }
以上,
testWordSpoutTopology
是我們運行的主類
package storm.copyFromClass; import backtype.storm.Config; import backtype.storm.topology.OutputFieldsDeclarer; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.HashMap; import java.util.Random; import org.slf4j.Logger; import org.slf4j.LoggerFactory; // public class TestWordSpout extends BaseRichSpout { public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class); boolean _isDistributed; SpoutOutputCollector _collector; public TestWordSpout() { this(true); } public TestWordSpout(boolean isDistributed) { _isDistributed = isDistributed; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() { } // 發送 public void nextTuple() { Utils.sleep(100); final String[] words = new String[] { "張兵", "吳哥", "仝志維", "前輩", "禪師"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word)); } //在這里,我們沒有進行ACK public void ack(Object msgId) { } //在這里,我們沒有進行fail public void fail(Object msgId) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { if(!_isDistributed) { Map<String, Object> ret = new HashMap<String, Object>(); ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1); return ret; } else { return null; } } }
結果:
請注意在這里,我們的Stream 默認的id為空
到此,關于“Storm開發細節是什么”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。