您好,登錄后才能下訂單哦!
這篇文章主要介紹“PartitionManager分區管理器怎么使用”,在日常操作中,相信很多人在PartitionManager分區管理器怎么使用問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”PartitionManager分區管理器怎么使用”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
閱讀背景:對于java內部類有一個粗淺的認識
閱讀目的:了解kafka 分區是如何在Storm接口之中進行管理的
最終主題:詳盡的梳理PartitionManager的整個過程
package com.mixbox.storm.kafka; import backtype.storm.Config; import backtype.storm.metric.api.CombinedMetric; import backtype.storm.metric.api.CountMetric; import backtype.storm.metric.api.MeanReducer; import backtype.storm.metric.api.ReducedMetric; import backtype.storm.spout.SpoutOutputCollector; import com.google.common.collect.ImmutableMap; import com.mixbox.storm.kafka.KafkaSpout.EmitState; import com.mixbox.storm.kafka.KafkaSpout.MessageAndRealOffset; import com.mixbox.storm.kafka.trident.MaxMetric; import kafka.api.OffsetRequest; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; import kafka.message.MessageAndOffset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; /** * 分區的管理器 * * @author Yin Shuai * */ public class PartitionManager { public static final Logger LOG = LoggerFactory .getLogger(PartitionManager.class); private final CombinedMetric _fetchAPILatencyMax; private final ReducedMetric _fetchAPILatencyMean; private final CountMetric _fetchAPICallCount; private final CountMetric _fetchAPIMessageCount; /** * kafka MessageID 封裝了 partition 和offset * * @author Yin Shuai */ static class KafkaMessageId { public Partition partition; public long offset; public KafkaMessageId(Partition partition, long offset) { this.partition = partition; this.offset = offset; } } // 被發送的偏移量 Long _emittedToOffset; SortedSet<Long> _pending = new TreeSet<Long>(); // 已經提交的 Long _committedTo; // 等待去發射 LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>(); // 分區 Partition _partition; // Storm Spout的配置文件 SpoutConfig _spoutConfig; // topology 的實例ID String _topologyInstanceId; // kafka 底層的消費者ID SimpleConsumer _consumer; // 動態的分區Connection DynamicPartitionConnections _connections; //ZKState 狀態的維護 ZkState _state; //Storm的配置文件 Map _stormConf; // @SuppressWarnings("unchecked") public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) { _partition = id; _connections = connections; _spoutConfig = spoutConfig; _topologyInstanceId = topologyInstanceId; _consumer = connections.register(id.host, id.partition); _state = state; _stormConf = stormConf; String jsonTopologyId = null; Long jsonOffset = null; String path = committedPath(); try { Map<Object, Object> json = _state.readJSON(path); LOG.info("Read partition information from: " + path + " --> " + json); if (json != null) { jsonTopologyId = (String) ((Map<Object, Object>) json .get("topology")).get("id"); jsonOffset = (Long) json.get("offset"); } } catch (Throwable e) { LOG.warn("Error reading and/or parsing at ZkNode: " + path, e); } if (jsonTopologyId == null || jsonOffset == null) { // failed to parse // JSON? _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig); LOG.info("No partition information found, using configuration to determine offset"); } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) { _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime); LOG.info("Topology change detected and reset from start forced, using configuration to determine offset"); } else { _committedTo = jsonOffset; LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId); } LOG.info("Starting " + _partition + " from offset " + _committedTo); _emittedToOffset = _committedTo; _fetchAPILatencyMax = new CombinedMetric(new MaxMetric()); _fetchAPILatencyMean = new ReducedMetric(new MeanReducer()); _fetchAPICallCount = new CountMetric(); _fetchAPIMessageCount = new CountMetric(); } public Map getMetricsDataMap() { Map ret = new HashMap(); ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset()); ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset()); ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset()); ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset()); return ret; } // returns false if it's reached the end of current batch public EmitState next(SpoutOutputCollector collector) { //等待去發送的 為空了。 if (_waitingToEmit.isEmpty()) { // 開始裝載 fill(); } while (true) { //檢索并移除List中間的第一個元素 MessageAndRealOffset toEmit = _waitingToEmit.pollFirst(); //要發送的為空的時候, 沒有發生的 if (toEmit == null) { return EmitState.NO_EMITTED; } // 這里的tups Iterable<List<Object>> tups = KafkaUtils.generateTuples( _spoutConfig, toEmit.msg); if (tups != null) { for (List<Object> tup : tups) { collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); } break; } else { ack(toEmit.offset); } } if (!_waitingToEmit.isEmpty()) { return EmitState.EMITTED_MORE_LEFT; } else { return EmitState.EMITTED_END; } } /** * 填充的行為 * 這里真正的決定了你有哪些數據需要去發送 */ private void fill() { long start = System.nanoTime(); /* * 拿到MessageSet */ ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset); long end = System.nanoTime(); long millis = (end - start) / 1000000; _fetchAPILatencyMax.update(millis); _fetchAPILatencyMean.update(millis); _fetchAPICallCount.incr(); int numMessages = countMessages(msgs); _fetchAPIMessageCount.incrBy(numMessages); if (numMessages > 0) { LOG.info("Fetched " + numMessages + " messages from: " + _partition); } for (MessageAndOffset msg : msgs) { _pending.add(_emittedToOffset); _waitingToEmit.add(new MessageAndRealOffset(msg.message(), _emittedToOffset)); _emittedToOffset = msg.nextOffset(); } if (numMessages > 0) { LOG.info("Added " + numMessages + " messages from: " + _partition + " to internal buffers"); } } private int countMessages(ByteBufferMessageSet messageSet) { int counter = 0; for (MessageAndOffset messageAndOffset : messageSet) { counter = counter + 1; } return counter; } public void ack(Long offset) { _pending.remove(offset); } public void fail(Long offset) { // TODO: should it use in-memory ack set to skip anything that's been // acked but not committed??? // things might get crazy with lots of timeouts if (_emittedToOffset > offset) { _emittedToOffset = offset; _pending.tailSet(offset).clear(); } } public void commit() { // 最新完成的偏移量 long lastCompletedOffset = lastCompletedOffset(); //寫最新的完全的偏移量到zk,的某個分區,到某一個topology if (lastCompletedOffset != lastCommittedOffset()) { LOG.info("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId); Map<Object, Object> data = ImmutableMap .builder() .put("topology", ImmutableMap.of("id", _topologyInstanceId, "name", _stormConf.get(Config.TOPOLOGY_NAME))) .put("offset", lastCompletedOffset) .put("partition", _partition.partition) .put("broker", ImmutableMap.of("host", _partition.host.host, "port", _partition.host.port)) .put("topic", _spoutConfig.topic).build(); // 直接JSON 寫入 _state.writeJSON(committedPath(), data); _committedTo = lastCompletedOffset; LOG.info("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId); } else { LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId); } } //提交的路徑 private String committedPath() { return "/" + _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId(); } //拿到最新的分區便宜量 public long queryPartitionOffsetLatestTime() { return KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition, OffsetRequest.LatestTime()); } //最新的提交的便宜量 public long lastCommittedOffset() { return _committedTo; } public long lastCompletedOffset() { if (_pending.isEmpty()) { return _emittedToOffset; } else { return _pending.first(); } } //拿到最新的分區 public Partition getPartition() { return _partition; } public void close() { _connections.unregister(_partition.host, _partition.partition); } }
1 PartitionManager封裝了一個Static 的class kafkaMessageId,并且封裝了某個分區和偏移量
static class KafkaMessageId { public Partition partition; public long offset; public KafkaMessageId(Partition partition, long offset) { this.partition = partition; this.offset = offset; } }
2: 在PartitionManager中同時持有了一下的實例變量:
2.1 已經發射的數據 pending
2.2 已經提交的 committedTo
2.3 等待去發射的 _waitingToEmit
2.4 具體的分區 _partition
其中 _waitingToEmit 是一個LinkedList<MessageAndRealOffset>
3 : PartitionManager 在初始化的時候,需要傳遞的參數是
topologyInstanceId
DynamicPartitionConnections
ZkState
Map
SpoutConfig
Partition
SimpleConsumer 對象,SimpleConsumer對象將在 DynamicPartitionConnections中
通過register的方法進行注冊
到此,關于“PartitionManager分區管理器怎么使用”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。