您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關怎么實現RabbitMQ消息中間件的工作原理和使用,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
目前,主流的消息中間件主要有:ActiveMQ、Kafka、RabbitMQ、RocketMQ等等......,而我們今天的主角是:RabbitMQ,RabbitMQ是一個開元基于 erlang 語言開發具有高可用高并發的優點,適合集群消息代理和隊列服務器,它是基于AMQP協議來實現的,AMQP的和主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全,RabbitMQ支持多種語言,有消息確認機制和持久化機制,保證數據不丟失的前提做到可靠性、可用性。
消息(Message)是指應用于應用之間傳送的數據,消息的類型包括文本字符串、JSON、XML、內嵌對象等等...
所謂 消息中間件 / 消息隊列(Message Queue Middleware,簡稱MQ)是利用高效可靠的消息傳遞機制進行數據交流,同時可以基于數據通信來進行分布式系統的繼承,消息中間件一般有兩種傳遞模式:點對點(Point-to-Point)模式和發布/訂閱(Pub/Sub)模式,點對點模式是基于隊列的,消息生產者發送消息到隊列,消息消費者從隊列中接收消息,隊列的存在使得消息的異步傳輸成為了可能,發布訂閱模式定義了如何向一個內容節點發布和訂閱內容,這個內容節點叫topic,這種模式可以滿足消費者發布一個消息,多個消費者同時消費同一信息的需求。
AMQP的全稱:Advanced Message Queuing Protocol(高級消息隊列協議),它是消息隊列的一個規范,其中定義個很多核心的概念,AMQP與JMS(Java Message Service)Java平臺的專業技術規范類似,同樣提供了很多面向中間件的API,用于兩個應用程序之間,或者分布式系統之間的發送消息,進行異步通信。
解釋:Producer(生產者)將信息投遞到Server端的RabbitMQ的Exchange中(過程:message->server->virtual host->RabbitMQ->Exchange),Consumer(消費者)只需要訂閱消息隊列Message Queue,每當有消息投遞到隊列queue中時,都會通知消費者進行消費。
生產者只需要將消息投遞到Exchange交換機中,不需要關注消息被投遞到哪個隊列。
消費者只需要監聽隊列來消費消息,不需要關注消息來自于哪個Exchange。
Exchange和Message Queue存在著綁定的關系,一個Exchage可以綁定多個消息隊列。
RabbitMQ常用的交換器類型主要有四種:direct、fanout、topic、headers ,Exchange分發消息時根據類型的不同分發策略有區別,headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,這里僅僅對其他三種進行展開說明:
1.direct
direct類型的交換器理由規則需要遵循嚴格的完全匹配規則,他會把消息路由到那些BindingKey和RoutingKey完全匹配的隊里中,如果消息中的路由鍵(RoutingKey)如果和 Binding 中的 BindingKey 一致, 交換器就將消息發到對應的隊列中。比如:
在發送消息的時候設置路由鍵為“info”或者“debug”,消息只會路由到Queue2,如果以其他路郵件發送消息,則消息不會路由到這兩個隊里中,這就是路由鍵和Binding key的完全匹配。
direct類型模式的特點:
(1) 不需要將Exchange進行任何綁定(binding)操作
(2) 消息傳遞時需要一個“RoutingKey”,可以簡單的理解為要發送到的隊列名字。
(3) 如果vhost中不存在RoutingKey中指定的隊列名,則該消息會被拋棄。
2.fanout
它會將所有發送到該交換器的消息路由到所有與該交換器綁定的隊列中。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發送到交換器的消息都會被轉發到與該交換器綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息。fanout 類型轉發消息是最快的。
這種模式的特點:
(1) 不需要RoutingKey,我們可以將路由鍵設置為空即可。
(2) 需要提前將Exchange和Queue進行綁定,一個Exchange可以綁定多個Queue,一個Queue可以同時與多個Exchange進行綁定("多對多關系")。
(3) 如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。
3.topic
direct類型的交換器理由規則需要遵循嚴格的完全匹配規則,這種嚴格的匹配方式有時候不能滿足實際的業務需求,topic就是在這種規則上進行了擴展,topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,但是這里的匹配規則有所不同,它的約定如下:
RoutingKey為一個點號 ".",分隔的字符串(被點號 "."號分隔開的一段獨立的字符串稱為單詞),比如:"com.rabbit.client"、"java.util.Map";
BindingKey和RoutingKey為一個點號 ".",分隔的字符串;
BindingKey中可以存在兩種特殊的字符串 "*"和 "#",用于做模糊匹配,其中"*"用于匹配一個單詞, "#" 用于匹配多規則單詞;
"com.#" 可以匹配到 com.rabbitmq.aaa
"com.*" 可以匹配到 com.rabbitmq
舉個實例:
路由鍵為 "com.rabbitmq.client" 的消息會同時路由到 Queue1 和 Queue2
路由鍵為 "com.hidden.client" 的消息會只會路由到 Queue2
路由鍵為 "com.hidden.data" 的消息會只會路由到 Queue2
路由鍵為 "java.rabbitmq.data" 的消息會只會路由到 Queue1
路由鍵為 "java.util.map" 的消息將會被丟棄或者返回給生產者,因為它沒有匹配任何的路由鍵。
RabbitMQ 最初起源于金融系統,用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。具體特點包括:
可靠性
RabbitMQ 使用一些機制來保證可靠性,如持久化、傳輸確認、發布確認。
靈活的路由(Flexible Routing)
在消息進入隊列之前,通過Exchange來路由消息的。對于典型的路由功能,RabbitMQ已經提供了一些內置的Exchange來實現。針對更復雜的路由功能,可以將多個 Exchange 綁定在一起,也通過插件機制實現自己的Exchange 。
消息集群(Clustering)
多個 RabbitMQ 服務器可以組成一個集群,形成一個邏輯 Broker 。
高可用
隊列可以在集群中的機器上進行鏡像,使得在部分節點出問題的情況下隊列仍然可用。
多種協議(Multi-protocol)
RabbitMQ 支持多種消息隊列協議,比如 STOMP、MQTT 等等。
多語言客戶端
RabbitMQ 幾乎支持所有常用語言,比如 Java、.NET、Ruby 等等。
管理界面
RabbitMQ 提供了一個易用的用戶界面,使得用戶可以監控和管理消息 Broker 的許多方面。
跟蹤機制
如果消息異常,RabbitMQ 提供了消息跟蹤機制,使用者可以找出發生了什么。
插件機制
RabbitMQ 提供了許多插件,來從多方面進行擴展,也可以編寫自己的插件。。
所有 MQ 產品從模型抽象上來說都是一樣的過程:
消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然后發布到隊列(queue)中,最后將消息發送到監聽的消費者。
消息流
上面只是最簡單抽象的描述,具體到 RabbitMQ 則有更詳細的概念需要解釋。上面介紹過 RabbitMQ 是 AMQP 協議的一個開源實現,所以其內部實際上也是 AMQP 中的基本概念:
RabbitMQ 內部結構
“Hello RabbitMQ World!”,學習一門技術,先出hello world開始,我們來編寫一個Java項目來使用RabbitMQ來實現消息的生產和消費,這樣能讓我們能夠更好的理解RabbitMQ的作用和原理,RabbitMQ是消息代理,它負責接收并轉發消息,我們可以將它視為郵局,將要發送的郵件都放在郵箱中,可以確保 Mailperson 先生或者女生最終將郵件傳遞給別人,因此:RabbitMQ是一個郵箱,一個郵局和一個郵遞員的實例。
生產意味著發送,發送消息的程序是生產者
隊列就是RabbitMQ內部的郵箱名稱,消息是存儲在隊列中的,盡管消息流經RabbitMQ和你的應用程序,生產者可以發送一個隊列信息,許多消費者可以嘗試從一個隊列里接收數據
消費與接受者是同一個身份,一個消費者是一個程序,主要是等待接收信息
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
我們稱其為消息發布者(發送者)MessageProducer和我們的消息消費者(接收者) MessageConsumer。發布者將連接到RabbitMQ,發送響應的消息:
MassageProducer (消息生產者)
/** * @Author 林必昭 * @Date 2019/11/29 22:12 * @descr */ public class MassageProducer { private final static String QUEUE_NAME = "myQueue"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); //設置RabbitMQ所在主機的ip或者主機名 factory.setHost("localhost"); //設置端口號 factory.setPort(5672); Connection conn = factory.newConnection(); //創建一個通道 Channel channel = conn.createChannel(); //指定一個隊列 channel.queueDeclare(QUEUE_NAME,false,false,false,null); //定義要發送的消息 String message = "Hello RabbitMQ World!"; //往隊列里發送message消息 channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [生產者] 發送消息: '" + message + "'"); } }
MessageConsumer (消息消費者)
/** * @Author 林必昭 * @Date 2019/11/29 22:12 * @descr */ public class MessageConsumer { private final static String QUEUE_NAME = "myQueue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); Connection conn = factory.newConnection(); Channel channel = conn.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("消費者正在等待消息,退出請按CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [消費者] 接收到了: '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
在運行程序之前我們需要先啟動RabbitMQ服務,前提需要安裝好Erlang和RabbitMQ,這里安裝的教程就不展開說明了:
下載Erlang軟件包 (這個網址下載速度比較快)
RabbitMQ官網
注意的是:這里必須下載安裝Erlang安裝包,因為RabbitMQ是基于Erlang進行開發的。
啟動服務之后,如圖所示:
接著我們運行我們的MessgeProducer.java和MessageConsumer.java,先運行MessgeProcucer,接著運行MessageConsumer,觀察控制臺的輸出:
至此,我們的第一個RabbitMQ服務程序編程測試成功了 !
啟動之后,我們可以登錄RabbitMQ的web客戶端查看我們的信息:http://localhost:15672/,通過客戶端我們可以查看RabbitMQ的版本信息,連接信息,Channel、Exchange連接等等...
我們也可以通過客戶端新增Exchange并指定類型:
同樣也可以手動添加隊列,具體操作我們可以自己嘗試登陸客戶端來設置!
在終端通過命令查看所有的Exchange:rabbitmqctl list_exchanges
關于怎么實現RabbitMQ消息中間件的工作原理和使用就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。