您好,登錄后才能下訂單哦!
這篇文章主要介紹“RocketMQ producer容錯機制源碼分析”,在日常操作中,相信很多人在RocketMQ producer容錯機制源碼分析問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”RocketMQ producer容錯機制源碼分析”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
這里有兩個點,一個是關于消息的處理,一個是關于broker的處理,比如說發送消息到broker-a的broker失敗了,我們可能下次就不想發送到這個broker-a,這就涉及到一個選擇broker的問題,也就是選擇MessageQueue的問題。
其實失敗重試我們在介紹RocketMQ消息生產者發送消息的時候介紹過了,其實同步發送與異步發送都會失敗重試的,比如說我發送一個消息,然后超時了,這時候在MQProducer層就會進行控制重試,默認是重試2次的,加上你發送那次,一共是發送3次,如果重試完還是有問題的話,這個時候就會拋出異常了。
我們來看下這一塊的代碼實現( DefaultMQProducerImpl 類sendDefaultImpl方法):
這塊其實就是用for循環實現的,其實不光RocketMQ,分布式遠程調用框架Dubbo的失敗重試也是用for循環實現的。
我們都知道,在RocketMQ中一個topic其實是有多個MessageQueue這么一個概念的,然后這些MessageQueue可能對應著不同的broker name,比如說id是0和1的MessageQueue 對應的broker name是 broker-a ,然后id是2和3的MessageQueue對應的broker name 是broker-b
我們發送消息的時候,其實涉及到發送給哪個MessageQueue這么一個問題,當然我們可以在發送消息的時候指定這個MessageQueue,如果你不指定的話,RocketMQ就會根據MQFaultStrategy 這么一個策略類給選擇出來一個MessageQueue。
我們先來看下是在哪里選擇的,其實就是在我們重試的循環中: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl
... // 重試發送 for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); // todo 選擇message queue MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); ...
我們可以看到,它會把topicPublishInfo 與 lastBrokerName 作為參數傳進去,topicPublishInfo 里面其實就是那一堆MessageQueue, 然后這個lastBrokerName 是上次我們選擇的那個broker name , 這個接著我們來看下這個selectOneMessageQueue實現:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // todo return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName); }
可以看到它調用了MQFaultStrategy 這個類的selectOneMessageQueue 方法,我們接著進去:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { // 發送延遲故障啟用,默認為false if (this.sendLatencyFaultEnable) { try { // 獲取一個index int index = tpInfo.getSendWhichQueue().getAndIncrement(); for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size(); if (pos < 0) pos = 0; MessageQueue mq = tpInfo.getMessageQueueList().get(pos); // 選取的這個broker是可用的 直接返回 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) return mq; } // 到這里 找了一圈 還是沒有找到可用的broker // todo 選擇 距離可用時間最近的 final String notBestBroker = latencyFaultTolerance.pickOneAtLeast(); int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker); if (writeQueueNums > 0) { final MessageQueue mq = tpInfo.selectOneMessageQueue(); if (notBestBroker != null) { mq.setBrokerName(notBestBroker); mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums); } return mq; } else { latencyFaultTolerance.remove(notBestBroker); } } catch (Exception e) { log.error("Error occurred when selecting message queue", e); } return tpInfo.selectOneMessageQueue(); } // todo return tpInfo.selectOneMessageQueue(lastBrokerName); }
這種延遲故障策略其實是由sendLatencyFaultEnable來控制的,它默認是關閉的。
我們先來看下最普通的選擇策略,可以看到調用了TopicPublishInfo 的selectOneMessageQueue方法:
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { // 消息第一個發送的時候 還沒有重試 也沒有上一個brokerName if (lastBrokerName == null) { return selectOneMessageQueue(); } else { // 這個 出現在重試的時候 for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 避開 上次發送的brokerName if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } // todo 到最后 沒有避開 只能隨機選一個 return selectOneMessageQueue(); } }
它這里里面分成了2部分,一個是沒有 這個lastBroker的,也就是這個這個消息還沒有被重試過,這是第一次發送這個消息,這個時候它的lastBrokerName就是null,然后他就會直接走selectOneMessageQueue 這個無參方法。
public MessageQueue selectOneMessageQueue() { // 相當于 某個線程輪詢 int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); }
先是獲取這個index ,然后使用index % MessageQueue集合的大小獲得一個MessageQueue集合值的一個下標(索引),這個index 其實某個線程內自增1的,這樣就形成了某個線程內輪詢的效果。這個樣子的話,同步發送其實就是單線程的輪詢,異步發送就是多個線程并發發送,然后某個線程內輪詢,我們看下他這個單個線程自增1效果是怎樣實現的。
public class ThreadLocalIndex { private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>(); private final Random random = new Random(); public int getAndIncrement() { Integer index = this.threadLocalIndex.get(); // 如果不存在就創建 然后設置到threadLocalIndex中 if (null == index) { index = Math.abs(random.nextInt()); this.threadLocalIndex.set(index); } index = Math.abs(index + 1); this.threadLocalIndex.set(index); return index; } }
可以看到這個sendWhichQueue 是用ThreadLocal實現的,然后這個樣子就可以一個線程一個index,而且不會出現線程安全問題。
好了這里我們就把這個消息第一次發送時候MessageQueue看完了,然后我們再來看下它其他重試的時候是怎樣選擇的,也就是lastBrokerName不是null的時候:
public MessageQueue selectOneMessageQueue(final String lastBrokerName) { // 消息第一個發送的時候 還沒有重試 也沒有上一個brokerName if (lastBrokerName == null) { return selectOneMessageQueue(); } else { // 這個 出現在重試的時候 for (int i = 0; i < this.messageQueueList.size(); i++) { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; MessageQueue mq = this.messageQueueList.get(pos); // 避開 上次發送的brokerName if (!mq.getBrokerName().equals(lastBrokerName)) { return mq; } } // todo 到最后 沒有避開 只能隨機選一個 return selectOneMessageQueue(); } }
這里其實就是選擇一個不是lastBrokerName 的MessageQueue,可以看到它是循環 MessageQueue 集合大小數個,這樣可能把所有的MessageQueue都看一遍,注意 這個循環只是起到選多少次的作用,具體的選擇還是要走某線程輪詢的那一套,到最后是在是選不出來了,也就是沒有這一堆MessageQueue都是在lastBrokerName上的,只能調用selectOneMessageQueue輪詢選一個了。
到這我們就把最普通的選擇一個MessageQueue介紹完了。
下面我們再來介紹下那個延遲故障的實現,這個其實就是根據你這個broker 的響應延遲時間的大小,來影響下次選擇這個broker的權重,他不是絕對的,因為根據它這個規則是在找不出來的話,他就會使用那套普通選擇算法來找個MessageQueue。
它是這樣一個原理:
在每次發送之后都收集一下它這次的一個響應延遲,比如我10點1分1秒200毫秒給broker-a了一個消息,然后到了10點1分1秒900毫秒的時候才收到broker-a 的一個sendResult也就是響應,這個時候他就是700ms的延遲,它會跟你就這個300ms的延遲找到一個時間范圍,他就認為你這個broker-a 這個broker 在某個時間段內,比如說30s內是不可用的。然后下次選擇的時候,他在第一輪會找那些可用的broker,找不到的話,就找那些上次不是這個broker的,還是找不到的話,他就絕望了,用最普通的方式,也就是上面說的那種輪詢算法找一個MessageQueue出來。
接下來我們先來看下它的收集延遲的部分,是這個樣子的,還是在這個失敗重試里面,然后它會在響應后或者異常后面都加一行代碼來收集這些延遲:
... // todo 進行發送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // todo isolation 參數為false(看一下異常情況) this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); ...
這是正常響應后的,注意它的isolation 參數,也就是隔離 是false,在看下異常的
... catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); exception = e; continue; } ...
他這個isolation 參數就是true ,也就是需要隔離的意思。
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { // todo this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation); }
可以看到是調用了mqFaultStrategy 的updateFaultItem 方法:
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) { // 是否開啟延遲故障容錯 if (this.sendLatencyFaultEnable) { // todo 計算不可用持續時間 long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency); // todo 存儲 this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration); } }
先是判斷是否開啟了這個延遲故障的這么一個配置,默認是不啟動的,但是你可以自己啟動set下就可以了setSendLatencyFaultEnable(true)
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setSendLatencyFaultEnable(true);
首先是計算這個它認為broker不可用的這么一個時間,參數就是你那個響應延遲,熔斷的話就配置30000毫秒, 否則的話就是正常的那個響應時間
/** * 計算不可用持續時間 * @param currentLatency 當前延遲 */ private long computeNotAvailableDuration(final long currentLatency) { // latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; // notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; // 倒著遍歷 for (int i = latencyMax.length - 1; i >= 0; i--) { // 如果延遲大于某個時間,就返回對應服務不可用時間,可以看出來,響應延遲100ms以下是沒有問題的 if (currentLatency >= latencyMax[i]) return this.notAvailableDuration[i]; } return 0; }
他這個計算規則是這個樣子的,他有兩個數組,一個是響應延遲的,一個是不可使用的時間,兩個排列都是從小到大的順序,倒著先找響應延遲,如果你這個延遲大于某個時間,就找對應下標的不可使用的時間,比如說響應延遲700ms,這時候他就會找到30000ms不可使用時間。
計算完這個不可使用時間后接著調用了latencyFaultTolerance的updateFaultItem方法,這個方法其實就是用來存儲的:
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) { // 從緩存中獲取 FaultItem old = this.faultItemTable.get(name); // 緩存沒有的情況 if (null == old) { final FaultItem faultItem = new FaultItem(name); // 設置延遲 faultItem.setCurrentLatency(currentLatency); // 設置啟用時間 faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); // 設置faultItemTable 中 old = this.faultItemTable.putIfAbsent(name, faultItem); // 如果已經有了,拿到 老的進行更新 if (old != null) { old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } } else { // 緩存中已經有了,直接拿老的進行更新 old.setCurrentLatency(currentLatency); old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration); } }
他有個faultItemTable 這個緩存,記錄著 每個broker的FaultItem的項,這個FaultItem就是保存它能夠使用的一個時間(當前時間戳+不可使用時間),其實這個方法就是做更新或者插入操作。
好了到這我們就把它這個收集響應延遲指標與計算可用時間這快就解析完了,再回頭看下那個選擇MessageQueue的方法:
可以看到它先是找那種可用的,然后不是上一個broker的那個,如果好幾輪下來沒有找到的話就選擇一個
public String pickOneAtLeast() { // 將map中里面的放到tmpList 中 final Enumeration<FaultItem> elements = this.faultItemTable.elements(); List<FaultItem> tmpList = new LinkedList<FaultItem>(); while (elements.hasMoreElements()) { final FaultItem faultItem = elements.nextElement(); tmpList.add(faultItem); } // 如果不是null if (!tmpList.isEmpty()) { // 洗牌算法 Collections.shuffle(tmpList); // 排序 Collections.sort(tmpList); final int half = tmpList.size() / 2; // 沒有 2臺機器 if (half <= 0) { // 選擇第一個 return tmpList.get(0).getName(); } else { // 有2臺機器及以上,某個線程內隨機選排在前半段的broker final int i = this.whichItemWorst.getAndIncrement() % half; return tmpList.get(i).getName(); } } return null; }
先是排序,然后將所有的broker/2 ,如果是小于等于0的話,說明就2個broker以下,選第一個,如果是2臺以上,就輪詢選一個
先來看下排序規則:
/** * 失敗條目(規避規則條目) */ class FaultItem implements Comparable<FaultItem> { // 條目唯一鍵,這里是brokerName private final String name; // todo currentLatency 和startTimestamp 被volatile修飾 // 本次消息發送的延遲時間 private volatile long currentLatency; // 故障規避的開始時間 private volatile long startTimestamp; public FaultItem(final String name) { this.name = name; } @Override public int compareTo(final FaultItem other) { // 將能提供服務的放前面 if (this.isAvailable() != other.isAvailable()) { if (this.isAvailable()) return -1; if (other.isAvailable()) return 1; } // 找延遲低的 放前面 if (this.currentLatency < other.currentLatency) return -1; else if (this.currentLatency > other.currentLatency) { return 1; } // 找最近能提供服務的 放前面 if (this.startTimestamp < other.startTimestamp) return -1; else if (this.startTimestamp > other.startTimestamp) { return 1; } return 0; }
它是把能提供服務的放前面,然后沒有,就找那種延遲低的放前面,也沒有的話就找最近能提供服務的放前頭。 找到這個broker 之后然后根據這個broker name 獲取寫隊列的個數,其實你這個寫隊列個數有幾個,然后你這個broker對應的MessageQueue就有幾個,如果write size >0的話,然后這個broker 不是null,就找一個mq,然后設置上它的broker name 與queue id
如果write<=0,直接移除這個broker對應FaultItem,最后實在是找不到就按照上面那種普通方法來找了。
到此,關于“RocketMQ producer容錯機制源碼分析”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。