您好,登錄后才能下訂單哦!
今天就跟大家聊聊有關如何整合RocketMQ事務消息,可能很多人都不太了解,為了讓大家更加了解,小編給大家總結了以下內容,希望大家根據這篇文章可以有所收獲。
ActiveMQ、RabbitMQ、ZeroMQ、Kafka、RocketMQ選型
RocketMQ提供了事務消息回查,查看官方Demo
@SpringBootApplication public class ProducerApplication implements CommandLineRunner { private static final String TX_PGROUP_NAME = "myTxProducerGroup"; @Resource private RocketMQTemplate rocketMQTemplate; @Value("${demo.rocketmq.transTopic}") private String springTransTopic; public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } @Override public void run(String... args) throws Exception { // Send transactional messages testTransaction(); } private void testTransaction() throws MessagingException { String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 10; i++) { try { Message msg = MessageBuilder .withPayload("Hello RocketMQ " + i) .setHeader(RocketMQHeaders.TRANSACTION_ID, "KEY_" + i) .build(); SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME, springTransTopic + ":" + tags[i % tags.length], msg, null); System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n", msg.getPayload(), sendResult.getSendStatus()); Thread.sleep(10); } catch (Exception e) { e.printStackTrace(); } } } @RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME) class TransactionListenerImpl implements RocketMQLocalTransactionListener { private AtomicInteger transactionIndex = new AtomicInteger(0); private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>(); @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n", transId); int value = transactionIndex.getAndIncrement(); int status = value % 3; localTrans.put(transId, status); if (status == 0) { // Return local transaction with success(commit), in this case, // this message will not be checked in checkLocalTransaction() System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload()); return RocketMQLocalTransactionState.COMMIT; } if (status == 1) { // Return local transaction with failure(rollback) , in this case, // this message will not be checked in checkLocalTransaction() System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload()); return RocketMQLocalTransactionState.ROLLBACK; } System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n"); return RocketMQLocalTransactionState.UNKNOWN; } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String transId = (String)msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID); RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT; Integer status = localTrans.get(transId); if (null != status) { switch (status) { case 0: retState = RocketMQLocalTransactionState.UNKNOWN; break; case 1: retState = RocketMQLocalTransactionState.COMMIT; break; case 2: retState = RocketMQLocalTransactionState.ROLLBACK; break; } } System.out.printf("------ !!! checkLocalTransaction is executed once," + " msgTransactionId=%s, TransactionState=%s status=%s %n", transId, retState, status); return retState; } } }
需要在testTransaction()
中發送消息,然后在TransactionListenerImpl
類中實現executeLocalTransaction()
方法才能執行整個本地事務,然后在checkLocalTransaction()
中實現事務消息回查。
查看源代碼可以知道testTransaction()
方法和executeLocalTransaction()
是在同一個線程當中,只不過包裝RocketMQTemplate
中。
消息發送的事務消息回調查詢和本地事務沒嚴格的先后順序,怎么保證,回查時,事務操作肯定已經完成。
事務消息回調使用transaction_id
查詢,那么transaction_id
存放在哪里,同時保證transaction_id
關聯的業務操作執行成功。
怎么把事務回調查詢操作隔離出業務,保證不侵入代碼中。
下游消費者怎么保證接口冪等性。
下游消費者怎么提高冪等性查詢性能。
怎么把冪等性操作隔離出業務,保證不侵入代碼中。
因為數據庫或者其他業務操作可能會存在延時,那么不能保證回查時業務操作已完成,那么可以多次回查,并設置最大回查次數,同時不能丟棄MQ消息持久化,方便手動恢復。
可以使用本地消息表落地的發送消息,同時可以采用切面、繼承等等方式將落地消息隔離出業務代碼之外,保證本地消息落庫不侵入,注意必須要保證本地消息落庫和本地業務落庫在同一個事務之內!
事務消息回查可以使用第2點的本地消息表,根據transaction_id
查詢,判斷本地事務的執行結果,也和第2點一樣,可以使用一些方式將事務消息回查代碼隔離出業務代碼,保證不侵入。
冪等性的方法:
數據庫唯一約束
狀態機CAS單向流轉
消息去重表
,在執行本地業務前,先對redis判斷是業務id是否存在,存在則直接返回消費成功,在執行本地業務之后,可以將消費信息異步落地到redis當中。注意:需要保證本地業務和消息冪等性操作在同一個事務當中,同時redis落地操作在事務之外。
比較好的方案應該是數據庫唯一約束 + 消息去重表,在消息去重表中對業務id設置唯一約束,同時將消息落地操作隔離出本地業務之外,保證不侵入。
定時清理歷史的本地消息表(消息去重表)。
看完上述內容,你們對如何整合RocketMQ事務消息有進一步的了解嗎?如果還想了解更多知識或者相關內容,請關注億速云行業資訊頻道,感謝大家的支持。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。