91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ事務消息是怎么保證數據的一致性

發布時間:2021-10-18 11:24:59 來源:億速云 閱讀:157 作者:iii 欄目:web開發

這篇文章主要介紹“RocketMQ事務消息是怎么保證數據的一致性”,在日常操作中,相信很多人在RocketMQ事務消息是怎么保證數據的一致性問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”RocketMQ事務消息是怎么保證數據的一致性”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

前言

在面過的幾家大廠中,幾乎每輪的面試官(「沒寫錯,幾乎是每輪面試官」)都問了同樣一個問題:你們的系統是分布式的系統嗎?

答:是。

面試官:那么你們分布式的系統是如何解決分布式事務這個問題的呢?也就是如何保證數據的一致性。

答:我們的系統中通過 RocketMQ 的事務消息來保證數據的最終一致性。

面試官:那你說說它是如何來保證數據的最終一致性的?

答:分兩部分來回答,第一部分先回答事務消息的實現流程,第二部分解釋為什么它能保證數據的最終一致性。

事務消息的實現流程

RocketMQ事務消息是怎么保證數據的一致性

事務消息

  1. 鴻蒙官方戰略合作共建——HarmonyOS技術社區

  2. 首先服務 A 發送一個半事務消息(也稱 half 消息)至 MQ 中。為什么要先發送一個 half 消息呢?這是為了保證服務 A 和 MQ  之間的通信正常,如果無法正常通信,則服務 A 可以直接返回一個異常,也就不用處理后面的邏輯的了。

  3. 如果 half 消息發送成功,MQ 收到這個 half 消息后,會返回一個 success 響應給服務 A。

  4. 服務 A 接收到 MQ 返回的 success 響應后,開始處理本地的業務邏輯,并提交本地事務。

  5. 如果服務 A 本地事務提交成功,則會向 MQ 中發送 commit,表示將 half 消息提交,MQ 就會執行第 5 步操作;如果服務 A  本地事務提交失敗,則直接回滾本地事務,并向 MQ 中發送 rollback,表示將之前的 half 消息進行回滾,MQ 接收到 rollback 消息后,就會將  half 消息刪除。

  6. 如果 commit,則將 half 消息寫入到磁盤。

  7. 如果 MQ 長時間沒有接收到 commit 或者 rollback 消息,例如:服務 A 在處理本地業務時宕機了,或者發送的  commit、rollback 因為在弱網環境,數據丟失了。那么 MQ 就會在一定時間后嘗試調用服務 A 提供的一個接口,通過這個接口來判斷 half  消息的狀態。所以服務 A 提供的接口,需要實現的業務邏輯是:通過數據庫中對應數據的狀態來判斷,之前的 half 消息對應的業務是否執行成功。如果 MQ  從這個接口中得知 half 消息執行成功了,那么 MQ 就會將 half 消息持久化到本地磁盤,如果得知沒有執行成功,那么就會將 half 消息刪除。

  8. 服務 B 從 MQ 中消費到對應的消息。

  9. 服務 B 處理本地業務邏輯,然后提交本地事務。

如何保證數據的最終一致性

實現流程說完了,可能你現在有各種各樣的疑惑?

Q: half 消息是個啥?

A: 它和我們正常發送的普通消息是一樣的,都是存儲在 MQ 中,唯一不同的是 half 在 MQ 中不會立馬被消費者消費到,除非這個 half 消息被  commit 了。(至于為什么未 commit 的 half 消息無法被消費者讀取到,這是因為在 MQ 內部,對于事務消息而言,在 commit  之前,會先放在一個內部隊列中,只有 commit 了,才會真正將消息放在消費者能讀取到的 topic 隊列中)

Q: 為什么要先發送 half 消息?

A: 前面已經解釋過了,主要是為了保證服務 A 和 MQ 之間是否能正常通信,如果兩者之間都不能正常通信,后面還玩個錘子,直接返回異常就可以了。

Q: 如果 MQ 接收到了 half 消息,但是在返回 success 響應的時候,因為網絡原因,導致服務 A 沒有接收到 success  響應,這個時候是什么現象?

A: 當服務 A 發送 half 消息后,它會等待 MQ 給自己返回 success 響應,如果沒有接收到,那么服務 A  也會直接結束,返回異常,不再執行后續邏輯。不執行后續邏輯,這樣服務 A 也就不會提交 commit 消息給 MQ,MQ 長時間沒接收到 commit  消息,那么它就會主動回調服務 A 的一個接口,服務 A 通過接口,查詢本地數據后,發現這條消息對應的業務并沒有正常執行,那么就告訴 MQ,這個 half  消息不能 commit,需要 rollback,MQ 知道后,就將 half 消息進行刪除。

Q: 如果服務 A 本地事務執行失敗了,怎么辦?

