您好,登錄后才能下訂單哦!
這篇文章主要講解了“消息隊列原理之如何掌握rabbitmq”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“消息隊列原理之如何掌握rabbitmq”吧!
RabbitMQ 是一個由 Erlang 開發的 AMQP(Advanced Message Queuing Protocol,高級消息隊列協議)的開源實現,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。支持多種客戶端語言。
整體架構對照下面的圖說明
先看看圖片上各個名次的解釋:
Broker:它提供一種傳輸服務,它的角色就是維護一條從生產者到消費者的路線,保證數據能按照指定的方式進行傳輸,簡單來說就是消息隊列服務器實體。
Connection: 客戶端與 Rabbitmq Broker
直接的 TCP
連接,通常一個客戶端與 Broker 之間只需要一個連接即可。
Channel: 消息通道,在客戶端的每個連接里,可建立多個channel,最好每個線程都用獨立的Channel,后續的對 Queue
和 Exchange
的操作都是在 Channel 中完成的。
Producer: 消息生產者,通過和 Broker
建立 Connection 和 Channel ,向 Exchange 發送消息。
Consumer: 消息消費者,通過和 Broker
建立 Connection 和 Channel,從 Queue 中消費消息。
Exchange: 消息交換機,按照一定的策略把 Producer 生產的消息投遞到 Queue 中,等待消費者消費。
Queue: 消息隊列載體,每個消息都會被投入到一個或多個隊列。
Vhost: 虛擬主機,一個broker里可以開設多個vhost,用作權限分離,把不同的系統使用的rabbitmq區分開,共用一個消息隊列服務器,但看上去就像各自在用不用的rabbitmq服務器一樣。
Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來,這樣RabbitMQ就知道如何正確地將消息路由到指定的Queue了。
RoutingKey:路由關鍵字,生產者在將消息發送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規則,而這個routing key需要與Exchange Type及binding key聯合使用才能最終生效。
這里面比較難理解的概念是
RoutingKey
,Exchange
,Binding
,消費發送時不會直接發送給Queue
,而是先發送給Exchange
,由Exchange
按照一定的規則投遞到與它綁定的Queue
中,那這個規則是什么呢? 規則就與Exchange
的 Type、Binding
、RoutingKey
相關,Exchange
支持的類型有 4 種,direct,fanout,topic,headers
,含義如下:
direct:
Queue
和Exchange
在綁定時需要指定一個key
, 我們稱為Bindkey
。Producer
往Exchange
發送消息時,也需要指定一個key
,這個key
就是Routekey
。這種模式下 Exchange 會把消息投遞給Routekey
和Bindkey
相同的隊列fanout: 類似于廣播的方式,會把消息投遞給和 Exchange 綁定的所有隊列,不需要檢查
Routekey
和Bindkey
。topic: 類似于組播的方式,這種模式下
Bingkey
支持模糊匹配,*
代表匹配一個任意詞組,#
代表匹配0個或多個詞組。如 Producer 產生一條 RouteKey 為benz.car
的消息, 同時這個 Exchange 綁定了3組隊列(請注意是3組不是3個,意思是Exchange可以和同一個Queue進行多次綁定,通過Bindkey 的不同,它們之間是多對多的關系),Bindkey 分別為:car
,*.car
,benz.car
,那么會把這個消息投遞到*.car
、benz.car
對應的Queue
中。headers: 這個類型
Routekey
和Bindkey
的匹配規則來路由消息,而是根據發送的消息內容中的headers
屬性進行匹配。
對照上面圖和名次解釋應該比較清晰明了了,下面我們通過幾個例子說明如何使用。
先看看 Rabbitmq 默認的 exchange ,其中第一個(AMQP default) 是默認的,默認綁定了所有的 Queue
,會把消息投遞到 Routekey 對應的隊列中,即: Routekey==QueueName
。
package main import ( "fmt" "github.com/streadway/amqp" "log" ) func handlerError(err error, msg string) { if err != nil { log.Fatalf("%s: %s", msg, err) } } var url = "amqp://username:password@ip:port" func main() { conn, err := amqp.Dial(url) handlerError(err, "Failed to connect to RabbitMQ") defer conn.Close() channel, err := conn.Channel() handlerError(err, "Failed to open a Channel") defer channel.Close() queueNameCar := "car" if _, err := channel.QueueDeclare(queueNameCar, false, false, false, false, nil); err != nil { handlerError(err, "Failed to decare Queue") } if err := channel.Publish("", queueNameCar, false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") } }
這里是一個完整的 Demo, 后面只會提供main() 函數
的示例代碼,其他的和這里這里類似。
申明了一個名稱為 car
的消息隊列,并沒有做任何的綁定,往 defalut exchange
發送一條消息,routekey 為 car
,可以看到和隊列名相同。
為了方便演示,結果以圖片的方式展現,可以看到這里有 car
的隊列,并且有一條消息。
在創建隊列有幾個參數可以關注一下
Durability: 持久化,是否將隊列持久化到磁盤,當選擇持久化時當 rabbitmq 重啟了,這個隊列還在,否則當重啟了之后這個隊列就沒有了,需要重新創建,這個需要設計程序時考慮到。
Auto delete: 當其中一個消費者已經完成之后,會刪除這個隊列并斷開與其他的消費者的連接。
Arguments:
x-message-ttl: 消息的過期時間,發布到隊列中的消息在被丟棄之前可以存活多久。
x-expires: 隊列的過期時間,一個隊列在多長時間內未使用會被自動刪除。
x-max-length: 隊列的長度,最多剋容納多少條消息。
x-max-length-bytes: 隊列最大可以包含多大的消息。
x-dead-letter-exchange: 當消息過期或者被客戶端
reject
之后應該重新投遞到那個exchange
,類似與一個producer
發送消息時選擇exchange
x-dead-letter-routing-key: 當消息過期或者被客戶端
reject
之后重新投遞時的Routekey
,類似與一個producer
發送消息時設置routekey
,默認是原消息的routekey
x-max-priority: 消息的優先級設置,設置可以支持的最大優先級,如設置為
10
,則可以在發送消息設置優先級,可以根據優先級處理消息,默認為空,當為空時則不支持優先級x-queue-mode: 將隊列設置為懶惰模式,盡可能多地將消息保留在磁盤上,以減少RAM的使用量;如果不設置,隊列將保留內存中的緩存,以盡可能快地傳遞消息。
我們自己創建一個 direct 類型的 exchange 并綁定一些隊列看看是什么效果。
func main() { conn, err := amqp.Dial(url) handlerError(err, "Failed to connect to RabbitMQ") defer conn.Close() channel, err := conn.Channel() handlerError(err, "Failed to open a Channel") defer channel.Close() directExchangeNameCar := "direct.car" if err := channel.ExchangeDeclare(directExchangeNameCar, "direct", true, false, false, false, nil); err != nil { handlerError(err, "Failed to decalare exchange") } queueNameCar := "car" queueNameBigCar := "big-car" queueNameMiddleCar := "middle-car" queueNameSmallCar := "small-car" channel.QueueDeclare(queueNameCar, false, false, false, false, nil) channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil) channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil) channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil) if err := channel.QueueBind(queueNameCar, "car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "big.car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "middler.car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "small.car", directExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.Publish(directExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") } }
代碼中申明了 1 一個 Exchange
,4個 Queue
,7個 Binding
,其中一個 Binding
詳情如下:
可以看到向這個 Exchange 中發消息,Routekey 為 car
,匹配的隊列有個,那么這4個隊列中都應該有消息才對 和我們的設想是一直
Queue
的創建上面已經講過了,這里有Exchange
的創建,那么再看看創建Exchange
有哪些參數
Type: 類型,上面已經涉及到了
Durability: 持久化
Auto delete: 是否自動刪除,如果為yes 則當其中隊列完成
unbind
操作,則其他的queue
或者exchange
也會 unbind 并且刪除這個exchange
。Internal: 如果為yes ,則客戶端不能直接往這個 exchange 上發送消息,只能用作和
exchange
綁定。
fanout 工作方式類似于廣播,看看下面的代碼
func main() { conn, err := amqp.Dial(url) handlerError(err, "Failed to connect to RabbitMQ") defer conn.Close() channel, err := conn.Channel() handlerError(err, "Failed to open a Channel") defer channel.Close() fanoutExchangeNameCar := "fanout.car" if err := channel.ExchangeDeclare(fanoutExchangeNameCar, "fanout", true, false, false, false, nil); err != nil { handlerError(err, "Failed to decalare exchange") } queueNameCar := "car" queueNameBigCar := "big-car" queueNameMiddleCar := "middle-car" queueNameSmallCar := "small-car" channel.QueueDeclare(queueNameCar, false, false, false, false, nil) channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil) channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil) channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil) if err := channel.QueueBind(queueNameCar, "car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "big.car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "middler.car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "small.car", fanoutExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.Publish(fanoutExchangeNameCar, "middle.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") } }
這個申明了一個 fanout
類型的 exchange ,和上面的代碼類似,只有 exchange 不同。
可以先在腦海中想想每個 queue 中有幾條消息。
向 fanout.car
這個 exchange 發消息指定 Routekey 為 middle.car
,但是由于是廣播模式,所以和 routekey 是沒有關系的,每個消息隊列中各有一條消息。
請注意有些 binding 指向的是同一個 queue ,那么會產生多條消息到相同的 queue 中,答案是否定的。producer 產生一條消息,根據一定的規則,每個隊列只會收到一條(如何符合投遞規則的話)。
topic 比較有意思了,和之前的簡單粗暴的用法不一樣了,先看看下面的代碼,聲明了一個 topic
類型的 exchange
, 4個 queue
func main() { conn, err := amqp.Dial(url) handlerError(err, "Failed to connect to RabbitMQ") defer conn.Close() channel, err := conn.Channel() handlerError(err, "Failed to open a Channel") defer channel.Close() topicExchangeNameCar := "topic.car" if err := channel.ExchangeDeclare(topicExchangeNameCar, "topic", true, false, false, false, nil); err != nil { handlerError(err, "Failed to decalare exchange") } queueNameCar := "car" queueNameBigCar := "big-car" queueNameMiddleCar := "middle-car" queueNameSmallCar := "small-car" channel.QueueDeclare(queueNameCar, false, false, false, false, nil) channel.QueueDeclare(queueNameBigCar, false, false, false, false, nil) channel.QueueDeclare(queueNameMiddleCar, false, false, false, false, nil) channel.QueueDeclare(queueNameSmallCar, false, false, false, false, nil) if err := channel.QueueBind(queueNameCar, "car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameBigCar, "big.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameMiddleCar, "middler.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "small.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "*.small.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } if err := channel.QueueBind(queueNameSmallCar, "#.small.car", topicExchangeNameCar, false, nil); err != nil { handlerError(err, "Failed to bind queue to exchange") } }
現在思考每個 producer
產生消息之后,會有哪些 queue
會收到消息。
if err := channel.Publish(topicExchangeNameCar, "car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
每個 queue 都會收到消息
if err := channel.Publish(topicExchangeNameCar, "small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
small-car
這一個隊列會收到消息。
符合 Routekey 為 small.car
、*.small.car
、#.small.car
的binding
if err := channel.Publish(topicExchangeNameCar, "benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
small-car
這一個隊列會收到消息。
符合 Routekey 為 *.small.car
、#.small.car
的binding
if err := channel.Publish(topicExchangeNameCar, "auto.blue.benz.small.car", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
small-car
這一個隊列會收到消息。
符合 Routekey 為 #.small.car
的binding
if err := channel.Publish(topicExchangeNameCar, "bike", false, false, amqp.Publishing{ContentType: "text/plain", Body: []byte("test car")}); err != nil { handlerError(err, "Failed to publish message") }
都不會收到消息,沒有符合的 routekey 。
這種類型很少有實際的應用場景。
感謝各位的閱讀,以上就是“消息隊列原理之如何掌握rabbitmq”的內容了,經過本文的學習后,相信大家對消息隊列原理之如何掌握rabbitmq這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。