91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ中broker消息存儲之如何實現消息轉儲

發布時間:2021-12-17 14:22:25 來源:億速云 閱讀:185 作者:小新 欄目:大數據

小編給大家分享一下RocketMQ中broker消息存儲之如何實現消息轉儲,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

broker在接收到producer發送的消息之后,首先會將消息存儲到CommitLog的末尾,然后通過一個異步的分發線程ReputMessageService將消息轉儲到ConsumeQueue以及IndexFile中。

轉儲的核心邏輯在ReputMessageService.doReput中:

    // DefaultMessageStore.ReputMessageService    
    private void doReput() {
            if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
                log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                    this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
                this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
            }
            for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

                if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                    && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                    break;
                }

                // 1. 獲取reputFromOffset偏移所指向的數據
                SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();

                        for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                            // 2. 解析消息體
                            DispatchRequest dispatchRequest =
                                DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
                            int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                            if (dispatchRequest.isSuccess()) {
                                if (size > 0) {
                                    // 3. 執行分發
                                    DefaultMessageStore.this.doDispatch(dispatchRequest);

                                    // ...

                                    this.reputFromOffset += size;
                                    readSize += size;
                                    // ...
                                } else if (size == 0) {
                                    this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                    readSize = result.getSize();
                                }
                            } else if (!dispatchRequest.isSuccess()) {

                               // ...
                            }
                        }
                    } finally {
                        result.release();
                    }
                } else {
                    doNext = false;
                }
            }
        }

ConsumeQueue的插入操作如下:

    // ConsumeQueue.java
    private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset) {

        if (offset + size <= this.maxPhysicOffset) {
            log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
            return true;
        }

        // 1. 將commitlog offset/msg size/tags code寫到內存緩存
        this.byteBufferIndex.flip();
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        this.byteBufferIndex.putLong(offset);
        this.byteBufferIndex.putInt(size);
        this.byteBufferIndex.putLong(tagsCode);

        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; // ConsumeQueue中偏移

        // 2. 獲取最后一個MappedFile
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
        if (mappedFile != null) {

            // ...
            this.maxPhysicOffset = offset + size;
            // 3. 寫入索引數據
            return mappedFile.appendMessage(this.byteBufferIndex.array());
        }
        return false;
    }

IndexFile的寫入邏輯如下:

    // IndexService.java
    public void buildIndex(DispatchRequest req) {
        IndexFile indexFile = retryGetAndCreateIndexFile();
        if (indexFile != null) {
            long endPhyOffset = indexFile.getEndPhyOffset();
            DispatchRequest msg = req;
            String topic = msg.getTopic();
            String keys = msg.getKeys();
            if (msg.getCommitLogOffset() < endPhyOffset) {
                return;
            }

            // ...
         
            if (keys != null && keys.length() > 0) {
                String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
                for (int i = 0; i < keyset.length; i++) { // 為每個key執行寫入
                    String key = keyset[i];
                    if (key.length() > 0) {
                        indexFile = putKey(indexFile, msg, buildKey(topic, key));
                        if (indexFile == null) {
                            log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                            return;
                        }
                    }
                }
            }
        } else {
            log.error("build index error, stop building index");
        }
    }

    private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
        for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
            log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");

            indexFile = retryGetAndCreateIndexFile(); // 文件已滿,重試
            if (null == indexFile) {
                return null;
            }

            ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
        }

        return indexFile;
    }

消息轉儲的整體流程如下圖:

RocketMQ中broker消息存儲之如何實現消息轉儲

看完了這篇文章,相信你對“RocketMQ中broker消息存儲之如何實現消息轉儲”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

林西县| 斗六市| 连州市| 邹城市| 千阳县| 罗甸县| 阿坝县| 东莞市| 上饶市| 峡江县| 延庆县| 封开县| 乌兰浩特市| 潼关县| 佛冈县| 图片| 大渡口区| 启东市| 页游| 修水县| 城固县| 磴口县| 汉源县| 手游| 满城县| 延津县| 宜君县| 土默特右旗| 清丰县| 上杭县| 兰西县| 永康市| 海兴县| 灵丘县| 崇文区| 合川市| 宿松县| 文化| 雅安市| 浦东新区| 饶阳县|