91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Storm排序怎么實現

發布時間:2021-12-23 14:18:17 來源:億速云 閱讀:143 作者:iii 欄目:云計算

這篇文章主要講解了“Storm排序怎么實現”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Storm排序怎么實現”吧!

閱讀背景:

   1 : 您需要對滑動窗口要初步了解

   2  :   您需要了解滑動窗口在滑動的過程之中,滑動chunk的計算過程,尤其是每發射一次,就需要清空一次。

package com.cc.storm.bolt;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

/**
 * 1 在這里我們需要去實現一個滑動窗口,請注意,在我們實現滑動窗口的過程之中清空的是當前滑動窗口的下一個
 * 
 * 
 * 
 * @author Yin Shuai
 * 
 */
public class RollingCountBolt implements IRichBolt {

	private static final long serialVersionUID = 1765379339552134320L;

	private HashMap<Object, long[]> _objectCounts = new HashMap<Object, long[]>();
	private int _numBuckets;
	private transient Thread cleaner;
	private OutputCollector _collector;

	/**
	 * _trackMinute
	 * 是我們整個滑動窗口的大小,滑動窗口的大小,本質上決定了我們的時間區間,也就是說,假設我們目前滑動窗口的總體大小為15分鐘。
	 * 那我們的商品點擊的實時排序的指標值,好比商品瀏覽量的計算值,也就是15分鐘
	 * 
	 * 而單個窗口的大小也就是我,我們這個三十分鐘在隨著時間不斷的在推移
	 * 
	 * 舉例說明:在最初的構造過程之中,如果我們的桶的數目為10,那么單個窗口的時間長度為3.
	 * 
	 * [0,30],[3,33],[6,36],[9,39],[12,42] 統計的數值處在不斷的變化之中
	 * 
	 */
	private int _trackMinutes;

	public RollingCountBolt(int numBuckets, int trackMinutes) {
		this._numBuckets = numBuckets;
		this._trackMinutes = trackMinutes;
	}

	public long totalObjects(Object obj) {
		long[] curr = _objectCounts.get(obj);
		long total = 0;
		for (long l : curr) {
			total += l;
		}
		return total;
	}

	public int currentBucket(int buckets) {
		return currentSecond() / secondsPerBucket(buckets) % buckets;
	}

	public int currentSecond() {
		return (int) (System.currentTimeMillis() / 1000);
	}

	/**
	 * 
	 * @param buckets
	 *            你設定的桶的數量
	 * @return 依據我們默認的_trackMinutes / buckets 得到每一個桶的數量
	 */
	public int secondsPerBucket(int buckets) {
		return _trackMinutes * 60 / buckets;
	}

	public long millisPerBucket(int buckets) {
		return (long) 1000 * secondsPerBucket(buckets);
	}

	@SuppressWarnings("rawtypes")
	@Override
	public void prepare(Map stormConf, TopologyContext context,
			OutputCollector collector) {
		// TODO Auto-generated method stub
		_collector = collector;
		cleaner = new Thread(new Runnable() {

			@SuppressWarnings("unchecked")
			@Override
			public void run() {
				// TODO Auto-generated method stub
				int lastBucket = currentBucket(_numBuckets);

				while (true) {

					int currBucket = currentBucket(_numBuckets);
					p("線程while循環: 當前的桶為:" + currBucket);

					if (currBucket != lastBucket) {
						p("線程while循環:之前的桶數為:" + lastBucket);

						int bucketToWipe = (currBucket + 1) % _numBuckets;
						p("線程while循環:要擦除掉的桶為:" + bucketToWipe);

						synchronized (_objectCounts) {
							Set objs = new HashSet(_objectCounts.keySet());

							for (Object obj : objs) {

								long[] counts = _objectCounts.get(obj);
								long currBucketVal = counts[bucketToWipe];
								p("線程while循環:擦除掉的值為:" + currBucketVal);
								counts[bucketToWipe] = 0;
								long total = totalObjects(obj);
								if (currBucketVal != 0) {
									p("線程while循環:擦除掉的值為不為0:那就發射數據:obj total"
											+ obj + ":" + total);
									_collector.emit(new Values(obj, total));

								}
								if (total == 0) {

									p("線程while循環: 總數為0以后,將obj對象刪除");
									_objectCounts.remove(obj);

								}
							}
						}
						lastBucket = currBucket;
					}

					long delta = millisPerBucket(_numBuckets)
							- (System.currentTimeMillis() % millisPerBucket(_numBuckets));
					Utils.sleep(delta);

					p("\n");
				}
			}
		});
		cleaner.start();
	}

	@Override
	public void execute(Tuple input) {

		Object obj1 = input.getValue(0);
		Object obj = input.getValue(1);

		int currentBucket = currentBucket(_numBuckets);

		p("execute方法:當前桶:bucket: " + currentBucket);

		synchronized (_objectCounts) {

			long[] curr = _objectCounts.get(obj);

			if (curr == null) {
				curr = new long[_numBuckets];
				_objectCounts.put(obj, curr);
			}

			curr[currentBucket]++;

			System.err
					.print(("execute方法:接受到的merchandiseIDS:" + obj.toString() + ",long數組:"));

			for (long number : curr) {
				System.err.print(number + ":");
			}

			p("execute方法:發射的數據: " + obj + ":" + totalObjects(obj));

			/**
			 * 我們不斷的發射的也就是我們某一個商品id,在當前滑動窗口,也就是我們的時間周期內的指標計算值
			 * 要注意,在排序的過程之中,我們只針對key, 也就是我們的商品id,由此發射給后續的排序bolt依據包含了時間區間的信息
			 */

			// 每來一條數據,就會發射一次
			_collector.emit(new Values(obj, totalObjects(obj)));
			_collector.ack(input);
		}

		p("\n");
	}

	@Override
	public void cleanup() {
		// TODO Auto-generated method stub

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub
		declarer.declare(new Fields("merchandiseID", "count"));
	}

	@Override
	public Map<String, Object> getComponentConfiguration() {
		// TODO Auto-generated method stub
		return null;
	}

	public void p(Object o) {
		System.err.println(o.toString());
	}
}

  在這里,最需要我們關注的地方是,滑動窗口每滑動一次,將情況一組數據。 而發射數據的過程之中將統計這一組數

據。

感謝各位的閱讀,以上就是“Storm排序怎么實現”的內容了,經過本文的學習后,相信大家對Storm排序怎么實現這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

福州市| 万山特区| 区。| 富源县| 修水县| 泾川县| 定南县| 中阳县| 新邵县| 扶风县| 图们市| 星座| 胶州市| 临沧市| 石屏县| 秀山| 张家口市| 渝北区| 万源市| 吴旗县| 皮山县| 毕节市| 永新县| 吉安市| 汉沽区| 安岳县| 芜湖县| 兴隆县| 荥经县| 肥乡县| 凤山县| 恭城| 六安市| 手机| 长兴县| 玛曲县| 铁岭市| 上蔡县| 宣汉县| 牟定县| 双流县|