您好,登錄后才能下訂單哦!
本篇內容介紹了“RocketMQ DLedger多副本即主從切換的實現原理”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
DLedger 基于 raft 協議,故天然支持主從切換,即主節點(Leader)發生故障,會重新觸發選主,在集群內再選舉出新的主節點。
RocketMQ 中主從同步,從節點不僅會從主節點同步數據,也會同步元數據,包含 topic 路由信息、消費進度、延遲隊列處理隊列、消費組訂閱配置等信息。那主從切換后元數據如何同步呢?特別是主從切換過程中,對消息消費有多大的影響,會丟失消息嗎?
本節先對 BrokerController 中與主從切換相關的方法。
BrokerController#startProcessorByHa
private void startProcessorByHa(BrokerRole role) { if (BrokerRole.SLAVE != role) { if (this.transactionalMessageCheckService != null) { this.transactionalMessageCheckService.start(); } } }
感覺該方法的取名較為隨意,該方法的作用是開啟事務狀態回查處理器,即當節點為主節點時,開啟對應的事務狀態回查處理器,對PREPARE狀態的消息發起事務狀態回查請求。
BrokerController#shutdownProcessorByHa
private void shutdownProcessorByHa() { if (this.transactionalMessageCheckService != null) { this.transactionalMessageCheckService.shutdown(true); } }
關閉事務狀態回查處理器,當節點從主節點變更為從節點后,該方法被調用。
BrokerController#handleSlaveSynchronize
private void handleSlaveSynchronize(BrokerRole role) { if (role == BrokerRole.SLAVE) { // [@1](https://my.oschina.net/u/1198) if (null != slaveSyncFuture) { slaveSyncFuture.cancel(false); } this.slaveSynchronize.setMasterAddr(null); // slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { [@Override](https://my.oschina.net/u/1162528) public void run() { try { BrokerController.this.slaveSynchronize.syncAll(); } catch (Throwable e) { log.error("ScheduledTask SlaveSynchronize syncAll error.", e); } } }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS); } else { // @2 //handle the slave synchronise if (null != slaveSyncFuture) { slaveSyncFuture.cancel(false); } this.slaveSynchronize.setMasterAddr(null); } }
該方法的主要作用是處理從節點的元數據同步,即從節點向主節點主動同步 topic 的路由信息、消費進度、延遲隊列處理隊列、消費組訂閱配置等信息。
代碼@1:如果當前節點的角色為從節點:
如果上次同步的 future 不為空,則首先先取消。
然后設置 slaveSynchronize 的 master 地址為空。不知大家是否與筆者一樣,有一個疑問,從節點的時候,如果將 master 地址設置為空,那如何同步元數據,那這個值會在什么時候設置呢?
開啟定時同步任務,每 10s 從主節點同步一次元數據。
代碼@2:如果當前節點的角色為主節點,則取消定時同步任務并設置 master 的地址為空。
BrokerController#changeToSlave
public void changeToSlave(int brokerId) { log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId); //change the role brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check // @1 messageStoreConfig.setBrokerRole(BrokerRole.SLAVE); // @2 //handle the scheduled service try { this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE); // @3 } catch (Throwable t) { log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t); } //handle the transactional service try { this.shutdownProcessorByHa(); // @4 } catch (Throwable t) { log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t); } //handle the slave synchronise handleSlaveSynchronize(BrokerRole.SLAVE); // @5 try { this.registerBrokerAll(true, true, brokerConfig.isForceRegister()); // @6 } catch (Throwable ignored) { } log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId); }
Broker 狀態變更為從節點。其關鍵實現如下:
設置 brokerId,如果broker的id為0,則設置為1,這里在使用的時候,注意規劃好集群內節點的 brokerId。
設置 broker 的狀態為 BrokerRole.SLAVE。
如果是從節點,則關閉定時調度線程(處理 RocketMQ 延遲隊列),如果是主節點,則啟動該線程。
關閉事務狀態回查處理器。
從節點需要啟動元數據同步處理器,即啟動 SlaveSynchronize 定時從主服務器同步元數據。
立即向集群內所有的 nameserver 告知 broker 信息狀態的變更。
BrokerController#changeToMaster
public void changeToMaster(BrokerRole role) { if (role == BrokerRole.SLAVE) { return; } log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName()); //handle the slave synchronise handleSlaveSynchronize(role); // @1 //handle the scheduled service try { this.messageStore.handleScheduleMessageService(role); // @2 } catch (Throwable t) { log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t); } //handle the transactional service try { this.startProcessorByHa(BrokerRole.SYNC_MASTER); // @3 } catch (Throwable t) { log.error("[MONITOR] startProcessorByHa failed when changing to master", t); } //if the operations above are totally successful, we change to master brokerConfig.setBrokerId(0); //TO DO check // @4 messageStoreConfig.setBrokerRole(role); try { this.registerBrokerAll(true, true, brokerConfig.isForceRegister()); // @5 } catch (Throwable ignored) { } log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName()); }
該方法是 Broker 角色從從節點變更為主節點的處理邏輯,其實現要點如下:
關閉元數據同步器,因為主節點無需同步。
開啟定時任務處理線程。
開啟事務狀態回查處理線程。
設置 brokerId 為 0。
向 nameserver 立即發送心跳包以便告知 broker 服務器當前最新的狀態。
主從節點狀態變更的核心方法就介紹到這里了,接下來看看如何觸發主從切換。
從前面的文章我們可以得知,RocketMQ DLedger 是基于 raft 協議實現的,在該協議中就實現了主節點的選舉與主節點失效后集群會自動進行重新選舉,經過協商投票產生新的主節點,從而實現高可用。
BrokerController#initialize
if (messageStoreConfig.isEnableDLegerCommitLog()) { DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore); ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler); }
上述代碼片段截取自 BrokerController 的 initialize 方法,我們可以得知在 Broker 啟動時,如果開啟了 多副本機制,即 enableDLedgerCommitLog 參數設置為 true,會為 集群節點選主器添加 roleChangeHandler 事件處理器,即節點發送變更后的事件處理器。
接下來我們將重點探討 DLedgerRoleChangeHandler 。
DLedgerRoleChangeHandler 繼承自 RoleChangeHandler,即節點狀態發生變更后的事件處理器。上述的屬性都很簡單,在這里就重點介紹一下 ExecutorService executorService,事件處理線程池,但只會開啟一個線程,故事件將一個一個按順序執行。
接下來我們來重點看一下 handle 方法的執行。
DLedgerRoleChangeHandler#handle
public void handle(long term, MemberState.Role role) { Runnable runnable = new Runnable() { public void run() { long start = System.currentTimeMillis(); try { boolean succ = true; log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole()); switch (role) { case CANDIDATE: // @1 if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) { brokerController.changeToSlave(dLedgerCommitLog.getId()); } break; case FOLLOWER: // @2 brokerController.changeToSlave(dLedgerCommitLog.getId()); break; case LEADER: // @3 while (true) { if (!dLegerServer.getMemberState().isLeader()) { succ = false; break; } if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) { break; } if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex() && messageStore.dispatchBehindBytes() == 0) { break; } Thread.sleep(100); } if (succ) { messageStore.recoverTopicQueueTable(); brokerController.changeToMaster(BrokerRole.SYNC_MASTER); } break; default: break; } log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start)); } catch (Throwable t) { log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t); } } }; executorService.submit(runnable); }
代碼@1:如果當前節點狀態機狀態為 CANDIDATE,表示正在發起 Leader 節點,如果該服務器的角色不是 SLAVE 的話,需要將狀態切換為 SLAVE。
代碼@2:如果當前節點狀態機狀態為 FOLLOWER,broker 節點將轉換為 從節點。
代碼@3:如果當前節點狀態機狀態為 Leader,說明該節點被選舉為 Leader,在切換到 Master 節點之前,首先需要等待當前節點追加的數據都已經被提交后才可以將狀態變更為 Master,其關鍵實現如下:
如果 ledgerEndIndex 為 -1,表示當前節點還未又數據轉發,直接跳出循環,無需等待。
如果 ledgerEndIndex 不為 -1 ,則必須等待數據都已提交,即 ledgerEndIndex 與 committedIndex 相等。
并且需要等待 commitlog 日志全部已轉發到 consumequeue中,即 ReputMessageService 中的 reputFromOffset 與 commitlog 的 maxOffset 相等。
等待上述條件滿足后,即可以進行狀態的變更,需要恢復 ConsumeQueue,維護每一個 queue 對應的 maxOffset,然后將 broker 角色轉變為 master。
經過上面的步驟,就能實時完成 broker 主節點的自動切換。由于單從代碼的角度來看主從切換不夠直觀,下面我將給出主從切換的流程圖。
由于從源碼的角度或許不夠直觀,故本節給出其流程圖。
> 溫馨提示:該流程圖的前半部分在 源碼分析 RocketMQ 整合 DLedger(多副本)實現平滑升級的設計技巧 該文中有所闡述。
我相信經過上面的講解,大家應該對主從切換的實現原理有了一個比較清晰的理解,我更相信讀者朋友們會拋出一個疑問,主從切換會不會丟失消息,消息消費進度是否會丟失而導致重復消費呢?
首先,由于 RocketMQ 元數據,當然也包含消息消費進度的同步是采用的從服務器定時向主服務器拉取進行更新,存在時延,引入 DLedger 機制,也并不保證其一致性,DLedger 只保證 commitlog 文件的一致性。
當主節點宕機后,各個從節點并不會完成同步了消息消費進度,于此同時,消息消費繼續,此時消費者會繼續從從節點拉取消息進行消費,但匯報的從節點并不一定會成為新的主節點,故消費進度在 broker 端存在丟失的可能性。當然并不是一定會丟失,因為消息消費端只要不重啟,消息消費進度會存儲在內存中。
綜合所述,消息消費進度在 broker 端會有丟失的可能性,存在重復消費的可能性,不過問題不大,因為 RocketMQ 本身也不承若不會重復消費。
消息會不會丟失的關鍵在于,日志復制進度較慢的從節點是否可以被選舉為主節點,如果在一個集群中,從節點的復制進度落后與從主節點,但當主節點宕機后,如果該從節點被選舉成為新的主節點,那這將是一個災難,將會丟失數據。關于一個節點是否給另外一個節點投贊成票的邏輯在 源碼分析 RocketMQ DLedger 多副本之 Leader 選主 的 2.4.2 handleVote 方法中已詳細介紹,在這里我以截圖的方式再展示其核心點:
從上面可以得知,如果發起投票節點的復制進度比自己小的話,會投拒絕票。
必須得到集群內超過半數節點認可,即最終選舉出來的主節點的當前復制進度一定是比絕大多數的從節點要大,并且也會等于承偌給客戶端的已提交偏移量。故得出的結論是不會丟消息。
本文的介紹就到此為止了,最后拋出一個思考題與大家相互交流學習,也算是對 DLedger 多副本即主從切換一個總結回顧。答案我會以留言的方式或在下一篇文章中給出。
例如一個集群內有5個節點的 DLedgr 集群。 Leader Node: n0-broker-a folloer Node: n1-broker-a,n2-broker-a,n3-broker-a,n4-broker-a
從節點的復制進度可能不一致,例如: n1-broker-a復制進度為 100 n2-broker-a復制進度為 120 n3-broker-a復制進度為 90 n4-broker-a負載進度為 90
如果此時 n0-broker-a 節點宕機,觸發選主,如果 n1率先發起投票,由于 n1,的復制進度大于 n3,n4,再加上自己一票,是有可能成為leader的,此時消息會丟失嗎?為什么?
“RocketMQ DLedger多副本即主從切換的實現原理”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。