您好,登錄后才能下訂單哦!
這篇文章主要講解了“Apache Pulsar啟動了哪些服務”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Apache Pulsar啟動了哪些服務”吧!
PulsarStandaloneStarter
在standalone模式下,主要啟動了以下幾個服務
PulsarService
PulsarAdmin
LocalBookeeperEnsemble
WorkerService
PulsarBrokerStarter.BrokerStarter
在普通模式下,啟動了以下幾個服務
PulsarService
BookieServer
AutoRecoveryMain
StatsProvider
WorkerService
簡單說一些這幾個服務
WorkerService: Pulsar function 相關,可以不啟動
PulsarService: 主要的PulsarBroker相關
BookieServer: Bookeeper相關
AutoRecoveryMain: Bookeeper autorecovery相關
StatsProvider: Metric Exporter類似的功能
PulsarService.start
ProtocolHandlers
支持不同protocol處理(kafka協議等)
localZookeeperConnectionProvider
維護zk session 和zk連接
startZkCacheService
LocalZooKeeperCache => LocalZooKeeperCacheService
GlobalZooKeeperCache => ConfigurationCacheService
BookkeeperClientFactory
創建配置Bookkeeper 客戶端
managedLedgerClientFactory
維護一個ManagedLedger的客戶端,借用BookkeeperClient
BrokerService
這個是服務器的主要邏輯了,這個放在后面說
loadManager
收集集群機器負載,并根據負載情況均衡負載
startNamespaceService
NameSpaceService,管理放置的ResourceBundle,和LoadManager相關
schemaStorage
schemaRegistryService
上面2個都是和Schema相關的
defaultOffloader
LedgerOffloader,用來將Ledger(Bookkeeper)中的冷數據放到其他存儲當中
WebService
webSocketService
http,websocket相關
LeaderElectionService
和LoadManager有關,如果是集中方式的話需要選出一個Leader定期根據集群情況進行均衡負載
transactionMetadataStoreService
事務相關
metricGenerator
metric相關
WorkerService
pulsar function 相關
public void start() throws Exception { // producer id 分布式生成器 this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath, pulsar.getConfiguration().getClusterName()); // 網絡層配置 ServerBootstrap bootstrap = defaultServerBootstrap.clone(); ServiceConfiguration serviceConfig = pulsar.getConfiguration(); bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false)); ... // 綁定端口 listenChannel = bootstrap.bind(addr).sync().channel(); ... // metric this.startStatsUpdater( serviceConfig.getStatsUpdateInitialDelayInSecs(), serviceConfig.getStatsUpdateFrequencyInSecs()); // 啟動了一堆需要定期執行的任務 this.startInactivityMonitor(); // 啟動3個schedule任務分別檢測 // 1. 長時間無效的topic // 2. 長時間無效的producer(和message去重相關) // 3. 長時間無效的subscription this.startMessageExpiryMonitor(); this.startCompactionMonitor(); this.startMessagePublishBufferMonitor(); this.startConsumedLedgersMonitor(); this.startBacklogQuotaChecker(); this.updateBrokerPublisherThrottlingMaxRate(); this.startCheckReplicationPolicies(); // register listener to capture zk-latency ClientCnxnAspect.addListener(zkStatsListener); ClientCnxnAspect.registerExecutor(pulsar.getExecutor());
順著netty的初始化方式我們直接看ChannelInitializer,這里應該和Kafka類似進行處理請求的操作。
protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER); ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder( brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4)); ch.pipeline().addLast("flowController", new FlowControlHandler()); ServerCnx cnx = new ServerCnx(pulsar); ch.pipeline().addLast("handler", cnx); connections.put(ch.remoteAddress(), cnx); }
這個類的作用可以對標KafkaApis,處理各種Api請求
這個類實際上是一個ChannelHandler
繼承了PulsarHandler(主要負責一些連接的keepalive邏輯)
PulsarHandler繼承了 PulsarDecoder ( 主要負責序列化,反序列化Api請求)
PulsarDecoder實際上是一個 ChannelInboundHandlerAdapter
而PulsarAPi實際上是通過Pulsar.proto 生成的,這里編寫了各種Api的定義
感謝各位的閱讀,以上就是“Apache Pulsar啟動了哪些服務”的內容了,經過本文的學習后,相信大家對Apache Pulsar啟動了哪些服務這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。