您好,登錄后才能下訂單哦!
這篇文章主要介紹springboot中rabbitmq如何實現消息可靠性機制,文中介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們一定要看完!
1.1 生產者模塊導入rabbitmq相關依賴
<!--AMQP依賴,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--用于mq消息的序列化與反序列化--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency>
1.2 配置文件中進行mq的相關配置
spring.rabbitmq.host=10.128.240.183 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/ spring.rabbitmq.publisher-confirm-type=correlated spring.rabbitmq.publisher-returns=true spring.rabbitmq.template.mandatory=true
publish-confirm-type:開啟publisher-confirm,有以下可選值
simple:同步等待confirm結果,直到超時
correlated:異步回調,定義ConfirmCallback。mq返回結果時會回調這個ConfirmCallback
publish-returns:開啟publish-return功能。可以定義ReturnCallback
template.mandatory: 定義消息路由失敗的策略
true:調用ReturnCallback
false:直接丟棄消息
1.3 定義ReturnCallback(消息投遞到隊列失敗觸發此回調)
每個RabbitTemplate只能配置一個ReturnCallback。
當消息投遞失敗,就會調用生產者的returnCallback中定義的處理邏輯
可以在容器啟動時就配置這個回調
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 獲取RabbitTemplate對象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 配置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 判斷是否是延遲消息 Integer receivedDelay = message.getMessageProperties().getReceivedDelay(); if (receivedDelay != null && receivedDelay > 0) { // 是一個延遲消息,忽略這個錯誤提示 return; } // 記錄日志 log.error("消息發送到隊列失敗,響應碼:{}, 失敗原因:{}, 交換機: {}, 路由key:{}, 消息: {}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有需要的話,重發消息 }); } }
1.4 定義ConfirmCallback(消息到達交換機觸發此回調)
可以為redisTemplate指定一個統一的確認回調
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 獲取RabbitTemplate對象 RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 配置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { // 判斷是否是延遲消息 Integer receivedDelay = message.getMessageProperties().getReceivedDelay(); if (receivedDelay != null && receivedDelay > 0) { // 是一個延遲消息,忽略這個錯誤提示 return; } // 記錄日志 log.error("消息發送到隊列失敗,響應碼:{}, 失敗原因:{}, 交換機: {}, 路由key:{}, 消息: {}", replyCode, replyText, exchange, routingKey, message.toString()); // 如果有需要的話,重發消息 }); // 設置統一的confirm回調。只要消息到達broker就ack=true rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("這是統一的回調"); System.out.println("correlationData:" + correlationData); System.out.println("ack:" + b); System.out.println("cause:" + s); } }); } }
也可以為特定的消息定制回調
@Autowired private RabbitTemplate rabbitTemplate; @Test public void testmq() throws InterruptedException { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); correlationData.getFuture().addCallback(result->{ if (result.isAck()) { // ACK log.debug("消息成功投遞到交換機!消息ID: {}", correlationData.getId()); } else { // NACK log.error("消息投遞到交換機失敗!消息ID:{}", correlationData.getId()); // 重發消息 } },ex->{ // 記錄日志 log.error("消息發送失敗!", ex); // 重發消息 }); rabbitTemplate.convertAndSend("example.direct","blue","hello,world",correlationData); }
2.1 添加配置
# 手動ack消息,不使用默認的消費端確認 spring.rabbitmq.listener.simple.acknowledge-mode=manual
none:關閉ack,消息投遞時不可靠的,可能丟失
auto:類似事務機制,出現異常時返回nack,消息回滾到mq,沒有異常,返回
ackmanual:我們自己指定什么時候返回ack
2.2 manual模式在監聽器中自定義返回ack
@RabbitListener(queues = "order.release.order.queue") @Service public class OrderCloseListener { @Autowired private OrderService orderService; @RabbitHandler private void listener(OrderEntity orderEntity, Channel channel, Message message) throws IOException { System.out.println("收到過期的訂單信息,準備關閉訂單" + orderEntity.getOrderSn()); try { orderService.closeOrder(orderEntity); // 第二個參數為false則表示僅確認此條消息。如果為true則表示對收到的多條消息同時確認 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // 第二個參數為ture表示將這個消息重新加入隊列 channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); } } }
3.1 配置文件添加配置,開啟本地重試
spring: rabbitmq: listener: simple: retry: enabled: true # 開啟消費者失敗重試 initial-interval: 1000 # 初識的失敗等待時長為1秒 multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * last-interval max-attempts: 3 # 最大重試次數 stateless: true # true無狀態;false有狀態。如果業務中包含事務,這里改為false
開啟本地重試,如果消息處理過程總拋出異常,不會requeue到隊列,而是在消費者本地重試
重試達到最大次數后,spring會返回ack,消息會被丟棄
4. 消費者模塊添加失敗策略(用于開啟失敗本地重試功能后)
當開啟本地重試后,重試最大次數后消息直接丟棄。
三種策略,都繼承于MessageRecovery接口
RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄消息。默認就是這種方式
ImmediateRequeueMessageRecoverer:重試耗盡后,返回nack,消息重新入隊
RepublishMessageRecoverer:重試耗盡后,將失敗消息投遞到指定的交換機
4.2 定義處理失敗消息的交換機和隊列 沒有會自動創建相應的隊列、交換機與綁定關系,有了就啥也不做
@Bean public DirectExchange errorMessageExchange(){ return new DirectExchange("error.direct"); } @Bean public Queue errorQueue(){ return new Queue("error.queue", true); } // 路由鍵為key @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){ return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error"); }
4.3 向容器中添加一個失敗策略組件
@Bean public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ // error為路由鍵 return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }
以上是“springboot中rabbitmq如何實現消息可靠性機制”這篇文章的所有內容,感謝各位的閱讀!希望分享的內容對大家有幫助,更多相關知識,歡迎關注億速云行業資訊頻道!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。