91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Apache Pulsar啟動了哪些服務

發布時間:2021-12-24 10:34:28 來源:億速云 閱讀:272 作者:iii 欄目:大數據

這篇文章主要講解了“Apache Pulsar啟動了哪些服務”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“Apache Pulsar啟動了哪些服務”吧!

1.啟動入口

PulsarStandaloneStarter
在standalone模式下,主要啟動了以下幾個服務

  1. PulsarService

  2. PulsarAdmin

  3. LocalBookeeperEnsemble

  4. WorkerService

PulsarBrokerStarter.BrokerStarter
在普通模式下,啟動了以下幾個服務

  1. PulsarService

  2. BookieServer

  3. AutoRecoveryMain

  4. StatsProvider

  5. WorkerService

簡單說一些這幾個服務

  • WorkerService: Pulsar function 相關,可以不啟動

  • PulsarService: 主要的PulsarBroker相關

  • BookieServer: Bookeeper相關

  • AutoRecoveryMain: Bookeeper autorecovery相關

  • StatsProvider: Metric Exporter類似的功能

2. PulsarService

PulsarService.start

  1. ProtocolHandlers
    支持不同protocol處理(kafka協議等)

  2. localZookeeperConnectionProvider
    維護zk session 和zk連接

  3. startZkCacheService

    • LocalZooKeeperCache => LocalZooKeeperCacheService

    • GlobalZooKeeperCache => ConfigurationCacheService

  4. BookkeeperClientFactory
    創建配置Bookkeeper 客戶端

  5. managedLedgerClientFactory
    維護一個ManagedLedger的客戶端,借用BookkeeperClient

  6. BrokerService
    這個是服務器的主要邏輯了,這個放在后面說

  7. loadManager
    收集集群機器負載,并根據負載情況均衡負載

  8. startNamespaceService
    NameSpaceService,管理放置的ResourceBundle,和LoadManager相關

  9. schemaStorage

  10. schemaRegistryService
    上面2個都是和Schema相關的

  11. defaultOffloader
    LedgerOffloader,用來將Ledger(Bookkeeper)中的冷數據放到其他存儲當中

  1. WebService

  2. webSocketService
    http,websocket相關

  3. LeaderElectionService
    和LoadManager有關,如果是集中方式的話需要選出一個Leader定期根據集群情況進行均衡負載

  4. transactionMetadataStoreService
    事務相關

  5. metricGenerator
    metric相關

  6. WorkerService
    pulsar function 相關

3. BrokerService

public void start() throws Exception {
        // producer id 分布式生成器
        this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath,
                pulsar.getConfiguration().getClusterName());

        // 網絡層配置
        ServerBootstrap bootstrap = defaultServerBootstrap.clone();

        ServiceConfiguration serviceConfig = pulsar.getConfiguration();

        bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false));
        ...
        // 綁定端口
        listenChannel = bootstrap.bind(addr).sync().channel();
        ...

       // metric
        this.startStatsUpdater(
                serviceConfig.getStatsUpdateInitialDelayInSecs(),
                serviceConfig.getStatsUpdateFrequencyInSecs());

       // 啟動了一堆需要定期執行的任務
        this.startInactivityMonitor();
       // 啟動3個schedule任務分別檢測
       // 1. 長時間無效的topic
       // 2. 長時間無效的producer(和message去重相關)
       // 3. 長時間無效的subscription
        this.startMessageExpiryMonitor();
        this.startCompactionMonitor();
        this.startMessagePublishBufferMonitor();
        this.startConsumedLedgersMonitor();
        this.startBacklogQuotaChecker();
        this.updateBrokerPublisherThrottlingMaxRate();
        this.startCheckReplicationPolicies();

        // register listener to capture zk-latency
        ClientCnxnAspect.addListener(zkStatsListener);
        ClientCnxnAspect.registerExecutor(pulsar.getExecutor());

4. PulsarChannelInitializer

順著netty的初始化方式我們直接看ChannelInitializer,這里應該和Kafka類似進行處理請求的操作。

protected void initChannel(SocketChannel ch) throws Exception {
        
        ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
     
        ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(
            brokerConf.getMaxMessageSize() + Commands.MESSAGE_SIZE_FRAME_PADDING, 0, 4, 0, 4));

        ch.pipeline().addLast("flowController", new FlowControlHandler());
        ServerCnx cnx = new ServerCnx(pulsar);
        ch.pipeline().addLast("handler", cnx);

        connections.put(ch.remoteAddress(), cnx);
    }

5. ServerCnx

這個類的作用可以對標KafkaApis,處理各種Api請求
這個類實際上是一個ChannelHandler
繼承了PulsarHandler(主要負責一些連接的keepalive邏輯)
PulsarHandler繼承了 PulsarDecoder ( 主要負責序列化,反序列化Api請求)
PulsarDecoder實際上是一個 ChannelInboundHandlerAdapter

而PulsarAPi實際上是通過Pulsar.proto 生成的,這里編寫了各種Api的定義

感謝各位的閱讀,以上就是“Apache Pulsar啟動了哪些服務”的內容了,經過本文的學習后,相信大家對Apache Pulsar啟動了哪些服務這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

盖州市| 云安县| 长治县| 拜泉县| 潮州市| 镇坪县| 阿合奇县| 永清县| 绥棱县| 定远县| 义马市| 湘潭县| 浦县| 香河县| 安徽省| 额尔古纳市| 白河县| 奉贤区| 蓬安县| 方山县| 石城县| 伊宁县| 游戏| 兰溪市| 耒阳市| 韶山市| 吴川市| 罗江县| 开平市| 浙江省| 瓦房店市| 饶阳县| 班戈县| 南川市| 邹城市| 固始县| 漠河县| 眉山市| 大安市| 广西| 内黄县|