您好,登錄后才能下訂單哦!
RocketMQ采用內存和磁盤存儲來存儲消息。那現在來分析一下消息存儲的流程
在Broker啟動的時候會拉起相關服務
流程如下:
流程圖引用網址
http://blog.csdn.net/akfly/article/details/53447000
由于是Broker來存儲消息,那么消息入口的代碼應該是在Broker里面,而Broker的入口是BrokerStartup,以及重要的BrokerController。
具體流程可以參考Broker啟動源代碼分析。
Broker啟動流程
以發送消息為例
Broker啟動的時候,會注冊一個SendMessageProcesser來響應netty的發送消息請求,如下:
public void registerProcessor() {
/**
* SendMessageProcessor
*/
SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
sendProcessor.registerSendMessageHook(sendMessageHookList);
sendProcessor.registerConsumeMessageHook(consumeMessageHookList);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
}
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
@Override
public RemotingCommand proce***equest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
SendMessageContext mqtraceContext;
...
switch (request.getCode()) {
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader);
}
}
}
繼續看sendMessage..
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
...
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
調用MessageStore.putMessage(msgInner)
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。