您好,登錄后才能下訂單哦!
本篇內容介紹了“Transactional topology怎么使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
你可以通過使用TransactionalTopologyBuilder來創建transactional topology. 下面就是一個transactional topology的定義, 它的作用是計算輸入流里面的tuple的個數。這段代碼來自storm-starter里面的TransactionalGlobalCount。
MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
builder.setBolt("partial-count", new BatchCount(), 5)
.shuffleGrouping("spout");
builder.setBolt("sum", new UpdateGlobalCount())
.globalGrouping("partial-count");
TransactionalTopologyBuilder構造器中接受如下的參數:
?一個transaction topology的id
?spout在整個topology里面的id。
?一個transactional spout。
?一個可選的這個transactional spout的并行度。
topology的id是用來在zookeeper里面保存這個topology的當前進度狀態的,所以如果你重啟這個topology, 它可以接著前面的進度繼續執行。
一個transaction topology里面有一個唯一的TransactionalSpout, 這個spout是通過TransactionalTopologyBuilder的構造函數來指定的。在這個例子里面,MemoryTransactionalSpout被用來從一個內存變量里面讀取數據(DATA)。第二個參數指定spout發送的tuple的字段, 第三個參數指定每個batch的最大tuple數量。關于如何自定義TransactionalSpout我們會在后面介紹。
現在說說 bolts。這個topology并行地計算tuple的總數量。第一個bolt:BatchBolt,隨機地把輸入tuple分給各個task,然后各個task各自統計局部數量。第二個bolt:UpdateGlobalCount, 用全局grouping來匯總這個batch中tuple的數量,然后再更新到數據庫里面的全局數量。
下面是BatchCount的定義:
public static class BatchCount extends BaseBatchBolt {
Object _id;
BatchOutputCollector _collector;
int _count = 0;
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
_collector = collector;
_id = id;
}
@Override
public void execute(Tuple tuple) {
_count++;
}
@Override
public void finishBatch() {
_collector.emit(new Values(_id, _count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "count"));
}
}
storm會為每個正在處理的batch創建一個BatchCount對象,這個BatchCount是運行在BatchBoltExecutor里面的。而BatchBoltExecutor負責創建以及清理這個對象的實例。
BatchCount對象的prepare方法接收如下參數:
?Storm config
?Topology context
?Output collector
?這個batch的id (txid),在Transactional Topology中, 這個id則是一個TransactionAttempt對象。
這個batch bolt的抽象在DRPC里面也可以用, 只是txid的類型不一樣而已。實際上,BatchBolt可以接收一個txid類型的參數,所以如果你只是想在transactioinal topology里面使用這個BatchBolt,你可以去繼承BaseTransactionalBolt類,如下定義:
public abstract class BaseTransactionalBolt extends BaseBatchBolt<TransactionAttempt> {
}
在transaction topology里面發射的所有的tuple都必須以TransactionAttempt作為第一個field, 然后storm可以根據這個field來判斷哪些tuple屬于一個batch。所以你在發射tuple的時候需要滿足這個條件。
TransactionAttempt包含兩個值: 一個transaction id,一個attempt id。transaction id的作用就是我們上面介紹的對于每個batch是唯一的,而且不管這個batch 被replay多少次都是一樣的。attempt id是對于每個batch唯一的一個id, 但是對于同一個batch,它replay之后的attempt id跟replay之前就不一樣了, 我們可以把attempt id理解成replay-times, storm利用這個id來區別一個batch發射的tuple的不同版本。
transaction id對于每個batch加一, 所以第一個batch的transaction id是”1″, 第二個batch是”2″,依次類推。
每收到一個batch中的tuple,execute方法便被調用一次。每次當該方法被調用時,你應該把這個batch里面的狀態保持在一個本地變量里面。對于這個例子來說, 它在execute方法里面遞增tuple的個數。
最后, 當這個bolt接收到某個batch的所有的tuple之后, finishBatch方法會被調用。這個例子里面的BatchCount類會在這個時候發射它的局部數量到它的輸出流里面去。
下面是UpdateGlobalCount類的定義:
public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
TransactionAttempt _attempt;
BatchOutputCollector _collector;
int _sum = 0;
@Override
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
_collector = collector;
_attempt = attempt;
}
@Override
public void execute(Tuple tuple) {
_sum+=tuple.getInteger(1);
}
@Override
public void finishBatch() {
Value val = DATABASE.get(GLOBAL_COUNT_KEY);
Value newval;
if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
newval = new Value();
newval.txid = _attempt.getTransactionId();
if(val==null) {
newval.count = _sum;
} else {
newval.count = _sum + val.count;
}
DATABASE.put(GLOBAL_COUNT_KEY, newval);
} else {
newval = val;
}
_collector.emit(new Values(_attempt, newval.count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "sum"));
}
}
UpdateGlobalCount是Transactional Topologies相關的類,所以它繼承自BaseTransactionalBolt。在execute方法里面, UpdateGlobalCount累積這個batch的計數, 比較有趣的是finishBatch方法。
首先, 注意這個bolt實現了ICommitter接口,這告訴storm要在這個事務的commit階段調用finishBatch方法,所以對于finishBatch的調用會保證強順序性(順序就是transaction id的升序),另一方面execute方法在processing或者commit階段都可以執行。另外一種把bolt標識為commiter的方法是調用TransactionalTopologyBuilder的setCommiterBolt來添加Bolt(而不是setBolt)。
UpdateGlobalCount里面finishBatch方法的邏輯是首先從數據庫中獲取當前的值,并且把數據庫里面的transaction id與當前這個batch的transaction id進行比較。如果他們一樣, 那么忽略這個batch。否則把這個batch的結果加到總結果里面去,并且更新數據庫。
“Transactional topology怎么使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。