您好,登錄后才能下訂單哦!
這篇文章主要講解了“Storm排序怎么實現”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Storm排序怎么實現”吧!
閱讀背景:
1 : 您需要對滑動窗口要初步了解
2 : 您需要了解滑動窗口在滑動的過程之中,滑動chunk的計算過程,尤其是每發射一次,就需要清空一次。
package com.cc.storm.bolt; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; /** * 1 在這里我們需要去實現一個滑動窗口,請注意,在我們實現滑動窗口的過程之中清空的是當前滑動窗口的下一個 * * * * @author Yin Shuai * */ public class RollingCountBolt implements IRichBolt { private static final long serialVersionUID = 1765379339552134320L; private HashMap<Object, long[]> _objectCounts = new HashMap<Object, long[]>(); private int _numBuckets; private transient Thread cleaner; private OutputCollector _collector; /** * _trackMinute * 是我們整個滑動窗口的大小,滑動窗口的大小,本質上決定了我們的時間區間,也就是說,假設我們目前滑動窗口的總體大小為15分鐘。 * 那我們的商品點擊的實時排序的指標值,好比商品瀏覽量的計算值,也就是15分鐘 * * 而單個窗口的大小也就是我,我們這個三十分鐘在隨著時間不斷的在推移 * * 舉例說明:在最初的構造過程之中,如果我們的桶的數目為10,那么單個窗口的時間長度為3. * * [0,30],[3,33],[6,36],[9,39],[12,42] 統計的數值處在不斷的變化之中 * */ private int _trackMinutes; public RollingCountBolt(int numBuckets, int trackMinutes) { this._numBuckets = numBuckets; this._trackMinutes = trackMinutes; } public long totalObjects(Object obj) { long[] curr = _objectCounts.get(obj); long total = 0; for (long l : curr) { total += l; } return total; } public int currentBucket(int buckets) { return currentSecond() / secondsPerBucket(buckets) % buckets; } public int currentSecond() { return (int) (System.currentTimeMillis() / 1000); } /** * * @param buckets * 你設定的桶的數量 * @return 依據我們默認的_trackMinutes / buckets 得到每一個桶的數量 */ public int secondsPerBucket(int buckets) { return _trackMinutes * 60 / buckets; } public long millisPerBucket(int buckets) { return (long) 1000 * secondsPerBucket(buckets); } @SuppressWarnings("rawtypes") @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { // TODO Auto-generated method stub _collector = collector; cleaner = new Thread(new Runnable() { @SuppressWarnings("unchecked") @Override public void run() { // TODO Auto-generated method stub int lastBucket = currentBucket(_numBuckets); while (true) { int currBucket = currentBucket(_numBuckets); p("線程while循環: 當前的桶為:" + currBucket); if (currBucket != lastBucket) { p("線程while循環:之前的桶數為:" + lastBucket); int bucketToWipe = (currBucket + 1) % _numBuckets; p("線程while循環:要擦除掉的桶為:" + bucketToWipe); synchronized (_objectCounts) { Set objs = new HashSet(_objectCounts.keySet()); for (Object obj : objs) { long[] counts = _objectCounts.get(obj); long currBucketVal = counts[bucketToWipe]; p("線程while循環:擦除掉的值為:" + currBucketVal); counts[bucketToWipe] = 0; long total = totalObjects(obj); if (currBucketVal != 0) { p("線程while循環:擦除掉的值為不為0:那就發射數據:obj total" + obj + ":" + total); _collector.emit(new Values(obj, total)); } if (total == 0) { p("線程while循環: 總數為0以后,將obj對象刪除"); _objectCounts.remove(obj); } } } lastBucket = currBucket; } long delta = millisPerBucket(_numBuckets) - (System.currentTimeMillis() % millisPerBucket(_numBuckets)); Utils.sleep(delta); p("\n"); } } }); cleaner.start(); } @Override public void execute(Tuple input) { Object obj1 = input.getValue(0); Object obj = input.getValue(1); int currentBucket = currentBucket(_numBuckets); p("execute方法:當前桶:bucket: " + currentBucket); synchronized (_objectCounts) { long[] curr = _objectCounts.get(obj); if (curr == null) { curr = new long[_numBuckets]; _objectCounts.put(obj, curr); } curr[currentBucket]++; System.err .print(("execute方法:接受到的merchandiseIDS:" + obj.toString() + ",long數組:")); for (long number : curr) { System.err.print(number + ":"); } p("execute方法:發射的數據: " + obj + ":" + totalObjects(obj)); /** * 我們不斷的發射的也就是我們某一個商品id,在當前滑動窗口,也就是我們的時間周期內的指標計算值 * 要注意,在排序的過程之中,我們只針對key, 也就是我們的商品id,由此發射給后續的排序bolt依據包含了時間區間的信息 */ // 每來一條數據,就會發射一次 _collector.emit(new Values(obj, totalObjects(obj))); _collector.ack(input); } p("\n"); } @Override public void cleanup() { // TODO Auto-generated method stub } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("merchandiseID", "count")); } @Override public Map<String, Object> getComponentConfiguration() { // TODO Auto-generated method stub return null; } public void p(Object o) { System.err.println(o.toString()); } }
在這里,最需要我們關注的地方是,滑動窗口每滑動一次,將情況一組數據。 而發射數據的過程之中將統計這一組數
據。
感謝各位的閱讀,以上就是“Storm排序怎么實現”的內容了,經過本文的學習后,相信大家對Storm排序怎么實現這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。