91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

52.源代碼解讀-RocketMQ消息寫入機制

發布時間:2020-07-21 14:43:59 來源:網絡 閱讀:797 作者:rongwei84n 欄目:軟件技術

一. 前言

RocketMQ采用內存和磁盤存儲來存儲消息。那現在來分析一下消息存儲的流程

二. 代碼流程

在Broker啟動的時候會拉起相關服務
流程如下:

52.源代碼解讀-RocketMQ消息寫入機制

流程圖引用網址
http://blog.csdn.net/akfly/article/details/53447000

三. 代碼流程

由于是Broker來存儲消息,那么消息入口的代碼應該是在Broker里面,而Broker的入口是BrokerStartup,以及重要的BrokerController。
具體流程可以參考Broker啟動源代碼分析。

Broker啟動流程

以發送消息為例

1. Broker啟動注冊發送消息處理器

Broker啟動的時候,會注冊一個SendMessageProcesser來響應netty的發送消息請求,如下:

public void registerProcessor() {
        /**
         * SendMessageProcessor
         */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
}

2. 消息處理器處理發送者發送過來的消息

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

    @Override
    public RemotingCommand proce***equest(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        SendMessageContext mqtraceContext;
                ...
        switch (request.getCode()) {
            response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
        }
    }
}       

繼續看sendMessage..

private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
        ...
        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}               

調用MessageStore.putMessage(msgInner)

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

玉林市| 通许县| 睢宁县| 南溪县| 金阳县| 庄河市| 汽车| 习水县| 凤城市| 洪江市| 格尔木市| 平潭县| 丘北县| 鲜城| 巴里| 凤城市| 安徽省| 曲沃县| 翼城县| 资讯| 乡城县| 南江县| 荥经县| 托克逊县| 焦作市| 鄂尔多斯市| 林芝县| 闵行区| 措勤县| 武陟县| 贵州省| 汉阴县| 馆陶县| 闻喜县| 汤阴县| 宁南县| 尚义县| 北辰区| 新昌县| 广丰县| 兰考县|