91超碰碰碰碰久久久久久综合_超碰av人澡人澡人澡人澡人掠_国产黄大片在线观看画质优化_txt小说免费全本

溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

RocketMQ源碼閱讀

發布時間:2020-06-19 17:36:43 來源:網絡 閱讀:3544 作者:網易云捕 欄目:開發技術

  RocketMQ 是一款開源的消息中間件,采用Java實現,設計思想來自于Kafka(Scala實現),在具體設計時體現了自己的選擇和需求,具體差別可以看RocketMQ與Kafka對比。接下來是自己閱讀源碼的一些探索。

 RocketMQ源碼閱讀

        RocketMQ的整體架構如下,可以看到各個組件充當的角色,Name Server 負責維護一些全局的路由信息:當前有哪些broker,每個Topic在哪個broker上; Broker具體處理消息的存儲和服務;生產者和消費者是消息的源頭和歸宿。

 RocketMQ源碼閱讀


一、Producer 發送消息

        Producer發送消息是如何得知發到哪個broker的 ? 每個應用在收發消息之前,一般會調用一次producer.start()/consumer.start()做一些初始化工作,其中包括:創建需要的實例對象,如MQClientInstance;設置定時任務,如從Nameserver中定時更新本地的Topic route info,發送心跳信息到所有的 broker,動態調整線程池的大小,把當前producer加入到指定的組中等等。客戶端會緩存路由信息TopicPublishInfo, 同時定期從NameServer取Topic路由信息,每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有的NameServer。Producer在發送消息的時候會去查詢本地的topicPublishInfoTable(一個ConcurrentHashMap),如果沒有命中的話就會詢問NameServer得到路由信息 (RequestCode=GET_ROUTEINTO_BY_TOPIC) 如果nameserver中也沒有查詢到(表示該主題的消息第一次發送),那么將會發送一個default的topic進行路由查詢。

具體過程如下圖所示:
       Producer 在得到了具體的通信地址后,發送過程就顯而易見了。通過代碼可以看到在選擇消息隊列進行發送時采用隨機方式,同時和上一次發送的broker保持不同,防止熱點。

 RocketMQ源碼閱讀

二、Broker處理來自Producer的消息

       每個producer在發送消息的時候都和對應的Broker建立了長連接,此時broker已經準備好接收Message,Broker的SendMessageProcessor.sendMessage處理消息的存儲,具體過程如下。接收到消息后,會先寫入Commit Log文件(順序寫,寫滿了會新建一個新的文件),然后更新Consume queue文件(存儲如何由topic定位到具體的消息)。

 RocketMQ源碼閱讀

三、RocketMQ 存儲特點

       RocketMQ的消息采用順序寫到commitlog文件,然后利用consume queue文件作為索引,如圖。RocketMQ采用零拷貝mmap+write的方式來回應Consumer的請求,RocketMQ宣稱大部分請求都會在Page Cache層得到滿足,所以消息過多不會因為磁盤讀使得性能下降,這里自己的理解是,在64bit機器下,虛存地址空間(vm_area_struct)不是問題,所以相關的文件都會被映射到內存中(有定期刪除文件的操作),即使此刻不在內存,操作系統也會因為缺頁異常進行換入,雖然地址空間不是問題,但是一個進程映射文件的個數(/proc/sys/vm/max_map_count)是有限的,所以可能在這里發生OOM。

 RocketMQ源碼閱讀

通過Broker中的存儲目錄(默認路徑是 $HOME/store)也能看到存儲的邏輯視圖:

 RocketMQ源碼閱讀

四、順序消息是如何保證的?

       需要業務層自己決定哪些消息應該順序到達,然后發送的時候通過規則(hash)映射到同一個隊列,因為沒有誰比業務自己更加知道關于消息順序的特點。這樣的順序是相對順序,局部順序,因為發送方只保證把這些消息順序的發送到broker上的同一隊列,但是不保證其他Producer也會發送消息到那個隊列,所以需要Consumer在拉到消息后做一些過濾。

五、RocketMQ 刷盤實現

       Broker 在消息的存取時直接操作的是內存(內存映射文件),這可以提供系統的吞吐量,但是無法避免機器掉電時數據丟失,所以需要持久化到磁盤中。刷盤的最終實現都是使用NIO中的 MappedByteBuffer.force() 將映射區的數據寫入到磁盤,如果是同步刷盤的話,在Broker把消息寫到CommitLog映射區后,就會等待寫入完成。異步而言,只是喚醒對應的線程,不保證執行的時機,流程如圖所示,更多細節可以參考。


 RocketMQ源碼閱讀


六、消息過濾

       類似于重復數據刪除技術(Data Deduplication),可以在源端做,也可以在目的端實現,就是網絡和存儲的權衡,如果在Broker端做消息過濾就需要逐一比對consume queue 的 tagsCode 字段(hashcode),如果符合則傳輸給消費者,因為是 hashcode,所以存在誤判,需要在 Consumer 接收到消息后進行字符串級別的過濾,確保準確性。

小結

       這次代碼閱讀主要著眼于消息的發送過程和Broker上的存儲,其他方面的細節有待深入。

 

向AI問一下細節
推薦閱讀:
  1. RocketMQ
  2. Rocketmq整體分析

免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。

AI

东阿县| 花莲县| 平武县| 红原县| 钟祥市| 阳曲县| 綦江县| 讷河市| 耒阳市| 温宿县| 湘乡市| 松潘县| 漾濞| 利津县| 岑巩县| 广宗县| 南靖县| 钟山县| 类乌齐县| 灯塔市| 宿迁市| 清丰县| 泰宁县| 德兴市| 新巴尔虎右旗| 林口县| 五寨县| 平和县| 台北市| 武鸣县| 文水县| 东源县| 定南县| 喀什市| 金门县| 拉孜县| 五大连池市| 芷江| 灵宝市| 兰州市| 沾化县|