91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ消息發送流程源碼分析

發布時間:2022-08-01 13:47:31 來源:億速云 閱讀:112 作者:iii 欄目:開發技術

本文小編為大家詳細介紹“RocketMQ消息發送流程源碼分析”,內容詳細,步驟清晰,細節處理妥當,希望這篇“RocketMQ消息發送流程源碼分析”文章能幫助大家解決疑惑,下面跟著小編的思路慢慢深入,一起來學習新知識吧。

讀源碼

1 調用defaultMQProducerImpl.send()

public SendResult send(
    Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}

2 設置過期時間

public SendResult send(Message msg,
    long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

3 執行defaultMQProducerImpl.sendDefaultImpl()方法

private SendResult sendDefaultImpl(
    Message msg,
    final CommunicationMode communicationMode,
    final SendCallback sendCallback,
    final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}

這里看看這幾個參數,

  • communicationMode 是通信模式,同步異步還是單向

  • sendCallback 是針對異步模式的,異步模式需要設置發送完成后的回調。

sendDefaultImpl是發送消息的核心方法。

這里消息孩子進到第一個卡口,先要檢查送孩子來的家長是否還能聯系上,若是能聯系到,就繼續。要是聯系不到,這孩子豈不是被拋棄了,不敢接不敢接,送到孤兒院吧。

然后需要檢查消息孩子了,首先是檢查孩子還在不在,別扔個衣服跑了。
然后看看孩子指定的這個topic,不能說我想去內個topic哈,必須是實實在在的名字。而且上頭也規定了,這個topic的名字也不能太長,也不能包含特殊字符。已有的一些領導定過的也不能用哈。
接下來就是檢查孩子的body了,之前說body就是孩子的技能,首先,技能為空,不行不行,啥都不會是不行的。再者太長也不行,你唱首歌兩年,這沒法玩。

檢查message不為null

檢查topic

  • topic不能為空

  • topic不能太長

  • 不能包含特殊字符

檢查話題的名字是否被系統已占用

檢查body

  • 檢查是否為空

  • 檢查長度是否過長,最大為4MB 這樣

下邊我們看看sendDefaultImpl這個方法。給他拆成一段一段的看。

1 兩個校驗

//校驗生產者服務是ok的,可以聯系到的
this.makeSureStateOK();
//校驗消息的參數
Validators.checkMessage(msg, this.defaultMQProducer);
  • 第一個檢查,檢查生產者服務是否是正常工作的,若是不正常工作,就拋出異常。

private void makeSureStateOK() throws MQClientException {
    if (this.serviceState != ServiceState.RUNNING) {
        throw new MQClientException("The producer service state not OK, "
            + this.serviceState
            + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
            null);
    }
}
  • 第二個檢查,檢查消息本身是否為空,檢查topic,檢查消息的body

public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer) throws MQClientException {
    if (null == msg) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message is null");
    }
    // 這里校驗Topic的時候,校驗了不能為空,長度和特殊字符
    Validators.checkTopic(msg.getTopic());
    //這里則校驗了一些不允許使用的topic名字
    Validators.isNotAllowedSendTopic(msg.getTopic());
    // body不為空
    if (null == msg.getBody()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body is null");
    }
    // body長度不為0
    if (0 == msg.getBody().length) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL, "the message body length is zero");
    }
    // body 長度不能過長
    if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
        throw new MQClientException(ResponseCode.MESSAGE_ILLEGAL,
            "the message body size over max value, MAX: " + defaultMQProducer.getMaxMessageSize());
    }
}

2 獲取topic路由信息

嗯,這里孩子終于通過了檢查,服務人員開始帶著他去找自己指定的topic區域,指定是自己指定,劃分還是工作人員劃分的。咱總得知道這個topic區域在哪吧。

先去緩存筆記里找,有沒有這個區域的信息,若是沒有這個topic,就新建一個,然后更新到緩存筆記里邊。若有topic但是不知道在哪,就找name server大腦去申請這個topic在哪的信息。

執行tryToFindTopicPublishInfo方法去獲取Topic的路由信息,若是不存在就新建,若是有topic但是緩存中沒有路由信息,則通過name server獲取路由信息。

TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
    //獲取topic信息
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    //不存在
    if (null == topicPublishInfo || !topicPublishInfo.ok()) {
        //新建
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        //修改topic的路由信息并更新到本地
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    }
    //包含路由信息就直接返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
        return topicPublishInfo;
    } else {
        //不包含路由信息則向name server申請,修改topic的路由信息并更新到本地
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    }
}

3 計算重試次數

這就是計算消息孩子可以嘗試去找地方坐幾次,沒坐上,欸,我又來了,沒坐上,欸,我又來了。

這行代碼就是計算重試次數的,根據communicationMode傳入的值,同步異步還是單向的來決定重試次數是幾次。 很明顯,若是同步的,就會嘗試三次。若是異步的或者單向的就只發送一次。

int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

4 執行隊列選擇方法

我們之前說了,Broker類似于候船大廳,為了均分壓力,每次都要進與上次不同的候船大廳。

執行selectOneMessageQueue方法通過Queue將消息發送到與上次不同的一個Broker。也可以通過 sendLatencyFaultEnable判斷是否啟用延遲容錯開關

MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);

5 發送消息

這就是走過巷道坐到屬于自己的座位上了

然后就通過sendKernelImpl發送消息了,這是發送消息的核心方法。會準備通信層的入參,并將請求發送給通信層,內部實現是基于Netty的。

sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);

讀到這里,這篇“RocketMQ消息發送流程源碼分析”文章已經介紹完畢,想要掌握這篇文章的知識點還需要大家自己動手實踐使用過才能領會,如果想了解更多相關內容的文章,歡迎關注億速云行業資訊頻道。

向AI問一下細節

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

寻乌县| 谢通门县| 西乡县| 延寿县| 芜湖县| 临城县| 台北市| 普洱| 施秉县| 神农架林区| 鄄城县| 灵台县| 项城市| 浮梁县| 当雄县| 南通市| 阳朔县| 杂多县| 屏山县| 麻阳| 龙江县| 涡阳县| 肇庆市| 昌吉市| 河北区| 同江市| 靖宇县| 界首市| 安达市| 桃园县| 达州市| 新乡市| 乌拉特后旗| 太谷县| 泽普县| 南丹县| 大石桥市| 仪陇县| 嘉定区| 江津市| 鹤岗市|