91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ主從如何同步消息消費進度?

發布時間:2020-06-18 11:09:06 來源:網絡 閱讀:737 作者:艾弗森哇 欄目:數據庫

前面我也跟大家講述了 RocketMQ 讀寫分離的規則,但是你可能會問,主從服務器之間的消費進度是如何保持同步的?下面我來給大家解答一下。

如果消費者消費模式不同,也會有不同的保存方式,消費者端的消息消費進度保存到 OffsetStore 中,他有兩個實現類:

org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore?//?本地消費進度保存實現org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore?//?遠程消費進度保存實現

其中,如果是廣播模式消費,消息的消費進度是保存到本地,如果是集群消費模式,消息的消費進度則是保存到 Broker,但無論是保存到本地,還是保存到 Broker,消費者都會在本地留一份緩存,我們暫且看看集群消費模式下,消息消費進度的緩存是如何保存的:http://m.qd8.com.cn/yiyao/xinxi21_3710012.html

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateOffset:

public?void?updateOffset(MessageQueue?mq,?long?offset,?boolean?increaseOnly)?{??if?(mq?!=?null)?{
????AtomicLong?offsetOld?=?this.offsetTable.get(mq);????if?(null?==?offsetOld)?{
??????offsetOld?=?this.offsetTable.putIfAbsent(mq,?new?AtomicLong(offset));
????}????if?(null?!=?offsetOld)?{??????if?(increaseOnly)?{
????????MixAll.compareAndIncreaseOnly(offsetOld,?offset);
??????}?else?{
????????offsetOld.set(offset);
??????}
????}
??}
}

消息者在消費完消息后,會調用以上方法,講消費進度放入 offsetTable 緩存中,當 Rebalance 負載重新分配生成 PullRequest 對象時,會調用 RemoteBrokerOffsetStore.readOffset 方法從 offsetTable 緩存中取出對應的消費進度緩存值,再將該值放進 PullRequest 對象中,接下來消息拉取時就很將消息消費進度緩存發送到 Broker 端,所以我們繼續看 Broker 端的處理邏輯。

之前整理 Broker 啟動流程時,發現 Broker 啟動時會開啟一個定時任務:

org.apache.rocketmq.broker.BrokerController#initialize:

this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{????@Override
????public?void?run()?{????????try?{
????????????BrokerController.this.slaveSynchronize.syncAll();
????????}?catch?(Throwable?e)?{
????????????log.error("ScheduledTask?syncAll?slave?exception",?e);
????????}
????}
},?1000?*?10,?1000?*?60,?TimeUnit.MILLISECONDS);

如果 Broker 是從服務器,則會開啟以上定時任務。

org.apache.rocketmq.broker.slave.SlaveSynchronize#syncAll:

public?void?syncAll()?{??this.syncTopicConfig();??this.syncConsumerOffset();??this.syncDelayOffset();??this.syncSubscriptionGroupConfig();
}

在主服務器沒有宕機的情況下,從服務器會定時從主服務器中同步消息消費進度等信息,那現在問題來了,由于這個同步是單方面同步,即只會從服務器同步主服務器,那如果主服務器宕機了之后,消費者切換成從服務器拉取消息進行消費,如果之后主服務器啟動了,從服務器在把已經消費過的偏移量同步過來,那豈不是造成同步消費了?

其實消費者取在拉取消息的時候,如果消費者的緩存中存在消費進度,也會向 Broker 更新消息消費進度,所以即使是主服務器掛了,在它重新啟動之后,消費者的消費進度沒有丟失,依然會更新主服務器的消息消費進度,這樣一來,消費端與主服務器只掛了器中一個,并不會導致消息重新被消費,具體代碼邏輯如下:

org.apache.rocketmq.broker.processor.PullMessageProcessor#proce***equest:

boolean?storeOffsetEnable?=?brokerAllowSuspend;
storeOffsetEnable?=?storeOffsetEnable?&&?hasCommitOffsetFlag;
storeOffsetEnable?=?storeOffsetEnable
????&&?this.brokerController.getMessageStoreConfig().getBrokerRole()?!=?BrokerRole.SLAVE;if?(storeOffsetEnable)?{?this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),?requestHeader.getConsumerGroup(),?requestHeader.getTopic(),?requestHeader.getQueueId(),?requestHeader.getCommitOffset());
}

其中 brokerAllowSuspend 表示 broker 是否允許掛起,該值默認為 true,hasCommitOffsetFlag 表示息消費者在內存中是否緩存了消息消費進度,從代碼邏輯可看出,如果 Broker 為主服務器,并且 brokerAllowSuspend 和 hasCommitOffsetFlag 都為true,那么就會將消費者消費進度更新到本地。焦作國醫胃腸醫院評價怎么樣:http://jz.lieju.com/zhuankeyiyuan/37325143.htm


向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

阿鲁科尔沁旗| 都安| 德兴市| 德保县| 洞口县| 曲水县| 政和县| 贵港市| 高雄县| 辽宁省| 湖州市| 乐山市| 青神县| 凯里市| 阿坝县| 蒲江县| 元谋县| 长武县| 慈利县| 沁阳市| 新建县| 汾西县| 双鸭山市| 磴口县| 西丰县| 霸州市| 忻州市| 延边| 广元市| 靖江市| 民丰县| 阳东县| 陈巴尔虎旗| 浪卡子县| 仁化县| 牙克石市| 绥阳县| 门源| 平果县| 乌鲁木齐市| 布尔津县|