91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Storm流方式的統計系統怎么實現

發布時間:2021-12-23 14:13:42 來源:億速云 閱讀:139 作者:iii 欄目:云計算

本篇內容主要講解“Storm流方式的統計系統怎么實現”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Storm流方式的統計系統怎么實現”吧!

1: 初期硬件準備:

                1 如果條件具備:請保證您安裝好了 redis集群

                2 配置好您的Storm開發環境

                3 保證好您的開發環境的暢通: 主機與主機之間,Storm與redis之間

2:業務背景的介紹:

                1  在這里我們將模擬一個   流方式的數據處理過程

                 2 數據的源頭保存在我們的redis 集群之中

                 3  發射的數據格式為: ip,url,client_key

數據發射器

package storm.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Values;
import backtype.storm.tuple.Fields;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import redis.clients.jedis.Jedis;
import storm.utils.Conf;

import java.util.Map;
import org.apache.log4j.Logger;

/**
 * click Spout 從redis中間讀取所需要的數據
 */
public class ClickSpout extends BaseRichSpout {

	private static final long serialVersionUID = -6200450568987812474L;

	public static Logger LOG = Logger.getLogger(ClickSpout.class);

	// 對于redis,我們使用的是jedis客戶端
	private Jedis jedis;

	// 主機
	private String host;

	// 端口
	private int port;

	// Spout 收集器
	private SpoutOutputCollector collector;

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

	
	        // 這里,我們發射的格式為
	        // IP,URL,CLIENT_KEY
		outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.IP,
				storm.cookbook.Fields.URL, storm.cookbook.Fields.CLIENT_KEY));
	}

	@Override
	public void open(Map conf, TopologyContext topologyContext,
			SpoutOutputCollector spoutOutputCollector) {

		host = conf.get(Conf.REDIS_HOST_KEY).toString();
		port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
		this.collector = spoutOutputCollector;
		connectToRedis();
	}

	private void connectToRedis() {
		jedis = new Jedis(host, port);
	}

	@Override
	public void nextTuple() {
		String content = jedis.rpop("count");
		if (content == null || "nil".equals(content)) {
			try {
				Thread.sleep(300);
			} catch (InterruptedException e) {
			}
		} else {

			// 將jedis對象 rpop出來的字符串解析為 json對象
			JSONObject obj = (JSONObject) JSONValue.parse(content);

			String ip = obj.get(storm.cookbook.Fields.IP).toString();
			String url = obj.get(storm.cookbook.Fields.URL).toString();
			String clientKey = obj.get(storm.cookbook.Fields.CLIENT_KEY)
					.toString();

			System.out.println("this is a clientKey");

			// List<Object> tuple對象
			collector.emit(new Values(ip, url, clientKey));
		}
	}
}

在這個過程之中,請注意:

1  我們在 OPEN 方法之中初始化   host,port,collector,以及Redis的連接,調用Connect方法并連接到redis數據庫

2 我們在nextTupe 取出數據,并且將他轉換為一個JSON對象,并且拿到 ip,url,clientKey,同時將他們包裝成為一個

Values對象

讓我們來看看數據的流向圖:

Storm流方式的統計系統怎么實現

在我們的數據從clickSpout 讀取以后,接下來,我們將采用2個bolt

                                    1  : repeatVisitBolt 

                                    2   :  geographyBolt 

共同來讀取同一個數據源的數據:clickSpout

3 細細察看 repeatVisitBolt

package storm.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import redis.clients.jedis.Jedis;
import storm.utils.Conf;

import java.util.Map;

public class RepeatVisitBolt extends BaseRichBolt {

	private OutputCollector collector;

	private Jedis jedis;
	private String host;
	private int port;

	@Override
	public void prepare(Map conf, TopologyContext topologyContext,
			OutputCollector outputCollector) {
		this.collector = outputCollector;
		host = conf.get(Conf.REDIS_HOST_KEY).toString();
		port = Integer.valueOf(conf.get(Conf.REDIS_PORT_KEY).toString());
		connectToRedis();
	}

	private void connectToRedis() {
		jedis = new Jedis(host, port);
		jedis.connect();
	}

	public boolean isConnected() {
		if (jedis == null)
			return false;
		return jedis.isConnected();
	}

