您好,登錄后才能下訂單哦!
本篇內容介紹了“java開發微服務架構怎么設計消息隊列”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
在微服務開發中我們經常會引入消息中間件實現業務解耦,執行異步操作, 現在讓我們來看看使用消息中間件的好處和弊端。
首先需要肯定是使用消息組件有很多好處,其中最核心的三個是:解耦、異步、削峰。
解耦:客戶端只要講請求發送給特定的通道即可,不需要感知接收請求實例的情況。
異步:將消息寫入消息隊列,非必要的業務邏輯以異步的方式運行,加快響應速度。
削峰:消息中間件在消息被消費之前一直緩存消息,消息處理端可以按照自己處理的并發量從消息隊列中慢慢處理消息,不會一瞬間壓垮業務。
當然消息中間件并不是銀彈,引入消息機制后也會有如下一些弊端:
潛在的性能瓶頸:消息代理可能會存在性能瓶頸。幸運的是目前主流的消息中間件都支持高度的橫向擴展。
潛在的單點故障:消息代理的高可用性至關重要,否則系統整體的可靠性將受到影響,幸運的是大多數消息中間件都是高可用的。
額外的操作復雜性:消息系統是一個必須獨立安裝、配置和運維的系統組件,增加了運維的復雜度。
這些弊端我們借助消息中間件本身提供的擴展、高可用能力可以解決,但是要真正用好消息中間件我們還需要關注可能會遇到的一些設計難題。
在生產環境中為了提高消息處理的能力以及應用程序的吞吐量,一般會將消費者部署多個實例節點。那么帶來的挑戰就是如何確保每個消息只被處理一次,并且是按照他們的發送順序來處理的。
例如:假設有3個相同的接收方實例從同一個點對點通道讀取消息,發送方按順序發布了 Order Created
、Order Updated
和 Order Cancelled
這3個事件消息。簡單的消息實現可能就會同事講每個消息給不同的接收方。若由于網絡問題導致延遲,消息可能沒有按照他們發出時的順序被處理,這將導致奇怪的行為,服務實例可能在另一個服務器處理 Order Created
消息之前處理 Order Cancelled
消息。
Kafka 使用的解決方案是使用分片(分區)通道。整體解決方案分為三個部分:
一個主題通道由多個分片組成,每個分片的行為類似一個通道。
發送方在消息頭部指定分片鍵如orderId,Kafka使用分片鍵將消息分配給特定的分片。
將接收方的多個實例組合在一起,并將他們視為相同的邏輯接收方(消費者組)。kafka將每個分片分配給單個接收器,它在接收方啟動和關閉時重新分配分片。
如上圖所示,每個Order事件消息都將orderId作為其分片鍵。特定訂單的每個事件都發布到同一個分片。而且該分片中的消息始終由同一個接收方實例讀取,因此這樣就能夠保證按順序處理這些消息。
引入消息架構必須要解決的另一個挑戰是處理重復消息。在理想情況下,消息代理應該只傳遞一次消息,但保證消息有且僅有一次的消息傳遞的成本通常很高。相反,很多消息組件承諾至少保證成功傳遞一次消息。
在正常情況下,消息組件只會傳遞一次消息。但是當客戶端、網絡或消息組件故障可能導致消息被多次傳遞。假設客戶端在處理消息后發送確認消息前,他的數據庫崩潰了,這時消息組件將再次發送未確認的消息,在數據庫重新啟動時向該客戶端發送。
處理重復消息有以下兩種不同的方法:
編寫冪等消息處理程序
跟蹤消息并丟棄重復項
如果應用程序處理消息的邏輯是滿足冪等的,那么重復消息就是無害的。程序的冪等性是指,即使這個應用被相同輸入參數多次重復調用時,也不會產生額外的效果。例如:取消一個已經取消的訂單,就是一個冪等性操作。同樣,創建一個已經存在的訂單操作也必是這樣。滿足冪等的消息處理程序可以被放心的執行多次,只要消息組件在傳遞消息時保持相同的消息順序。
但是不幸的是,應用程序通常不是冪等的。或者你現在正在使用的消息組件在重新傳遞消息時不會保留排序。重復或無序消息可能會導致錯誤。在這種情況下,你需要編寫跟蹤消息并丟棄重復消息的消息處理程序。
考慮一個授權消費者信用卡的消息處理程序。它必須為每個訂單僅執行一次信用卡授權操作。這段應用程序每次調用時都會產生不同的效果。如果重復消息導致消息處理程序多次執行該邏輯,則應用程序的行為將不正確。執行此類應用程序邏輯的消息處理程序必須通過檢測和丟棄重復消息而讓它成為冪等的。
一個簡單的解決方案是消息接收方使用 message id 跟蹤他已處理的消息并丟棄任何重復項。例如,在數據庫表中存儲它消費的每條消息的 message id。
當接收方處理消息時,它將消息的 message id 作為創建和變更業務實體的事務的一部分記錄在數據表里。如上圖所示,接收方將包含message id 的行插入 PROCESSED_MESSAGE表。如果消息是重復的,則INSERT將失敗,接收方可以選擇丟棄該消息。
另一個解決方案是消息處理程序在應用程序表,而不是專門表中記錄 message id。當時用具有受限事務模型的NoSQL數據庫時,此方法特別有用,因為 NoSQL數據庫通常不支持將針對兩個表的更新作為數據庫事務。
服務通常需要在更新數據庫的事務中發布消息,數據庫更新和消息發送都必須在事務中進行,否則服務可能會更新數據庫然后在發送消息之前崩潰。
如果服務不以原子方式執行者兩個操作,則類似的故障可能使系統處于不一致狀態。
接下來我們看一下常用的保證事務消息的兩種解決方案,最后再看看現代消息組件RocketMQ的事務性消息解決方案。
如果你的應用程序正在使用關系型數據庫,要保證數據的更新和消息發送之間的事務可以直接使用事務性發件箱模式,Transactional Outbox。
此模式使用數據庫表作為臨時消息隊列。如上圖所示,發送消息的服務有個OUTBOX數據表,在進行INSERT、UPDATE、DELETE 業務操作時也會給OUTBOX數據表INSERT一條消息記錄,這樣可以保證原子性,因為這是基于本地的ACID事務。
OUTBOX表充當臨時消息隊列,然后我們在引入一個消息中繼(MessageRelay)的服務,由他從OUTBOX表中讀取數據并發布消息到消息組件。
消息中繼的實現可以很簡單,只需要通過定時任務定期從OUTBOX表中拉取最新未發布的數據,獲取到數據后將數據發送給消息組件,最后將完成發送的消息從OUTBOX表中刪除即可。
另外一種保證事務性消息的方式是基于數據庫的事務日志,也就是所謂的數據變更捕獲,Change Data Capture,簡稱CDC。
一般數據庫在數據發生變更的時候都會記錄事務日志(Transaction Log),比如MySQL的binlog。事務日志可以簡單的理解成數據庫本地的一個文件隊列,它主要記錄按時間順序發生的數據庫表變更記錄。
這里我們利用alibaba開源的組件canal結合MySQL來說明下這種模式的工作原理。
更多操作說明可以參考官方文檔:https://github.com/alibaba/canal
canal工作原理
canal 模擬 MySQL slave 的交互協議,把自己偽裝成一個MySQL的 slave節點 ,向 MySQL master 發送dump 協議;
MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal );
canal 解析 binary log 對象(原始為 byte 流),然后可以將解析后的數據直接發送給消息組件。
Apache RocketMQ在4.3.0版中已經支持分布式事務消息,RocketMQ采用了2PC的思想來實現了提交事務消息,同時增加一個補償邏輯來處理二階段超時或者失敗的消息,如下圖所示。
RocketMQ實現事務消息主要分為兩個階段:正常事務的發送及提交、事務信息的補償流程。
整體流程為:
正常事務發送與提交階段
1、生產者發送一個半消息給MQServer(半消息是指消費者暫時不能消費的消息)
2、服務端響應消息寫入結果,半消息發送成功
3、開始執行本地事務
4、根據本地事務的執行狀態執行Commit或者Rollback操作
事務信息的補償流程
1、如果MQServer長時間沒收到本地事務的執行狀態會向生產者發起一個確認回查的操作請求
2、生產者收到確認回查請求后,檢查本地事務的執行狀態
3、根據檢查后的結果執行Commit或者Rollback操作
補償階段主要是用于解決生產者在發送Commit或者Rollback操作時發生超時或失敗的情況。
在生產者使用RocketMQ發送事務消息的時候我們也會借鑒第一種方案即自建一張事務日志表,然后在執行本地事務的時候同時生成一條事務日志記錄,讓本地事務與日志事務在同一個方法中,同時添加 @Transactional
注解,保證兩個操作事務是一個原子操作。這樣如果事務日志表中有這個本地事務的信息,那就代表本地事務執行成功,需要Commit,相反如果沒有對應的事務日志,則表示沒執行成功,需要Rollback。
“java開發微服務架構怎么設計消息隊列”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。