您好,登錄后才能下訂單哦!
這篇文章主要講解了“如何理解Github上14.1k Star的RocketMQ”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“如何理解Github上14.1k Star的RocketMQ”吧!
RocketMQ 事務消息發送流程:
結合源碼來看,RocketMQ 的事務消息 TransactionMQProducer 的 sendMessageInTransaction 方法,實際調用了 DefaultMQProducerImpl 的 sendMessageInTransaction 方法。我們進入 sendMessageInTransaction 方法,整個事務消息的發送流程清晰可見。
首先,做發送前檢查,并填入必要參數,包括設 prepare 事務消息。
源碼清單-1
public TransactionSendResult sendMessageInTransaction(final Message msg, final LocalTransactionExecuter localTransactionExecuter, final Object arg) throws MQClientException { TransactionListener transactionListener = getCheckListener(); if (null == localTransactionExecuter && null == transactionListener) { throw new MQClientException("tranExecutor is null", null); } // ignore DelayTimeLevel parameter if (msg.getDelayTimeLevel() != 0) { MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL); } Validators.checkMessage(msg, this.defaultMQProducer); SendResult sendResult = null; MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
進入發送處理流程:
源碼清單-2
try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); }
根據 broker 返回的處理結果決策本地事務是否執行,半消息發送成功則開始本地事務執行:
源碼清單-3
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: // 當備broker狀態不可用時,半消息要回滾,不執行本地事務 localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; }
本地事務執行結束,根據本地事務狀態進行二階段處理:
源碼清單-4
try { this.endTransaction(sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); } // 組裝發送結果 // ... return transactionSendResult; }
接下來,我們深入每個階段代碼分析。
重點分析 send 方法。進入 send 方法后,我們發現,RocketMQ 的事務消息的一階段,使用了 SYNC 同步模式:
源碼清單-5
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }
這一點很容易理解,畢竟事務消息是要根據一階段發送結果來決定要不要執行本地事務的,所以一定要阻塞等待 broker 的 ack。
我們進入 DefaultMQProducerImpl.java 中去看 sendDefaultImpl 方法的實現,通過讀這個方法的代碼,來嘗試了解在事務消息的一階段發送過程中 producer 的行為。
值得注意的是,這個方法并非為事務消息定制,甚至不是為 SYNC 同步模式定制的,因此讀懂了這段代碼,基本可以對 RocketMQ 的消息發送機制有了一個較為全面的認識。
這段代碼邏輯非常通暢,不忍切片。為了節省篇幅,將代碼中較為繁雜但信息量不大的部分以注釋代替,盡可能保留流程的完整性。個人認為較為重要或是容易被忽略的部分,以注釋標出,后文還有部分細節的詳細解讀。
源碼清單-6
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.makeSureStateOK(); // 一、消息有效性校驗。見后文 Validators.checkMessage(msg, this.defaultMQProducer); final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // 獲取當前topic的發送路由信息,主要是要broker,如果沒找到則從namesrv獲取 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // 二、發送重試機制。見后文 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; String[] brokersSent = new String[timesTotal]; for (; times < timesTotal; times++) { // 第一次發送是mq == null, 之后都是有broker信息的 String lastBrokerName = null == mq ? null : mq.getBrokerName(); // 三、rocketmq發送消息時如何選擇隊列?——broker異常規避機制 MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } // 發送核心代碼 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // rocketmq 選擇 broker 時的規避機制,開啟 sendLatencyFaultEnable == true 才生效 this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { // 四、RocketMQ的三種CommunicationMode。見后文 case ASYNC: // 異步模式 return null; case ONEWAY: // 單向模式 return null; case SYNC: // 同步模式 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { // ... // 自動重試 } catch (MQClientException e) { // ... // 自動重試 } catch (MQBrokerException e) { // ... // 僅返回碼==NOT_IN_CURRENT_UNIT==205 時自動重試 // 其他情況不重試,拋異常 } catch (InterruptedException e) { // ... // 不重試,拋異常 } } else { break; } } if (sendResult != null) { return sendResult; } // 組裝返回的info信息,最后以MQClientException拋出 // ... ... // 超時場景拋RemotingTooMuchRequestException if (callTimeout) { throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout"); } // 填充MQClientException異常信息 // ... } validateNameServerSetting(); throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION); }
源碼清單-7
Validators.checkMessage(msg, this.defaultMQProducer);
在此方法中校驗消息的有效性,包括對 topic 和消息體的校驗。topic 的命名必須符合規范,且避免使用內置的系統消息 TOPIC。消息體長度 > 0 && 消息體長度 <= 102410244 = 4M 。
源碼清單-8
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException { if (null == msg) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null"); } // topic Validators.checkTopic(msg.getTopic()); Validators.isNotAllowedSendTopic(msg.getTopic()); // body if (null == msg.getBody()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null"); } if (0 == msg.getBody().length) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero"); } if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) { throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize()); } }
Producer 在消息發送不成功時,會自動重試,最多發送次數 = retryTimesWhenSendFailed + 1 = 3 次 。
值得注意的是,并非所有異常情況都會重試,從以上源碼中可以提取到的信息告訴我們,在以下三種情況下,會自動重試:
發生 RemotingException,MQClientException 兩種異常之一時
發生 MQBrokerException 異常,且 ResponseCode 是 NOT_IN_CURRENT_UNIT = 205 時
SYNC 模式下,未發生異常且發送結果狀態非 SEND_OK
在每次發送消息之前,會先檢查是否在前面這兩步就已經耗時超長(超時時長默認 3000ms),若是,則不再繼續發送并且直接返回超時,不再重試。這里說明了 2 個問題:
producer 內部自動重試對業務應用而言是無感知的,應用看到的發送耗時是包含所有重試的耗時在內的;
一旦超時意味著本次消息發送已經以失敗告終,原因是超時。這個信息最后會以 RemotingTooMuchRequestException 的形式拋出。
這里需要指出的是,在 RocketMQ 官方文檔中指出,發送超時時長是 10s,即 10000ms,網上許多人對 rocketMQ 的超時時間解讀也認為是 10s。然而代碼中卻明明白白寫著 3000ms,最終我 debug 之后確認,默認超時時間確實是 3000ms。
源碼清單-9
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
這行代碼是發送前選擇 queue 的過程。
這里涉及 RocketMQ 消息發送高可用的的一個核心機制,latencyFaultTolerance。這個機制是 Producer 負載均衡的一部分,通過 sendLatencyFaultEnable 的值來控制,默認是 false 關閉狀態,不啟動 broker 故障延遲機制,值為 true 時啟用 broker 故障延遲機制,可由 Producer 主動打開。
選擇隊列時,開啟異常規避機制,則根據 broker 的工作狀態避免選擇當前狀態不佳的 broker 代理,不健康的 broker 會在一段時間內被規避,不開啟異常規避機制時,則按順序選取下一個隊列,但在重試場景下會盡量選擇不同于上次發送 broker 的 queue。每次消息發送都會通過 updateFaultItem 方法來維護 broker 的狀態信息。
源碼清單-10
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { if (this.sendLatencyFaultEnable) { // 計算延遲多久,isolation表示是否需要隔離該broker,若是,則從30s往前找第一個比30s小的延遲值,再按下標判斷規避的周期,若30s,則是10min規避; // 否則,按上一次發送耗時來決定規避時長; long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } }
深入到 selectOneMessageQueue 方法內部一探究竟:
源碼清單-11
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { // 開啟異常規避 try { int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; // 按順序取下一個message queue作為發送的queue MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 當前queue所在的broker可用,且與上一個queue的broker相同, // 或者第一次發送,則使用這個queue if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) return mq; } } final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } // 不開啟異常規避,則隨機自增選擇Queue return tpInfo.selectOneMessageQueue(lastBrokerName); }
源碼清單-12
public enum CommunicationMode { SYNC, ASYNC, ONEWAY, }
以上三種模式指的都是消息從發送方到達 broker 的階段,不包含 broker 將消息投遞給訂閱方的過程。三種模式的發送方式的差異:
**單向模式:**ONEWAY。消息發送方只管發送,并不關心 broker 處理的結果如何。這種模式下,由于處理流程少,發送耗時非常小,吞吐量大,但不能保證消息可靠不丟,常用于流量巨大但不重要的消息場景,例如心跳發送等。
**異步模式:**ASYNC。消息發送方發送消息到 broker 后,無需等待 broker 處理,拿到的是 null 的返回值,而由一個異步的線程來做消息處理,處理完成后以回調的形式告訴發送方發送結果。異步處理時如有異常,返回發送方失敗結果之前,會經過內部重試(默認 3 次,發送方不感知)。這種模式下,發送方等待時長較小,吞吐量較大,消息可靠,用于流量大但重要的消息場景。
**同步模式:**SYNC。消息發送方需等待 broker 處理完成并明確返回成功或失敗,在消息發送方拿到消息發送失敗的結果之前,也會經歷過內部重試(默認 3 次,發送方不感知)這種模式下,發送方會阻塞等待消息處理結果,等待時長較長,消息可靠,用于流量不大但重要的消息場景。需要強調的是,事務消息的一階段半事務消息的處理是同步模式。
在 sendKernelImpl 方法中也可以看到具體的實現差異。ONEWAY 模式最為簡單,不做任何處理。負責發送的 sendMessage 方法參數中,相比同步模式,異步模式多了回調方法、包含 topic 發送路由元信息的 topicPublishInfo、包含發送 broker 信息的 instance、包含發送隊列信息的 producer、重試次數。另外,異步模式下,會對有壓縮的消息先做 copy。
源碼清單-13
switch (communicationMode) { case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; case ONEWAY: case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; }
官方文檔中有這樣一張圖,十分清晰的描述了異步通信的詳細過程:
源碼清單-3 體現了本地事務的執行,localTransactionState 將本地事務執行結果與事務消息二階段的發送關聯起來。
值得注意的是,如果一階段的發送結果是 SLAVENOTAVAILABLE,即便 broker 不可用時,也會將 localTransactionState 置為 Rollback,此時將不會執行本地事務。之后由 endTransaction 方法負責二階段提交,見源碼清單-4。具體到 endTransaction 的實現:
源碼清單-14
public void endTransaction( final SendResult sendResult, final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } String transactionId = sendResult.getTransactionId(); final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName()); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; // 采用oneway的方式發送二階段消息 this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
在二階段發送時,之所以用 oneway 的方式發送,個人理解這正是因為事務消息有一個特殊的可靠機制——回查。
當 Broker 經過了一個特定的時間,發現依然沒有得到事務消息的二階段是否要提交或者回滾的確切信息,Broker 不知道 Producer 發生了什么情況(可能 producer 掛了,也可能 producer 發了 commit 但網絡抖動丟了,也可能……于是主動發起回查。
事務消息的回查機制,更多的是在 broker 端的體現。RocketMQ 的 broker 以 Half 消息、Op 消息、真實消息三個不同的 topic 來將不同發送階段的事務消息進行了隔離,使得 Consumer 只能看到最終確認 commit 需要投遞出去的消息。其中詳細的實現邏輯在本文中暫不多贅述,后續可另開一篇專門來從 Broker 視角來解讀。
回到 Producer 的視角,當收到了 Broker 的回查請求,Producer 將根據消息檢查本地事務狀態,根據結果決定提交或回滾,這就要求 Producer 必須指定回查實現,以備不時之需。當然,正常情況下,并不推薦主動發送 UNKNOW 狀態,這個狀態毫無疑問會給 broker 帶來額外回查開銷,只在出現不可預知的異常情況時才啟動回查機制,是一種比較合理的選擇。
另外,4.7.1 版本的事務回查并非無限回查,而是最多回查 15 次:
源碼清單-15
/** * The maximum number of times the message was checked, if exceed this value, this message will be discarded. */ @ImportantField private int transactionCheckMax = 15;
感謝各位的閱讀,以上就是“如何理解Github上14.1k Star的RocketMQ”的內容了,經過本文的學習后,相信大家對如何理解Github上14.1k Star的RocketMQ這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。