您好,登錄后才能下訂單哦!
RocketMQ 是一款開源的消息中間件,采用Java實現,設計思想來自于Kafka(Scala實現),在具體設計時體現了自己的選擇和需求,具體差別可以看RocketMQ與Kafka對比。接下來是自己閱讀源碼的一些探索。
RocketMQ的整體架構如下,可以看到各個組件充當的角色,Name Server 負責維護一些全局的路由信息:當前有哪些broker,每個Topic在哪個broker上; Broker具體處理消息的存儲和服務;生產者和消費者是消息的源頭和歸宿。
一、Producer 發送消息
Producer發送消息是如何得知發到哪個broker的 ? 每個應用在收發消息之前,一般會調用一次producer.start()/consumer.start()做一些初始化工作,其中包括:創建需要的實例對象,如MQClientInstance;設置定時任務,如從Nameserver中定時更新本地的Topic route info,發送心跳信息到所有的 broker,動態調整線程池的大小,把當前producer加入到指定的組中等等。客戶端會緩存路由信息TopicPublishInfo, 同時定期從NameServer取Topic路由信息,每個Broker與NameServer集群中的所有節點建立長連接,定時注冊Topic信息到所有的NameServer。Producer在發送消息的時候會去查詢本地的topicPublishInfoTable(一個ConcurrentHashMap),如果沒有命中的話就會詢問NameServer得到路由信息 (RequestCode=GET_ROUTEINTO_BY_TOPIC) 如果nameserver中也沒有查詢到(表示該主題的消息第一次發送),那么將會發送一個default的topic進行路由查詢。
具體過程如下圖所示:
Producer 在得到了具體的通信地址后,發送過程就顯而易見了。通過代碼可以看到在選擇消息隊列進行發送時采用隨機方式,同時和上一次發送的broker保持不同,防止熱點。
二、Broker處理來自Producer的消息
每個producer在發送消息的時候都和對應的Broker建立了長連接,此時broker已經準備好接收Message,Broker的SendMessageProcessor.sendMessage處理消息的存儲,具體過程如下。接收到消息后,會先寫入Commit Log文件(順序寫,寫滿了會新建一個新的文件),然后更新Consume queue文件(存儲如何由topic定位到具體的消息)。
三、RocketMQ 存儲特點
RocketMQ的消息采用順序寫到commitlog文件,然后利用consume queue文件作為索引,如圖。RocketMQ采用零拷貝mmap+write的方式來回應Consumer的請求,RocketMQ宣稱大部分請求都會在Page Cache層得到滿足,所以消息過多不會因為磁盤讀使得性能下降,這里自己的理解是,在64bit機器下,虛存地址空間(vm_area_struct)不是問題,所以相關的文件都會被映射到內存中(有定期刪除文件的操作),即使此刻不在內存,操作系統也會因為缺頁異常進行換入,雖然地址空間不是問題,但是一個進程映射文件的個數(/proc/sys/vm/max_map_count)是有限的,所以可能在這里發生OOM。
通過Broker中的存儲目錄(默認路徑是 $HOME/store)也能看到存儲的邏輯視圖:
四、順序消息是如何保證的?
需要業務層自己決定哪些消息應該順序到達,然后發送的時候通過規則(hash)映射到同一個隊列,因為沒有誰比業務自己更加知道關于消息順序的特點。這樣的順序是相對順序,局部順序,因為發送方只保證把這些消息順序的發送到broker上的同一隊列,但是不保證其他Producer也會發送消息到那個隊列,所以需要Consumer在拉到消息后做一些過濾。
五、RocketMQ 刷盤實現
Broker 在消息的存取時直接操作的是內存(內存映射文件),這可以提供系統的吞吐量,但是無法避免機器掉電時數據丟失,所以需要持久化到磁盤中。刷盤的最終實現都是使用NIO中的 MappedByteBuffer.force() 將映射區的數據寫入到磁盤,如果是同步刷盤的話,在Broker把消息寫到CommitLog映射區后,就會等待寫入完成。異步而言,只是喚醒對應的線程,不保證執行的時機,流程如圖所示,更多細節可以參考。
六、消息過濾
類似于重復數據刪除技術(Data Deduplication),可以在源端做,也可以在目的端實現,就是網絡和存儲的權衡,如果在Broker端做消息過濾就需要逐一比對consume queue 的 tagsCode 字段(hashcode),如果符合則傳輸給消費者,因為是 hashcode,所以存在誤判,需要在 Consumer 接收到消息后進行字符串級別的過濾,確保準確性。
小結
這次代碼閱讀主要著眼于消息的發送過程和Broker上的存儲,其他方面的細節有待深入。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。