您好,登錄后才能下訂單哦!
這篇文章給大家分享的是有關RocketMQ消息存儲的示例分析的內容。小編覺得挺實用的,因此分享給大家做個參考,一起跟隨小編過來看看吧。
先梳理消息存儲主干流程。本分切分為兩部分,第一部分消息存儲流程概覽,主要為校驗流程;第二部分CommitLog存儲概覽,即消息存儲流程。
調用鏈
@1 SendMessageProcessor#sendMessage
//消息存儲
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
@2 DefaultMessageStore#putMessage
流程圖
備注:PageCache是否繁忙,內存鎖定時間為1秒,在集群流量負載很高時可能出現system busy,broker buys等異常信息。
源代碼
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
//如果消息存儲服務已關閉,則消息寫入被拒絕
if (this.shutdown) {
log.warn("message store has shutdown, so putMessage is forbidden");
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
//Slave不處理消息存儲
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is slave mode, so putMessage is forbidden ");
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
}
//如果消息存儲服務不可寫,則消息寫入會被拒絕
//出現該錯誤可能磁盤已滿
if (!this.runningFlags.isWriteable()) {
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0) {
log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());
}
return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);
} else {
this.printTimes.set(0);
}
//Topic長度的限制不能超過127個字節
if (msg.getTopic().length() > Byte.MAX_VALUE) {
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
//消息屬性長度檢查不能超過32K
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);
}
//判斷PageCache是否繁忙:閥值[osPageCacheBusyTimeOutMills = 1000 ] 比較時間為當前時間與Commit Lock時間之差
//如果返回true,意味著此時有消息在寫入CommitLog,且那條消息的寫入耗時較長(超過1s),則本條消息不再寫入
//返回內存頁寫入繁忙
if (this.isOSPageCacheBusy()) {
return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
}
long beginTime = this.getSystemClock().now();
//將消息寫入CommitLog
PutMessageResult result = this.commitLog.putMessage(msg);
//消息寫入時間過長,發出警告
long eclipseTime = this.getSystemClock().now() - beginTime;
if (eclipseTime > 500) {
log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);
}
//對消息的存儲耗時進行分級記錄,并記錄當前所有消息存儲時的最大耗時
this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);
//記錄存粗失敗次數
if (null == result || !result.isOk()) {
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
}
return result;
}
調用鏈
@1 DefaultMessageStore#putMessage
//將消息寫入CommitLog
PutMessageResult result = this.commitLog.putMessage(msg);
@2 CommitLog#putMessage
流程圖
備注:此時寫入消息并沒有寫入磁盤,而是寫入了writeBuffer或者mappedByteBuffer(PageCache或堆外內存)
源代碼
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
// Set the storage time
//設置消息存儲時間(存儲到Broker的時間)
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
//Message Body的循環冗余校驗碼,防止消息體內容被篡改
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
//統計存儲耗時相關的Metric
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
//獲取消息類型
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
//不處理事務消息
//重試(延時)消息發到SCHEDULE_TOPIC中
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE//
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
//延時投遞時間級別
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//將Topic更改為 SCHEDULE_TOPIC_XXXX
topic = ScheduleMessageService.SCHEDULE_TOPIC;
//根據延時級別獲取延時消息新隊列ID(queueId等于延時級別-1)
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
//將消息中原topic和queueId存入到消息屬性中
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
long eclipseTimeInLock = 0;
MappedFile unlockMappedFile = null;
//獲取最新的日志文件CommitLog 內存映射文件 零拷貝
//mappedFileQueue 管理這些連續的CommitLog文件
//MappedFile 和 MappedFileQueue高性能的磁盤接口
//mappedFileQueue可以理解為commitLog文件夾,而MappedFile對應文件夾下的文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//加鎖,默認使用自旋鎖。//依賴于messageStoreConfig#useReentrantLockWhenPutMessage配置
//putMessage會有多個工作線程并行處理,所以需要加鎖。串行寫入commitLog
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
//再次設置時間戳全局有序
msg.setStoreTimestamp(beginLockTimestamp);
//文件已滿或者沒有映射文件重新創建一個文件
if (null == mappedFile || mappedFile.isFull()) {
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
}
//創建映射文件失敗(可能磁盤已滿)
if (null == mappedFile) {
log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
//消息寫入完成后,先將beginTimeInLock設置為0,然后釋放鎖
//該值用來計算消息寫入耗時。寫入新消息前,會根據該值來檢查操作系統內存頁寫入是否繁忙
//如果上一條消息在1s內沒有成功寫入,則本次消息不再寫入
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
}
//向映射文件中寫入消息
//注意:只是將消息寫入映射文件中的writeBuffer/mappedByteBuffer,沒有刷盤
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK: //消息成功寫入
break;
//文件已經到結尾了,重新建一個新的mappedFile.
case END_OF_FILE: //當前CommitLog可用空間不足
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
//創建新的CommitLog,并重新寫入消息
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED: //消息長度超過了最大閥值
case PROPERTIES_SIZE_EXCEEDED: //消息屬性超過了最大閥值
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
case UNKNOWN_ERROR: //未知錯誤
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
default:
beginTimeInLock = 0;
return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
}
eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
} finally {
putMessageLock.unlock(); //釋放鎖
}
if (eclipseTimeInLock > 500) {
log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);
}
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
}
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics Metrics指標
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
//同步刷盤或者異步刷盤
handleDiskFlush(result, putMessageResult, msg);
//主從同步
handleHA(result, putMessageResult, msg);
return putMessageResult;
}
感謝各位的閱讀!關于“RocketMQ消息存儲的示例分析”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,讓大家可以學到更多知識,如果覺得文章不錯,可以把它分享出去讓更多的人看到吧!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。