您好,登錄后才能下訂單哦!
小編給大家分享一下Spring Boot + RabbitMQ如何實現分布式事務,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!
一:分布式事務解決方案
1.兩階段提交(2PC)
第一階段:事務協調器要求每個涉及到事務的數據庫預提交(precommit)此操作,并反映是否可以提交.
第二階段:事務協調器要求每個數據庫提交數據。
案例可參照http://blog.itpub.net/28624388/viewspace-2137095/
2.補償事務(TCC)
TCC 其實就是采用的補償機制,其核心思想是:針對每個操作,都要注冊一個與其對應的確認和補償(撤銷)操作。它分為三個階段:
Try 階段主要是對業務系統做檢測及資源預留
Confirm 階段主要是對業務系統做確認提交,Try階段執行成功并開始執行 Confirm階段時,默認 Confirm階段是不會出錯的。即:只要Try成功,Confirm一定成功。
Cancel 階段主要是在業務執行錯誤,需要回滾的狀態下執行的業務取消,預留資源釋放。
3.本地消息表(異步確保)
本地消息表這種實現方式應該是業界使用最多的,其核心思想是將分布式事務拆分成本地事務進行處理。
基本思路:
a.消息生產方,需要額外建一個消息表,并記錄消息發送狀態。消息表和業務數據要在一個事務里提交,也就是說他們要在一個數據庫里面。然后消息會經過MQ發送到消息的消費方。如果消息發送失敗,會進行重試發送。
b.消息消費方,需要處理這個消息,并完成自己的業務邏輯。此時如果本地事務處理成功,表明已經處理成功了,如果處理失敗,那么就會重試執行。如果是業務上面的失敗,可以給生產方發送一個業務補償消息,通知生產方進行回滾等操作。
c.生產方和消費方定時掃描本地消息表,把還沒處理完成的消息或者失敗的消息再發送一遍。如果有靠譜的自動對賬補賬邏輯,這種方案還是非常實用的。
二:Spring Boot + RabbitMQ分布式事務實現
1.pom.xml依賴配置
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.application.yaml rabbitmq配置
# RabbitMQ rabbitmq: host: 112.74.105.178 port: 5672 username: admin password: admin virtual-host: / publisher-confirms: true publisher-returns: true listener: simple: acknowledge-mode: manual
3.RabbitMQConfig.java
@Configuration public class RabbitMQConfig { // 下單并且派單存隊列 public static final String ORDER_DIC_QUEUE = "order_dis_queue"; // 補單隊列,判斷訂單是否已經被創建 public static final String ORDER_CREATE_QUEUE = "order_create_queue"; // 下單并且派單交換機 private static final String ORDER_EXCHANGE_NAME = "order_exchange_name"; @Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate(connectionFactory); template.setMessageConverter(new Jackson2JsonMessageConverter()); return template; } @Bean public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); return factory; } @Bean public Queue OrderDicQueue() { return new Queue(ORDER_DIC_QUEUE); } @Bean public Queue OrderCreateQueue() { return new Queue(ORDER_CREATE_QUEUE); } @Bean DirectExchange directOrderExchange() { return new DirectExchange(ORDER_EXCHANGE_NAME); } @Bean Binding bindingExchangeOrderDicQueue() { return BindingBuilder.bind(OrderDicQueue()).to(directOrderExchange()).with("orderRoutingKey"); } @Bean Binding bindingExchangeOrderCreateQueue() { return BindingBuilder.bind(OrderCreateQueue()).to(directOrderExchange()).with("orderRoutingKey"); } }
4. 消息生產者
public class MsgPushInfoServiceImpl extends ServiceImpl<MsgPushInfoMapper, MsgPushInfoEntity> implements MsgPushInfoService, RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; public void orderAndDsipatch() { try { String orderId = "123456"; JSONObject jsonObect = new JSONObject(); jsonObect.put("orderId", orderId); String msg = jsonObect.toString(); System.out.println("msg:" + msg); MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("application/json"); messageProperties.setMessageId(orderId); Message message = new Message(msg.getBytes(),messageProperties); CorrelationData correlationData = new CorrelationData(orderId); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback(this); rabbitTemplate.convertAndSend("order_exchange_name", "orderRoutingKey", message, correlationData); } catch (Exception e) { e.printStackTrace(); } } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { String orderId = correlationData.getId(); System.out.println("消息id:" + orderId); if (ack) { // 消息發送成功 System.out.println("消息發送確認成功"); } else { // 重試機制 System.out.println("消息發送確認失敗:" + cause); } } }
5.消息消費者
@Component public class DispatchReceiver { @RabbitHandler @RabbitListener(queues = "order_dis_queue", containerFactory = "rabbitListenerContainerFactory") public void process(Message message, Channel channel) { System.out.println("rev : " + message.getMessageProperties().getMessageId()); try { System.out.println("======basicNack====="+message.getMessageProperties().getDeliveryTag()); //業務處理成功,則刪除消息 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); //業務處理失敗,則發送補償消息 } catch (Exception e) { e.printStackTrace(); } } }
看完了這篇文章,相信你對“Spring Boot + RabbitMQ如何實現分布式事務”有了一定的了解,如果想了解更多相關知識,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。