您好,登錄后才能下訂單哦!
這篇文章主要介紹storm中trident是什么,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
Storm是一個實時流計算框架,Trident是對storm的一個更高層次的抽象,Trident最大的特點以batch的形式處理stream。
一些最基本的操作函數有Filter、Function,Filter可以過濾掉tuple,Function可以修改tuple內容,輸出0或多個tuple,并能把新增的字段追加到tuple后面。
聚合有partitionAggregate和Aggregator接口。partitionAggregate對當前partition中的tuple進行聚合,它不是重定向操作。Aggregator有三個接口:CombinerAggregator, ReducerAggregator,Aggregator,它們屬于重定向操作,它們會把stream重定向到一個partition中進行聚合操作。
重定向操作會改變數據流向,但不會改變數據內容,重定向操會產生網絡傳輸,可能影響一部分效率。而Filter、Function、partitionAggregate則屬于本地操作,不會產生網絡傳輸。
GroupBy會根據指定字段,把整個stream切分成一個個grouped stream,如果在grouped stream上做聚合操作,那么聚合就會發生在這些grouped stream上而不是整個batch。如果groupBy后面跟的是aggregator,則是聚合操作,如果跟的是partitionAggregate,則不是聚合操作。
Trident主要有5類操作:
1、作用在本地的操作,不產生網絡傳輸。
2、對數據流的重分布,不改變流的內容,但是產生網絡傳輸。
3、聚合操作,有可能產生網絡傳輸。
4、作用在分組流(grouped streams)上的操作。
5、Merge和join
概念
partition中文意思是分區,有人將partition理解為Storm里面的task,即并發的基本執行單位。我理解應該是像數據庫里面的分區,是將一個batch的數據分區,分成多個partition,或者可以理解為多個子batch,然后多個partition可以并發處理。這里關鍵的區別是:partition是數據,不是執行的代碼。你把數據(tuple)分區以后,如果你沒有多個task(并發度)來處理這些分區后的數據,那分區也是沒有作用的。所以這里的關系是這樣的:先有batch,因為Trident內部是基于batch來實現的;然后有partition;分區后再分配并發度,然后才能進行并發處理。并發度的分配是利用parallelismHint來實現的。
操作
既然有partition的概念,那么也就有partition的操作。Trident提供的分區操作,類似于Storm里面講的grouping。分區操作有:
重分區操作通過運行一個函數改變元組在任務之間的分布,也可以調整分區的數量(比如重分區之后將并行度調大),重分區需要網絡傳輸的參與。重分區函數包含以下這幾個:
shuffle:使用隨機輪詢算法在所有目標分區間均勻分配元組;
broadcast:每個元組復制到所有的目標分區。這在DRPC中非常有用,例如,需要對每個分區的數據做一個stateQuery操作;
partitionBy:接收一些輸入字段,根據這些字段輸入字段進行語義分區。通過對字段取hash值或者取模來選擇目標分區。partitionBy保證相同的字段一定被分配到相同的目標分區;
global:所有的元組分配到相同的分區,該分區是流種所有batch決定的;
batchGlobal:同一個batch中的元組被分配到相同的目標分區,不同batch的元組有可能被分配到不同的目標分區;
partition:接收一個自定義的分區函數,自定義分區函數需要實現backtype.storm.grouping.CustomStreamGrouping接口。
注意,除了這里明確提出來的分區操作,Trident里面還有aggregate()函數隱含有分區的操作,它用的是global()操作,這個在后面接收聚合操作的時候還會再介紹。
each() 方法
作用:操作batch中的每一個tuple內容,一般與Filter或者Function函數配合使用。
下面通過一個例子來介紹each()方法,假設我們有一個FakeTweetsBatchSpout,它會模擬一個Stream,隨機產生一個個消息。我們可以通過設置這個Spout類的構造參數來改變這個Spout的batch Size的大小。
1.Filter類:過濾tuple
一個通過actor字段過濾消息的Filter:
public static class PerActorTweetsFilter extends BaseFilter { String actor; public PerActorTweetsFilter(String actor) { this.actor = actor; } @Override public boolean isKeep(TridentTuple tuple) { return tuple.getString(0).equals(actor); } }
Topology:
topology.newStream("spout", spout) .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")) .each(new Fields("actor", "text"), new Utils.PrintFilter());
從上面例子看到,each()方法有一些構造參數
第一個構造參數:作為Field Selector,一個tuple可能有很多字段,通過設置Field,我們可以隱藏其它字段,僅僅接收指定的字段(其它字段實際還在)。
第二個是一個Filter:用來過濾掉除actor名叫"dave"外的其它消息。
2.Function類:加工處理tuple內容
一個能把tuple中text內容變成大寫的Function:
public static class UppercaseFunction extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { collector.emit(new Values(tuple.getString(0).toUpperCase())); } }
Topology:
topology.newStream("spout", spout) .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")) .each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text")) .each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());
首先,UppercaseFunction函數的輸入是Fields("text", "actor"),其作用是把其中的"text"字段內容都變成大寫。
其次,它比Filter多出一個輸出字段,作用是每個tuple在經過這個Function函數處理后,輸出字段都會被追加到tuple后面,在本例中,執行完Function之后的tuple內容多了一個"uppercased_text",并且這個字段排在最后面。
3. Field Selector與project
我們需要注意的是,上面每個each()方法的第一個Field字段僅僅是隱藏掉沒有指定的字段內容,實際上被隱藏的字段依然還在tuple中,如果想要徹底丟掉它們,我們就需要用到project()方法。
投影操作作用是僅保留Stream指定字段的數據,比如有一個Stream包含如下字段: [“a”, “b”, “c”, “d”],運行如下代碼:
mystream.project(new Fields("b", "d"))
則輸出的流僅包含 [“b”, “d”]字段。
aggregation的介紹
首先聚合操作分兩種:partitionAggregate(),以及aggregate()。
1.partitionAggregate
partitionAggregate()的操作是在partition上,一個batch的tuple被分成多個partition后,每個partition都會單獨運行partitionAggregate中指定的聚合操作。分區聚合在一批tuple的每一個分區上運行一個函數。與函數不同的是,分區聚合的輸出元組會覆蓋掉輸入元組。請看如下示例:
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
假設你有一個包含a,b兩個字段的輸入流,元組的分區情況如下:
Partition 0: ["a", 1] ["b", 2] Partition 1: ["a", 3] ["c", 8] Partition 2: ["e", 1] ["d", 9] ["d", 10]
運行上面的那一行代碼將會輸出如下的元組,這些元組只包含一個sum字段:
Partition 0: [3] Partition 1: [11] Partition 2: [20]
2.aggregate
aggregate()隱含了一個global分區操作,也就是它做的是全局聚合操作。它針對的是整個batch的聚合計算。
這兩種聚合操作,都可以傳入不同的aggregator實現具體的聚合任務。Trident中有三種aggregator接口,分別為:ReducerAggregator,CombinerAggregator,Aggregator。
下面是CombinerAggregator接口的定義:
public interface CombinerAggregator<T> extends Serializable { T init(TridentTuple tuple); T combine(T val1, T val2); T zero(); }
CombinerAggregator返回只有一個字段的一個元組。CombinerAggregator在每個輸入元組上運行init函數,然后通過combine函數聚合結果值直到只剩下一個元組。如果分區中沒有任何元組,CombinerAggregator將返回zero函數中定義的元組。比如,下面是Count聚合器的實現:
public class Count implements CombinerAggregator<Long> { public Long init(TridentTuple tuple) { return 1L; } public Long combine(Long val1, Long val2) { return val1 + val2; } public Long zero() { return 0L; } }
ReducerAggregator接口的定義如下:
public interface ReducerAggregator<T> extends Serializable { T init(); T reduce(T curr, TridentTuple tuple); }
ReducerAggregator通過init函數得到一個初始的值,然后對每個輸入元組調用reduce方法計算值,產生一個元組作為輸出。比如Count的ReducerAggregator實現如下:
public class Count implements ReducerAggregator<Long> { public Long init() { return 0L; } public Long reduce(Long curr, TridentTuple tuple) { return curr + 1; } }
最常用的聚合器的接口是Aggregator,它的定義如下:
public interface Aggregator<T> extends Operation { T init(Object batchId, TridentCollector collector); void aggregate(T state, TridentTuple tuple, TridentCollector collector); void complete(T state, TridentCollector collector); }
Aggregator能夠發射任意數量,任意字段的元組。并且可以在執行期間的任何時候發射元組,它的執行流程如下:
處理batch之前調用init方法,init函數的返回值是一個表示聚合狀態的對象,該對象會傳遞到aggregate和complete函數;
每個在batch分區中的元組都會調用aggregate方法,該方法能夠更新聚合狀態并且發射元組;
當batch分區中的所有元組都被aggregate函數處理完時調用complete函數。
下面是使用Aggregator接口實現的Count聚合器:
public class CountAgg extends BaseAggregator<CountState> { static class CountState { long count = 0; } public CountState init(Object batchId, TridentCollector collector) { return new CountState(); } public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) { state.count+=1; } public void complete(CountState state, TridentCollector collector) { collector.emit(new Values(state.count)); } }
有些時候,我們需要通知執行很多個聚合器,則可以使用如下的鏈式調用執行:
mystream.chainedAgg() .partitionAggregate(new Count(), new Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")) .chainEnd()
上面的代碼將會在每一個分區執行Count和Sum聚合器,輸出結果是包含count和sum兩個字段的元組。
最重要的區別是CombinerAggregator,它是先在partition上做partial aggregate,然后再將這些部分聚合結果通過global分區到一個總的分區,在這個總的分區上對結果進行匯總。
groupBy()分組操作
首先它包含兩個操作,一個是分區操作,一個是分組操作。
如果后面是partitionAggregate()的話,就只有分組操作:在每個partition上分組,分完組后,在每個分組上進行聚合;
如果后面是aggregate()的話,先根據partitionBy分區,在每個partition上分組,,分完組后,在每個分組上進行聚合。
parallelismHint并發度的介紹
它設置它前面所有操作的并發度,直到遇到某個repartition操作為止。
topology.newStream("spout", spout) .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")) .parallelismHint(5) .each(new Fields("actor", "text"), new Utils.PrintFilter());
意味著:parallelismHit之前的spout,each都是5個相同的操作一起并發,對,一共有5個spout同時發射數據,其實parallelismHint后面的each操作,也是5個并發。分區操作是作為Bolt劃分的分界點的。
如果想單獨設置Spout怎么辦?要在Spout之后,Bolt之前增加一個ParallelismHint,并且還要增加一個分區操作:
topology.newStream("spout", spout) .parallelismHint(2) .shuffle() .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")) .parallelismHint(5) .each(new Fields("actor", "text"), new Utils.PrintFilter());
很多人只是設置了Spout的并發度,而沒有調用分區操作,這樣是達不到效果的,因為Trident是不會自動進行分區操作的。像我之前介紹的,先分區,再設置并發度。如果Spout不設置并發度,只設置shuffle,默認是1個并發度,這樣后面設置5個并發度不會影響到Spout,因為并發度的影響到shuffle分區操作就停止了。
例子
groupBy+aggregate+parallelismHint
package com.demo; import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Values; import storm.trident.operation.BaseAggregator; import storm.trident.operation.TridentCollector; import storm.trident.operation.TridentOperationContext; import storm.trident.tuple.TridentTuple; public class MyAgg extends BaseAggregator<Map<String, Integer>> { /** * */ private static final long serialVersionUID = 1L; /** * 屬于哪個分區 */ private int partitionId; /** * 分區數量 */ private int numPartitions; private String batchId; @SuppressWarnings("rawtypes") @Override public void prepare(Map conf, TridentOperationContext context) { partitionId = context.getPartitionIndex(); numPartitions = context.numPartitions(); } public void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) { String word = tuple.getString(0); Integer value = val.get(word); if (value == null) { value = 0; } value++; // 把數據保存到一個map對象中 val.put(word, value); System.err.println("I am partition [" + partitionId + "] and I have kept a tweet by: " + numPartitions + " " + word + " " +batchId); } public void complete(Map<String, Integer> val, TridentCollector collector) { collector.emit(new Values(val)); } public Map<String, Integer> init(Object arg0, TridentCollector arg1) { this.batchId = arg0.toString(); return new HashMap<String, Integer>(); } }
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2, new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f")); spout.setCycle(false); TridentTopology tridentTopology = new TridentTopology(); tridentTopology .newStream("spout", spout) .shuffle() .groupBy(new Fields("sentence")) .aggregate(new Fields("sentence"), new MyAgg(), new Fields("Map")) .parallelismHint(2)
I am partition [0] and I have kept a tweet by: 2 a 1:0 I am partition [0] and I have kept a tweet by: 2 a 1:0 I am partition [0] and I have kept a tweet by: 2 a 2:0 I am partition [1] and I have kept a tweet by: 2 d 2:0 I am partition [0] and I have kept a tweet by: 2 e 3:0 I am partition [1] and I have kept a tweet by: 2 f 3:0
groupBy+partitionAggregate+parallelismHint
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2, new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f")); spout.setCycle(false); TridentTopology tridentTopology = new TridentTopology(); tridentTopology .newStream("spout", spout) .shuffle() .groupBy(new Fields("sentence")) .partitionAggregate(new Fields("sentence"), new MyAgg(), new Fields("Map"))) .toStream() .parallelismHint(2)
I am partition [0] and I have kept a tweet by: 2 a 1:0 I am partition [1] and I have kept a tweet by: 2 a 1:0 I am partition [0] and I have kept a tweet by: 2 a 2:0 I am partition [1] and I have kept a tweet by: 2 d 2:0 I am partition [0] and I have kept a tweet by: 2 e 3:0 I am partition [1] and I have kept a tweet by: 2 f 3:0
由于shuffle已經把tuple平均分配給5個partition了,用groupBy+partitionAggregate來聚合又沒有partitionBy分區的作用,所以,直接在5個分區上進行聚合,結果就是每個分區各有一個tuple。
而用groupBy+aggregate,雖然也是shuffle,但是由于具有partitiononBy分區的作用,值相同的tuple都分配到同一個分區,結果就是每個分區根據不同的值來做匯聚。
aggregate+parallelismHint(沒有groupBy)
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2, new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f")); spout.setCycle(false); TridentTopology tridentTopology = new TridentTopology(); tridentTopology .newStream("spout", spout) .shuffle() .aggregate(new Fields("sentence"), new MyAgg(), new Fields("Map")) .parallelismHint(2)
I am partition [1] and I have kept a tweet by: 2 a 1:0 I am partition [1] and I have kept a tweet by: 2 a 1:0 I am partition [0] and I have kept a tweet by: 2 a 2:0 I am partition [0] and I have kept a tweet by: 2 d 2:0 I am partition [1] and I have kept a tweet by: 2 e 3:0 I am partition [1] and I have kept a tweet by: 2 f 3:0
partitionAggregate+parallelismHint(沒有groupBy操作)
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2, new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f")); spout.setCycle(false); TridentTopology tridentTopology = new TridentTopology(); tridentTopology .newStream("spout", spout) .shuffle() .partitionAggregate(new Fields("sentence"), new MyAgg(), new Fields("Map")) .toStream() .parallelismHint(2)
I am partition [1] and I have kept a tweet by: 2 a 1:0 I am partition [0] and I have kept a tweet by: 2 a 1:0 I am partition [1] and I have kept a tweet by: 2 a 2:0 I am partition [0] and I have kept a tweet by: 2 d 2:0 I am partition [0] and I have kept a tweet by: 2 e 3:0 I am partition [1] and I have kept a tweet by: 2 f 3:0
我們可以發現,partitionAggregate加上groupBy,或者不加上groupBy,對結果都一樣:groupBy對于partitionAggregate沒有影響。但是對于aggregate來說,加上groupBy,就不是做全局聚合了,而是對分組做聚合;不加上groupBy,就是做全局聚合。
如果spout設置并行度,但是沒有加shuffle,不會起作用,分區默認為1,;如果不設置并行度并且沒有加shuffle,分區默認為1。
Merge和Joins
api的最后一部分便是如何把各種流匯聚到一起。最簡單的方式就是把這些流匯聚成一個流。我們可以這么做:
topology.merge(stream1, stream2, stream3);
Trident指定新的合并之后的流中的字段為stream1中的字段。
另一種合并流的方式就是join。一個標準的join就像是一個sql,必須有標準的輸入,因此,join只針對符合條件的Stream。join應用在來自Spout的每一個小Batch中。
下面的例子中,stream1流包含key,val1,val2三個字段,stream2流包含x,val1兩個字段:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
stream1流的key字段與stream2流的x字段組join操作,另外,Trident要求所有新流的輸出字段被重命名,因為輸入流可能包含相同的字段名稱。連接流發射的元組將會包含:
連接字段的列表。在上面的例子中,字段key對應stream1的key,stream2的x;
來自所有流的所有非連接字段的列表,按照傳遞到連接方法的順序排序。在上面的例子中,字段a與字段b對應stream1的val1和val2,c對應于stream2的val1.
當join的是來源于不同Spout的stream時,這些Spout在發射數據時需要同步,一個Batch所包含的tuple會來自各個Spout。
以上是“storm中trident是什么”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。