	@Override
	public void execute(Tuple tuple) {

		String ip = tuple.getStringByField(storm.cookbook.Fields.IP);
		String clientKey = tuple
				.getStringByField(storm.cookbook.Fields.CLIENT_KEY);
		String url = tuple.getStringByField(storm.cookbook.Fields.URL);
		String key = url + ":" + clientKey;

		String value = jedis.get(key);
		
		// redis中取,如果redis中沒有,就插入新的一條訪問記錄。
		if (value == null) {
			jedis.set(key, "visited");
			collector.emit(new Values(clientKey, url, Boolean.TRUE.toString()));
		} else {
			collector
					.emit(new Values(clientKey, url, Boolean.FALSE.toString()));
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
		outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
				storm.cookbook.Fields.CLIENT_KEY, storm.cookbook.Fields.URL,
				storm.cookbook.Fields.UNIQUE));
	}
}

  在這里,我們把url 和 clientKey 組合成為 【url:clientKey】的格式組合,并依據這個對象,在redis中去查找,如果沒有,那那Set到redis中間去,并且判定它為【unique】

4:

package storm.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

public class VisitStatsBolt extends BaseRichBolt {

    private OutputCollector collector;

    private int total = 0;
    private int uniqueCount = 0;

    @Override
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        this.collector = outputCollector;
    }

    @Override
    public void execute(Tuple tuple) {
    	
    	//在這里,我們在上游來判斷這個Fields 是否是獨特和唯一的
        boolean unique = Boolean.parseBoolean(tuple.getStringByField(storm.cookbook.Fields.UNIQUE));
        
        total++;
        if(unique)uniqueCount++;
        collector.emit(new Values(total,uniqueCount));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(storm.cookbook.Fields.TOTAL_COUNT,
        		storm.cookbook.Fields.TOTAL_UNIQUE));
    }
}

第一次出現,uv ++ 

5  接下來,看看流水線2 :

package storm.bolt;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.json.simple.JSONObject;

import storm.cookbook.IPResolver;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * User: yin shaui Date: 2014/05/21 Time: 8:58 AM To change this template use
 * File | Settings | File Templates.
 */
public class GeographyBolt extends BaseRichBolt {

	// ip解析器
	private IPResolver resolver;

	private OutputCollector collector;

	public GeographyBolt(IPResolver resolver) {
		this.resolver = resolver;
	}

	@Override
	public void prepare(Map map, TopologyContext topologyContext,
			OutputCollector outputCollector) {
		this.collector = outputCollector;
	}

	@Override
	public void execute(Tuple tuple) {

		// 1 從上級的目錄之中拿到我們所要使用的ip
		String ip = tuple.getStringByField(storm.cookbook.Fields.IP);

		// 將ip 轉換為json
		JSONObject json = resolver.resolveIP(ip);

		// 將 city和country 組織成為一個新的元祖,在這里也就是我們的Values對象
		String city = (String) json.get(storm.cookbook.Fields.CITY);
		String country = (String) json.get(storm.cookbook.Fields.COUNTRY_NAME);

		collector.emit(new Values(country, city));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

		// 確定了我們這次輸出元祖的格式
		outputFieldsDeclarer.declare(new Fields(storm.cookbook.Fields.COUNTRY,
				storm.cookbook.Fields.CITY));
	}
}

以上Bolt,完成了一個Ip到 CITY,COUNTRY 的轉換

package storm.bolt;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

public class GeoStatsBolt extends BaseRichBolt {

	private class CountryStats {

		//
		private int countryTotal = 0;

		private static final int COUNT_INDEX = 0;
		private static final int PERCENTAGE_INDEX = 1;
		private String countryName;

		public CountryStats(String countryName) {
			this.countryName = countryName;
		}

		private Map<String, List<Integer>> cityStats = new HashMap<String, List<Integer>>();

		/**
		 * @param cityName
		 */
		public void cityFound(String cityName) {
			countryTotal++;

			// 已經有了值,一個加1的操作
			if (cityStats.containsKey(cityName)) {
				cityStats.get(cityName)
						.set(COUNT_INDEX,
								cityStats.get(cityName).get(COUNT_INDEX)
										.intValue() + 1);
				// 沒有值的時候
			} else {
				List<Integer> list = new LinkedList<Integer>();
				list.add(1);
				list.add(0);
				cityStats.put(cityName, list);
			}

			double percent = (double) cityStats.get(cityName).get(COUNT_INDEX)
					/ (double) countryTotal;

			cityStats.get(cityName).set(PERCENTAGE_INDEX, (int) percent);

		}

		/**
		 * @return 拿到的國家總數
		 */
		public int getCountryTotal() {
			return countryTotal;
		}

