您好,登錄后才能下訂單哦!
RabbitMQ發布確認高級問題的示例分析,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。
1. 存在的問題
再生產環境中由于一些不明原因導致rabbitmq
重啟,在RabbitMQ
重啟期間生產者消息投遞失敗,會導致消息丟失。
當消息不能正常被接收的時候,我們需要將消息存放在緩存中。
spring.rabbitmq.host=192.168.123.129 spring.rabbitmq.port=5672 spring.rabbitmq.username=admin spring.rabbitmq.password=123 spring.rabbitmq.publisher-confirm-type=correlated
NONE
:禁用發布確認模式,是默認值。
CORRELATED
:發布消息成功到交換機會觸發回調方方法。
CORRELATED
:就是發布一個就確認一個。
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange(){ return new DirectExchange(CONFIRM_EXCHANGE_NAME); } @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange, @Qualifier("confirmQueue") Queue confirmQueue){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * 回調接口 */ @Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); } /** * 交換機接受失敗后進行回調 * 1. 保存消息的ID及相關消息 * 2. 是否接收成功 * 3. 接受失敗的原因 * @param correlationData * @param b * @param s */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if(b == true){ log.info("交換機已經收到id為:{}的消息",id); }else{ log.info("交換機還未收到id為:{}消息,由于原因:{}",id,s); } } }
import com.xiao.springbootrabbitmq.utils.MyCallBack; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; @RestController @RequestMapping("/confirm") @Slf4j public class Producer { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ CorrelationData correlationData1 = new CorrelationData("1"); String routingKey1 = "key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1); CorrelationData correlationData2 = new CorrelationData("2"); String routingKey2 = "key2"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2); log.info("發送得內容是:{}",message); } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class ConfirmConsumer { public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; @RabbitListener(queues = CONFIRM_QUEUE_NAME) public void receiveMessage(Message message){ String msg = new String(message.getBody()); log.info("接收到隊列" + CONFIRM_QUEUE_NAME + "消息:{}",msg); } }
1. 第一種情況
ID
為1
的消息正常送達,ID
為2
的消息由于RoutingKey
的錯誤,導致不能正常被消費,但是交換機還是正常收到了消息,所以此時由于交換機正常接收之后的原因丟失的消息不能正常被接收。
2. 第二種情況
我們再上一種情況下修改了ID
為1
的消息的交換機的名稱,所以此時回調函數會進行回答由于什么原因導致交換機無法接收成功消息。
在僅開啟了生產者確認機制的情況下,交換機接收到消息后,會直接給消息生產者發送確認消息,如果發現該消息不可路由(就是消息被交換機成功接收后,無法到達隊列),那么消息會直接被丟棄,此時生產者是不知道消息被丟棄這個事件的。
通過設置該參數可以在消息傳遞過程中不可達目的地時將消息返回給生產者。
spring.rabbitmq.publisher-returns=true
需要在配置文件種開啟返回回調
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.PostConstruct; @RestController @RequestMapping("/confirm") @Slf4j public class Producer { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; @Autowired RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void sendMessage(@PathVariable String message){ CorrelationData correlationData1 = new CorrelationData("1"); String routingKey1 = "key1"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey1,message + routingKey1,correlationData1); log.info("發送得內容是:{}",message + routingKey1); CorrelationData correlationData2 = new CorrelationData("2"); String routingKey2 = "key2"; rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message + routingKey2,correlationData2); log.info("發送得內容是:{}",message + routingKey2); } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; /** * 回調接口 */ @Component @Slf4j public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback { @Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void init(){ rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnsCallback(this); } /** * 交換機接受失敗后進行回調 * 1. 保存消息的ID及相關消息 * 2. 是否接收成功 * 3. 接受失敗的原因 * @param correlationData * @param b * @param s */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { String id = correlationData != null ? correlationData.getId() : ""; if(b == true){ log.info("交換機已經收到id為:{}的消息",id); }else{ log.info("交換機還未收到id為:{}消息,由于原因:{}",id,s); } } @Override public void returnedMessage(ReturnedMessage returnedMessage) { Message message = returnedMessage.getMessage(); String exchange = returnedMessage.getExchange(); String routingKey = returnedMessage.getRoutingKey(); String replyText = returnedMessage.getReplyText(); log.error("消息{},被交換機{}退回,回退原因:{},路由Key:{}",new String(message.getBody()),exchange,replyText,routingKey); } }
其他類的代碼與上一小節案例相同
ID
為2
的消息由于RoutingKey
不可路由,但是還是被回調函數處理了。
這里我們新增了備份交換機、備份隊列、報警隊列。它們綁定關系如圖所示。如果確認交換機成功接收的消息無法路由到相應的隊列,就會被確認交換機發送給備份交換機。
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange"; public static final String CONFIRM_QUEUE_NAME = "confirm_queue"; public static final String BACKUP_EXCHANGE_NAME = "backup_exchange"; public static final String BACKUP_QUEUE_NAME = "backup_queue"; public static final String WARNING_QUEUE_NAME = "warning_queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange(){ return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true) .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build(); } @Bean("confirmQueue") public Queue confirmQueue(){ return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean("backupExchange") public FanoutExchange backupExchange(){ return new FanoutExchange(BACKUP_EXCHANGE_NAME); } @Bean("backupQueue") public Queue backupQueue(){ return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } @Bean("warningQueue") public Queue warningQueue(){ return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmExchange") DirectExchange confirmExchange, @Qualifier("confirmQueue") Queue confirmQueue){ return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY); } @Bean public Binding queueBindingExchange1(@Qualifier("backupExchange") FanoutExchange backupExchange, @Qualifier("backupQueue") Queue backupQueue){ return BindingBuilder.bind(backupQueue).to(backupExchange); } @Bean public Binding queueBindingExchange2(@Qualifier("backupExchange") FanoutExchange backupExchange, @Qualifier("warningQueue") Queue warningQueue){ return BindingBuilder.bind(warningQueue).to(backupExchange); } }
import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @Slf4j public class WarningConsumer { public static final String WARNING_QUEUE_NAME = "warning_queue"; @RabbitListener(queues = WARNING_QUEUE_NAME) public void receiveMessage(Message message){ String msg = new String(message.getBody()); log.info("報警發現不可路由的消息內容為:{}",msg); } }
mandatory參數與備份交換機可以一起使用的時候,如果兩者同時開啟,備份交換機優先級高。
看完上述內容是否對您有幫助呢?如果還想對相關知識有進一步的了解或閱讀更多相關文章,請關注億速云行業資訊頻道,感謝您對億速云的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。