您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關Storm如何和Kafka進行整合,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
對于Storm 如何和Kafka進行整合
package com.mixbox.storm.kafka; import backtype.storm.Config; import backtype.storm.metric.api.IMetric; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import kafka.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mixbox.storm.kafka.PartitionManager.KafkaMessageId; import java.util.*; /** * @author Yin Shuai */ public class KafkaSpout extends BaseRichSpout { public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class); /** * 內部類,Message和Offset的偏移量對象 * * @author Yin Shuai */ public static class MessageAndRealOffset { public Message msg; public long offset; public MessageAndRealOffset(Message msg, long offset) { this.msg = msg; this.offset = offset; } } /** * 發射的枚舉類 * @author Yin Shuai */ static enum EmitState { EMITTED_MORE_LEFT, EMITTED_END, NO_EMITTED } String _uuid = UUID.randomUUID().toString(); SpoutConfig _spoutConfig; SpoutOutputCollector _collector; // 分區的協調器,getMyManagedPartitions 拿到我所管理的分區 PartitionCoordinator _coordinator; // 動態的分區鏈接:保存到kafka各個節點的連接,以及負責的topic的partition號碼 DynamicPartitionConnections _connections; // 提供了從zookeeper讀寫kafka 消費者信息的功能 ZkState _state; // 上次更新的毫秒數 long _lastUpdateMs = 0; // 當前的分區 int _currPartitionIndex = 0; public KafkaSpout(SpoutConfig spoutConf) { _spoutConfig = spoutConf; } @SuppressWarnings("unchecked") @Override public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) { _collector = collector; List<String> zkServers = _spoutConfig.zkServers; // 初始化的時候如果zkServers 為空,那么初始化 默認的配置Zookeeper if (zkServers == null) { zkServers = new ArrayList<String>() { { add("192.168.50.144"); add("192.168.50.169"); add("192.168.50.207"); } }; // zkServers = // (List<String>)conf.get(Config.STORM_ZOOKEEPER_SERVERS); System.out.println(" 使用的是Storm默認配置的Zookeeper List : " + zkServers); } Integer zkPort = _spoutConfig.zkPort; // 在這里我們也同時 來檢查zookeeper的端口是否為空 if (zkPort == null) { zkPort = 2181; // zkPort = ((Number) // conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue(); } Map stateConf = new HashMap(conf); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort); stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot); // 通過保存的配置文件,我們持有了一個zookeeper的state,支持節點內容的創建和刪除 _state = new ZkState(stateConf); // 對于連接的維護 _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig)); // using TransactionalState like this is a hack // 拿到總共的任務次數 int totalTasks = context .getComponentTasks(context.getThisComponentId()).size(); // 判斷當前的主機是否是靜態的statichost if (_spoutConfig.hosts instanceof StaticHosts) { _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); // 當你拿到的spoutConfig是zkhost的時候 } else { _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid); } context.registerMetric("kafkaOffset", new IMetric() { KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric( _spoutConfig.topic, _connections); @Override public Object getValueAndReset() { List<PartitionManager> pms = _coordinator .getMyManagedPartitions(); Set<Partition> latestPartitions = new HashSet(); for (PartitionManager pm : pms) { latestPartitions.add(pm.getPartition()); } _kafkaOffsetMetric.refreshPartitions(latestPartitions); for (PartitionManager pm : pms) { _kafkaOffsetMetric.setLatestEmittedOffset( pm.getPartition(), pm.lastCompletedOffset()); } return _kafkaOffsetMetric.getValueAndReset(); } }, _spoutConfig.metricsTimeBucketSizeInSecs); context.registerMetric("kafkaPartition", new IMetric() { @Override public Object getValueAndReset() { List<PartitionManager> pms = _coordinator .getMyManagedPartitions(); Map concatMetricsDataMaps = new HashMap(); for (PartitionManager pm : pms) { concatMetricsDataMaps.putAll(pm.getMetricsDataMap()); } return concatMetricsDataMaps; } }, _spoutConfig.metricsTimeBucketSizeInSecs); } @Override public void close() { _state.close(); } @Override public void nextTuple() { // Storm-spout 是從kafka 消費數據,把 kafka 的 consumer // 當成是一個spout,并且向其他的bolt的發送數據 // 拿到當前我管理的這些PartitionsManager List<PartitionManager> managers = _coordinator.getMyManagedPartitions(); for (int i = 0; i < managers.size(); i++) { // 對于每一個分區的 PartitionManager // in case the number of managers decreased // 當前的分區 _currPartitionIndex = _currPartitionIndex % managers.size(); // 拿到當前的分區,并且發送,這里把SpoutOutputCollector傳遞進去了,由他發射元祖 EmitState state = managers.get(_currPartitionIndex) .next(_collector); // 如果發送狀態為:發送-還有剩余 if (state != EmitState.EMITTED_MORE_LEFT) { _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); } // 如果發送的狀態為: 發送-沒有剩余 if (state != EmitState.NO_EMITTED) { break; } } long now = System.currentTimeMillis(); if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) { commit(); } } @Override public void ack(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.ack(id.offset); } } @Override public void fail(Object msgId) { KafkaMessageId id = (KafkaMessageId) msgId; PartitionManager m = _coordinator.getManager(id.partition); if (m != null) { m.fail(id.offset); } } @Override public void deactivate() { // 停止工作 commit(); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println(_spoutConfig.scheme.getOutputFields()); declarer.declare(_spoutConfig.scheme.getOutputFields()); } private void commit() { _lastUpdateMs = System.currentTimeMillis(); for (PartitionManager manager : _coordinator.getMyManagedPartitions()) { manager.commit(); } } }
在粗淺的代碼閱讀之后,在這里進行詳細的分析:
1 KafkaSpout之中持有了一個 MessageAndRealOffset 的內部類
public static class MessageAndRealOffset { public Message msg; public long offset; public MessageAndRealOffset(Message msg,long offset) { this.msg = msg; this.offset = offset; } }
2 在Spout之中我們還持有了一個PartitionCoordinator的分區協調器,默認的情況我們實例化的對象
是ZKCoordinator
關于Storm如何和Kafka進行整合就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。