您好,登錄后才能下訂單哦!
本篇內容介紹了“RocketMQ的事務消息發送流程是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
半消息實現了分布式環境下的數據一致性的處理,生產者發送事務消息的流程如上圖所示,通過對源碼的學習,我們可以弄清楚下面幾點,也是半消息機制的核心:
1.為什么prepare消息不會被Consumer消費?
2.事務消息是如何提交和回滾的?
3.定時回查本地事務狀態的實現細節。
發送事務消息方法TransactionMQProducer.sendMessageInTransaction:
msg:消息
tranExecuter:本地事務執行器
arg:本地事務執行器參數
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // 忽視消息延遲的屬性 if (msg.getDelayTimeLevel() != 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); // 發送半消息 SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup()); try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); } // 處理發送半消息的結果 LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { // 發送半消息成功,執行本地事務邏輯 case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } // 執行本地事務邏輯 if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; // 發送半消息失敗,標記本地事務狀態為回滾 case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; } // 結束事務,設置消息 COMMIT / ROLLBACK try { this.endTransaction(msg, sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } // 返回事務發送結果 TransactionSendResult transactionSendResult = new TransactionSendResult(); transactionSendResult.setSendStatus(sendResult.getSendStatus()); transactionSendResult.setMessageQueue(sendResult.getMessageQueue()); // 提取Prepared消息的uniqID transactionSendResult.setMsgId(sendResult.getMsgId()); transactionSendResult.setQueueOffset(sendResult.getQueueOffset()); transactionSendResult.setTransactionId(sendResult.getTransactionId()); transactionSendResult.setLocalTransactionState(localTransactionState); return transactionSendResult; }
該方法的入參包含有一個需要用戶實現本地事務的LocalTransactionExecuter executer,executer中會進行事務操作以保證本地事務和消息發送這兩個操作的原子性。
由上面的源碼可知:
Producer會首先發送一個半消息到Broker中:
半消息發送成功,執行事務
半消息發送失敗,不執行事務
半消息發送到Broker后不會被Consumer消費掉的原因有以下兩點:
Broker在將消息寫入CommitLog時會判斷消息類型,如果是prepare或者rollback消息,ConsumeQueue的offset不變
Broker在構造ConsumeQueue時會判斷是否是處于prepare或者rollback狀態的消息,如果是則不會將該消息放入ConsumeQueue里,Consumer在拉取消息時也就不會拉取到這條消息
Producer會根據半消息的發送結果和本地任務執行結果來決定如何處理事務(commit或rollback),方法最后調用了endTransaction來處理事務的執行結果,源碼如下:
sendResult:發送半消息的結果
localTransactionState:本地事務狀態
localException:執行本地事務邏輯產生的異常
RemotingException:遠程調用異常
MQBrokerException:Broker異常
InterruptedException:當線程中斷異常
UnknownHostException:未知host異常
public void endTransaction( final Message msg, final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { // 解碼消息id final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } // 創建請求 String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false); requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; // 提交 commit / rollback 消息 this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
該方法是將事務執行的結果發送給Broker,再由Broker決定是否進行消息投遞,執行步驟如下:
1.收到消息后先檢查是否是事務消息,如果不是事務消息則直接返回
2.根據請求頭里的offset查詢半消息,如果查詢結果為空則直接返回
3.根據半消息構造新消息,新構造的消息會被重新寫入到CommitLog里,rollback消息的消息體為空
4.如果是rollback消息,則該消息不會被投遞
具體原因上文中已經分析過:只有commit消息才會被Broker投遞給consumer
RocketMQ會將commit消息和rollback消息都寫入到commitLog里,但rollback消息的消息體為空且不會被投遞,CommitLog在刪除過期消息時才會將其刪除。當事務commit成功之后,RocketMQ會重新封裝半消息并將其投遞給Consumer端消費。
Broker發起
相較于普通消息,事務消息主要依賴下面三個類:
1.TransactionStateService:事務狀態服務,負責對事務消息進行管理,包括存儲和更新事務消息狀態、回查狀態等
2.TranStateTable:事務消息狀態存儲表,基于MappedFileQueue實現
3.TranRedoLog:TranStateTable的日志,每次寫入操作都會記錄日志,當Broker宕機時,可以利用這個文件做數據恢復
存儲半消息到CommitLog時,使用offset索引到對應的TranStateTable的位置
“RocketMQ的事務消息發送流程是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。