您好,登錄后才能下訂單哦!
[TOC]
在Strom的API中提供了LocalCluster
對象,這樣在不用搭建Storm環境或者Storm集群的情況下也能夠開發Storm的程序,非常方便。
基于Maven構建工程項目,其所需要的依賴如下:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
</dependency>
需求如下:
數據源不斷產生遞增數字,對產生的數字累加求和
分析如下:
Strom的Topology包含Spout和Bolt兩種節點類型,在這個案例中,可以使用Spout來對數據源進行處理(模擬產生數據),
然后將其發送到計算和的Bolt中,所以實際上這里只需要使用一個Spout節點和一個Bolt節點就可以了。
在理解了Storm的設計思想后,將其與MapReduce的設計思想進行對比,再看下面的程序代碼其實是非常好理解的。
/**
* 數據源
*/
static class OrderSpout extends BaseRichSpout {
private Map conf; // 當前組件配置信息
private TopologyContext context; // 當前組件上下文對象
private SpoutOutputCollector collector; // 發送tuple的組件
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
/**
* 接收數據的核心方法
*/
@Override
public void nextTuple() {
long num = 0;
while (true) {
num++;
StormUtil.sleep(1000);
System.out.println("當前時間" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "產生的訂單金額:" + num);
this.collector.emit(new Values(num));
}
}
/**
* 是對發送出去的數據的描述schema
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("order_cost"));
}
}
private Long sumOrderCost = 0L;
/**
* 計算和的Bolt節點
*/
static class SumBolt extends BaseRichBolt {
private Map conf; // 當前組件配置信息
private TopologyContext context; // 當前組件上下文對象
private OutputCollector collector; // 發送tuple的組件
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
private Long sumOrderCost = 0L;
/**
* 處理數據的核心方法
*/
@Override
public void execute(Tuple input) {
Long orderCost = input.getLongByField("order_cost");
sumOrderCost += orderCost;
System.out.println("商城網站到目前" + StormUtil.df_yyyyMMddHHmmss.format(new Date()) + "的商品總交易額" + sumOrderCost);
StormUtil.sleep(1000);
}
/**
* 如果當前bolt為最后一個處理單元,該方法可以不用管
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
/**
* 1°、實現數字累加求和的案例:數據源不斷產生遞增數字,對產生的數字累加求和。
* <p>
* Storm組件:Spout、Bolt、數據是Tuple,使用main中的Topology將spout和bolt進行關聯
* MapReduce的組件:Mapper和Reducer、數據是Writable,通過一個main中的job將二者關聯
* <p>
* 適配器模式(Adapter):BaseRichSpout,其對繼承接口中一些沒必要的方法進行了重寫,但其重寫的代碼沒有實現任何功能。
* 我們稱這為適配器模式
*/
public class StormLocalSumTopology {
/**
* 構建拓撲,相當于在MapReduce中構建Job
*/
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
/**
* 設置spout和bolt的dag(有向無環圖)
*/
builder.setSpout("id_order_spout", new OrderSpout());
builder.setBolt("id_sum_bolt", new SumBolt())
.shuffleGrouping("id_order_spout"); // 通過不同的數據流轉方式,來指定數據的上游組件
// 使用builder構建topology
StormTopology topology = builder.createTopology();
// 啟動topology
LocalCluster localCluster = new LocalCluster(); // 本地開發模式,創建的對象為LocalCluster
String topologyName = StormLocalSumTopology.class.getSimpleName(); // 拓撲的名稱
Config config = new Config(); // Config()對象繼承自HashMap,但本身封裝了一些基本的配置
localCluster.submitTopology(topologyName, config, topology);
}
}
需要說明的是,Spout和Bolt的類都作為StormLocalSumTopology的靜態成員變量,這樣做是為了開發的方便,當然實際上也可以將其單獨作為一個文件。
執行主函數,其輸出如下:
當前時間20180412213836產生的訂單金額:1
商城網站到目前20180412213836的商品總交易額1
當前時間20180412213837產生的訂單金額:2
商城網站到目前20180412213837的商品總交易額3
當前時間20180412213838產生的訂單金額:3
商城網站到目前20180412213838的商品總交易額6
......
需求如下:
監控一個目錄下的文件,當發現有新文件的時候,把文件讀取過來,解析文件中的內容,統計單詞出現的總次數
分析如下:
可以設置三個節點:
Spout:用于持續讀取目錄下需要被監聽(通過后綴名標識)的文件,并且將每一行輸出到下一個Bolt中
(類似于MapReduce中的FileInputFormat)
Bolt1:讀取行,并解析其中的單詞,將每個單詞輸出到下一個Bolt中
(類似于MapReduce中的Mapper)
Bolt2:讀取單詞,進行統計計算
(類似于MapReduce中的Reducer)
/**
* Spout,獲取數據源,這里是持續讀取某一目錄下的文件,并將每一行輸出到下一個Bolt中
*/
static class FileSpout extends BaseRichSpout {
private Map conf; // 當前組件配置信息
private TopologyContext context; // 當前組件上下文對象
private SpoutOutputCollector collector; // 發送tuple的組件
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple() {
File directory = new File("D:/data/storm");
// 第二個參數extensions的意思就是,只采集某些后綴名的文件
Collection<File> files = FileUtils.listFiles(directory, new String[]{"txt"}, true);
for (File file : files) {
try {
List<String> lines = FileUtils.readLines(file, "utf-8");
for(String line : lines) {
this.collector.emit(new Values(line));
}
// 當前文件被消費之后,需要重命名,同時為了防止相同文件的加入,重命名后的文件加了一個隨機的UUID,或者加入時間戳也可以的
File destFile = new File(file.getAbsolutePath() + "_" + UUID.randomUUID().toString() + ".completed");
FileUtils.moveFile(file, destFile);
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("line"));
}
}
/**
* Bolt節點,將接收到的每一行數據切割為一個個單詞并發送到下一個節點
*/
static class SplitBolt extends BaseRichBolt {
private Map conf; // 當前組件配置信息
private TopologyContext context; // 當前組件上下文對象
private OutputCollector collector; // 發送tuple的組件
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
@Override
public void execute(Tuple input) {
String line = input.getStringByField("line");
String[] words = line.split(" ");
for (String word : words) {
this.collector.emit(new Values(word,1));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
/**
* Bolt節點,執行單詞統計計算
*/
static class WCBolt extends BaseRichBolt {
private Map conf; // 當前組件配置信息
private TopologyContext context; // 當前組件上下文對象
private OutputCollector collector; // 發送tuple的組件
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.conf = conf;
this.context = context;
this.collector = collector;
}
private Map<String, Integer> map = new HashMap<>();
@Override
public void execute(Tuple input) {
String word = input.getStringByField("word");
Integer count = input.getIntegerByField("count");
/*if (map.containsKey(word)) {
map.put(word, map.get(word) + 1);
} else {
map.put(word, 1);
}*/
map.put(word, map.getOrDefault(word, 0) + 1);
System.out.println("====================================");
map.forEach((k ,v)->{
System.out.println(k + ":::" +v);
});
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
}
/**
* 2°、單詞計數:監控一個目錄下的文件,當發現有新文件的時候,
把文件讀取過來,解析文件中的內容,統計單詞出現的總次數
E:\data\storm
*/
public class StormLocalWordCountTopology {
/**
* 構建拓撲,組裝Spout和Bolt節點,相當于在MapReduce中構建Job
*/
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
// dag
builder.setSpout("id_file_spout", new FileSpout());
builder.setBolt("id_split_bolt", new SplitBolt()).shuffleGrouping("id_file_spout");
builder.setBolt("id_wc_bolt", new WCBolt()).shuffleGrouping("id_split_bolt");
StormTopology stormTopology = builder.createTopology();
LocalCluster cluster = new LocalCluster();
String topologyName = StormLocalWordCountTopology.class.getSimpleName();
Config config = new Config();
cluster.submitTopology(topologyName, config, stormTopology);
}
}
執行程序后,往目標目錄中添加.txt
文件,程序輸出如下:
====================================
hello:::1
====================================
hello:::1
you:::1
====================================
hello:::2
you:::1
====================================
hello:::2
he:::1
you:::1
====================================
hello:::3
he:::1
you:::1
====================================
me:::1
hello:::3
he:::1
you:::1
在編寫了Storm的程序后,再來看看其相關的術語就容易理解很多了。
Topology用于封裝一個實時計算應用程序的邏輯,類似于Hadoop的MapReduce Job
Stream 消息流,是一個沒有邊界的tuple序列,這些tuples會被以一種分布式的方式并行地創建和處理
Spouts 消息源,是消息生產者,他會從一個外部源讀取數據并向topology里面面發出消息:tuple
Bolts 消息處理者,所有的消息處理邏輯被封裝在bolts里面,處理輸入的數據流并產生新的輸出數據流,
可執行過濾,聚合,查詢數據庫等操作
Task 每一個Spout和Bolt會被當作很多task在整個集群里面執行,每一個task對應到一個線程.
Stream groupings 消息分發策略,定義一個Topology的其中一步是定義每個tuple接受什么樣的流作為輸入,
stream grouping就是用來定義一個stream應該如何分配給Bolts們.
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。