A: 服務 A 本地事務執行失敗后,先對自己本地事務進行回滾,然后再向 MQ 發送 rollback 操作。

Q: 服務 A 本地事務提交成功或失敗后,向 MQ 發送的 commit 或者 rollback 消息,因為網絡問題丟失了,又該怎么處理?

A: 和上一個問題一樣,MQ 長時間沒有接收到 half 消息的 commit 或者 rollback 消息,MQ 會主動回調服務 A  的接口,通過這個接口來判斷自己該對這個 half 消息如何處理。

Q: 前面說的全是事務消息的實現流程,這和事務消息如何保證數據的最終一致性有什么關系呢?

A: 有關系。首先,服務 A 執行本地事務并提交和向 MQ 中發送消息這是兩個寫操作,然后通過 RocketMQ  的事務消息,我們保證了這兩個寫操作要么都執行成功,要么都執行失敗。然后讓其他系統,如服務 B 通過消費 MQ  中的消息,然后再去執行自己本地的事務,這樣到最后,服務 A 和服務 B 這兩個系統的數據狀態是不是達到了一致?這就是最終一致性的含義。

如果要求服務 A 和服務 B 的數據狀態,在服務 A 返回給客戶端之間,這兩者就達到一致,這是強一致性,RocketMQ 是沒法保證強一致性的。

目前通過「可靠消息來保證數據的最終一致性」是很多大廠都采用的方案,基本都是通過 MQ  和補償機制來保證數據的一致性。(所謂的可靠消息,就是消息不丟失,如何保證 MQ 的消息不丟失,下篇文章會寫,這也是面試常考題)

Q: 服務 B 本地事務提交失敗了,怎么辦?

A: 如果服務 B 本地事務提交失敗了,可以進行多次重試,直到成功。如果重試多次后,還是提交失敗,例如此時服務 B 對應的 DB 宕機了,這個時候只要服務  B 不向 MQ 提交本次消息的 offset 即可。如果不提交 offset,那么 MQ 會在一定時間后,繼續將這條消息推送給服務 B,服務 B  就可以繼續執行本地事務并提交了,直到成功。這樣,依舊是保證了服務 A 和服務 B 數據的最終一致性。

代碼實現

使用 RokcetMQ 的事務消息主要涉及到兩個部分:

如何發送半事務消息,這個可以通過「TransactionMQProducer」 類來實現。

TransactionMQProducer transactionMQProducer = new TransactionMQProducer("producerGroup"); TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(msg, null); // 通過result來判斷half消息是否發送成功 if(result.getSendStatus() == SendStatus.SEND_OK){     // 成功 }else{     // 失敗 }

在前面我們提到了服務 A 需要提供一個接口,用來供 MQ 回調服務  A,實際上這個接口就是一個監聽器:「TransactionListener」的方法。這是一個接口,提供了兩個方法。

public interface TransactionListener {       // 當half消息發送成功后,我們在這里實現自己的業務邏輯,然后commit或者rollback 給MQ     LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);        // 這個方法就是供MQ回調的方法,MQ通過回調該方法來判斷half消息的狀態      // 可以看到,這個方法的參數是MessageExt,也就是half消息的內容,如果根據MessageExt,我們完全能在服務A中判斷之前的業務是否處理成功     LocalTransactionState checkLocalTransaction(final MessageExt msg); }

實際使用時,我們需要實現該接口,例如:

public class MyTransactionListener implements TransactionListener {      @Override     public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {         try{             // 處理業務邏輯             // ....              // 業務邏輯處理成功,commit             return LocalTransactionState.COMMIT_MESSAGE;         }catch (Exception e){          }         // 業務處理失敗,rollback         return LocalTransactionState.ROLLBACK_MESSAGE;     }      @Override     public LocalTransactionState checkLocalTransaction(MessageExt msg) {         return null;     } }

另外,在創建 producer 時,指定我們實現實現的監聽器

TransactionMQProducer transactionMQProducer = new TransactionMQProducer("producerGroup"); transactionMQProducer.setTransactionListener(new MyTransactionListener());

到此,關于“RocketMQ事務消息是怎么保證數據的一致性”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

罗源县| 潍坊市| 扎鲁特旗| 梧州市| 封开县| 平舆县| 景泰县| 探索| 金堂县| 集贤县| 迁西县| 若尔盖县| 五河县| 镇坪县| 抚远县| 江安县| 抚顺县| 乐平市| 嘉鱼县| 绥江县| 镇坪县| 和硕县| 天津市| 梅河口市| 随州市| 利辛县| 舒城县| 阳高县| 股票| 巴青县| 洞口县| 安达市| 竹北市| 马尔康县| 彭泽县| 永济市| 恩平市| 彩票| 德阳市| 桃园市| 兴化市|