		/**
		 * @param cityName  依據傳入的城市名稱,拿到城市總數
		 * @return
		 */

		public int getCityTotal(String cityName) {
			return cityStats.get(cityName).get(COUNT_INDEX).intValue();
		}

		
		public String toString() {
			return "Total Count for " + countryName + " is "
					+ Integer.toString(countryTotal) + "\n" + "Cities:  "
					+ cityStats.toString();
		}
	}

	private OutputCollector collector;

	// CountryStats 是一個內部類的對象
	private Map<String, CountryStats> stats = new HashMap<String, CountryStats>();

	@Override
	public void prepare(Map map, TopologyContext topologyContext,
			OutputCollector outputCollector) {
		this.collector = outputCollector;
	}

	@Override
	public void execute(Tuple tuple) {
		String country = tuple.getStringByField(storm.cookbook.Fields.COUNTRY);
		String city = tuple.getStringByField(storm.cookbook.Fields.CITY);

		// 如果國家不存在的時候,新增加一個國家,國家的統計
		if (!stats.containsKey(country)) {
			stats.put(country, new CountryStats(country));
		}

		// 這里拿到新的統計,cityFound 是拿到某個城市的值
		stats.get(country).cityFound(city);

		collector.emit(new Values(country,
				stats.get(country).getCountryTotal(), city, stats.get(country)
						.getCityTotal(city)));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
		outputFieldsDeclarer.declare(new backtype.storm.tuple.Fields(
				storm.cookbook.Fields.COUNTRY,
				storm.cookbook.Fields.COUNTRY_TOTAL,
				storm.cookbook.Fields.CITY, storm.cookbook.Fields.CITY_TOTAL));
	}
}

有關地理位置的統計,附帶上程序其他的使用類

package storm.cookbook;

/**
 */
public class Fields {

	public static final String IP = "ip";
	
	public static final String URL = "url";
	
	public static final String CLIENT_KEY = "clientKey";
	
	public static final String COUNTRY = "country";
	
	public static final String COUNTRY_NAME = "country_name";
	
	public static final String CITY = "city";
	
	//唯一的,獨一無二的
	public static final String UNIQUE = "unique";
	
	//城鎮整數
	public static final String COUNTRY_TOTAL = "countryTotal";
	
	//城市整數
	public static final String CITY_TOTAL = "cityTotal";
	
	//總共計數
	public static final String TOTAL_COUNT = "totalCount";
	
	//總共獨一無二的
	public static final String TOTAL_UNIQUE = "totalUnique";




}
package storm.cookbook;

import org.json.simple.JSONObject;
import org.json.simple.JSONValue;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;

public class HttpIPResolver implements IPResolver, Serializable {

	static String url = "http://api.hostip.info/get_json.php";

	@Override
	public JSONObject resolveIP(String ip) {
		URL geoUrl = null;
		BufferedReader in = null;
		try {
			geoUrl = new URL(url + "?ip=" + ip);
			URLConnection connection = geoUrl.openConnection();
			in = new BufferedReader(new InputStreamReader(
					connection.getInputStream()));
			String inputLine;

			JSONObject json = (JSONObject) JSONValue.parse(in);

			in.close();

			return json;
		} catch (IOException e) {
			e.printStackTrace();
		} finally {

			// 每當in為空的時候我們不進行如下的close操作,只有在in不為空的時候進行close操作
			if (in != null) {
				try {
					in.close();
				} catch (IOException e) {
				}
			}
		}
		return null;
	}
}
package storm.cookbook;

import org.json.simple.JSONObject;

/**
 * Created with IntelliJ IDEA.
 * User: admin
 * Date: 2012/12/07
 * Time: 5:29 PM
 * To change this template use File | Settings | File Templates.
 */
public interface IPResolver {

	public JSONObject resolveIP(String ip);
}

到此,相信大家對“Storm流方式的統計系統怎么實現”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

遂昌县| 白银市| 班戈县| 宁波市| 资源县| 佛山市| 衡山县| 汶川县| 桐梓县| 廉江市| 嘉祥县| 浦江县| 泰顺县| 澄城县| 南平市| 阿坝| 宿州市| 抚松县| 镇远县| 苍南县| 武城县| 台南县| 佛冈县| 承德市| 巫山县| 石嘴山市| 阳泉市| 昭通市| 湾仔区| 榆中县| 家居| 青神县| 花莲市| 吉首市| 苏尼特左旗| 新野县| 丹阳市| 泸西县| 进贤县| 九龙城区| 定陶县|