您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關storm中如何自定義數據分組,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
數據流組
設計一個拓撲時,你要做的最重要的事情之一就是定義如何在各組件之間交換數據(數據流是如何被bolts消費的)。一個數據流組指定了每個bolt會消費哪些數據流,以及如何消費它們。
storm自帶數據流組
隨機數據流組
隨機流組是最常用的數據流組。它只有一個參數(數據源組件),并且數據源會向隨機選擇的bolt發送元組,保證每個消費者收到近似數量的元組。
builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");
域數據流組
域數據流組允許你基于元組的一個或多個域控制如何把元組發送給bolts。它保證擁有相同域組合的值集發送給同一個bolt。回到單詞計數器的例子,如果你用word域為數據流分組,word-normalizer bolt將只會把相同單詞的元組發送給同一個word-counterbolt實例。
builder.setBolt("word-counter", new WordCounter(),2) .fieldsGrouping("word-normalizer", new Fields("word"));
全部數據流組
全部數據流組,為每個接收數據的實例復制一份元組副本。這種分組方式用于向bolts發送信號。比如,你要刷新緩存,你可以向所有的bolts發送一個刷新緩存信號。在單詞計數器的例子里,你可以使用一個全部數據流組,添加清除計數器緩存的功能
builder.setBolt("word-counter", new WordCounter(),2) .fieldsGroupint("word-normalizer",new Fields("word")) .allGrouping("signals-spout","signals");
直接數據流組
這是一個特殊的數據流組,數據源可以用它決定哪個組件接收元組
builder.setBolt("word-counter", new WordCounter(),2) .directGrouping("word-normalizer");
。與前面的例子類似,數據源將根據單詞首字母決定由哪個bolt接收元組。要使用直接數據流組,在WordNormalizer bolt中,使用emitDirect方法代替emit。
public void execute(Tuple input) { ... for(String word : words){ if(!word.isEmpty()){ ... collector.emitDirect(getWordCountIndex(word),new Values(word)); } } //對元組做出應答 collector.ack(input); } public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase(); if(word.isEmpty()){ return 0; }else{ return word.charAt(0) % numCounterTasks; } }
在prepare方法中計算任務數
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.numCounterTasks = context.getComponentTasks("word-counter"); }
全局數據流組
全局數據流組把所有數據源創建的元組發送給單一目標實例(即擁有最低ID的任務)。
不分組
這個數據流組相當于隨機數據流組。也就是說,使用這個數據流組時,并不關心數據流是如何分組的。
自定義數據流組
storm自定義數據流組和hadoop Partitioner分組很相似,storm自定義分組要實現CustomStreamGrouping接口,接口源碼如下:
public
interface
CustomStreamGrouping
extends
Serializable {
void
prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
List<Integer> chooseTasks(
int
taskId, List<Object> values);
}
targetTasks就是Storm運行時告訴你,當前有幾個目標Task可以選擇,每一個都給編上了數字編號。而 chooseTasks(int taskId, List values); 就是讓你選擇,你的這條數據values,是要哪幾個目標Task處理?
這是我寫的一個自定義分組,總是把數據分到第一個Task:
public
class
MyFirstStreamGrouping
implements
CustomStreamGrouping {
private
static
Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.
class
);
private
List<Integer> tasks;
@Override
public
void
prepare(WorkerTopologyContext context, GlobalStreamId stream,
List<Integer> targetTasks) {
this
.tasks = targetTasks;
log.info(tasks.toString());
}
@Override
public
List<Integer> chooseTasks(
int
taskId, List<Object> values) {
log.info(values.toString());
return
Arrays.asList(tasks.get(
0
));
}
}
從上面的代碼可以看出,該自定義分組會把數據歸并到第一個TaskArrays.asList(tasks.get(0));,也就是數據到達后總是被派發到第一組。和Hadoop不同的是,Storm允許一條數據被多個Task處理,因此返回值是List .就是讓你來在提供的 'List targetTasks' Task中選擇任意的幾個(必須至少是一個)Task來處理數據。
第二個自定義分組,wordcount中使首字母相同的單詞交給同一個bolt處理:
public class ModuleGrouping implements CustormStreamGrouping{ int numTasks = 0; @Override public List<Integer> chooseTasks(List<Object> values) { List<Integer> boltIds = new ArrayList<Integer>(); if(values.size()>0){ String str = values.get(0).toString(); if(str.isEmpty()){ boltIds.add(0); }else{ boltIds.add(str.charAt(0) % numTasks); } } return boltIds; } @Override public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) { numTasks = targetTasks.size(); } }
這是一個CustomStreamGrouping的簡單實現,在這里我們采用單詞首字母字符的整數值與任務數的余數,決定接收元組的bolt。
builder.setBolt("word-normalizer", new WordNormalizer()) .customGrouping("word-reader", new ModuleGrouping());
看完上述內容,你們對storm中如何自定義數據分組有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。