您好,登錄后才能下訂單哦!
背景
何為延遲隊列?
顧名思義,延遲隊列就是進入該隊列的消息會被延遲消費的隊列。而一般的隊列,消息一旦入隊了之后就會被消費者馬上消費。
場景一:在訂單系統中,一個用戶下單之后通常有30分鐘的時間進行支付,如果30分鐘之內沒有支付成功,那么這個訂單將進行一場處理。這是就可以使用延時隊列將訂單信息發送到延時隊列。
場景二:用戶希望通過手機遠程遙控家里的智能設備在指定的時間進行工作。這時候就可以將用戶指令發送到延時隊列,當指令設定的時間到了再將指令推送到只能設備。
延遲隊列能做什么?
延遲隊列多用于需要延遲工作的場景。最常見的是以下兩種場景:
1、延遲消費。比如:
2、延遲重試。比如消費者從隊列里消費消息時失敗了,但是想要延遲一段時間后自動重試。
如果不使用延遲隊列,那么我們只能通過一個輪詢掃描程序去完成。這種方案既不優雅,也不方便做成統一的服務便于開發人員使用。但是使用延遲隊列的話,我們就可以輕而易舉地完成。
如何實現?
別急,在下文中,我們將詳細介紹如何利用Spring Boot加RabbitMQ來實現延遲隊列。
本文出現的示例代碼都已push到Github倉庫中:https://github.com/Lovelcp/blog-demos/tree/master/spring-boot-rabbitmq-delay-queue
實現思路
在介紹具體的實現思路之前,我們先來介紹一下RabbitMQ的兩個特性,一個是Time-To-Live Extensions,另一個是Dead Letter Exchanges。
Time-To-Live Extensions
RabbitMQ允許我們為消息或者隊列設置TTL(time to live),也就是過期時間。TTL表明了一條消息可在隊列中存活的最大時間,單位為毫秒。也就是說,當某條消息被設置了TTL或者當某條消息進入了設置了TTL的隊列時,這條消息會在經過TTL秒后“死亡”,成為Dead Letter。如果既配置了消息的TTL,又配置了隊列的TTL,那么較小的那個值會被取用。更多資料請查閱官方文檔。
Dead Letter Exchange
剛才提到了,被設置了TTL的消息在過期后會成為Dead Letter。其實在RabbitMQ中,一共有三種消息的“死亡”形式:
如果隊列設置了Dead Letter Exchange(DLX),那么這些Dead Letter就會被重新publish到Dead Letter Exchange,通過Dead Letter Exchange路由到其他隊列。更多資料請查閱官方文檔。
流程圖
聰明的你肯定已經想到了,如何將RabbitMQ的TTL和DLX特性結合在一起,實現一個延遲隊列。
針對于上述的延遲隊列的兩個場景,我們分別有以下兩種流程圖:
延遲消費
延遲消費是延遲隊列最為常用的使用模式。如下圖所示,生產者產生的消息首先會進入緩沖隊列(圖中紅色隊列)。通過RabbitMQ提供的TTL擴展,這些消息會被設置過期時間,也就是延遲消費的時間。等消息過期之后,這些消息會通過配置好的DLX轉發到實際消費隊列(圖中藍色隊列),以此達到延遲消費的效果。
延遲重試
延遲重試本質上也是延遲消費的一種,但是這種模式的結構與普通的延遲消費的流程圖較為不同,所以單獨拎出來介紹。
如下圖所示,消費者發現該消息處理出現了異常,比如是因為網絡波動引起的異常。那么如果不等待一段時間,直接就重試的話,很可能會導致在這期間內一直無法成功,造成一定的資源浪費。那么我們可以將其先放在緩沖隊列中(圖中紅色隊列),等消息經過一段的延遲時間后再次進入實際消費隊列中(圖中藍色隊列),此時由于已經過了“較長”的時間了,異常的一些波動通常已經恢復,這些消息可以被正常地消費。
代碼實現
接下來我們將介紹如何在Spring Boot中實現基于RabbitMQ的延遲隊列。我們假設讀者已經擁有了Spring Boot與RabbitMQ的基本知識。
初始化工程
首先我們在Intellij中創建一個Spring Boot工程,并且添加spring-boot-starter-amqp擴展。
配置隊列
從上述的流程圖中我們可以看到,一個延遲隊列的實現,需要一個緩沖隊列以及一個實際的消費隊列。又由于在RabbitMQ中,我們擁有兩種消息過期的配置方式,所以在代碼中,我們一共配置了三條隊列:
我們通過Java Config的方式將上述的隊列配置為Bean。由于我們添加了spring-boot-starter-amqp擴展,Spring Boot在啟動時會根據我們的配置自動創建這些隊列。為了方便接下來的測試,我們將delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置為同一個,且過期的消息都會通過DLX轉發到delay_process_queue。
delay_queue_per_message_ttl
首先介紹delay_queue_per_message_ttl的配置代碼:
@Bean Queue delayQueuePerMessageTTL() { return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME) .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter發送到的exchange .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key .build(); }
其中,x-dead-letter-exchange聲明了隊列里的死信轉發到的DLX名稱,x-dead-letter-routing-key聲明了這些死信在轉發時攜帶的routing-key名稱。
delay_queue_per_queue_ttl
類似地,delay_queue_per_queue_ttl的配置代碼:
@Bean Queue delayQueuePerQueueTTL() { return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME) .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 設置隊列的過期時間 .build(); }
delay_queue_per_queue_ttl隊列的配置比delay_queue_per_message_ttl隊列的配置多了一個x-message-ttl,該配置用來設置隊列的過期時間。
delay_process_queue
delay_process_queue的配置最為簡單:
@Bean Queue delayProcessQueue() { return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME) .build(); }
配置Exchange
配置DLX
首先,我們需要配置DLX,代碼如下:
@Bean DirectExchange delayExchange() { return new DirectExchange(DELAY_EXCHANGE_NAME); }
然后再將該DLX綁定到實際消費隊列即delay_process_queue上。這樣所有的死信都會通過DLX被轉發到delay_process_queue:
@Bean Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) { return BindingBuilder.bind(delayProcessQueue) .to(delayExchange) .with(DELAY_PROCESS_QUEUE_NAME); }
配置延遲重試所需的Exchange
從延遲重試的流程圖中我們可以看到,消息處理失敗之后,我們需要將消息轉發到緩沖隊列,所以緩沖隊列也需要綁定一個Exchange。在本例中,我們將delay_process_per_queue_ttl作為延遲重試里的緩沖隊列。具體代碼是如何配置的,這里就不贅述了,大家可以查閱我Github中的代碼。
定義消費者
我們創建一個最簡單的消費者ProcessReceiver,這個消費者監聽delay_process_queue隊列,對于接受到的消息,他會:
另外,我們還需要新建一個監聽容器用于存放消費者,代碼如下:
@Bean SimpleMessageListenerContainer processContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); container.setConnectionFactory(connectionFactory); container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 監聽delay_process_queue container.setMessageListener(new MessageListenerAdapter(processReceiver)); return container; }
至此,我們前置的配置代碼已經全部編寫完成,接下來我們需要編寫測試用例來測試我們的延遲隊列。
編寫測試用例
延遲消費場景
首先我們編寫用于測試TTL設置在消息上的測試代碼。
我們借助spring-rabbit包下提供的RabbitTemplate類來發送消息。由于我們添加了spring-boot-starter-amqp擴展,Spring Boot會在初始化時自動地將RabbitTemplate當成bean加載到容器中。
解決了消息的發送問題,那么又該如何為每個消息設置TTL呢?這里我們需要借助MessagePostProcessor。
MessagePostProcessor通常用來設置消息的Header以及消息的屬性。我們新建一個ExpirationMessagePostProcessor類來負責設置消息的TTL屬性:
/** * 設置消息的失效時間 */ public class ExpirationMessagePostProcessor implements MessagePostProcessor { private final Long ttl; // 毫秒 public ExpirationMessagePostProcessor(Long ttl) { this.ttl = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties() .setExpiration(ttl.toString()); // 設置per-message的失效時間 return message; } }
然后在調用RabbitTemplate的convertAndSend方法時,傳入ExpirationMessagePostPorcessor即可。我們向緩沖隊列中發送3條消息,過期時間依次為1秒,2秒和3秒。具體的代碼如下所示:
@Test public void testDelayQueuePerMessageTTL() throws InterruptedException { ProcessReceiver.latch = new CountDownLatch(3); for (int i = 1; i <= 3; i++) { long expiration = i * 1000; rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME, (Object) ("Message From delay_queue_per_message_ttl with expiration " + expiration), new ExpirationMessagePostProcessor(expiration)); } ProcessReceiver.latch.await(); }
細心的朋友一定會問,為什么要在代碼中加一個CountDownLatch呢?這是因為如果沒有latch阻塞住測試方法的話,測試用例會直接結束,程序退出,我們就看不到消息被延遲消費的表現了。
那么類似地,測試TTL設置在隊列上的代碼如下:
@Test public void testDelayQueuePerQueueTTL() throws InterruptedException { ProcessReceiver.latch = new CountDownLatch(3); for (int i = 1; i <= 3; i++) { rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME, "Message From delay_queue_per_queue_ttl with expiration " + QueueConfig.QUEUE_EXPIRATION); } ProcessReceiver.latch.await(); }
我們向緩沖隊列中發送3條消息。理論上這3條消息會在4秒后同時過期。
延遲重試場景
我們同樣還需測試延遲重試場景。
@Test public void testFailMessage() throws InterruptedException { ProcessReceiver.latch = new CountDownLatch(6); for (int i = 1; i <= 3; i++) { rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE); } ProcessReceiver.latch.await(); }
我們向delay_process_queue發送3條會觸發FAIL的消息,理論上這3條消息會在4秒后自動重試。
查看測試結果
延遲消費場景
延遲消費的場景測試我們分為了TTL設置在消息上和TTL設置在隊列上兩種。首先,我們先看一下TTL設置在消息上的測試結果:
從上圖中我們可以看到,ProcessReceiver分別經過1秒、2秒、3秒收到消息。測試結果表明消息不僅被延遲消費了,而且每條消息的延遲時間是可以被個性化設置的。TTL設置在消息上的延遲消費場景測試成功。
然后,TTL設置在隊列上的測試結果如下圖:
從上圖中我們可以看到,ProcessReceiver經過了4秒的延遲之后,同時收到了3條消息。測試結果表明消息不僅被延遲消費了,同時也證明了當TTL設置在隊列上的時候,消息的過期時間是固定的。TTL設置在隊列上的延遲消費場景測試成功。
延遲重試場景
接下來,我們再來看一下延遲重試的測試結果:
ProcessReceiver首先收到了3條會觸發FAIL的消息,然后將其移動到緩沖隊列之后,過了4秒,又收到了剛才的那3條消息。延遲重試場景測試成功。
總結
本文首先介紹了延遲隊列的概念以及用途,并且通過代碼詳細講解了如何通過Spring Boot和RabbitMQ實現一個延遲隊列。希望本文能夠對大家平時的學習和工作能有所啟發和幫助。也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。