您好,登錄后才能下訂單哦!
Storm適用場景
流聚合:
流聚合就是把兩個或多個數據流聚合成一個數據流 -- 基于一些共同的tuple字段。
builder.setBolt(5,new MyJoiner(),parallelism)
.fieldsGrouping(1,new Fields("joinfield1","joinfield2"))
.fieldsGrouping(2,new Fields("joinfield1","joinfield2"))
.fieldsGrouping(3,new Fields("joinfield1","joinfield2"))
批處理:
有時候為了性能或者一些別的原因,你可能想把一組tuple一起處理,而不是一個一個單獨處理。
BasicBolt:
a、讀一個輸入tuple;
b、根據這個輸入tuple發射一個或者多個tuple;
c、在execute的方法的最后ack那個輸入tuple
遵循這類模式的bolt一般是函數或者是過濾器,這種模式太常見,storm為這類模式單獨封裝了一個接口:IbasicBolt。
內存內緩存 + Fields grouping組合
在bolt的內存里面緩存一些東西非常常見。緩存在和fields grouping結合起來之后就更有用了。比如,你有一個bolt把短鏈接變成長鏈接(bit.ly,t.co之類的)。你可以把短鏈接到長鏈接的對應關系利用LRU算分緩存在內存里面以避免重復計算。比如組件一發射短鏈接,組件二把短鏈接轉化成長鏈接并緩存在內存里面。看一下下面兩段代碼有什么不一樣:
builder.setBolt(2,new ExpandUrl(),parallelism).shuffleGrouping(1);
builder.setBolt(2,new ExpandUrl(),parallelism).fieldsGrouping(1,new Fields("url"));
計算top N
比如你有一個bolt發射這樣的tuple:"value","count"并且你想一個bolt基于這些信息算出top N的tuple。最簡單的辦法是有一個bolt可以做一個全局的grouping的動作并且在內存里面保持著top N的值。
這個方式對于大數據量的流顯然是沒有擴展性的,因為所有的數據會被發到同一臺機器。一個更好的方法是在多臺機器上面并行的計算這個流每一部分的top N,然后再由一個bolt合并這些機器上面所算出來的top N以算出最后的top N,代碼大概是這樣的:
builder.setBolt(2,new RankObjects(),parallellism).fieldsGrouping(1,new Fields("value"));
builder.setBolt(3,new MergeObjects()).globalGrouping(2);
這個模式之所以可以成功是因為第一個bolt的fieldsgrouping使得這種并行算法在語義上是正確的。
用TimeCacheMap來高效地保存一個最近被更新的對象的緩存:
有時候你想在內存里面保存一些最近活躍的對象,以及那些不再活躍的對象。TimeCacheMap是一個非常高效地數據結構,它提供了一些callback函數使得我們在對象不再活躍的時候我們可以做一些事情。
分布式RPC:CoordinatedBolt和KeyedFairBolt:
用storm做分布式RPC應用的時候有兩種比較常見的模式:它們被封裝在CoordinatedBolt和KeyedFairBolt里面。
CoordinatedBolt包裝你的bolt,并且確定什么時候你的bolt已經接收到所有的tuple,它主要使用Direct Stream來做這個。
KeyedFairBolt同樣包裝你的bolt并且保證你的topology同時處理多個DRPC調用,而不是串行的一次只執行一個。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。