您好,登錄后才能下訂單哦!
本篇內容介紹了“如何實現一個延遲隊列”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
首先,隊列這種數據結構相信大家都不陌生,它是一種先進先出的數據結構。普通隊列中的元素是有序的,先進入隊列中的元素會被優先取出進行消費;
延時隊列相比于普通隊列最大的區別就體現在其延時的屬性上,普通隊列的元素是先進先出,按入隊順序進行處理,而延時隊列中的元素在入隊時會指定一個延遲時間,表示其希望能夠在經過該指定時間后處理。從某種意義上來講,延遲隊列的結構并不像一個隊列,而更像是一種以時間為權重的有序堆結構。
我在開發業務需求時遇到的使用場景是這樣的,用戶可以在小程序中訂閱不同的微信或者 QQ 的模板消息,產品同學可以在小程序的管理端新建消息推送計劃,當到達指定的時間節點的時候給所有訂閱模板消息的用戶進行消息推送。
如果僅僅是服務單一的小程序,那也許起個定時任務,或者甚至人工的定時去執行能夠最便捷最快速的去完成這項需求,但我們希望能夠抽象出一個消息訂閱的模塊服務出來給所有業務使用,這時候就需要一種通用的系統的解決方案,這時候便需要使用到延遲隊列了。
除了上述我所遇到的這樣的典型的需求以外,延遲隊列的應用場景其實也非常的廣泛,比如說以下的場景:
鴻蒙官方戰略合作共建——HarmonyOS技術社區
新建的訂單,如果用戶在 15 分鐘內未支付,則自動取消。
公司的會議預定系統,在會議預定成功后,會在會議開始前半小時通知所有預定該會議的用戶。
安全工單超過 24 小時未處理,則自動拉企業微信群提醒相關責任人。
用戶下單外賣以后,距離超時時間還有 10 分鐘時提醒外賣小哥即將超時。
對于數據量比較少并且時效性要求不那么高的場景,一種比較簡單的方式是輪詢數據庫,比如每秒輪詢一下數據庫中所有數據,處理所有到期的數據,比如如果我是公司內部的會議預定系統的開發者,我可能就會采用這種方案,因為整個系統的數據量必然不會很大并且會議開始前提前 30 分鐘提醒與提前 29 分鐘提醒的差別并不大。
但是如果需要處理的數據量比較大實時性要求比較高,比如淘寶每天的所有新建訂單 15 分鐘內未支付的自動超時,數量級高達百萬甚至千萬,這時候如果你還敢輪詢數據庫怕是要被你老板打死,不被老板打死估計也要被運維同學打死。
這種場景下,就需要使用到我們今天的主角 —— 延遲隊列了。延遲隊列為我們提供了一種高效的處理大量需要延遲消費消息的解決方案。那么話不多說,下面我們就來看一下幾種常見的延遲隊列的解決方案以及他們各自的優缺點。
我們知道 Redis 有一個有序集合的數據結構 ZSet,ZSet 中每個元素都有一個對應 Score,ZSet 中所有元素是按照其 Score 進行排序的。
那么我們可以通過以下這幾個操作使用 Redis 的 ZSet 來實現一個延遲隊列:
鴻蒙官方戰略合作共建——HarmonyOS技術社區
入隊操作: ZADD KEY timestamp task
, 我們將需要處理的任務,按其需要延遲處理時間作為 Score 加入到 ZSet 中。Redis 的 ZAdd 的時間復雜度是 O(logN)
, N
是 ZSet 中元素個數,因此我們能相對比較高效的進行入隊操作。
起一個進程定時(比如每隔一秒)通過 ZREANGEBYSCORE
方法查詢 ZSet 中 Score 最小的元素,具體操作為: ZRANGEBYSCORE KEY -inf +inf limit 0 1 WITHSCORES
。查詢結果有兩種情況:
a. 查詢出的分數小于等于當前時間戳,說明到這個任務需要執行的時間了,則去異步處理該任務;
b. 查詢出的分數大于當前時間戳,由于剛剛的查詢操作取出來的是分數最小的元素,所以說明 ZSet 中所有的任務都還沒有到需要執行的時間,則休眠一秒后繼續查詢;
同樣的, ZRANGEBYSCORE
操作的時間復雜度為 O(logN + M)
,其中 N
為 ZSet 中元素個數, M
為查詢的元素個數,因此我們定時查詢操作也是比較高效的。
這里從網上搬運了一套 Redis 實現延遲隊列的后端架構,其在原來 Redis 的 ZSet 實現上進行了一系列的優化,使得整個系統更穩定、更健壯,能夠應對高并發場景,并且具有更好的可擴展性,是一個挺不錯的架構設計,其整體架構圖如下:
其核心設計思路:
鴻蒙官方戰略合作共建——HarmonyOS技術社區
將延遲的消息任務通過 hash 算法路由至不同的 Redis Key 上,這樣做有兩大好處:
a. 避免了當一個 KEY 在存儲了較多的延時消息后,入隊操作以及查詢操作速度變慢的問題(兩個操作的時間復雜度均為 O(logN)
)。
b. 系統具有了更好的橫向可擴展性,當數據量激增時,我們可以通過增加 Redis Key 的數量來快速的擴展整個系統,來抗住數據量的增長。
每個 Redis Key 都對應建立一個處理進程,稱為 Event 進程,通過上述步驟 2 中所述的 ZRANGEBYSCORE 方法輪詢 Key,查詢是否有待處理的延遲消息。
所有的 Event 進程只負責分發消息,具體的業務邏輯通過一個額外的消息隊列異步處理,這么做的好處也是顯而易見的:
a. 一方面,Event 進程只負責分發消息,那么其處理消息的速度就會非常快,就不太會出現因為業務邏輯復雜而導致消息堆積的情況。
b. 另一方面,采用一個額外的消息隊列后,消息處理的可擴展性也會更好,我們可以通過增加消費者進程數量來擴展整個系統的消息處理能力。
Event 進程采用 Zookeeper 選主單進程部署的方式,避免 Event 進程宕機后,Redis Key 中消息堆積的情況。一旦 Zookeeper 的 leader 主機宕機,Zookeeper 會自動選擇新的 leader 主機來處理 Redis Key 中的消息。
從上述的討論中我們可以看到,通過 Redis Zset 實現延遲隊列是一種理解起來較為直觀,可以快速落地的方案。并且我們可以依賴 Redis 自身的持久化來實現持久化,使用 Redis 集群來支持高并發和高可用,是一種不錯的延遲隊列的實現方案。
RabbitMQ 本身并不直接提供對延遲隊列的支持,我們依靠 RabbitMQ 的 TTL 以及 死信隊列功能,來實現延遲隊列的效果。那就讓我們首先來了解一下,RabbitMQ 的死信隊列以及 TTL 功能。
死信隊列實際上是一種 RabbitMQ 的消息處理機制,當 RabbmitMQ 在生產和消費消息的時候,消息遇到如下的情況,就會變成“死信”:
鴻蒙官方戰略合作共建——HarmonyOS技術社區
消息被拒絕 basic.reject/ basic.nack
并且不再重新投遞 requeue=false
消息超時未消費,也就是 TTL 過期了
消息隊列到達最大長度
消息一旦變成一條死信,便會被重新投遞到死信交換機(Dead-Letter-Exchange),然后死信交換機根據綁定規則轉發到對應的死信隊列上,監聽該隊列就可以讓消息被重新消費。
TTL(Time-To-Live)是 RabbitMQ 的一種高級特性,表示了一條消息的最大生存時間,單位為毫秒。如果一條消息在 TTL 設置的時間內沒有被消費,那么它就會變成一條死信,進入我們上面所說的死信隊列。
有兩種不同的方式可以設置消息的 TTL 屬性,一種方式是直接在創建隊列的時候設置整個隊列的 TTL 過期時間,所有進入隊列的消息,都被設置成了統一的過期時間,一旦消息過期,馬上就會被丟棄,進入死信隊列,參考代碼如下:
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
在延遲隊列的延遲時間為固定值的時候,比較適合使用這種方式。
另一種方式是針對單條消息設置,參考代碼如下,該消息被設置了 6 秒的過期時間:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("6000"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg content".getBytes());
如果需要不同的消息設置不同的延遲時間,上面針對隊列的 TTL 設置便無法滿足我們的需求,需要使用這種針對單個消息的 TTL 設置。
不過需要注意的是,使用這種方式設置的 TTL,消息可能不會按時死亡,因為 RabbitMQ 只會檢查第一個消息是否過期。比如這種情況,第一個消息設置了 20s 的 TTL,第二個消息設置了 10s 的 TTL,那么 RabbitMQ 會等到第一個消息過期之后,才會讓第二個消息過期。
解決這個問題的方法也很簡單,只需要安裝 RabbitMQ 的一個插件即可:
https://www.rabbitmq.com/community-plugins.html
安裝好這個插件后,所有的消息就都能按照被設置的 TTL 過期了。
好了,介紹完 RabbitMQ 的死信隊列以及 TTL 這兩種特性之后,我們離實現延遲隊列就只差一步之遙了。
聰明的讀者可能已經發現了,TTL 不就是延遲隊列中消息要延遲的時間么?如果我們把需要延遲的消息,將 TTL 設置為其延遲時間,投遞到 RabbitMQ 的普通隊列中,一直不去消費它,那么經過 TTL 的時間后,消息就會自動被投遞到死信隊列,這時候我們使用消費者進程實時地去消費死信隊列中的消息,不就實現了延遲隊列的效果。
從下圖可以直觀的看出使用 RabbitMQ 實現延遲隊列的整體流程:
使用 RabbitMQ 來實現延遲隊列,我們可以很好的利用一些 RabbitMQ 的特性,比如消息可靠發送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正確處理的消息不會被丟棄。另外,通過 RabbitMQ 集群的特性,可以很好的解決單點故障問題,不會因為單個節點掛掉導致延遲隊列不可用或者消息丟失。
TimeWheel 時間輪算法,是一種實現延遲隊列的巧妙且高效的算法,被應用在 Netty,Zookeeper,Kafka 等各種框架中。
如上圖所示,時間輪是一個存儲延遲消息的環形隊列,其底層采用數組實現,可以高效循環遍歷。這個環形隊列中的每個元素對應一個延遲任務列表,這個列表是一個雙向環形鏈表,鏈表中每一項都代表一個需要執行的延遲任務。
時間輪會有表盤指針,表示時間輪當前所指時間,隨著時間推移,該指針會不斷前進,并處理對應位置上的延遲任務列表。
由于時間輪的大小固定,并且時間輪中每個元素都是一個雙向環形鏈表,我們可以在 O(1)
的時間復雜度下向時間輪中添加延遲任務。
如下圖,例如我們有一個這樣的時間輪,在表盤指針指向當前時間為 2 時,我們需要新添加一個延遲 3 秒的任務,我們可以快速計算出延遲任務在時間輪中所對應的位置為 5,并添加到位置 5 上任務列表尾部。
到現在為止一切都非常棒,但是細心的同學可能發現了,上面的時間輪的大小是固定的,只有 12 秒。如果此時我們有一個需要延遲 200 秒的任務,我們應該怎么處理呢?直接擴充整個時間輪的大小嗎?這顯然不可取,因為這樣做的話我們就需要維護一個非常非常大的時間輪,內存是不可接受的,而且底層數組大了之后尋址效率也會降低,影響性能。
為此,Kafka 引入了多層時間輪的概念。其實多層時間輪的概念和我們的機械表上時針、分針、秒針的概念非常類似,當僅使用秒針無法表示當前時間時,就使用分針結合秒針一起表示。同樣的,當任務的到期時間超過了當前時間輪所表示的時間范圍時,就會嘗試添加到上層時間輪中,如下圖所示:
第一層時間輪整個時間輪所表示時間范圍是 0-12 秒,第二層時間輪每格能表示的時間范圍是整個第一層時間輪所表示的范圍也就是 12 秒,所以整個第二層時間輪能表示的時間范圍即 12*12=144 秒,依次類推第三層時間輪能表示的范圍是 1728 秒,第四層為 20736 秒等等。
比如現在我們需要添加一個延時為 200 秒的延遲消息,我們發現其已經超過了第一層時間輪能表示的時間范圍,我們就需要繼續往上層時間輪看,將其添加在第二層時間輪 200/12 = 17 的位置,然后我們發現 17 也超過了第二次時間輪的表示范圍,那么我們就需要繼續往上層看,將其添加在第三層時間輪的 17/12 = 2 的位置。
Kafka 中時間輪算法添加延遲任務以及推動時間輪滾動的核心流程如下,其中 Bucket 即時間輪中的延遲任務隊列,并且 Kafka 引入的 DelayQueue 解決了多數 Bucket 為空導致的時間輪滾動效率低下的問題:
使用時間輪實現的延遲隊列,能夠支持大量任務的高效觸發。并且在 Kafka 的時間輪算法的實現方案中,還引入了 DelayQueue,使用 DelayQueue 來推送時間輪滾動,而延遲任務的添加與刪除操作都放在時間輪中,這樣的設計大幅提升了整個延遲隊列的執行效率。
總結
延遲隊列在我們日常開發中應用非常廣泛,本文介紹了三種不同的實現延遲隊列的方案,三種方案各自有各自的特點,例如 Redis 的實現方案理解起來最為簡單,能夠快速落地,但 Redis 畢竟是基于內存的,雖然有數據持久化方案,但還是有數據丟失的可能性。而 RabbitMQ 的實現方案,由于 RabbitMQ 本身的消息可靠發送、消息可靠投遞、死信隊列等特性,可以保障消息至少被消費一次以及未被正確處理的消息不會被丟棄,讓消息的可靠性有了保障。最后 Kafka 的時間輪算法,個人覺得是三種實現方案中最難理解但也不失為一種非常巧妙實現方案。
“如何實現一個延遲隊列”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。