91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ存儲中如何實現同步刷盤和異步刷盤

發布時間:2021-11-18 09:36:31 來源:億速云 閱讀:421 作者:小新 欄目:大數據

這篇文章將為大家詳細講解有關RocketMQ存儲中如何實現同步刷盤和異步刷盤,小編覺得挺實用的,因此分享給大家做個參考,希望大家閱讀完這篇文章后可以有所收獲。

一、問題思考

1.同步刷盤是怎么工作的?
2.異步刷盤是怎么工作的?
3.上篇文章的疑問,寫入堆外內存的消息如何落盤的?

二、Broker啟動刷盤有關調用鏈
1.調用鏈

//初始化鏈條
@1 BrokerStartup#main
start(createBrokerController(args));
@2 BrokerStartup#createBrokerController
final BrokerController controller = new BrokerController(...)
boolean initResult = controller.initialize();
@3 BrokerController#initialize
this.messageStore = new DefaultMessageStore(...);
@4 DefaultMessageStore#DefaultMessageStore()
this.commitLog = new CommitLog(this);
@5 CommitLog#CommitLog()
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig()
.getFlushDiskType()) {
this.flushCommitLogService = new GroupCommitService();
} else {
this.flushCommitLogService = new FlushRealTimeService();
}
this.commitLogService = new CommitRealTimeService();
//啟動鏈條
@6 BrokerStartup#start
controller.start();
@7 BrokerController#start()
this.messageStore.start();
@8 DefaultMessageStore#start()
this.commitLog.start();
@9 CommitLog#start()
this.flushCommitLogService.start();
if (defaultMessageStore.getMessageStoreConfig()
.isTransientStorePoolEnable()) {
this.commitLogService.start();
}

小結:由調用鏈可以看出,初始化并啟動了以下線程類

1.同步刷盤 GroupCommitService

2.異步刷盤 FlushRealTimeService

3.如果開啟堆外內存并且為異步刷盤 CommitRealTimeService


2.線程類關系圖

RocketMQ存儲中如何實現同步刷盤和異步刷盤

三、線程類工作流程

既然線程類在Broker啟動時就啟動了,他們在做啥呢?

1.堆外內存線程類CommitRealTimeService工作流程

RocketMQ存儲中如何實現同步刷盤和異步刷盤


小結:
1.CommitRealTimeService主要工作是將寫入堆外內存(writeBuffer)的消息,寫入到fileChannel中,fileChannel為commitLog文件通道

2.committedPosition用于記錄將writeBuffer數據寫入到fileChannel中的內存位點(相對偏移量offset)
3.committedWhere用于記錄寫入fileChannel中的物理偏移量(文件名稱+相對偏移量offset)

2.同步刷盤線程類GroupCommitService工作流程

RocketMQ存儲中如何實現同步刷盤和異步刷盤

注1:

1.執行onWaitEnd時交換讀寫容器,該線程類提供兩個容器來裝GroupCommitRequest

2.requestsWrite和requestsRead,每次執行提交(刷盤)前都會進行容器交換

3.好處:讀寫請求容器分離,避免潛在的鎖競爭

private void swapRequests() {
List<GroupCommitRequest> tmp = this.requestsWrite;
this.requestsWrite = this.requestsRead;
this.requestsRead = tmp;
}

注2:

1.flushedPosition 標記已經刷盤內存的位點。即刷盤相對偏移量,刷盤到什么位置了,下次從此處刷盤即可

2.flushedWhere 標記已經刷盤的物理偏移量,根據此位置可精確查找到文件中消息的存儲位置。flushedWhere = 當前刷盤文件名稱(該日志文件的起始物理偏移量) + flushedPosition

注3:流程圖中標記紅色部分,將刷盤結果通知給等待線程

小結:同步刷盤線程類GroupCommitService主要工作
將請求從讀容器中取出并通過mappedByteBuffer.force()將數據落盤。

3.異步刷盤線程類FlushRealTimeService工作流程

RocketMQ存儲中如何實現同步刷盤和異步刷盤

小結:FlushRealTimeService主要工作
1.不開啟堆外外內存刷盤方式為mappedByteBuffer.force()
2.開啟堆外內存刷盤方式為fileChannel.force


疑問:同步刷盤線程類GroupCommitService每執行一次都會交換讀寫容器,那刷盤請求什么時候放到寫容器(requestsWrite)呢?


四、消息追加與線程類的交互

分析完線程類后,把鏡頭切換到消息追加,看看消息進來后是如何跟線程類交互的?


1.調用鏈

@1 CommitLog#putMessage
//同步刷盤或者異步刷盤
handleDiskFlush(result, putMessageResult, msg);
@2 CommitLog#handleDiskFlush

2.同步刷盤主要代碼

同步刷盤時構造刷盤請求,將請求提交給線程類GroupCommitService,service.putRequest(request),并獲取刷盤結果。

if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
service.putRequest(request);
//等待MappedFile刷盤成功狀態通過countDownLatch來控制
boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
}
}

3.異步刷盤主要代碼

未開啟堆外內存喚醒FlushRealTimeServicee,開啟堆外內存喚醒CommitRealTimeService。

if (!this.defaultMessageStore.getMessageStoreConfig()
.isTransientStorePoolEnable()) {
flushCommitLogService.wakeup();
} else {
commitLogService.wakeup();
}

五、刷盤方式示意圖
1.同步刷盤示意圖

RocketMQ存儲中如何實現同步刷盤和異步刷盤


2.異步刷盤未開啟堆外緩存示意圖

RocketMQ存儲中如何實現同步刷盤和異步刷盤


3.異步刷盤開啟堆外緩存示意圖

RocketMQ存儲中如何實現同步刷盤和異步刷盤

關于“RocketMQ存儲中如何實現同步刷盤和異步刷盤”這篇文章就分享到這里了,希望以上內容可以對大家有一定的幫助,使各位可以學到更多知識,如果覺得文章不錯,請把它分享出去讓更多的人看到。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

晋中市| 富民县| 吴堡县| 额济纳旗| 进贤县| 瓮安县| 靖远县| 葵青区| 白玉县| 淅川县| 怀柔区| 湖南省| 太仓市| 邻水| 双鸭山市| 东乡族自治县| 曲水县| 克山县| 万宁市| 都安| 营口市| 商河县| 堆龙德庆县| 阳谷县| 开远市| 吴江市| 辽源市| 湘潭市| 乾安县| 萝北县| 景德镇市| 凤台县| 齐齐哈尔市| 天等县| 建始县| 凌源市| 芷江| 连平县| 通化市| 铜梁县| 城市|