91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ的事務消息發送流程是什么

發布時間:2023-05-08 17:01:29 來源:億速云 閱讀:172 作者:iii 欄目:開發技術

本篇內容介紹了“RocketMQ的事務消息發送流程是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!

事務消息發送流程

RocketMQ的事務消息發送流程是什么

半消息實現了分布式環境下的數據一致性的處理,生產者發送事務消息的流程如上圖所示,通過對源碼的學習,我們可以弄清楚下面幾點,也是半消息機制的核心:

1.為什么prepare消息不會被Consumer消費?

2.事務消息是如何提交和回滾的?

3.定時回查本地事務狀態的實現細節。

發送事務消息源碼分析

發送事務消息方法TransactionMQProducer.sendMessageInTransaction:

  • msg:消息

  • tranExecuter:本地事務執行器

  • arg:本地事務執行器參數

public TransactionSendResult sendMessageInTransaction(final Message msg,
        final LocalTransactionExecuter localTransactionExecuter, final Object arg)
        throws MQClientException {
        TransactionListener transactionListener = getCheckListener();
        if (null == localTransactionExecuter && null == transactionListener) {
            throw new MQClientException("tranExecutor is null", null);
        }

        // 忽視消息延遲的屬性
        if (msg.getDelayTimeLevel() != 0) {
            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
        }

        Validators.checkMessage(msg, this.defaultMQProducer);
		
		// 發送半消息
        SendResult sendResult = null;
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
        try {
            sendResult = this.send(msg);
        } catch (Exception e) {
            throw new MQClientException("send message Exception", e);
        }
		
		// 處理發送半消息的結果
        LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
        Throwable localException = null;
        switch (sendResult.getSendStatus()) {
        	// 發送半消息成功,執行本地事務邏輯
            case SEND_OK: {
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }
                    String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
                    if (null != transactionId && !"".equals(transactionId)) {
                        msg.setTransactionId(transactionId);
                    }
                    // 執行本地事務邏輯
                    if (null != localTransactionExecuter) {
                        localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
                    }
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        log.info(msg.toString());
                    }
                } catch (Throwable e) {
                    log.info("executeLocalTransactionBranch exception", e);
                    log.info(msg.toString());
                    localException = e;
                }
            }
            break;
            // 發送半消息失敗,標記本地事務狀態為回滾
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
            default:
                break;
        }
		
		// 結束事務,設置消息 COMMIT / ROLLBACK
        try {
            this.endTransaction(msg, sendResult, localTransactionState, localException);
        } catch (Exception e) {
            log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
        }
		
		// 返回事務發送結果
        TransactionSendResult transactionSendResult = new TransactionSendResult();
        transactionSendResult.setSendStatus(sendResult.getSendStatus());
        transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
        
        // 提取Prepared消息的uniqID
        transactionSendResult.setMsgId(sendResult.getMsgId());
        transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
        transactionSendResult.setTransactionId(sendResult.getTransactionId());
        transactionSendResult.setLocalTransactionState(localTransactionState);
        return transactionSendResult;
    }

該方法的入參包含有一個需要用戶實現本地事務的LocalTransactionExecuter executer,executer中會進行事務操作以保證本地事務和消息發送這兩個操作的原子性。

由上面的源碼可知:

Producer會首先發送一個半消息到Broker中:

  • 半消息發送成功,執行事務

  • 半消息發送失敗,不執行事務

半消息發送到Broker后不會被Consumer消費掉的原因有以下兩點:

  • Broker在將消息寫入CommitLog時會判斷消息類型,如果是prepare或者rollback消息,ConsumeQueue的offset不變

  • Broker在構造ConsumeQueue時會判斷是否是處于prepare或者rollback狀態的消息,如果是則不會將該消息放入ConsumeQueue里,Consumer在拉取消息時也就不會拉取到這條消息

Producer會根據半消息的發送結果和本地任務執行結果來決定如何處理事務(commit或rollback),方法最后調用了endTransaction來處理事務的執行結果,源碼如下:

  • sendResult:發送半消息的結果

  • localTransactionState:本地事務狀態

  • localException:執行本地事務邏輯產生的異常

  • RemotingException:遠程調用異常

  • MQBrokerException:Broker異常

  • InterruptedException:當線程中斷異常

  • UnknownHostException:未知host異常

public void endTransaction(
        final Message msg,
        final SendResult sendResult,
        final LocalTransactionState localTransactionState,
        final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {
        // 解碼消息id
        final MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }

		// 創建請求
        String transactionId = sendResult.getTransactionId();
        final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());
        switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
                break;
            default:
                break;
        }

        doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false);
        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null;

		// 提交 commit / rollback 消息 
        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
            this.defaultMQProducer.getSendMsgTimeout());
    }

該方法是將事務執行的結果發送給Broker,再由Broker決定是否進行消息投遞,執行步驟如下:

1.收到消息后先檢查是否是事務消息,如果不是事務消息則直接返回

2.根據請求頭里的offset查詢半消息,如果查詢結果為空則直接返回

3.根據半消息構造新消息,新構造的消息會被重新寫入到CommitLog里,rollback消息的消息體為空

4.如果是rollback消息,則該消息不會被投遞

具體原因上文中已經分析過:只有commit消息才會被Broker投遞給consumer

RocketMQ會將commit消息和rollback消息都寫入到commitLog里,但rollback消息的消息體為空且不會被投遞,CommitLog在刪除過期消息時才會將其刪除。當事務commit成功之后,RocketMQ會重新封裝半消息并將其投遞給Consumer端消費。

事務消息回查

Broker發起

相較于普通消息,事務消息主要依賴下面三個類:

1.TransactionStateService:事務狀態服務,負責對事務消息進行管理,包括存儲和更新事務消息狀態、回查狀態等

2.TranStateTable:事務消息狀態存儲表,基于MappedFileQueue實現

3.TranRedoLog:TranStateTable的日志,每次寫入操作都會記錄日志,當Broker宕機時,可以利用這個文件做數據恢復

存儲半消息到CommitLog時,使用offset索引到對應的TranStateTable的位置

“RocketMQ的事務消息發送流程是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

哈巴河县| 阳城县| 广安市| 砀山县| 屯昌县| 丹棱县| 金川县| 自贡市| 芜湖县| 河东区| 息烽县| 阿鲁科尔沁旗| 龙川县| 霍邱县| 新宾| 大冶市| 阿图什市| 乌拉特中旗| 灌阳县| 张家港市| 河池市| 牙克石市| 岱山县| 双牌县| 开平市| 内江市| 高青县| 天台县| 牡丹江市| 西乌珠穆沁旗| 南靖县| 浦江县| 天峻县| 南雄市| 通榆县| 通许县| 吉林市| 宝兴县| 宁安市| 黔东| 闽侯县|