您好,登錄后才能下訂單哦!
AMQP全稱是Advanced Message Queuing Protocol,它是一個(分布式)消息傳遞協議,使用和符合此協議的客戶端能夠基于使用和符合此協議的消息傳遞中間件代理(Broker,也就是經紀人,個人感覺叫代理合口一些)進行通信。AMQP目前已經推出協議1.0,實現此協議的比較知名的產品有StormMQ、RabbitMQ、Apache Qpid等。RabbitMQ實現的AMQP版本是0.9.1,官方文檔中也提供了該協議pdf文本下載,有興趣可以翻閱一下。
Messaging Broker,這里稱為消息中間件代理。它的職責是從發布者(Publisher,或者有些時候稱為Producer,生產者)接收消息,然后把消息路由到消費者(Consumer,或者有些時候稱為Listener,監聽者)。
因為消息中間件代理、發布者客戶端和消費者客戶端都是基于AMQP這一網絡消息協議,所以消息中間件代理、發布者客戶端和消費者客戶端可以在不同的機器上,從而實現分布式通訊和服務解耦。
消息中間件代理不僅僅提供了消息接收和消息路由這兩個基本功能,還有其他高級的特性如消息持久化功能、監控功能等等。
AMQP-0-9-1模型的基本視圖是:消息發布者消息發布到交換器(Exchange)中,交換器的角色有點類似于日常見到的郵局或者信箱。然后,交換器把消息的副本分發到隊列(Queue)中,分發消息的時候遵循的規則叫做綁定(Binding)。接著,消息中間件代理向訂閱隊列的消費者發送消息(push模式),或者消費者也可以主動從隊列中拉取消息(fetch/pull模式)。
發布者在發布消息的時候可以指定消息屬性(消息元數據),某些消息元數據可能由消息中間件代理使用,其他消息元數據對于消息中間件代理而言是不透明的,僅供消息消費者使用。
由于網絡是不可靠的,客戶端可能無法接收消息或者處理消息失敗,這個時候消息中間件代理無法感知消息是否正確傳遞到消費者中,因此AMQP模型提供了消息確認(Message Acknowledgement)的概念:當消息傳遞到消費者,消費者可以自動向消息中間件代理確認消息已經接收成功或者由應用程序開發者選擇手動確認消息已經接收成功并且向消息中間件代理確認消息,消息中間件代理只有在接收到該消息(或者消息組)的確認通知后才會從隊列中完全刪除該消息。
在某些情況下,交換器無法正確路由到隊列中,那么該消息就會返回給發布者,或者丟棄,或者如果消息中間件代理實現了"死信隊列(Dead Letter Queue)"擴展,消息會被放置到死信隊列中。消息發布者可以選擇使用對應的參數控制路由失敗的處理策略。
交互器(Exchange)是消息發送的第一站目的地,它的作用就是就收消息并且將其路由到零個或者多個隊列。路由消息的算法取決于交互器的類型和路由規則(也就是Binding)。RabbitMQ消息中間件代理支持四種類型的交互器,分別是:
交換器類型 | Broker默認預聲明的交換器 |
---|---|
Direct | (空字符串[(AMQP default)])和amq.direct |
Fanout | amq.fanout |
Topic | amq.topic |
Headers | amq.match (和RabbitMQ中的amq.headers) |
聲明交互器的時候需要提供一些列的屬性,其中比較重要的屬性如下:
之所以存在Durability和Auto-delete特性是因為并發所有的場景和用例都要求交互器是持久化的。
Direct類型的交換器基于消息路由鍵(RoutingKey)把消息傳遞到隊列中。Direct交換器是消息單播路由的理想實現(當然,用于多播路由也可以),它的工作原理如下:
默認交換器(Default Exchange)是一種特殊的Direct交互器,它的名稱是空字符串(也就是""),它由消息中間件代理預聲明,在RabbitMQ Broker中,它在Web管理界面中的名稱是(AMQP default)
。每個新創建的隊列都會綁定到默認交換器,路由鍵就是該隊列的隊列名,也就是所有的隊列都可以通過默認交換器進行消息投遞,只需要指定路由鍵為相應的隊列名即可。
Fanout其實是一個組合單詞,fan也就是扇形,out就是向外發散的意思,Fanout交換器可以想象為"扇形"交換器。Fanout交換器會忽略路由鍵,它會路由消息到所有綁定到它的隊列。也就是說,如果有N個隊列綁定到一個Fanout交換器,當一個新的消息發布到該Fanout交換器,那么這條新消息的一個副本會分發到這N個隊列中。Fanout交換器是消息廣播路由的理想實現。
Topic交換器基于路由鍵和綁定隊列和交換器的模式進行匹配從而把消息路由到一個或者多個隊列。綁定隊列和交換器的Topic模式(這個模式串其實就是聲明綁定時候的路由鍵,和消息發布的路由鍵并非同一個)一般使用點號(dot,也就是'.')分隔,例如source.target.key
,綁定模式支持通配符:
source.target.#
可以匹配source.target.doge
、source.target.doge.throwable
等等。可以匹配
source.target.doge、
source.target.throwable`等等。對每一條消息,Topic交換器會遍歷所有的綁定關系,檢查消息指定的路由鍵是否匹配綁定關系中的路由鍵,如果匹配,則將消息推送到相應隊列。
Topic交換器是消息多播路由的理想實現。
Headers交換器是一種不常用的交換器,它使用多個屬性進行路由,這些屬性一般稱為消息頭,它不使用路由鍵進行消息路由。消息頭(Message Headers)是消息屬性(消息元數據)部分,因此,使用Headers交換器在建立隊列和交換器的綁定關系的時候需要指定一組鍵值對,發送消息到Headers交換器時候,需要在消息屬性中攜帶一組鍵值對作為消息頭。消息頭屬性支持匹配規則x-match如下:
Headers交換器也是忽略路由鍵的,只依賴于消息屬性中的消息頭進行消息路由。
AMQP 0-9-1模型中的隊列與其他消息或者任務隊列系統中的隊列非常相似:它們存儲應用程序所使用的消息。隊列和交換器的基本屬性有類似的地方:
一個隊列只有被聲明(Declare)了才能使用,也就是隊列的第一次聲明就是隊列的創建操作(因為第一次聲明的時候隊列并不存在)。如果使用相同的參數再次聲明已經存在的隊列,那么此次聲明會不生效(當然也不會出現異常)。但是如果使用不相同的參數再次聲明已經存在的隊列,那么會拋出通道級別的異常,異常代碼是406(PRECONDITION_FAILED)。
隊列名必須由255字節(bytes)長度以內的UTF-8編碼字符組成。實現AMQP 0-9-1規范的消息中間件代理具備自動生成隨機隊列名的功能,也就是在聲明隊列的時候,隊列名指定為空字符串,那么消息中間件代理會自動生成一個隊列名,并且在隊列聲明的返回結果中帶上對應的隊列名。
以"amq."開頭的隊列是由消息中間件代理內部生成的,有其特殊的作用,因此不能聲明此類名稱的新隊列,否則會導致通道級別的異常,異常代碼為403(ACCESS_REFUSED)。
持久化的隊列會持久化到磁盤中,這種隊列在消息中間件代理重啟后不會被刪除。不開啟持久化特性的隊列稱為瞬時(transient)隊列,并非所有的場景都需要開啟隊列的持久化特性。
隊列的持久化特性并不意味著路由到它上面的消息是持久化的,也就是隊列的持久化跟消息的持久化是兩回事。如果息中間件代理掛了,它重啟后會重新聲明開啟了持久化特性的隊列,這些隊列中只有使用了消息持久化特性的消息會被恢復。
綁定(Binding)是交換器路由消息到隊列的規則。例如交換器E可以路由消息到隊列Q,那么Q必須通過一定的規則綁定到E。綁定中使用的某些交換器的類型決定了它可以使用可選的路由鍵(RoutingKey)。路由鍵的作用類似于過濾器,可以篩選某些發布到交換器的消息路由到目標隊列。
如果發布的消息沒有路由到任意一個目標隊列,例如,消息已經發布到交換器,交換器中沒有任何綁定,這個時候消息會被丟棄或者返回給發布者,取決于消息發布者發布消息時候使用的參數。
如果隊列只有發布者生產消息,那么是沒有意義的,必須有消費者對消息進行使用,或者叫這個操作為消息消費,消息消費的方式有兩種:
basic.consume
)。basic.get
)。使用推模式的情況下,消費者必須指定需要訂閱的隊列。每個隊列可以存在多個消費者,或者僅僅注冊一個獨占的消費者。
每個消費者(訂閱者)都有一個稱為消費者標簽(consumer tag)的標識符,消費者標簽是一個字符串。通過消費者標簽可以實現取消訂閱的操作。
消費者應用程序有可能在接收和處理消息的時候崩潰,也有可能因為網絡原因導致消息中間件代理投遞消息到消費者的時候失敗了,這樣就會催生一個問題:AMQP消息中間件代理應該在什么時候從隊列中刪除消息?因此,AMQP 0-9-1規范提供了兩種選擇:
basic.deliver
或basic.get-ok
)。basic.ack
<= 個人感覺這個地方少寫了basic.nack
和basic.reject
)前一種稱為自動確認模型(動作觸發的同時進行了消息確認),后一種稱為顯式確認模型。顯式確認模型中,需要消費者主動向消息中間件代理進行消息主動確認,這個消息主動確認動作的執行時機完全由應用程序控制。消息主動確認有三種方式:積極確認(ack)、消極確認(nack)和拒絕(reject)。
預取消息(Prefetching Messages)是一個特性。對于多個消費者共享同一個隊列的情況,能夠告知消息中間件代理在發送下一個確認之前指定每個消費者一次可以接收消息的消息量。這個特性可以理解為簡單的負載均衡技術,在批量發布消息的場景下能夠提高吞吐量。
AMQP模型中,消息具有屬性值。AMQP 0-9-1規范定義了一些常見的屬性,一般開發人員不需要太關注這些屬性:
這些通用的屬性一般是消息中間件代理使用的,還有可以定制的可選屬性header,形式是鍵值對,類似于HTTP中的請求頭。消息屬性是在發布消息的時候設置的。
AMQP消息還有一個有效載荷(payload,其實就是消息數據體),AMQP代理將其視為不透明的字節數組,也就是AMQP代理不會檢查或者修改消息的有效載荷。有些消息可能只包含屬性而沒有有效負載。通常使用序列化格式(如JSON,Thrift,Protocol Buffers和MessagePack)來序列化和結構化數據,以便將其作為消息有效負載發布。在一般約定下,消息屬性中的Content type
和Content encoding
一般可以表明其序列化的方式。
消息發布支持消息的持久化特性,消息持久化特性開啟后,消息中間件代理會把消息保存到磁盤中,如果重啟代理消息也不會丟失。開啟消息持久化特性將會影響性能,主要是因為涉及到刷盤操作。
AMQP 0-9-1定義了一些方法,對應了客戶端和消息中間件代理之間交互的一些操作方法,這些操作方法的設計跟面向對象編程語言中的方法沒有任何共同之處。常用的交換器相關的操作方法有:
在邏輯上,上面幾個操作方法在客戶端和消息中間件代理之間的交互如下:
對于隊列,也有類似的操作方法:
并非所有的AMQP操作方法都有響應結果操作方法,像消息發布方法basic.publish
的使用是最廣泛的,此操作方法沒有對應的響應結果操作方法。有些操作方法可能有多個響應結果(操作方法),例如basic.get
。
AMQP的連接(Connection)通常是長期存在的。AMQP是一種使用TCP進行可靠傳遞的應用程序級協議。AMQP連接使用用戶身份驗證,可以使用TLS(SSL)進行保護。當應用程序不再需要連接到AMQP代理時,它應該正常關閉AMQP連接,而不是突然關閉底層TCP連接。
某些應用程序需要與AMQP代理程序建立多個連接。但是,不希望同時打開許多TCP連接,因為這樣做會消耗系統資源并使配置防火墻變得十分困難。通道(Channel)可以認為是"共享一個單獨的TCP連接的輕量級連接",一個AMQP連接可以擁有多個通道。
對于使用了多線程處理的應用程序,有一種使用場景十分普遍:每個線程開啟一個新的通道使用,這些通道是線程間隔離的。
另外,每個特定的通道和其他通道是相互隔離的,每個執行的AMQP操作方法(包括響應)都攜帶一個通道的唯一標識,這樣客戶端就能通過該通道的唯一標識得知操作方法是對應哪個通道發生的。
為了使單個消息中間件代理可以托管多個完全隔離的"環境"(這里的隔離指的是用戶組、交互器、隊列等),AMQP提供了虛擬主機(Virtual Host)的概念。多個虛擬主機類似于許多主流的Web服務器的虛擬主機,提供了AMQP組件完全隔離的環境。AMQP客戶端可以在連接消息中間件代理時指定需要連接的虛擬主機。
理解RabbitMQ中的AMQP模型,其實從開發者的角度來看,最重要的是Exchange、Queue、Binding三者的關系,這里談談個人的見解。消息的發布第一站總是Exchange,從模型上看,消息發布無法直接發送到隊列中。Exchange本身不存儲消息,它在接收到消息之后,會基于路由規則也就是Binding,把消息路由到目標Queue中。從實際操作來看,聲明路由規則總是在發布消息和消費消息之前,也就是一般步驟如下:
我們最關注的兩個階段,消息發布和消息消費中,消息發布實際上只跟Exchange有關,而消息消費實際上只跟Queue有關。Binding實際上就是Exchange和Queue的契約關系,會直接影響消息發布階段的消息路由。那么,路由失敗一般是什么情況導致的?路由失敗,其實就是消息已經發布到Exchange,而Exchange中從既有的Binding中無法找到存在的目標Queue用于傳遞消息副本(一般而言,很少人會發送消息到一個不存在的Exchange)。消息路由失敗,從理解AMQP的模型來看,可以從根本上避免的,除非是消息發布者故意胡亂使用或者人為錯誤使用了未存在的RoutingKey、Exchange或者說是Binding關系而導致的。
AMQP-0-9-1模型中支持了四種交換器direct(單播)、fanout(廣播)、topic(多播)、headers,實際上,從使用者角度來看,四種交換器的功能是可以相互取代的。例如可以使用fanout類型交換器實現廣播,其實使用direct類型交換器也是可以實現廣播的,只是對應的direct類型交換器需要通過多個路由鍵綁定到多個目標隊列中。在面對生產環境的技術選型的時候,我們需要考慮性能、維護難度、合理性等角度去考慮選擇什么類型的交換器,就上面的廣播消息的例子,顯然使用fanout類型交換器可以避免聲明多個綁定關系,這樣在性能、合理性上是更優的選擇。
在AMQP-0-9-1模型中,負載均衡的實現是基于消費者而不是基于隊列(準確來說應該是消息傳遞到隊列的方式)。實際上,出現消息生產速度大大超過消費者的消費速度的時候,隊列中有可能會出現消息積壓。AMQP-0-9-1模型中沒有提供基于隊列負載均衡的特性,也就是出現消息生產速度大大超過消費者的消費速度時候,并不會把消息路由到多個隊列中,而是通過預取消息(Prefetching Messages)的特性,確定消息者的消費能力,從而調整消息中間件代理推送消息到對應消費者的數量,這樣就能夠實現消費速度快的消費者能夠消費更多的消息,減少產生有消費者處于饑餓狀態和有消費者長期處于忙碌狀態的問題。
AMQP中提供的消息確認機制主要包括積極確認(一般叫ack,Acknowledgement)、消極確認(一般叫nack,Negative Acknowledgement)和拒絕(reject)。消息確認機制是保證消息不丟失的重要措施,當消費者接收到消息中間件代理推送的消息時候,需要主動通知消息中間件代理消息已經確認投遞成功,然后消息中間件代理才會從隊列中刪除對應的消息。沒有主動確認的消息就會變為"nack"狀態,可以想象為暫存在隊列的"nack區"中,這些消息不會投遞到消費者,直到消費者重啟后,"nack區"中的消息會重新變為"ready"狀態,可以重新投遞給消費者。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。