您好,登錄后才能下訂單哦!
今天小編給大家分享一下RocketMQ事務消息怎么保證消息的可靠性和一致性的相關知識點,內容詳細,邏輯清晰,相信大部分人都還太了解這方面的知識,所以分享這篇文章給大家參考一下,希望大家閱讀完這篇文章后有所收獲,下面我們一起來了解一下吧。
在發送事務消息的時候,會加一個標識,表示這個消息是事務消息。broker接收到消息后,在我們之前看的代碼里org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage會判斷是否是事務消息。
if (sendTransactionPrepareMessage) { asyncPutMessageFuture = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner); } else { asyncPutMessageFuture = this.brokerController.getMessageStore().asyncPutMessage(msgInner); }
sendTransactionPrepareMessage=true表示是事務消息,所以走了一個單獨的邏輯。
public CompletableFuture<PutMessageResult> asyncPutHalfMessage(MessageExtBrokerInner messageInner) { return store.asyncPutMessage(parseHalfMessageInner(messageInner)); }
這里parseHalfMessageInner這個方法里面開始了偷梁換柱,把topic和queueId都改了,把原本的信息先存在變量里面。所以實際上這個消息發到了半消息專有的topic里面,topic名字叫做RMQ_SYS_TRANS_HALF_TOPIC
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic()); MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msgInner.getQueueId())); msgInner.setSysFlag( MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE)); msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic()); msgInner.setQueueId(0); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; }
然后其他代碼還是和普通的消息一樣,就是把事務消息做了轉發,存在了RMQ_SYS_TRANS_HALF_TOPIC里面。
到這里發送半消息就成功了,然后最后客戶端發送了半消息之后,會查一下本地事務的情況是否完成。這里有3種情況:commit、rollback、未知。完成和回滾都是確認的狀態,這個比較好處理,比較難的是未知。我們先看能得到確認結果的情況。
如果完成和回滾,會給客戶端發送結束事務的消息,這個消息叫END_TRANSACTION,包括消息里面包括了之前發送的半消息的id和offset。
broker處理的代碼在org.apache.rocketmq.broker.processor.EndTransactionProcessor#processRequest中。就是根據offset拿到半消息,然后如果是commit,就是把原本的topic和queueId還原,發到原本的隊列里面,這樣就可以正常消費了。然后把這個半消息“刪除”。如果是rollBack,也是拿到這個半消息,然后直接“刪除”就可以了。接下來看一下怎么“刪除”。
為什么我刪除會打引號呢?因為半消息其實就是跟正常的消息一樣,存在commitLog文件里面,mq的設計,就沒有刪除這個功能。所以所謂的刪除其實就是把這個消息消費掉,不做任何處理,就是刪除了。
想象一下,這個半消息有commit/rollBack/未知,3種狀態,未知的肯定不能刪除,那他怎么知道哪些消息是可以刪除的呢?總不能所有的都再去客戶端查一下事務的結果吧?mq怎么做的呢?前面提到的刪除其實就是把這些commit和rollBack處理過后的半消息,再保存起來,后面消費半消息的數據的時候,只要從里面查一下是否需要刪除就可以了。
這里又有一個問題,怎么把需要刪除的半消息存起來呢?mq存儲數據就是commitLog,所以其實這些需要刪除的數據,就是又發到了一個特定的topic里面。這個topic名字是RMQ_SYS_TRANS_OP_HALF_TOPIC。主意區分,原本半消息的topic名字是half_topic,這個topic名字是op_half_topic,存儲的是處理過后,可以刪除的半消息。
所以說前面提到的帶引號的“刪除”,就是把消息發到op_half_topic就表示是刪除了,這個op_half_topic消息的內容就是half_topic的offset。那么現在需要有個地方,來消費half_topic,然后判斷是否存在于op_half_topic,如果是表示可以刪除了,如果不是,就接著保存起來。
處理邏輯就在TransactionalMessageCheckService這個定時任務中。具體是在TransactionalMessageServiceImpl#check方法里面
@Override public void check(long transactionTimeout, int transactionCheckMax, AbstractTransactionalMessageCheckListener listener) { try { String topic = TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC; // 先拿到半消息 Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic); if (msgQueues == null || msgQueues.size() == 0) { log.warn("The queue of topic is empty :" + topic); return; } log.debug("Check topic={}, queues={}", topic, msgQueues); for (MessageQueue messageQueue : msgQueues) { long startTime = System.currentTimeMillis(); MessageQueue opQueue = getOpQueue(messageQueue); // 拿到半消息的最小偏移量 long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); // 拿到op_half的最小偏移量 long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset); if (halfOffset < 0 || opOffset < 0) { log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset); continue; } List<Long> doneOpOffset = new ArrayList<>(); HashMap<Long, Long> removeMap = new HashMap<>(); // 拉取op的消息(32條),op消息內容是half的offset,跟half_topic的最小offset比較,如果op的小于最小的,就說明已經處理過了,放在doneOpOffset,反之,則說明還沒處理過,就先放在removeMap里面 PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset); if (null == pullResult) { log.error("The queue={} check msgOffset={} with opOffset={} failed, pullResult is null", messageQueue, halfOffset, opOffset); continue; } // single thread int getMessageNullCount = 1; long newOffset = halfOffset; long i = halfOffset; // 然后對half_topic進行處理 while (true) { if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) { log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); break; } // 如果這個offset已經處理過了,就接著處理下一個 if (removeMap.containsKey(i)) { log.debug("Half offset {} has been committed/rolled back", i); Long removedOpOffset = removeMap.remove(i); doneOpOffset.add(removedOpOffset); } else { // 如果沒有處理過,就要把數據撈出來重新投遞 GetResult getResult = getHalfMsg(messageQueue, i); MessageExt msgExt = getResult.getMsg(); if (msgExt == null) { if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) { break; } if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) { log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); break; } else { log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); i = getResult.getPullResult().getNextBeginOffset(); newOffset = i; continue; } } if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { listener.resolveDiscardMsg(msgExt); newOffset = i + 1; i++; continue; } if (msgExt.getStoreTimestamp() >= startTime) { log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp())); break; } long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); long checkImmunityTime = transactionTimeout; String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); if (null != checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { newOffset = i + 1; i++; continue; } } } else { if (0 <= valueOfCurrentMinusBorn && valueOfCurrentMinusBorn < checkImmunityTime) { log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp())); break; } } List<MessageExt> opMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime || opMsg != null && opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout || valueOfCurrentMinusBorn <= -1; if (isNeedCheck) { // 重新投遞 if (!putBackHalfMsgQueue(msgExt, i)) { continue; } // 再重新確認事務 listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; } } newOffset = i + 1; i++; } // 更新offset if (newOffset != halfOffset) { transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); } long newOpOffset = calculateOpOffset(doneOpOffset, opOffset); if (newOpOffset != opOffset) { transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); } } } catch (Throwable e) { log.error("Check error", e); } }
我講解一下這個代碼做了啥。我們先明確這個代碼是要實現什么功能。就是消費half_topic,然后去根據op_half_topic的數據來判斷half_topc的消息是否被處理過,處理過了就直接忽略、丟棄,如果沒有處理過,就“保留”這個消息,等待后面事務確認了再處理。
這里“保留”我也是加了引號,因為mq消費是一條一條按順序消費,如果中間有一個數據卡住了,后面數據就沒法消費了。所以這里“保留”,其實也是消費了,只是他消費到了不確定結果的消息,他是重新投遞到了half_topic,來實現“保留”的目的。
好了,明確了這個代碼實現的功能,我們來一步步看一下細節。
首先是拿到half_topic和op_half_topic的offset,知道現在是消費到了哪里。然后去拉取op_half_topic,每次32條,op_half消息內容存的是half_topic的offset,只要判斷這條op_half里面的offset小于half_topic的offset,就表示已經消費過了,放在doneOpOffset的list里面,如果op_half保存的offset大于half_topic的offset,就表示還沒消費,放入removeMap,就表示這個半消息可以放心刪除了。
這一步,通過消費op_half,跟half_topic的minOffset做比較,構建了doneOpOffset,和removeMap。
然后就是消費half_topic的消息,只要判斷每條消息的offset是否在removeMap中,就表示可以刪除,放入doneOpOffset中,直接消費下一條數據,所以這里其實也不用真的拉取half_topic的消息,只要用offset來判斷就行,消費過了,offset+1,就可以去判斷下一條消息。
如果half_topic的offset沒有在removeMap中,就表示暫時還不知道結果,這時候就重新發送到half_topic,重新投遞之后,然后給客戶端發送一個檢查事務的請求,客戶端檢測過后,還是用之前的END_TRANSACTION命令,再發給broker,broker就會放到op_half里面,等于就是重新發了一個半消息的流程,實現了閉環。
最后就是更新兩個topic的offset了。之前的doneOpOffset保存下來,就是為了更新op_half的offset,只有都處理過了,才會更新,如果中間有一個沒有處理,就會阻塞在那條消息。
以上就是“RocketMQ事務消息怎么保證消息的可靠性和一致性”這篇文章的所有內容,感謝各位的閱讀!相信大家閱讀完這篇文章都有很大的收獲,小編每天都會為大家更新不同的知識,如果還想學習更多的知識,請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。