您好,登錄后才能下訂單哦!
本篇內容主要講解“怎么實現Java異步延遲消息隊列”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“怎么實現Java異步延遲消息隊列”吧!
系統在收到一個請求后,完整鏈路同步順序調用,實現起來簡單易懂,這也是所有功能在實現時最初選擇的方案。這種方案實現起來簡單,在并發量不高且調用鏈路不長的情況下,是最好的選擇方案,因為簡單所以不容易出錯和容易維護。如圖所示,一個請求按照1-2-3-4的順序。
優點:實現簡單易懂、易維護。
缺點:并發量不高。
由于系統是賬戶系統,每天的交易量大概在50-100w左右,且大部分交易集中在某個時間點,這就致使系統必須要支持高并發,異步方案也是該系統在最早設計時就已經選擇好了。異步化后的請求必須要保證成功且需要有持久化的能力,基于此選擇了消息中間件,在對比了幾款中間件后,發現rabbitmq比較符合現有的業務。
如圖所示,同顏色的線屬于一個同步流程,不同線的表示異步,流程講解如下:
主流程1->2->3:應用端發起交易,賬戶系統走完主流程后,將調用外部系統的請求放入MQ消息隊列,則直接響應應用端成功,應用端不關心后續異步操作
MQ通知①:MQ服務器收到消息后,回調賬戶系統,通知已收到消息。
消息消費⑴->⑵->⑶:監聽到隊列有新消息,調用外部系統,完成請求。
優點:快速響應應用端,提高并發量。
缺點:在主流程的第2步,可能由于網絡原因導致MQ沒收到消息,造成主流程成功響應應用端成功,但是由于消息丟失造成后續的異步處理失敗。
基于方案2的缺點,為了避免由于網絡異常造成的發送消息成功但實際MQ沒收到消息的情況,增加了一種發送成功后保證MQ能100%收到消息的機制,即使MQ宕機也能知道哪些消息是MQ沒收到的。
如圖所示,與方案2的對比,不同的是增加了緩存標志,即在發送消息到MQ前,先將該消息緩存到redis作為一個標志(如 2 保存標志),表明發送進行中,等到MQ收到消息并回調通知成功時才將該標志刪除(如 ② 刪除標志);
增加了定時任務:
輪詢redis里的消息,若指定時間內未收到通知,則重新發送該消息。該方案可能會造成重復發消息,所以在消費端需要做冪等控制
刪除已成功但未收到通知的重復標志
基于方案3,基本能保證不會受網絡異常的影響導致消息丟失的情況出現,至此,發送端的保證已經完成,但是消費端還有些不理想。
正常情況下,消費者在消費完消息后,會通知MQ告知已經消費成功,MQ收到后則從隊列刪除消息。如果告知消費失敗,則該消息會重新回到隊列重新被消費者監聽并且獲取。對應到RabbitMQ有以下三種情況:
ACK:成功消費消息,MQ刪除消息
NO_ACK:消費消息失敗,消息重新隊列,等待后續重新被消費
Rejected:消費消息失敗,MQ刪除該消息,如果隊列存在死信隊列的話,則將該消息移到死信隊列,相當于垃圾箱,不會被重新消費。
基于以上三種情況,如果消息消費失敗時,希望的是消息重回隊列,隔一段時間后再被消費,也就是消息具備延遲的效果,但是找遍了官網,發現不支持延遲的機制。如果不延遲消費,那么消息一回到隊列又會馬上被消費,如果外部系統在一段時間內沒有修復,那么在這段時間內的重復消費都屬于無效重試且浪費性能。
腦殼疼ing。。。下班回家在地鐵上頭腦風暴時,終于靈機一動,聯想到了死信隊列的一個功能,那就是可以設置消息在指定時間內沒被消費的話,就認定為是死亡消息,則該消息會被轉到對應的死信隊列。比如,正常隊列A,B作為A的死信隊列,設置A隊列的消息的死亡時間為n秒,如果n秒內沒被消費,則會自動轉移到B隊列。如圖當時馬上在備忘錄記下來。
如圖所示,在一個消息消費失敗后的做法如下:
消息消費失敗發送no_ack,讓消息回到隊列,并記錄失敗次數
重復消費失敗超過三次后,發送rejected,讓消息轉移到死信隊列B
由于死信隊列B無消費者,所以消息在n秒后會轉移到死信隊列C(在這一步起到延遲的效果)
隊列C的消費者消費死亡消息,將消息重新發送到正常隊列A
基于以上的最終方案,在測試同事的壓測下,大概500TPS/秒,不過沒有模擬數據庫方面的瓶頸(往數據庫插入一定量級的數據)。
由于代碼跟項目有關,所以就暫時不發源碼,等我把公司業務相關的移除掉后,再基于最終方案做個demo發到git上。
后續會進行優化,比如:
順序消費
讓需要順序消費的消息發往同個隊列(取模的方式等等),每條隊列只有一個消費者。單個消費者可能會有性能問題,可在消費者應用程序里弄內存隊列再進行并發消費。
消費堆積
消費者宕機造成堆積:消費者宕機了,由于生產者還在生產消息,經過一段時間后就會堆積海量的消息,如果MQ磁盤有限,即使消費者恢復后如果不能快速消費的話,可能會使MQ服務器磁盤爆滿。
消費者雖然恢復了,但是一時間堆積了海量的數據,消費完需要一定的時間。為了不對正常的MQ造成影響,一種解決方案是先快速消費調堆積的消息,但原來的消費者是帶有邏輯的,處理完一條消息可能需要200ms左右,所以為了快速消費,就先啟動一個臨時消費者,只做轉發邏輯大概消費一條消息只需要10ms。臨時消費者將消息轉發到一個臨時MQ,接著再啟動n個原來帶邏輯的消費者去消費,這樣可以達到快速消費堆積消息的效果。
生產快于消費導致:供過于求本身應該盡量優化消費端,找出原因,如果消費者本身就是慢于生產者的話,那這樣只能增加消費者數量了。
持久化帶來的性能影響
同步持久化:生產消息后保存到磁盤才返回給生產者成功,安全性高不會丟失消息,性能不高
異步持久化:性能高,會丟失數據
高可用
集群模式:其他節點保存某臺數據節點的隊列元數據,當消費者消費數據時,其他節點到實際保存消費的節點拉取數據。
優點:避免所有的生產者都往同個MQ寫數據,
缺點:做不到分布式的效果,一個生產者只能往一個MQ寫,其他的消費者若要消費的話還是要到原來的MQ上去拉取消息
鏡像集群模式:所有節點都保存同一份數據
分布式:數據分發在各個節點
到此,相信大家對“怎么實現Java異步延遲消息隊列”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。