您好,登錄后才能下訂單哦!
本系列是「RabbitMQ實戰:高效部署分布式消息隊列」書籍的總結筆記。
前段時間總結完了「深入淺出MyBatis」系列,對MyBatis有了更全面和深入的了解,在掘金社區也收到了一些博友的喜歡,很高興。另外,短暫的陪產假就要結束了,小寶也二周了,下周二就要投入工作了,希望自己盡快調整過來,加油努力。
從本篇開始總結「RabbitMQ實戰」系列的閱讀筆記,RabbitMQ是一個開源的消息代理和隊列服務器,可以通過基本協議在完全不同的應用之間共享數據,可以將作業排隊以便讓分布式服務進行處理。
本篇介紹下消息通信,首先介紹基礎概念,將這些概念映射到AMQP協議,然后介紹消息持久化、發送方確認模式等消息可靠性保證。
通過本篇介紹,你會了解到:
此部分的介紹,會牽涉到AMQP的元素,如果之前沒接觸過的,可以結合下面的「AMQP元素」進行理解。
消息是傳輸的主體,消息包括兩部分:有效載荷(payload)和標簽(label);有效載荷是要傳輸的數據,可以是任何內容,比如JSON串、二進制、自定義的數據協議等;標簽描述了有效載荷,并且Rabbit用它來決定誰將獲得消息的投遞。
可以與HTTP協議類比,HTTP消息頭部描述了消息體的類型、大小等,HTTP消息體是要傳輸的數據,HTTP服務端通過消息頭部決定如何處理請求和數據。
生產者創建消息,然后發送到代理服務器(RabbitMQ Server),AMQP只會用標簽表述這條消息(一個交換器名稱和可選的主題標記),Rabbit服務器會根據標簽把消息發送給訂閱的消費者。
消費者消費消息,它會訂閱到隊列(queue)上,每當有消息到達RabbitMQ服務器時,會發送給消費者,消費者收到消息時,會進行處理。
注意:消費者收到的消息只包括有效載荷,所有不會知道是從哪里發來的。
要想發布或消費消息,必須先與RabbitMQ Server建立一條TCP連接,建立TCP連接之后,要創建一條信道,信道是建立在真實TCP連接的虛擬連接。
AMQP命令都是通過信道發送出去的,每條信道會被指派一個唯一的ID,為什么不直接通過TCP連接發送AMQP命令呢? 因為操作系統建立和銷毀TCP會話是很昂貴的,而且創建的連接數也有限。 通過引入通道,可以在連接上建立通道,而且通道是私密的,相互不受影響。
通道的概念還是有點抽象,后面專門寫一篇文章進行分析介紹,這里簡單理解下吧。
AMQP消息路由有三部分組成:隊列、交換器和綁定,隊列是存放消息的地方,交換器是決定不同的分發策略,綁定是隊列和交換器的橋梁,定義匹配規則。
生產者發送消息到交換器,交換器根據自身類型和綁定規則,將消息存放在對應隊列中,然后將消息發送到監聽隊列的消費者。
如上圖:P為生產者,X為交換器,交換器類型為direct,根據不同的綁定規則(orange、black、green),分發給不同的隊列,C為消費者,從不同的隊列介紹消息。
消費者通過兩種方式從特定的隊列接收消息:
basic.get會影響性能,推薦使用basic.consume來實現高吞吐量,因為其處理過程是先訂閱消息,獲取單條消息,再取取消訂閱。
如果隊列擁有多個消費者時,隊列收到的消息將以循環的方式發給消費者,即多個消費者平均消費這些消息。
另外,消費者接收到的每一條消息都要進行確認,必須通過basic.ack命令向rabbitmq服務端發送一個確認。 也可以設置auto_ack為true,只要消費者接收到消息,就自動視為確認,不過不建議這樣,因為接收到不代表業務邏輯處理成功。 服務端接收到確認后,會從隊列中刪除對應消息。
還有一種場景,在接收到消息后,如果不想處理,可以通過下面方式處理:
最后來介紹下如何創建隊列,首先明確下是生成者還是消費者創建,關鍵點是:生產者能否承擔起丟失消息,因為發出去的消息如果路由到了不存在的隊列,Rabbit會忽略它們。所以,建議生成者和消費者都嘗試去創建隊列,可以通過設置queue.declare的passive選項設置為ture來判斷隊列是否存在,如果不存在會返回一個錯誤。
通過queue.declare命令來創建隊列,有一些選項說明下:
queueDeclare(String queue,
boolean durable,
boolean exclusive,
Map<String, Object> arguments);
交換器有四種類型:direct、fanout、topic、headers,其中headers匹配消息的header而非路由鍵,不太實用,就不詳細介紹了。
第一種:direct交換器
direct交換器比較簡單,如果和路由鍵 完全匹配 的話,就會投遞到對應的隊列:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
服務器默認包含一個空白字符串名稱的默認路由器,當聲明一個隊列時,會自定綁定到默認交換器,并以隊列名稱作為路由鍵。
第二種:fanout交換器
fanout交換器,不處理路由鍵,只需要簡單的將隊列綁定到交換機上,為會每個消費者自動生成一個隨機隊列,所有的消費者都會收到所有消息。
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
第三種:topic交換器
topic交換器,將路由鍵和某模式進行匹配,此時隊列需要綁定要一個模式上。
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
關于模式,符號#匹配一個或多個詞,符號匹配一個詞,因此kfs.#能夠匹配到kfs.session.message,但是audit.只會匹配到audit.session。
每個RabbitMQ服務器都能創建虛擬消息服務器,稱為虛擬主機(vhost),每個RabbitMQ本質上是一個mini版的RabbitMQ服務器,擁有自己的隊列、交換器、綁定,還有自己的權限機制。
連接時,必須制定vhost,rabbitmq包含了默認的vhost:"/"。當創建一個用戶時,會被指派給至少一個vhsot,并且相互隔離。
vhost不能通過AMQP協議創建,需要使用rabbitmqctl工具創建。
如果沒有持久化,重啟rabbitmq后,隊列、交換器都會消失,RabbitMQ提供了持久化的功能,需要滿足以下三個條件:
當發布一條持久化消息到持久化交換器上時,rabbit會在消息提交到日志文件后才會發送響應,所有會損失性能,所以,只對重要數據持久化即可。
考慮這種情況:由于發布消息后,不返回任何信息給生產者,如何只對服務器已經持久化到硬盤了呢,可能在傳輸過程中丟失,或者持久化前服務器宕機,導致消息丟失。
RabbitMQ通過「發送方確認模式」來解決上面的問題。首先,需要將信道設置成confirm模式,這樣所有在信道上發布的消息都會被指派一個唯一的ID號,一旦消息被投遞到所有匹配的隊列或持久化到磁盤,會發送一個確認消息給生產者。
通過本篇的介紹,對Rabbit的消息模型有了整體了解,下一篇會寫個DEMO,并介紹下運行和管理RabbitMQ。
歡迎掃描下方二維碼,關注我的個人微信公眾號 ~
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。