您好,登錄后才能下訂單哦!
這篇文章主要介紹基于springboot+rabbitmq實現消息確認機制的方法,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
這次我分享的是 springboot
+ rabbitmq
如何實現消息確認機制,以及在實際開發中的一點踩坑經驗,其實整體的內容比較簡單,有時候事情就是這么神奇,越是簡單的東西就越容易出錯。
可以看到使用了 RabbitMQ
以后,我們的業務鏈路明顯變長了,雖然做到了系統間的解耦,但可能造成消息丟失的場景也增加了。例如:
所以說能不使用中間件就盡量不要用,如果為了用而用只會徒增煩惱。開啟消息確認機制以后,盡管很大程度上保證了消息的準確送達,但由于頻繁的確認交互,rabbitmq
整體效率變低,吞吐量下降嚴重,不是非常重要的消息真心不建議你用消息確認機制。
下邊我們先來實現springboot
+ rabbitmq
消息確認機制,再對遇到的問題做具體分析。
一、準備環境
1、引入 rabbitmq 依賴包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2、修改 application.properties 配置
配置中需要開啟 發送端
和 消費端
的消息確認。
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 發送者開啟 confirm 確認機制 spring.rabbitmq.publisher-confirms=true # 發送者開啟 return 確認機制 spring.rabbitmq.publisher-returns=true #################################################### # 設置消費端手動 ack spring.rabbitmq.listener.simple.acknowledge-mode=manual # 是否支持重試 spring.rabbitmq.listener.simple.retry.enabled=true
3、定義 Exchange 和 Queue
定義交換機 confirmTestExchange
和隊列 confirm_test_queue
,并將隊列綁定在交換機上。
@Configuration public class QueueConfig { @Bean(name = "confirmTestQueue") public Queue confirmTestQueue() { return new Queue("confirm_test_queue", true, false, false); } @Bean(name = "confirmTestExchange") public FanoutExchange confirmTestExchange() { return new FanoutExchange("confirmTestExchange"); } @Bean public Binding confirmTestFanoutExchangeAndQueue( @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange, @Qualifier("confirmTestQueue") Queue confirmTestQueue) { return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange); } }
rabbitmq
的消息確認分為兩部分:發送消息確認 和 消息接收確認。
二、消息發送確認
發送消息確認:用來確認生產者 producer
將消息發送到 broker
,broker
上的交換機 exchange
再投遞給隊列 queue
的過程中,消息是否成功投遞。
消息從 producer
到 rabbitmq broker
有一個 confirmCallback
確認模式。
消息從 exchange
到 queue
投遞失敗有一個 returnCallback
退回模式。
我們可以利用這兩個Callback
來確保消的100%送達。
1、 ConfirmCallback確認模式
消息只要被 rabbitmq broker
接收到就會觸發 confirmCallback
回調 。
@Slf4j @Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error("消息發送異常!"); } else { log.info("發送者爸爸已經收到確認,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause); } } }
實現接口 ConfirmCallback
,重寫其confirm()
方法,方法內有三個參數correlationData
、ack
、cause
。
correlationData
:對象內部只有一個 id
屬性,用來表示當前消息的唯一性。ack
:消息投遞到broker
的狀態,true
表示成功。cause
:表示投遞失敗的原因。但消息被 broker
接收到只能表示已經到達 MQ服務器,并不能保證消息一定會被投遞到目標 queue
里。所以接下來需要用到 returnCallback
。
2、 ReturnCallback 退回模式
如果消息未能投遞到目標 queue
里將觸發回調 returnCallback
,一旦向 queue
投遞消息未成功,這里一般會記錄下當前消息的詳細投遞數據,方便后續做重發或者補償等操作。
@Slf4j @Component public class ReturnCallbackService implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey); } }
實現接口ReturnCallback
,重寫 returnedMessage()
方法,方法有五個參數message
(消息體)、replyCode
(響應code)、replyText
(響應內容)、exchange
(交換機)、routingKey
(隊列)。
下邊是具體的消息發送,在rabbitTemplate
中設置 Confirm
和 Return
回調,我們通過setDeliveryMode()
對消息做持久化處理,為了后續測試創建一個 CorrelationData
對象,添加一個id
為10000000000
。
@Autowired private RabbitTemplate rabbitTemplate; @Autowired private ConfirmCallbackService confirmCallbackService; @Autowired private ReturnCallbackService returnCallbackService; public void sendMessage(String exchange, String routingKey, Object msg) { /** * 確保消息發送失敗后可以重新返回到隊列中 * 注意:yml需要配置 publisher-returns: true */ rabbitTemplate.setMandatory(true); /** * 消費者確認收到消息后,手動ack回執回調處理 */ rabbitTemplate.setConfirmCallback(confirmCallbackService); /** * 消息投遞到隊列失敗回調處理 */ rabbitTemplate.setReturnCallback(returnCallbackService); /** * 發送消息 */ rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, new CorrelationData(UUID.randomUUID().toString())); }
三、消息接收確認
消息接收確認要比消息發送確認簡單一點,因為只有一個消息回執(ack
)的過程。使用@RabbitHandler
注解標注的方法要增加 channel
(信道)、message
兩個參數。
@Slf4j @Component @RabbitListener(queues = "confirm_test_queue") public class ReceiverMessage1 { @RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("小富收到消息:{}", msg); //TODO 具體業務 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("消息已重復處理失敗,拒絕再次接收..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息 } else { log.error("消息即將再次返回隊列處理..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } }
消費消息有三種回執方法,我們來分析一下每種方法的含義。
1、basicAck
basicAck
:表示成功確認,使用此回執方法后,消息會被rabbitmq broker
刪除。
void basicAck(long deliveryTag, boolean multiple)
deliveryTag
:表示消息投遞序號,每次消費消息或者消息重新投遞后,deliveryTag
都會增加。手動消息確認模式下,我們可以對指定deliveryTag
的消息進行ack
、nack
、reject
等操作。
multiple
:是否批量確認,值為 true
則會一次性 ack
所有小于當前消息 deliveryTag
的消息。
舉個栗子: 假設我先發送三條消息deliveryTag
分別是5、6、7,可它們都沒有被確認,當我發第四條消息此時deliveryTag
為8,multiple
設置為 true,會將5、6、7、8的消息全部進行確認。
2、basicNack
basicNack
:表示失敗確認,一般在消費消息業務異常時用到此方法,可以將消息重新投遞入隊列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
deliveryTag
:表示消息投遞序號。
multiple
:是否批量確認。
requeue
:值為 true
消息將重新入隊列。
3、basicReject
basicReject
:拒絕消息,與basicNack
區別在于不能進行批量操作,其他用法很相似。
void basicReject(long deliveryTag, boolean requeue)
deliveryTag
:表示消息投遞序號。
requeue
:值為 true
消息將重新入隊列。
四、測試
發送消息測試一下消息確認機制是否生效,從執行結果上看發送者發消息后成功回調,消費端成功的消費了消息。
用抓包工具Wireshark
觀察一下rabbitmq
amqp協議交互的變化,也多了 ack
的過程。
五、踩坑日志
1、不消息確認
這是一個非常沒技術含量的坑,但卻是非常容易犯錯的地方。
開啟消息確認機制,消費消息別忘了channel.basicAck
,否則消息會一直存在,導致重復消費。
2、消息無限投遞
在我最開始接觸消息確認機制的時候,消費端代碼就像下邊這樣寫的,思路很簡單:處理完業務邏輯后確認消息, int a = 1 / 0
發生異常后將消息重新投入隊列。
@RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("消費者 2 號收到:{}", msg); int a = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } }
但是有個問題是,業務代碼一旦出現 bug
99.9%的情況是不會自動修復,一條消息會被無限投遞進隊列,消費端無限執行,導致了死循環。
本地的CPU
被瞬間打滿了,大家可以想象一下當時在生產環境導致服務死機,我是有多慌。
而且rabbitmq management
只有一條未被確認的消息。
經過測試分析發現,當消息重新投遞到消息隊列時,這條消息不會回到隊列尾部,仍是在隊列頭部。
消費者會立刻消費這條消息,業務處理再拋出異常,消息再重新入隊,如此反復進行。導致消息隊列處理出現阻塞,導致正常消息也無法運行。
而我們當時的解決方案是,先將消息進行應答,此時消息隊列會刪除該條消息,同時我們再次發送該消息到消息隊列,異常消息就放在了消息隊列尾部,這樣既保證消息不會丟失,又保證了正常業務的進行。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 重新發送消息到隊尾 channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes(msg));
但這種方法并沒有解決根本問題,錯誤消息還是會時不時報錯,后面優化設置了消息重試次數,達到了重試上限以后,手動確認,隊列刪除此消息,并將消息持久化入MySQL
并推送報警,進行人工處理和定時任務做補償。
3、重復消費
如何保證 MQ 的消費是冪等性,這個需要根據具體業務而定,可以借助MySQL
、或者redis
將消息持久化,通過再消息中的唯一性屬性校驗。
demo
的 GitHub
地址 https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot-rabbitmq-confirm
以上是基于springboot+rabbitmq實現消息確認機制的方法的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。