您好,登錄后才能下訂單哦!
這篇文章主要為大家展示了“storm如何配置使用”,內容簡而易懂,條理清晰,希望能夠幫助大家解決疑惑,下面讓小編帶領大家一起研究并學習一下“storm如何配置使用”這篇文章吧。
示例代碼如下:
#storm.yaml 配置 #zookeeper storm.zookeeper.servers: - "bigdata01" - "bigdata02" - "bigdata03" #本地存放數據的路徑 storm.local.dir: "/apps/storm" #nimbus master nimbus.seeds: ["bigdata00"] #workder端口 supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 啟動命令 bin/ nohup storm nimbus & bin/ nohup storm supervisor & bin/ nohup storm ui & -------------------------------------------------------------------------------------- package com.hgs.storm; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; public class StormWordCountTest { public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, InterruptedException { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("wordspout", new WordCountSpout(), 3); builder.setBolt("splitword", (IRichBolt) new WordSpliteBolt(), 2).shuffleGrouping("wordspout"); //word 是splitword發出的字段,如第九十行 builder.setBolt("wordcount", new WordCountBolt(), 2).fieldsGrouping("splitword", new Fields("word")); Config config = new Config(); config.setNumWorkers(2); /* StormSubmitter.submitTopology("words-count", config, builder.createTopology()); if(args!=null && args.length>0) { StormSubmitter.submitTopology(args[0], config, builder.createTopology()); }else { LocalCluster cluster = new LocalCluster(); }*/ LocalCluster cluster = new LocalCluster(); cluster.submitTopology("words-count", config, builder.createTopology()); } } class WordCountSpout extends BaseRichSpout{ private static final long serialVersionUID = 1L; //從open方法中的到collector,用于declareOutputFields 方法發出字段信息 SpoutOutputCollector collector = null; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { collector.emit(new Values(" this is my first storm program so i hope it will success")); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare( new Fields("message")); } } class WordSpliteBolt extends BaseRichBolt{ private static final long serialVersionUID = 1L; OutputCollector collector = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String line = input.getString(0); String[] words = line.split(" "); for(String wd : words) { collector.emit(new Values(wd ,1)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","num")); } } class WordCountBolt extends BaseRichBolt{ ConcurrentHashMap<String, Integer> wordsMap = new ConcurrentHashMap<String, Integer>(); private static final long serialVersionUID = 1L; OutputCollector collector = null; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple input) { String word = input.getString(0); Integer num = input.getInteger(1); if(wordsMap.containsKey(word)) { wordsMap.put(word, wordsMap.get(word)+num); }else { wordsMap.put(word, num); } System.out.println(word +"----"+wordsMap.get(word)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
以上是“storm如何配置使用”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。