您好,登錄后才能下訂單哦!
小編給大家分享一下RocketMQ事務消息的示例分析,相信大部分人都還不怎么了解,因此分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后大有收獲,下面讓我們一起去了解一下吧!
一、大事務 = 小事務 + 異步
我們以一個轉帳的場景為例來說明這個問題,Bob向Smith轉賬100塊。這個列子在瓜子也有很多實際場景映射,如:車源狀態變化,訂單狀態變化,金融放款,物流運輸……
在單機環境下,執行事務的情況,大概是下面這個樣子
當用戶增長到一定程度,Bob和Smith的賬戶及余額信息已經不在同一臺服務器上了,那么上面的流程就變成了這樣
這時候你會發現,同樣是一個轉賬的業務,在集群環境下,耗時居然成倍的增長,這顯然是不能夠接受的。而且跨網絡調用的事務需要解決網絡不穩定的因素,直接放到業務代碼里控制,成本很高。那如何來規避這個問題?
大事務 = 小事務 + 異步
將大事務拆分成多個小事務異步執行。這樣基本上能夠將跨機事務的執行效率優化到與單機一致。轉賬的事務就可以分解成如下兩個小事務
圖中執行本地事務(Bob賬戶扣款)和發送異步消息應該保證同時成功或者同時失敗,也就是扣款成功了,發送消息一定要成功,如果扣款失敗了,就不能再發送消息。
二、什么是事務消息(Transactional message)
RocketMQ官方是這樣定義的。可以將其視為兩階段提交消息實現,以確保分布式系統中的最終一致性。事務性消息確保可以原子方式執行本地事務的執行和消息的發送。
1、事務狀態
事務消息有3種狀態
(1)TransactionStatus.CommitTransaction,提交事務,表示允許消費者消費(使用)這條消息
(2)TransactionStatus.RollbackTransaction,回滾事務,表示消息將被刪除,不允許使用
(3)TransactionStatus.Unknown,中間狀態,表示需要MQ向消息發送方進行檢查以確定狀態
2、如何發送事務消息
RocketMQ(4.5.1版本)已經把事務消息的發送方式封裝得非常優雅,只需要兩個大的環節就能夠完成,創建事務消息生產者和實現TransactionListener接口。看一下官方的例子代碼
(1)創建事務消息生產者
使用TransactionMqProducer類創建消息生產客戶端,并指定唯一的ProducerGroup
設置自定義線程池來處理檢查請求
執行本地事務之后,需要根據執行結果回復MQ,回復上一小節中描述的狀態
(2)實現TransactionListener接口
“executeLocalTransaction”方法用于在發送半條消息成功時執行本地事務。它返回上一節中提到的三個事務狀態之一。
“check local transaction”方法用于檢查本地事務狀態并響應MQ檢查請求。它還返回前一節中提到的三個事務狀態之一。
3、事務消息的執行流程
代碼寫起來非常簡單,以至于光看代碼,并不能知道事務消息具體的執行過程。
RocketMQ 事務消息的設計流程借鑒了兩階段提交理論,整體交互流程如下圖所示
事務發起方(即消息發送者)首先發送 prepare 消息到 MQ。
事務發起方(即消息發送者)在發送 prepare 消息成功后執行本地事務。
根據本地事務執行結果發送 commit 或者是 rollback 給 MQ。
如果消息是 rollback,MQ 將刪除該 prepare 消息不進行下發。
如果消息是 commit,MQ 將會把這個消息發送給 consumer 端。
如果執行本地事務過程中,執行端掛掉,或者超時,導致 MQ 收不到任何的消息(不知道是該 commit 還是該 rollback),RocketMQ 會定期掃描消息集群中的事務消息,這時候發現了某個 prepare 消息還不知道該怎么處理,它會向消息發送者確認,所以消息發送者需要實現一個 check 接口,RocketMQ 會根據消息發送者設置的策略來決定是 rollback 還是繼續 commit。這樣就保證了消息發送與本地事務同時成功或同時失敗。
Consumer 端的消費成功機制由 MQ 保證。
4、事務消息的存儲模型
在具體實現上,RocketMQ 通過使用 Half Topic 以及 Operation Topic 兩個內部隊列來存儲事務消息推進狀態,如下圖所示
其中,Half Topic 對應隊列中存放著 prepare 消息,Operation Topic 對應的隊列則存放了 prepare message 對應的 commit/rollback 消息,消息體中則是 prepare message 對應的 offset,服務端通過比對兩個隊列的差值來找到尚未提交的超時事務,進行回查。
從用戶側來說,用戶需要分別實現本地事務執行以及本地事務回查方法,因此只需關注本地事務的執行狀態即可;而在 service 層,則對事務消息的兩階段提交進行了抽象,同時針對超時事務實現了回查邏輯,通過不斷掃描當前事務推進狀態,來不斷反向請求 Producer 端獲取超時事務的執行狀態,在避免事務掛起的同時,也避免了 Producer 端的單點故障。
而在存儲層,RocketMQ 通過 Bridge 封裝了與底層隊列存儲的相關操作,用以操作兩個對應的內部隊列,用戶也可以依賴其他存儲介質實現自己的 service,RocketMQ 會通過 ServiceProvider 加載進來。
三、Notify的異曲同工
Notify和MetaQ是阿里的兩個消息中間件。MetaQ是一個高性能的存儲隊列;Notify是淘寶自主研發的一套消息服務引擎。貼兩個圖就什么都明白了
整體方案跟RocketMQ是完全相同的,只是兩者的Storage不同。
四、瓜子該怎么做事務一致性這塊工作
針對這個典型場景,有很多解決方案
1、Kafka換成RocketMQ
不行。有太多的業務跑在了Kafka上,替換消息中間件的成本基本不能接受。
2、類似去哪兒qmq的方案
這個方案研發簡單,但是侵入具體業務的數據庫,而且增加了部署運維的成本。
3、有人提出binlog+TCC的方案
沒有仔細研究,但是業務會經常調整,想想負責配置數據庫日志的同學肯定會抓狂(DBA沒有那么了解業務)。
4、為Kafka配一個類似Notify的消息引擎
這個方案有一定的可行性
(1)把Kafka定位為MetaQ,研制一個Notify,為prepare message提供單獨的存儲
(2)現在各業務系統所采用的Kafka客戶端已經是瓜子定制化開發的,可以模仿RocketMQ的客戶端進行改造。已有代碼的邏輯完全不受影響;需要事務一致性的功能,只需要換個接口,實現check邏輯即可,而原有消費方毫無感覺。
(3)似乎有可能結合spring的@Transactional標簽,在完全不改業務代碼(只升級自研Kafka客戶端)的情況下,也能緩解一些不一致問題
以上是“RocketMQ事務消息的示例分析”這篇文章的所有內容,感謝各位的閱讀!相信大家都有了一定的了解,希望分享的內容對大家有所幫助,如果還想學習更多知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。