91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ中如何實現push consumer消息拉取

發布時間:2021-12-17 14:20:26 來源:億速云 閱讀:399 作者:小新 欄目:大數據

這篇文章主要介紹RocketMQ中如何實現push consumer消息拉取,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!

RebalanceImpl.updateProcessQueueTableInRebalance方法的末尾,對于每一個新生成的ProcessQueue都會創建一個PullRequest執行首次消息拉取操作。PullRequest會通過RebalanceImpl.dispatchPullRequest方法達到DefaultMQPushConsumerImpl.executePullRequestImmediately,然后被投遞到PullMessageService的本地隊列中。

PulMessageService會啟動一個服務線程,不斷消費投遞到本地隊列中的PullRequest,最終調用到DefaultMQPushConsumerImpl.pullMessage方法。PullMessageService被MQClientInstance持有,同一個客戶端實例中所有的push consumer產生的PullRequest都會被投遞到同一個PullMessageService本地隊列中排隊等待執行。

DefaultMQPushConsumerImpl.pullMessage是消息拉取的核心方法。該方法首先會執行一系列的限流判斷,若命中限流條件則本次執行結束,等待一個固定時間之后會再次將同一個PullRequest投遞到PullMessageService中重新觸發消息拉取。

DefaultMQPushConsumerImpl.pullMessage核心邏輯:

   public void pullMessage(final PullRequest pullRequest) {
        // 限流判斷 ....

        final long beginTimestamp = System.currentTimeMillis();

        // 消息拉取callback
        PullCallback pullCallback = new PullCallback() {...};

        boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            // 獲取當前隊列已經被消費到最新的offset,通過本次pull請求附帶在broker上commit該offset
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {
                commitOffsetEnable = true;
            }
        }

        String subExpression = null;
        boolean classFilter = false;
        SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
        if (sd != null) {
            if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
                subExpression = sd.getSubString(); // 消息過濾表達式
            }

            classFilter = sd.isClassFilterMode();
        }

        int sysFlag = PullSysFlag.buildSysFlag(
            commitOffsetEnable, // commitOffset
            true, // suspend
            subExpression != null, // subscription
            classFilter // class filter
        );
        try {
            // 發起pull請求,成功后異步回調pullCallback
            this.pullAPIWrapper.pullKernelImpl(
                pullRequest.getMessageQueue(),
                subExpression,
                subscriptionData.getExpressionType(),
                subscriptionData.getSubVersion(),
                pullRequest.getNextOffset(),
                this.defaultMQPushConsumer.getPullBatchSize(),
                sysFlag,
                commitOffsetValue,
                BROKER_SUSPEND_MAX_TIME_MILLIS,
                CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
                CommunicationMode.ASYNC,
                pullCallback
            );
        } catch (Exception e) {
            log.error("pullKernelImpl exception", e);
            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        }
    }

pullMessage方法中創建的匿名PullCallback用來處理拉取到的消息列表:

        PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    // 1. 反序列化,并執行過濾
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);

                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
                                // 2. 保存到本地ProcessQueue中緩存
                                boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                // 3. 提交到ConsumeMessageService中,被push到message listener執行業務處理
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
                                    pullResult.getMsgFoundList(),
                                    processQueue,
                                    pullRequest.getMessageQueue(),
                                    dispatchToConsume);

                                // 4. 提交下一次PullRequest
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }

                            if (pullResult.getNextBeginOffset() < prevRequestOffset
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
                                    pullResult.getNextBeginOffset(),
                                    firstMsgOffset,
                                    prevRequestOffset);
                            }

                            break;
                        // ....
                    }
                }
            }

           // ....
        };

上面的第3步consumeMessageService.submitConsumeRequest中將根據并行或串行不同的方式將message提交給listener執行業務處理動作。

消息拉取的整體流程如下:

RocketMQ中如何實現push consumer消息拉取

以上是“RocketMQ中如何實現push consumer消息拉取”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

和政县| 富民县| 同江市| 徐州市| 阿尔山市| 康马县| 富蕴县| 芜湖县| 赣榆县| 绿春县| 保靖县| 北流市| 永年县| 庐江县| 大埔县| 石屏县| 玉溪市| 黔南| 眉山市| 东台市| 南开区| 井研县| 洞口县| 扬州市| 孟州市| 阳泉市| 邵武市| 勐海县| 桃江县| 农安县| 绥德县| 天峻县| 达州市| 黑山县| 郴州市| 繁峙县| 惠东县| 温泉县| 泸定县| 昭通市| 温宿县|