91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Storm如何實現單詞計數

發布時間:2021-12-23 13:47:49 來源:億速云 閱讀:111 作者:iii 欄目:云計算

這篇文章主要介紹“Storm如何實現單詞計數”,在日常操作中,相信很多人在Storm如何實現單詞計數問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Storm如何實現單詞計數”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

1. 使用mvn命令創建項目

mvn archetype:generate -DgroupId=storm.test -DartifactId=Storm01 -DpackageName=com.zhch.v1

然后編輯配置文件pom.xml,添加storm依賴 

<dependency>
  <groupId>org.apache.storm</groupId>
  <artifactId>storm-core</artifactId>
  <version>0.9.4</version>
</dependency>

最后通過下述命令來編譯項目,編譯正確完成后導入到IDE中 

mvn install

當然,也可以在IDE中安裝maven插件,從而直接在IDE中創建maven項目 


2. 實現數據源,用重復的靜態語句來模擬數據源

package storm.test.v1;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import java.util.Map;

public class SentenceSpout extends BaseRichSpout {
    private String[] sentences = {
            "storm integrates with the queueing",
            "and database technologies you already use",
            "a storm topology consumes streams of data",
            "and processes those streams in arbitrarily complex ways",
            "repartitioning the streams between each stage of the computation however needed"
    };
    private int index = 0;

    private SpoutOutputCollector collector;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }

    @Override
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.collector = spoutOutputCollector;
    }

    @Override
    public void nextTuple() {
        this.collector.emit(new Values(sentences[index]));
        index++;
        if (index >= sentences.length) {
            index = 0;
        }

        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
        }
    }
}

3. 實現語句分割bolt 

package storm.test.v1;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

public class SplitSentenceBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for (String word : words) {
            this.collector.emit(new Values(word));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}

4. 實現單詞計數bolt 

package storm.test.v1;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

public class WordCountBolt extends BaseRichBolt {
    private OutputCollector collector;
    private HashMap<String, Long> counts = null;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
        this.counts = new HashMap<String, Long>();
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = this.counts.get(word);
        if (count == null) {
            count = 0L;
        }
        count++;
        this.counts.put(word, count);
        this.collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word", "count"));
    }
}

5. 實現上報bolt 

package storm.test.v1;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ReportBolt extends BaseRichBolt {
    private HashMap<String, Long> counts = null;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        counts = new HashMap<String, Long>();
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = tuple.getLongByField("count");
        this.counts.put(word, count);
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    @Override
    public void cleanup() { //本地模式下,終止topology時可以保證cleanup()被執行
        System.out.println("--- FINAL COUNTS ---");
        List<String> keys = new ArrayList<String>();
        keys.addAll(this.counts.keySet());
        Collections.sort(keys);
        for (String key : keys) {
            System.out.println(key + " : " + this.counts.get(key));
        }
        System.out.println("----------");
    }
}

6. 實現單詞計數topology 

package storm.test.v1;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

public class WordCountTopology {
    private static final String SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String REPORT_BOLT_ID = "report-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) {
        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
        WordCountBolt countBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout(SENTENCE_SPOUT_ID, spout); //注冊數據源
        builder.setBolt(SPLIT_BOLT_ID, spiltBolt) //注冊bolt
                .shuffleGrouping(SENTENCE_SPOUT_ID); //該bolt訂閱spout隨機均勻發射來的數據流
        builder.setBolt(COUNT_BOLT_ID, countBolt)
                .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word")); //該bolt訂閱spiltBolt發射來的數據流,并且保證"word"字段值相同的tuple會被路由到同一個countBolt
        builder.setBolt(REPORT_BOLT_ID, reportBolt)
                .globalGrouping(COUNT_BOLT_ID); //該bolt訂閱countBolt發射來的數據流,并且所有的tuple都會被路由到唯一的一個reportBolt中

        Config config = new Config();

        //本地模式啟動
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
        try {
            Thread.sleep(5 * 1000);
        } catch (InterruptedException e) {
        }
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();
    }
}

7. 運行結果:

--- FINAL COUNTS ---
a : 302
already : 302
and : 604
arbitrarily : 302
between : 302
complex : 302
computation : 302
consumes : 302
data : 302
database : 302
each : 302
however : 302
in : 302
integrates : 302
needed : 302
of : 604
processes : 302
queueing : 302
repartitioning : 302
stage : 302
storm : 604
streams : 906
technologies : 302
the : 906
those : 302
topology : 302
use : 302
ways : 302
with : 302
you : 302
----------

到此,關于“Storm如何實現單詞計數”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

兰考县| 克山县| 奇台县| 陆丰市| 福建省| 黎川县| 洞口县| 定安县| 从江县| 咸宁市| 盱眙县| 裕民县| 从化市| 仙桃市| 宝坻区| 北流市| 芮城县| 定结县| 河津市| 濮阳县| 威信县| 济宁市| 东丽区| 扶沟县| 锡林浩特市| 郎溪县| 南溪县| 弥勒县| 浪卡子县| 太康县| 三河市| 临潭县| 巴楚县| 泗阳县| 平乡县| 卢龙县| 新沂市| 偏关县| 丹棱县| 揭阳市| 新民市|