您好,登錄后才能下訂單哦!
前言
消息隊列在現今數據量超大,并發量超高的系統中是十分常用的。本文將會對現時最常用到的幾款消息隊列框架?ActiveMQ、RabbitMQ、Kafka 進行分析對比。
詳細介紹 RabbitMQ 在 Spring 框架下的結構及實現原理,從Producer 端的事務、回調函數(ConfirmCallback / ReturnCallback)到 Consumer 端的?MessageListenerContainer 信息接收容器進行詳細的分析。通過對 RabbitTemplate、SimpleMessageListenerContainer、DirectMessageListenerContainer 等常用類型介紹,深入剖析在消息處理各個傳輸環節中的原理及注意事項。
并舉以實例對死信隊列、持久化操作進行一一介紹。
目錄
一、RabbitMQ 與 AMQP 的關系
二、RabbitMQ 的實現原理
三、RabbitMQ 應用實例
四、Producer 端的消息發送與監控
五、Consumer 消息接收管控?
六、死信隊列
七、持久化操作
AMQP(Advanced Message Queue Protocol 高級消息隊列協議)是一個消息隊列協議,它支持符合條件的客戶端和消息代理中間件(message middleware broker)進行通訊。RabbitMQ 則是 AMQP 協議的實現者,主要用于在分布式系統中信息的存儲發送與接收,RabbitMQ 的服務器端用 Erlang 語言編寫,客戶端支持多種開發語言:Python、.NET、Java、Ruby、C、PHP、ActionScript、XMPP、STOMP 等。
1.1? ?ActiveMQ、RabbitMQ、Kafka 對比
現在在市場上有 ActiveMQ、RabbitMQ、Kafka 等多個常用的消息隊列框架,與其他框架對比起來,RabbitMQ 在易用性、擴展性、高可用性、多協議、支持多語言客戶端等方面都有不俗表現。
?
1.2.1 AcitveMQ 特點
ActiveMQ 是 Apache 以 Java 語言開發的消息模型,它完美地支持?JMS(Java Message Service)消息服務,客戶端支持 Java、C、C++、C#、Ruby、Perl、Python、PHP 等多種開主發語言,支持OpenWire、Stomp、REST、XMPP、AMQP 等多種協議。ActiveMQ 采用異步消息傳遞方式,在設計上保證了多主機集群,客戶端-服務器,點對點等模式的有效通信。從開始它就是按照 JMS 1.1 和 J2EE 1.4 規范進行開發,實現了消息持久化,XA,事務支撐等功能。經歷多年的升級完善,現今已成為 Java 應用開發中主流的消息解決方案。但相比起 RabbitMQ、Kafka 它的主要缺點表現為資源消耗比較大,吞吐量較低,在高并發的情況下系統支撐能力較弱。如果系統全程使用 Java 開發,其并發量在可控范圍內,或系統需要支持多種不同的協議,使用 ActiveMQ 可更輕便地搭建起消息隊列服務。
1.2.2 Kafka 特點
Kafka 天生是面向分布式系統開發的消息隊列,它具有高性能、容災性、可動態擴容等特點。Kafka 與生俱來的特點在于它會把每個Partition 的數據都備份到不同的服務器當中,并與 ZooKeeper 配合,當某個Broker 故障失效時,ZooKeeper 服務就會將通知生產者和消費者,從備份服務器進行數據恢復。在性能上 Kafka 也大大超越了傳統的 ActiveMQ、RabbitMQ ,由于?Kafka 集群可支持動態擴容,在負載量到達峰值時可動態增加新的服務器進集群而無需重啟服務。但由于 Kafka 屬于分布式系統,所以它只能在同一分區內實現消息有序,無法實現全局消息有序。而且它內部的監控機制不夠完善,需要安裝插件,依賴ZooKeeper 進行元數據管理。如果系統屬于分布式管理機制,數據量較大且并發量難以預估的情況下,建議使用 Kafka 隊列。
1.2.3 RabbitMQ 對比
由于 ActiveMQ 過于依賴 JMS 的規范而限制了它的發展,所以 RabbitMQ 在性能和吞吐量上明顯會優于 ActiveMQ。
由于上市時間較長,在可用性、穩定性、可靠性上 RabbitMq 會比 Kafka 技術成熟,而且 RabbitMq 使用 Erlang 開發,所以天生具備高并發高可用的特點。而 Kafka 屬于分布式系統,它的性能、吞吐量、TPS 都會比 RabbitMq 要強。
2.1 生產者(Producer)、消費者(Consumer)、服務中心(Broker)之間的關系
首先簡單介紹 RabbitMQ 的運行原理,在 RabbitMQ 使用時,系統會先安裝并啟動 Broker Server,也就是 RabbitMQ 的服務中心。無論是生產者 (Producer),消費者(Consumer)都會通過連接池(Connection)使用 TCP/IP 協議(默認)來與 BrokerServer 進行連接。然后 Producer 會把 Exchange / Queue 的綁定信息發送到 Broker Server,Broker Server 根據 Exchange 的類型邏輯選擇對應 Queue ,最后把信息發送到與 Queue 關聯的對應 Consumer 。
?
2.2 交換器(Exchange)、隊列(Queue)、信道(Channel)、綁定(Binding)的概念
2.2.1 交換器 Exchange
Producer 建立連接后,并非直接將消息投遞到隊列 Queue 中,而是把消息發送到交換器 Exchange,由 Exchange 根據不同邏輯把消息發送到一個或多個對應的隊列當中。目前 Exchange 提供了四種不同的常用類型:Fanout、Direct、Topic、Header。
Fanout類型
此類型是最為常見的交換器,它會將消息轉發給所有與之綁定的隊列上。比如,有N個隊列與 Fanout 交換器綁定,當產生一條消息時,Exchange 會將該消息的N個副本分別發給每個隊列,類似于廣播機制。
Direct類型
此類型的 Exchange 會把消息發送到 Routing_Key 完全相等的隊列當中。多個 Cousumer 可以使用相同的關鍵字進行綁定,類似于數據庫的一對多關系。比如,Producer 以 Direct 類型的 Exchange 推送 Routing_Key 為 direct.key1 的隊列,系統再指定多個 Cousumer 綁定 direct.key1。如此,消息就會被分發至多個不同的 Cousumer 當中。
Topic類型
此類型是最靈活的一種方式配置方式,它可以使用模糊匹配,根據 Routing_Key 綁定到包含該關鍵字的不同隊列中。比如,Producer 使用 Topic類型的 Exchange 分別推送 Routing_Key 設置為?topic.guangdong.guangzhou?、topic.guangdong.shenzhen 的不同隊列,Cousumer 只需要把 Routing_Key 設置為 topic.guangdong.# ,就可以把所有消息接收處理。
Headers類型
該類型的交換器與前面介紹的稍有不同,它不再是基于關鍵字 Routing_Key 進行路由,而是基于多個屬性進行路由的,這些屬性比路由關鍵字更容易表示為消息的頭。也就是說,用于路由的屬性是取自于消息 Header 屬性,當消息 Header 的值與隊列綁定時指定的值相同時,消息就會路由至相應的隊列中。
2.2.2 Queue 隊列
Queue 隊列是消息的載體,每個消息都會被投入到 Queue 當中,它包含 name,durable,arguments 等多個屬性,name 用于定義它的名稱,當 durable(持久化)為 true 時,隊列將會持久化保存到硬盤上。反之為 false 時,一旦 Broker Server 被重啟,對應的隊列就會消失,后面還會有例子作詳細介紹。
2.2.3 Channel 通道
當 Broker Server 使用 Connection 連接 Producer / Cousumer 時會使用到信道(Channel),一個 Connection上可以建立多個 Channel,每個 Channel 都有一個會話任務,可以理解為邏輯上的連接。主要用作管理相關的參數定義,發送消息,獲取消息,事務處理等。
2.2.4 Binding 綁定
Binding 主要用于綁定交換器 Exchange 與 隊列 Queue 之間的對應關系,并記錄路由的 Routing-Key。Binding 信息會保存到系統當中,用于 Broker Server 信息的分發依據。
3.1 Rabbit 常用類說明
3.1.1 RabbitTemplate 類
Spring 框架已經封裝了 RabbitTemplate? 對 RabbitMQ 的綁定、隊列發送、接收進行簡化管理
方法 | 說明 |
void setExchange(String exchange)?? | 設置綁定的 exchange 名稱 |
String getExchange() | 獲取已綁定的 exchange 名稱 |
void setRoutingKey(String routingKey) | 設置綁定的 routingKey |
String getRoutingKey() | 獲取已綁定的 routingKey |
void send(String exchange, String routingKey, Message message,CorrelationData data) | 以Message方式發送信息到 Broken Server,CorrelationData 為標示符可為空 |
void convertAndSend(String exchange, String routingKey, Object object, CorrelationData data) | 以自定義對象方式發送信息到 Broken Server,系統將自動把 object轉換成 Message,CorrelationData 為標示符可為空 |
Message receive(String queueName, long timeoutMillis) | 根據queueuName接收隊列發送Message信息 |
Object receiveAndConvert(String queueName, long timeoutMillis) | 根據queueuName接收隊列對象信息 |
void setReceiveTimeout(long receiveTimeout) | 設置接收過期時間 |
void setReplyTimeout(long replyTimeout) | 設置重發時間 |
void setMandatory(boolean mandatory) | 開啟強制委托模式(下文會詳細說明) |
void setConfirmCallback(confirmCallback) | 綁定消息確認回調方法(下文會詳細說明) |
void setReturnCallback(returnCallback) | 綁定消息退出回調方法(下文會詳細說明) |
3.2? 初探 RabbitMQ?
在官網下載并成功安裝完 RabbitMQ 后,打開默認路徑?http://localhost:15672/#/?即可看到 RabbitMQ 服務中心的管理界面
3.2.1 Producer 端開發
先在 pom 中添加 RabbitMQ 的依賴,并在 application.yml 中加入 RabbitMQ 帳號密碼等信息。此例子,我們嘗試使用 Direct 交換器把隊列發送到不同的 Consumer。
?1?**********************pom?************************* ?2?<project> ?3?????????............. ?4?????<dependency> ?5?????????<groupId>org.springframework.boot</groupId> ?6?????????<artifactId>spring-boot-starter-amqp</artifactId> ?7?????????<version>2.0.5.RELEASE</version> ?8?????</dependency> ?9?</project>10? 11?****************??application.yml??**************** 12?spring: 13???application: 14??????name:?rabbitMqProducer 15???rabbitmq: 16?????host:?localhost? 17?????port:?5672 18?????username:?admin 19?????password:?12345678 20?????virtual-host:?/LeslieHost
首先使用 CachingConnectionFactory 建立鏈接,通過 BindingBuilder 綁定 Exchange、Queue、RoutingKey之間的關系。
然后通過 void convertAndSend (String exchange, String routingKey, Object object, CorrelationData data) 方法把信息發送到 Broken Server
?1?@Configuration ?2?public?class?ConnectionConfig?{ ?3?????@Value("${spring.rabbitmq.host}") ?4?????public?String?host; ?5????? ?6?????@Value("${spring.rabbitmq.port}") ?7?????public?int?port; ?8????? ?9?????@Value("${spring.rabbitmq.username}") 10?????public?String?username; 11????? 12?????@Value("${spring.rabbitmq.password}") 13?????public?String?password; 14????? 15?????@Value("${spring.rabbitmq.virtual-host}") 16?????public?String?virtualHost; 17? 18?????@Bean 19?????public?ConnectionFactory?getConnectionFactory(){ 20?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); 21?????????factory.setHost(host); 22?????????factory.setPort(port); 23?????????factory.setUsername(username); 24?????????factory.setPassword(password); 25?????????factory.setVirtualHost(virtualHost); 26?????????return?factory; 27?????} 28?} 29? 30?@Configuration 31?public?class?BindingConfig?{ 32?????public?final?static?String?first="direct.first"; 33?????public?final?static?String?second="direct.second"; 34?????public?final?static?String?Exchange_NAME="directExchange"; 35?????public?final?static?String?RoutingKey1="directKey1"; 36?????public?final?static?String?RoutingKey2="directKey2"; 37????? 38?????@Bean 39?????public?Queue?queueFirst(){ 40?????????return?new?Queue(first); 41?????} 42????? 43?????@Bean 44?????public?Queue?queueSecond(){ 45?????????return?new?Queue(second); 46?????} 47????? 48?????@Bean 49?????public?DirectExchange?directExchange(){ 50?????????return?new?DirectExchange(Exchange_NAME,true,true); 51?????} 52????? 53?????//利用BindingBuilder綁定Direct與queueFirst 54?????@Bean 55?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ 56?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 57?????} 58????? 59?????//利用BindingBuilder綁定Direct與queueSecond 60?????@Bean 61?????public?Binding?bindingExchangeSecond(Queue?queueSecond,?DirectExchange?directExchange){??????? 62?????????return?BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 63?????}??? 64?} 65? 66?@Controller 67?@RequestMapping("/producer") 68?public?class?ProducerController?{ 69?????@Autowired 70?????private?RabbitTemplate?template; 71????? 72?????@RequestMapping("/send") 73?????public?void?send()?{ 74?????????for(int?n=0;n<100;n++){??? 75? 76?????????????template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"I'm?the?first?queue!???"+String.valueOf(n),getCorrelationData()); 77?????????????template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey2,"I'm?the?second?queue!??"+String.valueOf(n),getCorrelationData()); 78?????????} 79?????} 80? 81??????private?CorrelationData?getCorrelationData(){ 82?????????return?new?CorrelationData(UUID.randomUUID().toString()); 83?????} 84?}
此時,打開 RabbitMQ 管理界面,可看到 Producer 已經向 Broken Server 的 direct.first / direct.second 兩個 Queue 分別發送100 個 Message
3.2.2? Consumer 端開發
分別建立兩個不同的 Consumer ,一個綁定 direct.first 別一個綁定 direct.second , 然后通過注解 @RabbitListener 監聽不同的 queue,當接到到 Producer 推送隊列時,顯示隊列信息。
?1?@Configuration ?2?public?class?ConnectionConfig?{ ?3?????@Value("${spring.rabbitmq.host}") ?4?????public?String?host; ?5????? ?6?????@Value("${spring.rabbitmq.port}") ?7?????public?int?port; ?8????? ?9?????@Value("${spring.rabbitmq.username}") 10?????public?String?username; 11????? 12?????@Value("${spring.rabbitmq.password}") 13?????public?String?password; 14????? 15?????@Value("${spring.rabbitmq.virtual-host}") 16?????public?String?virtualHost; 17? 18?????@Bean 19?????public?ConnectionFactory?getConnectionFactory(){ 20?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); 21?????????factory.setHost(host); 22?????????factory.setPort(port); 23?????????factory.setUsername(username); 24?????????factory.setPassword(password); 25?????????factory.setVirtualHost(virtualHost); 26?????????return?factory; 27?????} 28?} 29? 30?@Configuration 31?public?class?BindingConfig?{ 32?????public?final?static?String?first="direct.first"; 33?????public?final?static?String?Exchange_NAME="directExchange"; 34?????public?final?static?String?RoutingKey1="directKey1"; 35????? 36?????@Bean 37?????public?Queue?queueFirst(){ 38?????????return?new?Queue(first); 39?????} 40????? 41?????@Bean 42?????public?DirectExchange?directExchange(){ 43?????????return?new?DirectExchange(Exchange_NAME); 44?????} 45????? 46?????//利用BindingBuilder綁定Direct與queueFirst 47?????@Bean 48?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ 49?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 50?????}?? 51?} 52? 53?@Configuration 54?@RabbitListener(queues="direct.first") 55?public?class?RabbitMqListener?{ 56????? 57?????@RabbitHandler 58?????public?void?handler(String?message){ 59?????????System.out.println(message); 60?????} 61?} 62? 63?@SpringBootApplication 64?public?class?App?{ 65????? 66?????public?static?void?main(String[]?args){ 67?????????SpringApplication.run(App.class,?args); 68?????} 69?}
運行后可以觀察到不同的 Consumer 會收到不同隊列的消息
如果覺得使用 Binding 代碼綁定過于繁瑣,還可以直接在監聽類RabbitMqListener中使用 @QueueBinding 注解綁定
?1?@Configuration ?2?public?class?ConnectionConfig?{ ?3?????@Value("${spring.rabbitmq.host}") ?4?????public?String?host; ?5????? ?6?????@Value("${spring.rabbitmq.port}") ?7?????public?int?port; ?8????? ?9?????@Value("${spring.rabbitmq.username}") 10?????public?String?username; 11????? 12?????@Value("${spring.rabbitmq.password}") 13?????public?String?password; 14????? 15?????@Value("${spring.rabbitmq.virtual-host}") 16?????public?String?virtualHost; 17? 18?????@Bean 19?????public?ConnectionFactory?getConnectionFactory(){ 20?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); 21?????????factory.setHost(host); 22?????????factory.setPort(port); 23?????????factory.setUsername(username); 24?????????factory.setPassword(password); 25?????????factory.setVirtualHost(virtualHost); 26?????????return?factory; 27?????} 28?} 29? 30?@Configuration 31?@RabbitListener(bindings=@QueueBinding( 32?exchange=@Exchange(value="directExchange"), 33?value=@Queue(value="direct.second"), 34?key="directKey2")) 35?public?class?RabbitMqListener?{ 36????? 37?????@RabbitHandler 38?????public?void?handler(String?message){ 39?????????System.out.println(message); 40?????} 41?} 42? 43?@SpringBootApplication 44?public?class?App?{ 45????? 46?????public?static?void?main(String[]?args){ 47?????????SpringApplication.run(App.class,?args); 48?????} 49?}
運行結果
前面一節已經介紹了RabbitMQ的基本使用方法,這一節將從更深入的層面講述 Producer 的應用。
試想一下這種的情形,如果因 RabbitTemplate 發送時 Exchange 名稱綁定錯誤,或 Broken Server 因網絡問題或服務負荷過大引發異常,Producer 發送的隊列丟失,系統無法正常工作。此時,開發人員應該進行一系列應對措施進行監測,確保每個數據都能正常推送到 Broken Server 。有見及此,RabbitMQ 專門為大家提供了兩種解決方案,一是使用傳統的事務模式,二是使用回調函數,下面為大家作詳介紹。
4.1 Producer 端的事務管理
在需要使用事務時,可以通過兩種方法
第一可以調用 channel 類的方法以傳統模式進行管理,事務開始時調用 channel.txSelect(),信息發送后進行確認 channel.txCommit(),一旦捕捉到異常進行回滾 channel.txRollback(),最后關閉事務。
?1?@Controller ?2?@RequestMapping("/producer") ?3?public?class?ProducerController?{ ?4?????@Autowired ?5?????private?RabbitTemplate?template; ?6?? ?7?????@RequestMapping("/send") ?8?????public?void?send1(HttpServletResponse?response)? ?9?????????throws?InterruptedException,?IOException,??TimeoutException{ 10?????????Channel?channel=template.getConnectionFactory().createConnection().createChannel(true); 11?????????....... 12?????????try{ 13?????????????channel.txSelect(); 14?????????????channel.basicPublish("ErrorExchange",?BindingConfig.Routing_Key_First,?new?AMQP.BasicProperties(),"Nothing".getBytes()); 15?????????????channel.txCommit(); 16?????????}catch(Exception?e){ 17?????????????channel.txRollback(); 18?????????}finally{ 19?????????????channel.close(); 20?????????} 21?????????...... 22?????????...... 23?????????...... 24?????} 25?}
第二還可以直接通過 RabbitTemplate 的配置方法 void?setChannelTransacted(bool isTransacted) 直接開啟事務
?1?public?class?ProducerController?{ ?2?????@Autowired ?3?????private?ConnectionConfig?connection; ?4? ?5?????@Autowired ?6?????@Bean ?7?????private?RabbitTemplate?template(){ ?8?????????RabbitTemplate?template=new?RabbitTemplate(connection.getConnectionFactory()); ?9?????????template.setChannelTransacted(true); 10?????????return?template; 11?????} 12?? 13?????@RequestMapping("/send") 14?????@Transactional(rollbackFor=Exception.class) 15?????public?void?send(HttpServletResponse?response)?throws?InterruptedException,?IOException,TimeoutException{ 16?????????.......... 17?????????.......... 18?????????.......... 19?????} 20?}
4.2 利用 ConfirmCallback 回調確認消息是否成功發送到 Exchange?
使用事務模式消耗的系統資源比較大,系統往往會處理長期等待的狀態,在并發量較高的時候也有可能造成死鎖的隱患。有見及此,系統提供了輕量級的回調函數方式進行異步處理。
當需要確認消息是否成功發送到 Exchange 的時候,可以使用 ConfirmCallback 回調函數。使用該函數,系統推送消息后,該線程便會得到釋放,等 Exchange 接收到消息后系統便會異步調用 ConfirmCallback 綁定的方法進行處理。ConfirmCallback 只包含一個方法 void confirm(CorrelationData correlationData, boolean ack, String cause),此方法會把每條數據發送到 Exchange 時候的 ack 狀態(成功/失敗),cause 成敗原因,及對應的 correlationData(CorrelationData 只包含一個屬性 id,是綁定發送對象的唯一標識符) 返還到 Producer,讓Producer 進行相應處理。
注意:在綁定 ConfirmCallback 回調函數前,請先把? publisher-confirms 屬性設置為 true
?1?spring: ?2???application: ?3??????name:?rabbitmqproducer ?4???rabbitmq: ?5?????host:?127.0.0.1? ?6?????port:?5672 ?7?????username:?admin ?8?????password:?12345678 ?9?????virtual-host:?/LeslieHost
例如:下面的例子,特意將 RabbitTemplate 發送時所綁定的 Exchange 名稱填寫為錯誤名稱 “ ErrorExchange ”,造成發送失敗,然后在回調函數中檢查失敗的原因。
Producer 端代碼:?
?1?@Configuration ?2?public?class?ConnectionConfig?{ ?3?????@Value("${spring.rabbitmq.host}") ?4?????public?String?host; ?5????? ?6?????@Value("${spring.rabbitmq.port}") ?7?????public?int?port; ?8????? ?9?????@Value("${spring.rabbitmq.username}") 10?????public?String?username; 11????? 12?????@Value("${spring.rabbitmq.password}") 13?????public?String?password; 14????? 15?????@Value("${spring.rabbitmq.virtual-host}") 16?????public?String?virtualHost; 17? 18?????@Bean 19?????public?ConnectionFactory?getConnectionFactory(){ 20?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); 21?????????System.out.println(host); 22?????????factory.setHost(host); 23?????????factory.setPort(port); 24?????????factory.setUsername(username); 25?????????factory.setPassword(password); 26?????????factory.setVirtualHost(virtualHost); 27?????????factory.setPublisherConfirms(true); 28?????????factory.setPublisherReturns(true); 29?????????return?factory; 30?????} 31?} 32? 33?@Configuration 34?public?class?BindingConfig?{ 35?????public?final?static?String?first="direct.first"; 36?????public?final?static?String?Exchange_NAME="directExchange"; 37?????public?final?static?String?RoutingKey1="directKey1"; 38????? 39?????@Bean 40?????public?Queue?queueFirst(){ 41?????????return?new?Queue(first); 42?????} 43? 44?????@Bean 45?????public?DirectExchange?directExchange(){ 46?????????return?new?DirectExchange(Exchange_NAME); 47?????} 48????? 49?????@Bean 50?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ 51?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 52?????}?? 53?} 54? 55?@Component 56?public?class?MyConfirmCallback?implements?ConfirmCallback?{ 57????? 58?????@Override 59?????public?void?confirm(CorrelationData?correlationData,?boolean?ack,?String?cause)?{ 60?????????//?TODO?自動生成的方法存根 61?????????//?TODO?自動生成的方法存根 62?????????if(ack){ 63?????????????System.out.println(correlationData.getId()+"?ack?is:?true!?\ncause:"+cause); 64?????????}else 65?????????????System.out.println(correlationData.getId()+"?ack?is:?false!?\ncause:"+cause); 66?????} 67?} 68? 69?@Controller 70?@RequestMapping("/producer") 71?public?class?ProducerController?{ 72?????@Autowired 73?????private?RabbitTemplate?template; 74?????@Autowired 75?????private?MyConfirmCallback?confirmCallback; 76? 77?????@RequestMapping("/send") 78?????public?void?send()?{ 79?????????template.setConfirmCallback(confirmCallback);??????? 80?????????for(int?n=0;n<2;n++){??? 81?????????????template.convertAndSend("ErrorExchange", 82??????????????????????BindingConfig.RoutingKey1,"I'm?the?first?queue!???" 83??????????????????????+String.valueOf(n),getCorrelationData()); 84?????????} 85?????} 86? 87??????private?CorrelationData?getCorrelationData(){ 88?????????return?new?CorrelationData(UUID.randomUUID().toString()); 89?????} 90?}
Consumer端代碼
?1?@Configuration ?2?public?class?ConnectionConfig?{ ?3?????@Value("${spring.rabbitmq.host}") ?4?????public?String?host; ?5????? ?6?????@Value("${spring.rabbitmq.port}") ?7?????public?int?port; ?8????? ?9?????@Value("${spring.rabbitmq.username}") 10?????public?String?username; 11????? 12?????@Value("${spring.rabbitmq.password}") 13?????public?String?password; 14????? 15?????@Value("${spring.rabbitmq.virtual-host}") 16?????public?String?virtualHost; 17? 18?????@Bean 19?????public?ConnectionFactory?getConnectionFactory(){ 20?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); 21?????????factory.setHost(host); 22?????????factory.setPort(port); 23?????????factory.setUsername(username); 24?????????factory.setPassword(password); 25?????????factory.setVirtualHost(virtualHost); 26?????????return?factory; 27?????} 28?} 29? 30?@Configuration 31?@RabbitListener(bindings=@QueueBinding( 32?exchange=@Exchange(value="directExchange"), 33?value=@Queue(value="direct.first"), 34?key="directKey1")) 35?public?class?RabbitMqListener?{ 36????? 37?????@RabbitHandler 38?????public?void?handler(String?message){ 39?????????System.out.println(message); 40?????} 41?} 42? 43?@SpringBootApplication 44?public?class?App?{ 45????? 46?????public?static?void?main(String[]?args){ 47?????????SpringApplication.run(App.class,?args); 48?????} 49?}
運行結果:
4.3 綁定 CorrelationData 與發送對象的關系
上面的例子當中,CorrelationData 只是用一個隨機的 UUID 作為 CorrelationID,而在現實的應用場景中,由于?ConfirmCallback 只反回標識值 CorrelationData,而沒有把隊列里的對象值也一同返回。所以,在推送隊列時可以先用 Key-Value 保存 CorrelationID 與所發送信息的關系,這樣當 ConfirmCallback 回調時,就可根據 CorrelationID 找回對象,作進一步處理。
下面例子,我們把要發送的對象放在虛擬數據 DataSource 類中,用 DataRelation 記錄?CorrelationID 與發送對象 OrderID 的關系,然后在回調函數?ConfirmCallback 中根據 CorrelationID 查找對應的 OrderEntity,如果發送成功,則刪除綁定。如果發送失敗,可以重新發送或根據情況再作處理。
Producer端代碼:
??1?@Configuration ??2?public?class?ConnectionConfig?{ ??3?????@Value("${spring.rabbitmq.host}") ??4?????public?String?host; ??5????? ??6?????@Value("${spring.rabbitmq.port}") ??7?????public?int?port; ??8????? ??9?????@Value("${spring.rabbitmq.username}") ?10?????public?String?username; ?11????? ?12?????@Value("${spring.rabbitmq.password}") ?13?????public?String?password; ?14????? ?15?????@Value("${spring.rabbitmq.virtual-host}") ?16?????public?String?virtualHost; ?17? ?18?????@Bean ?19?????public?ConnectionFactory?getConnectionFactory(){ ?20?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); ?21?????????System.out.println(host); ?22?????????factory.setHost(host); ?23?????????factory.setPort(port); ?24?????????factory.setUsername(username); ?25?????????factory.setPassword(password); ?26?????????factory.setVirtualHost(virtualHost); ?27?????????factory.setPublisherConfirms(true); ?28?????????factory.setPublisherReturns(true); ?29?????????return?factory; ?30?????} ?31?} ?32? ?33?@Configuration ?34?public?class?BindingConfig?{ ?35?????public?final?static?String?first="direct.first"; ?36?????//Exchange?使用?direct?模式????? ?37?????public?final?static?String?Exchange_NAME="directExchange"; ?38?????public?final?static?String?RoutingKey1="directKey1"; ?39????? ?40?????@Bean ?41?????public?Queue?queueFirst(){ ?42?????????return?new?Queue(first); ?43?????} ?44????? ?45?????@Bean ?46?????public?DirectExchange?directExchange(){ ?47?????????return?new?DirectExchange(Exchange_NAME); ?48?????} ?49????? ?50?????@Bean ?51?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ ?52?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); ?53?????} ?54?} ?55? ?56?@Data ?57?public?class?OrderEntity?implements?Serializable{ ?58?????private?String?id; ?59?????private?String?goods; ?60?????private?Double?price; ?61?????private?Integer?count; ?62????? ?63?????public?OrderEntity(String?id,String?goods,Double?price,Integer?count){ ?64?????????this.id=id; ?65?????????this.goods=goods; ?66?????????this.price=price; ?67?????????this.count=count; ?68?????} ?69????? ?70?????public?OrderEntity(){} ?71????? ?72?????public?String?getId()?{ ?73?????????return?id; ?74?????} ?75?????public?void?setId(String?id)?{ ?76?????????this.id?=?id; ?77?????} ?78? ?79?????public?String?getGoods()?{ ?80?????????return?goods; ?81?????} ?82? ?83?????public?void?setGoodsId(String?goods)?{ ?84?????????this.goods?=?goods; ?85?????} ?86? ?87?????public?Integer?getCount()?{ ?88?????????return?count; ?89?????} ?90? ?91?????public?void?setCount(Integer?count)?{ ?92?????????this.count?=?count; ?93?????} ?94? ?95?????public?Double?getPrice()?{ ?96?????????return?price; ?97?????} ?98? ?99?????public?void?setPrice(Double?price)?{ 100?????????this.price?=?price; 101?????} 102?} 103? 104?@Component 105?public?class?DataSource?{ 106?????//加入虛擬數據 107?????private?static?List<OrderEntity>?list=new?ArrayList<OrderEntity>( 108?????????????Arrays.asList(new?OrderEntity("001","Nikon?D750",13990.00,1), 109???????????????????????????new?OrderEntity("002","Huwei?P30?Plus",5400.00,1), 110???????????????????????????..........)); 111????? 112?????public?DataSource(){ 113?????} 114????? 115?????public?List<OrderEntity>?getOrderList(){ 116?????????return?list; 117?????} 118????? 119?????//根據Id獲取對應order 120?????public?OrderEntity?getOrder(String?id){ 121?????????for(OrderEntity?order:list){ 122?????????????if(order.getId()==id) 123?????????????????return?order; 124?????????} 125?????????return?null; 126?????} 127?} 128? 129?public?class?DataRelation?{ 130?????public?static?Map?map=new?HashMap(); 131????? 132?????//綁定關系 133?????public?static?void?add(String?key,String?value){ 134?????????if(!map.containsKey(key)) 135?????????????map.put(key,value); 136?????} 137????? 138?????//返回orderId 139?????public?static?Object?get(String?key){ 140?????????if(map.containsKey(key)) 141?????????????return?map.get(key); 142?????????else 143?????????????return?null; 144?????} 145????? 146?????//根據?orderId?刪除綁定關系 147?????public?static?void?del(String?key){ 148?????????if(map.containsKey(key)) 149????????????map.remove(key); 150?????} 151?} 152? 153?@Component 154?public?class?MyConfirmCallback?implements?ConfirmCallback?{ 155?????@Autowired 156?????private?DataSource?datasource; 157????? 158?????@Override 159?????public?void?confirm(CorrelationData?correlationData,?boolean?ack,?String?cause)?{ 160?????????String?correlationId=correlationData.getId(); 161?????????//根據?correclationId取回對應的orderId 162?????????String?orderId=DataRelation.get(correlationId).toString(); 163?????????//在datasource中找回對應的order 164?????????OrderEntity?order=datasource.getOrder(orderId); 165????????? 166?????????if(ack){ 167?????????????System.out.println("--------------------ConfirmCallback-------------------\n"????????????????? 168?????????????????+"?order's?ack?is?true!\nId:"+order.getId()+"??Goods:"+order.getGoods() 169?????????????????+"?Count:"+order.getCount().toString()+"??Price:"+order.getPrice()); 170?????????????DataRelation.del(correlationId);????//操作完成刪除對應綁定 171?????????}else?{ 172?????????????System.out.println(order.getId()+"?order's?ack?is:?false!?\ncause:"+cause); 173?????????????//可在記錄日志后把Order推送到隊列進行重新發送 174?????????????....... 175?????????} 176?????} 177?} 178? 179?@Controller 180?@RequestMapping("/producer") 181?public?class?ProducerController?{ 182?????@Autowired 183?????private?RabbitTemplate?template; 184?????@Autowired 185?????private?MyConfirmCallback?confirmCallback; 186?????@Autowired 187?????private?DataSource?dataSource; 188? 189?????@RequestMapping("/send") 190?????public?void?send()?throws?InterruptedException,?IOException{ 191?????????//綁定?ConfirmCallback?回調函數 192?????????template.setConfirmCallback(confirmCallback); 193?? 194?????????for(OrderEntity?order:dataSource.getOrderList()){ 195?????????????CorrelationData?correlationData=getCorrelationData(); 196?????????????//保存?CorrelationId?與?orderId關系 197?????????????DataRelation.add(correlationData.getId(),?order.getId()); 198?????????????//把?order?插入隊列 199?????????????template.convertAndSend("directExchange",BindingConfig.RoutingKey1,order,correlationData); 200?????????} 201?????} 202????? 203?????private?CorrelationData?getCorrelationData(){ 204?????????return?new?CorrelationData(UUID.randomUUID().toString()); 205?????} 206?}
Consumer 端代碼
?1?@Configuration ?2?public?class?ConnectionConfig?{ ?3?????@Value("${spring.rabbitmq.host}") ?4?????public?String?host; ?5????? ?6?????@Value("${spring.rabbitmq.port}") ?7?????public?int?port; ?8????? ?9?????@Value("${spring.rabbitmq.username}") 10?????public?String?username; 11????? 12?????@Value("${spring.rabbitmq.password}") 13?????public?String?password; 14????? 15?????@Value("${spring.rabbitmq.virtual-host}") 16?????public?String?virtualHost; 17? 18?????@Bean 19?????public?ConnectionFactory?getConnectionFactory(){ 20?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); 21?????????factory.setHost(host); 22?????????factory.setPort(port); 23?????????factory.setUsername(username); 24?????????factory.setPassword(password); 25?????????factory.setVirtualHost(virtualHost); 26?????????return?factory; 27?????} 28?} 29? 30?@Configuration 31?@RabbitListener(bindings=@QueueBinding( 32?exchange=@Exchange(value="directExchange"), 33?value=@Queue(value="direct.first"), 34?key="directKey1")) 35?public?class?RabbitMqListener?{ 36????? 37?????@RabbitHandler 38?????public?void?handler(String?message){ 39?????????System.out.println(message); 40?????} 41?} 42? 43?@SpringBootApplication 44?public?class?App?{ 45????? 46?????public?static?void?main(String[]?args){ 47?????????SpringApplication.run(App.class,?args); 48?????} 49?}
運行結果
4.4 利用 ReturnCallback 處理隊列 Queue 錯誤
使用 ConfirmCallback 函數只能判斷消息是否成功發送到 Exchange,但并不能保證消息已經成功進行隊列 Queue。所以,系統預備了另一個回調函數 ReturnCallback 來監聽 Queue 隊列處理的成敗。如果隊列錯誤綁定不存在的 queue,或者 Broken Server 瞬間出現問題末能找到對應的 queue,系統就會激發 Producer 端 ReturnCallback 的回調函數來進行錯誤處理。 ReturnCallback 回調接口只包含一個方法 void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey),它會把出錯的 replyCode,replyText,exchange,routingKey等值都一起返還。與 ConfirmCallback 不同的是,returnedMessage 會把隊列中的對象保存到 Message 的 Body 屬性中并返還到回調函數。
注意:在綁定 ReturnCallback 回調函數前,請先把? publisher-returns 及 mandatory 屬性設置為 true 。 mandatory 參數默認為 false,用于判斷 broken server是否把錯誤的對象返還到? Producer。如末進行設置,系統將把錯誤的消息丟棄。
下面例子我們在調用 convertAndSend 方法時特意把?routingKey 設置為 ErrorKey,觸發 ReturnCallback 回調,然后在 ReturenCallback 的回調方法顯示?replyCode,replyText,exchange,routingKey 等值,并把隊列中對象屬性一并顯示。
Producer 端代碼
??1?@Configuration ??2?public?class?ConnectionConfig?{ ??3?????@Value("${spring.rabbitmq.host}") ??4?????public?String?host; ??5????? ??6?????@Value("${spring.rabbitmq.port}") ??7?????public?int?port; ??8????? ??9?????@Value("${spring.rabbitmq.username}") ?10?????public?String?username; ?11????? ?12?????@Value("${spring.rabbitmq.password}") ?13?????public?String?password; ?14????? ?15?????@Value("${spring.rabbitmq.virtual-host}") ?16?????public?String?virtualHost; ?17? ?18?????@Bean ?19?????public?ConnectionFactory?getConnectionFactory(){ ?20?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); ?21?????????System.out.println(host); ?22?????????factory.setHost(host); ?23?????????factory.setPort(port); ?24?????????factory.setUsername(username); ?25?????????factory.setPassword(password); ?26?????????factory.setVirtualHost(virtualHost); ?27?????????factory.setPublisherConfirms(true); ?28?????????factory.setPublisherReturns(true); ?29?????????return?factory; ?30?????} ?31?} ?32? ?33?@Configuration ?34?public?class?BindingConfig?{ ?35?????public?final?static?String?first="direct.first"; ?36?????public?final?static?String?Exchange_NAME="directExchange"; ?37?????public?final?static?String?RoutingKey1="directKey1"; ?38????? ?39?????@Bean ?40?????public?Queue?queueFirst(){ ?41?????????return?new?Queue(first); ?42?????} ?43? ?44?????@Bean ?45?????public?DirectExchange?directExchange(){ ?46?????????return?new?DirectExchange(Exchange_NAME); ?47?????} ?48????? ?49?????@Bean ?50?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ ?51?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); ?52?????}? ?53?} ?54? ?55?@Data ?56?public?class?OrderEntity?implements?Serializable{ ?57?????private?String?id; ?58?????private?String?goods; ?59?????private?Double?price; ?60?????private?Integer?count; ?61????? ?62?????public?OrderEntity(String?id,String?goods,Double?price,Integer?count){ ?63?????????this.id=id; ?64?????????this.goods=goods; ?65?????????this.price=price; ?66?????????this.count=count; ?67?????} ?68????? ?69?????public?OrderEntity(){} ?70????? ?71?????public?String?getId()?{ ?72?????????return?id; ?73?????} ?74?????public?void?setId(String?id)?{ ?75?????????this.id?=?id; ?76?????} ?77? ?78?????public?String?getGoods()?{ ?79?????????return?goods; ?80?????} ?81? ?82?????public?void?setGoodsId(String?goods)?{ ?83?????????this.goods?=?goods; ?84?????} ?85? ?86?????public?Integer?getCount()?{ ?87?????????return?count; ?88?????} ?89? ?90?????public?void?setCount(Integer?count)?{ ?91?????????this.count?=?count; ?92?????} ?93? ?94?????public?Double?getPrice()?{ ?95?????????return?price; ?96?????} ?97? ?98?????public?void?setPrice(Double?price)?{ ?99?????????this.price?=?price; 100?????} 101?} 102? 103?@Component 104?public?class?DataSource?{ 105?????//虛擬數據 106?????private?static?List<OrderEntity>?list=new?ArrayList<OrderEntity>( 107?????????????Arrays.asList(new?OrderEntity("001","Nikon?D750",13990.00,1), 108???????????????????????????new?OrderEntity("002","Huwei?P30?Plus",5400.00,1), 109???????????????????????????......)); 110?????public?DataSource(){ 111?????} 112????? 113?????public?List<OrderEntity>?getOrderList(){ 114?????????return?list; 115?????} 116????? 117?????//根據Id獲取對應order 118?????public?OrderEntity?getOrder(String?id){ 119?????????for(OrderEntity?order:list){ 120?????????????if(order.getId()==id) 121?????????????????return?order; 122?????????} 123?????????return?null; 124?????} 125?} 126? 127?@Component 128?public?class?MyReturnCallback?implements?ReturnCallback?{ 129? 130?????@Override 131?????public?void?returnedMessage(Message?message,?int?replyCode,? 132?????????????String?replyText,?String?exchange,?String?routingKey){ 133?????????//把messageBody反序列化為?OrderEntity對象 134?????????OrderEntity?order=convertToOrder(message.getBody()); 135?????????//顯示錯誤原因 136?????????System.out.println("-------------ReturnCallback!------------\n" 137?????????????+"?exchange:"+exchange+"?replyCode:"+String.valueOf(replyCode) 138?????????????+"?replyText:"+replyText+"?key:"+routingKey+"\n?OrderId:"+order.getId() 139?????????????+"??Goods:"+order.getGoods()+"??Count:"+order.getCount().toString() 140?????????????+"??Price:"+order.getPrice()+"?"); 141?????} 142????? 143?????//把byte[]反序列化為?OrderEntity對象 144?????private?OrderEntity?convertToOrder(byte[]?bytes){ 145?????????OrderEntity?order=null; 146?????????ByteArrayInputStream?bis?=?new?ByteArrayInputStream?(bytes);???????? 147?????????ObjectInputStream?ois; 148?????????try?{ 149?????????????ois?=?new?ObjectInputStream?(bis); 150?????????????Object?obj?=?ois.readObject(); 151?????????????order=(OrderEntity)obj; 152?????????????ois.close();??? 153?????????????bis.close();? 154?????????}?catch?(IOException?|?ClassNotFoundException?e)?{ 155?????????????//?TODO?自動生成的?catch?塊 156?????????????e.printStackTrace(); 157?????????}???????? 158?????????return?order; 159?????} 160?} 161? 162?@Controller 163?@RequestMapping("/producer") 164?public?class?ProducerController?{ 165?????@Autowired 166?????private?RabbitTemplate?template; 167?????@Autowired 168?????private?MyReturnCallback?returnCallback; 169?????@Autowired 170?????private?DataSource?dataSource; 171?? 172????? 173?????@RequestMapping("/send") 174?????public?void?send()?throws?InterruptedException,?IOException{ 175?????????//把?mandatory?屬性設定為true 176?????????template.setMandatory(true); 177?????????//綁定?ReturnCallback?回調函數 178?????????template.setReturnCallback(returnCallback); 179?? 180?????????for(OrderEntity?order:dataSource.getOrderList()){ 181?????????????CorrelationData?correlationData=getCorrelationData(); 182?????????????template.convertAndSend("directExchange","ErrorKey",order,correlationData); 183?????????} 184?????} 185????? 186?????private?CorrelationData?getCorrelationData(){ 187?????????return?new?CorrelationData(UUID.randomUUID().toString()); 188?????} 189?}
Consumer 代碼
?1?@Configuration ?2?public?class?ConnectionConfig?{ ?3?????@Value("${spring.rabbitmq.host}") ?4?????public?String?host; ?5????? ?6?????@Value("${spring.rabbitmq.port}") ?7?????public?int?port; ?8????? ?9?????@Value("${spring.rabbitmq.username}") 10?????public?String?username; 11????? 12?????@Value("${spring.rabbitmq.password}") 13?????public?String?password; 14????? 15?????@Value("${spring.rabbitmq.virtual-host}") 16?????public?String?virtualHost; 17? 18?????@Bean 19?????public?ConnectionFactory?getConnectionFactory(){ 20?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); 21?????????factory.setHost(host); 22?????????factory.setPort(port); 23?????????factory.setUsername(username); 24?????????factory.setPassword(password); 25?????????factory.setVirtualHost(virtualHost); 26?????????return?factory; 27?????} 28?} 29? 30?@Configuration 31?@RabbitListener(bindings=@QueueBinding( 32?exchange=@Exchange(value="directExchange"), 33?value=@Queue(value="direct.first"), 34?key="directKey1")) 35?public?class?RabbitMqListener?{ 36????? 37?????@RabbitHandler 38?????public?void?handler(String?message){ 39?????????System.out.println(message); 40?????} 41?} 42? 43?@SpringBootApplication 44?public?class?App?{ 45????? 46?????public?static?void?main(String[]?args){ 47?????????SpringApplication.run(App.class,?args); 48?????} 49?}
運行結果:
在第四節主要介紹了 Producer 端的隊列發送與監控,它只能管理 Producer 與 Broker Server 之間的通信,但并不能確認 Consumer 是否能成功接收到隊列,在這節內容將介紹 Consumer 端的隊列接收與監聽。前面幾節里,Consumer 端都是簡單地直接使用 RabbitListener 對隊列進行監聽,其實 RabbitMQ 已經為用戶準備了功能更強大的 MessageListenerContainer 容器用于管理 Message ,下面將為大家介紹。
5.1 AbstractMessageListenerContainer 介紹
AbstractMeessageListenerContainer 虛擬類是 RabbitMQ 封裝好的一個容器,本身并沒有對消息進行處理,而是把消息的處理方式交給了 MessageListener 。而它的主要功能是實現 MessageListener 的綁定,ApplicationContext 上下文的綁定,ErrorHandler 的錯誤處理方法的綁定、對消息消費的開始、結束等等默認參數進行配置,讓開發人員可以在容器中對 Consumer 實現統一管理。SimpleMessageListenerContainer、DirectMessageLinstenerCoontainer 都是它的子類,分別應用于不同的場景,在下面會再作詳細介紹。
方法 | 說明 |
void setAcknowledgeMode(AcknowledgeMode acknowledgeMode) | 設定消息接收確認的模式(下文會有詳細介紹) |
AcknowledgeMode getAcknowledgeMode() | 獲取消息接收確認的模式(下文會有詳細介紹) |
void setPrefetchCount(int prefetchCount) | 設置每個 consumer 每次可接收到消息的最大數量 |
void setQueues(Queue... queues) | 設定監聽Queue隊列 |
void addQueues(Queue... queues) | 加入監聽Queue隊列 |
void setMessageListener(Object messageListener) | 綁定MessageListener,對信息進行處理 |
void setChannelAwareMessageListener(ChannelAwareMessageListener messageListener) | 綁定ChannelAwareMessageListener,對信息進行處理,同時可獲取當前使用的channel信息 |
Object getMessageListener() | 獲取MessageListener對象 |
void setMessageConverter(MessageConverter messageConverter) | 綁定MessageConverter消息轉換對象? |
void setApplicationContext(ApplicationContext applicationContext) | 綁定ApplicationContext上下文 |
ConnectionFactory getConnectionFactory() | 獲取ConnectionFactory連接工廠 |
void setListenerId(String listenerId) | 設定ListenerId |
MessageListener 是監聽消息最常用 Listener,它只包含了一個方法 void onMessage(Message message),這是消息接收最常用的一個方法,開發者只需要實現此方法即可對接收到的 Message 進行處理。
ChannelAwareMessageListener 相當于是 MessageListener的一個擴展,包含了方法 void onMessage(Message message, Channel channel),除了對 Message 進行處理外,還可以對接收此 Message 的 Channel 進行檢測。
5.2 SimpleMessageListenerContainer 常用方法
SimpleMessageListenerContainer 是最常用的 MessageListener 容器,它可以通過下面的方法設置默認消費者數量與最大的消費者數量。下面例子中嘗試把 consurrentConsumers 設置為3,把maxConcurrentConsumers 設置為4,并同時監控 direct 模式交換器的 direct.first,direct.second 隊列。
方法 | 說明 |
void setConcurrentConsumers(final int concurrentConsumers) | 設定當前隊列中消費者數量 |
void setMaxConcurrentConsumers(int maxConcurrentConsumers) | 設定當前隊列中最大消費者數量 |
通過截圖可以看到,系統默認會為每個 queue 都創建 3 個 consumers,不同的 queue 中的 consumers 是共享相同的 3 個 channel 。
當 Producer 端發送消息時,consumers 的實際數量可根據 maxConcurrentConsumers 的配置限制進行擴展。?
Producer 端代碼
?1?@Configuration ?2?public?class?BindingConfig?{ ?3?????public?final?static?String?first="direct.first"; ?4?????public?final?static?String?second="direct.second"; ?5?????public?final?static?String?Exchange_NAME="directExchange"; ?6?????public?final?static?String?RoutingKey1="directKey1"; ?7?????public?final?static?String?RoutingKey2="directKey2"; ?8????? ?9?????@Bean 10?????public?Queue?queueFirst(){ 11?????????return?new?Queue(first); 12?????} 13????? 14?????@Bean 15?????public?Queue?queueSecond(){ 16?????????return?new?Queue(second); 17?????} 18????? 19?????@Bean 20?????public?DirectExchange?directExchange(){ 21?????????return?new?DirectExchange(Exchange_NAME); 22?????} 23????? 24?????//利用BindingBuilder綁定Direct與queueFirst 25?????@Bean 26?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ 27?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 28?????} 29????? 30?????//利用BindingBuilder綁定Direct與queueSecond 31?????@Bean 32?????public?Binding?bindingExchangeSecond(Queue?queueSecond,?DirectExchange?directExchange){??????? 33?????????return?BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 34?????}??? 35?} 36? 37?@Configuration 38?public?class?ConnectionConfig?{ 39?????@Value("${spring.rabbitmq.host}") 40?????public?String?host; 41????? 42?????@Value("${spring.rabbitmq.port}") 43?????public?int?port; 44????? 45?????@Value("${spring.rabbitmq.username}") 46?????public?String?username; 47????? 48?????@Value("${spring.rabbitmq.password}") 49?????public?String?password; 50????? 51?????@Value("${spring.rabbitmq.virtual-host}") 52?????public?String?virtualHost; 53? 54?????@Bean 55?????public?ConnectionFactory?getConnectionFactory(){ 56?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); 57?????????factory.setHost(host); 58?????????factory.setPort(port); 59?????????factory.setUsername(username); 60?????????factory.setPassword(password); 61?????????factory.setVirtualHost(virtualHost); 62?????????return?factory; 63?????} 64?} 65? 66?@Controller 67?@RequestMapping("/producer") 68?public?class?ProducerController?{ 69?????@Autowired 70?????private?RabbitTemplate?template; 71?? 72?????@RequestMapping("/send") 73?????public?void?send(HttpServletResponse?response)?throws?InterruptedException,?IOException{ 74?????????for(Integer?n=0;n<100;n++){75?????????????????CorrelationData?correlationData=getCorrelationData();76?????????????????template.convertAndSend("directExchange","directKey1",? 77??????????????????????????"queue1"+"??"+n.toString(),correlationData);78?????????????????template.convertAndSend("directExchange","directKey2","?queue2"+"??"+n.toString(),correlationData);???????????? 79?????????????????Thread.currentThread().sleep(30);80?????????}81?????}82?83?????private?CorrelationData?getCorrelationData(){84?????????return?new?CorrelationData(UUID.randomUUID().toString());85?????}86?}
Consumer 端代碼:
??1?@Configuration ??2?public?class?ConnectionConfig?{ ??3?????@Value("${spring.rabbitmq.host}") ??4?????public?String?host; ??5????? ??6?????@Value("${spring.rabbitmq.port}") ??7?????public?int?port; ??8????? ??9?????@Value("${spring.rabbitmq.username}") ?10?????public?String?username; ?11????? ?12?????@Value("${spring.rabbitmq.password}") ?13?????public?String?password; ?14????? ?15?????@Value("${spring.rabbitmq.virtual-host}") ?16?????public?String?virtualHost; ?17? ?18?????@Bean ?19?????public?ConnectionFactory?getConnectionFactory(){ ?20?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); ?21?????????factory.setHost(host); ?22?????????factory.setPort(port); ?23?????????factory.setUsername(username); ?24?????????factory.setPassword(password); ?25?????????factory.setVirtualHost(virtualHost); ?26?????????return?factory; ?27?????} ?28?} ?29? ?30?@Configuration ?31?public?class?BindingConfig?{ ?32?????public?final?static?String?first="direct.first"; ?33?????public?final?static?String?second="direct.second"; ?34?????public?final?static?String?Exchange_NAME="directExchange"; ?35?????public?final?static?String?RoutingKey1="directKey1"; ?36?????public?final?static?String?RoutingKey2="directKey2"; ?37????? ?38?????@Bean ?39?????public?Queue?queueFirst(){ ?40?????????return?new?Queue(first); ?41?????} ?42????? ?43?????@Bean ?44?????public?Queue?queueSecond(){ ?45?????????return?new?Queue(second); ?46?????} ?47????? ?48?????@Bean ?49?????public?DirectExchange?directExchange(){ ?50?????????return?new?DirectExchange(Exchange_NAME); ?51?????} ?52????? ?53?????//利用BindingBuilder綁定Direct與queueFirst ?54?????@Bean ?55?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ ?56?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); ?57?????} ?58????? ?59?????//利用BindingBuilder綁定Direct與queueSecond ?60?????@Bean ?61?????public?Binding?bindingExchangeSecond(Queue?queueSecond,?DirectExchange?directExchange){??????? ?62?????????return?BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); ?63?????}??? ?64?} ?65?@Configuration ?66?public?class?SimpleMessListener?{ ?67?????@Autowired ?68?????private?RabbitTemplate?template; ?69?????private?int?index=0; ?70? ?71?????@Bean ?72?????public?SimpleMessageListenerContainer?messageContainer(){ ?73?????????SimpleMessageListenerContainer?container=new?SimpleMessageListenerContainer(); ?74?????????container.setConnectionFactory(connectionConfig.getConnectionFactory()); ?75?????????//?綁定Queue1/Queue2 ?76?????????container.setQueueNames("direct.first");?????? ?77?????????container.addQueueNames("direct.second"); ?78?????????//設置默認?consumer?數為3 ?79?????????container.setConcurrentConsumers(3); ?80?????????//設置最大?consumer?數為4 ?81?????????container.setMaxConcurrentConsumers(4); ?82?????????//標記?consumerTag ?83?????????container.setConsumerTagStrategy(queue?->??"consumer"+(++index)); ?84?????????//綁定MessageListener顯示接收信息 ?85?????????container.setMessageListener(new?MessageListener(){ ?86?????????????@Override ?87?????????????public?void?onMessage(Message?message)?{ ?88?????????????????//?TODO?自動生成的方法存根 ?89?????????????????Thread?thread=Thread.currentThread(); ?90?????????????????MessageProperties?messProp=message.getMessageProperties(); ?91?????????????????try?{ ?92?????????????????????System.out.println("??ConsumerTag:"+messProp.getConsumerTag() ?93?????????????????????????????+"??ThreadId?is:"+thread.getId()+"??Queue:"+messProp.getConsumerQueue() ?94?????????????????????????????+"??"+new?String(message.getBody(),"UTF-8")); ?95?????????????????}?catch?(UnsupportedEncodingException?e)?{ ?96?????????????????????//?TODO?自動生成的?catch?塊 ?97?????????????????????e.printStackTrace(); ?98?????????????????} ?99?????????????} 100????????????? 101?????????}); 102?????????return?container; 103?????} 104?}
運行結果
5.3?SimpleMessageListenerContainer 的運作原理
在 SimpleMessageListenerContainer 模式中,無論系統監聽多少個 queue 隊列,channel 都是共享的,類似上面的例子,4個 channel 會把接收到不同的隊列請求并分發到對應的 consumer 進行處理。這樣做的好處是系統可以通過 concurrentConsumers、maxConcurrentConsumers 靈活設定當前隊列中消費者的數量,系統可以跟據實際需求靈活處理。但由于每個 channel 都是在固定線程中運行的,一個 channel 要游走于多個 consumer 當中,這無疑增加了系統在上下文切換中的開銷。下面用系統提供的?ChannelAwareMessageListener 接口,以更直觀的例子說明一下 SimpleMessageListenerContainer 當中 channel、queue、consumer 之間的關系。
Producer 端代碼
?1?@Configuration ?2?public?class?BindingConfig?{ ?3?????public?final?static?String?first="direct.first"; ?4?????public?final?static?String?second="direct.second"; ?5?????public?final?static?String?Exchange_NAME="directExchange"; ?6?????public?final?static?String?RoutingKey1="directKey1"; ?7?????public?final?static?String?RoutingKey2="directKey2"; ?8????? ?9?????@Bean 10?????public?Queue?queueFirst(){ 11?????????return?new?Queue(first); 12?????} 13????? 14?????@Bean 15?????public?Queue?queueSecond(){ 16?????????return?new?Queue(second); 17?????} 18????? 19?????@Bean 20?????public?DirectExchange?directExchange(){ 21?????????return?new?DirectExchange(Exchange_NAME); 22?????} 23????? 24?????//利用BindingBuilder綁定Direct與queueFirst 25?????@Bean 26?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ 27?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 28?????} 29????? 30?????//利用BindingBuilder綁定Direct與queueSecond 31?????@Bean 32?????public?Binding?bindingExchangeSecond(Queue?queueSecond,?DirectExchange?directExchange){??????? 33?????????return?BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 34?????}??? 35?} 36? 37?@Configuration 38?public?class?ConnectionConfig?{ 39?????@Value("${spring.rabbitmq.host}") 40?????public?String?host; 41????? 42?????@Value("${spring.rabbitmq.port}") 43?????public?int?port; 44????? 45?????@Value("${spring.rabbitmq.username}") 46?????public?String?username; 47????? 48?????@Value("${spring.rabbitmq.password}") 49?????public?String?password; 50????? 51?????@Value("${spring.rabbitmq.virtual-host}") 52?????public?String?virtualHost; 53? 54?????@Bean 55?????public?ConnectionFactory?getConnectionFactory(){ 56?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); 57?????????factory.setHost(host); 58?????????factory.setPort(port); 59?????????factory.setUsername(username); 60?????????factory.setPassword(password); 61?????????factory.setVirtualHost(virtualHost); 62?????????return?factory; 63?????} 64?} 65? 66?@Controller 67?@RequestMapping("/producer") 68?public?class?ProducerController?{ 69?????@Autowired 70?????private?RabbitTemplate?template; 71?? 72?????@RequestMapping("/send") 73?????public?void?send(HttpServletResponse?response)?throws?InterruptedException,?IOException{ 74?????????for(Integer?n=0;n<100;n++){75?????????????????CorrelationData?correlationData=getCorrelationData();76?????????????????template.convertAndSend("directExchange","directKey1",77??????????????????????????????"?queue1"+"??"+n.toString(),correlationData);78?????????????????template.convertAndSend("directExchange","directKey2",79??????????????????????????????"queue2"+"??"+n.toString(),correlationData);???????????? 80?????????????????Thread.currentThread().sleep(30);81?????????}82?????}83?84?????private?CorrelationData?getCorrelationData(){85?????????return?new?CorrelationData(UUID.randomUUID().toString());86?????}87?}
Consumer 端代碼
??1?@Configuration ??2?public?class?ConnectionConfig?{ ??3?????@Value("${spring.rabbitmq.host}") ??4?????public?String?host; ??5????? ??6?????@Value("${spring.rabbitmq.port}") ??7?????public?int?port; ??8????? ??9?????@Value("${spring.rabbitmq.username}") ?10?????public?String?username; ?11????? ?12?????@Value("${spring.rabbitmq.password}") ?13?????public?String?password; ?14????? ?15?????@Value("${spring.rabbitmq.virtual-host}") ?16?????public?String?virtualHost; ?17? ?18?????@Bean ?19?????public?ConnectionFactory?getConnectionFactory(){ ?20?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); ?21?????????factory.setHost(host); ?22?????????factory.setPort(port); ?23?????????factory.setUsername(username); ?24?????????factory.setPassword(password); ?25?????????factory.setVirtualHost(virtualHost); ?26?????????return?factory; ?27?????} ?28?} ?29? ?30?@Configuration ?31?public?class?BindingConfig?{ ?32?????public?final?static?String?first="direct.first"; ?33?????public?final?static?String?second="direct.second"; ?34?????public?final?static?String?Exchange_NAME="directExchange"; ?35?????public?final?static?String?RoutingKey1="directKey1"; ?36?????public?final?static?String?RoutingKey2="directKey2"; ?37????? ?38?????@Bean ?39?????public?Queue?queueFirst(){ ?40?????????return?new?Queue(first); ?41?????} ?42????? ?43?????@Bean ?44?????public?Queue?queueSecond(){ ?45?????????return?new?Queue(second); ?46?????} ?47????? ?48?????@Bean ?49?????public?DirectExchange?directExchange(){ ?50?????????return?new?DirectExchange(Exchange_NAME); ?51?????} ?52????? ?53?????//利用BindingBuilder綁定Direct與queueFirst ?54?????@Bean ?55?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ ?56?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); ?57?????} ?58????? ?59?????//利用BindingBuilder綁定Direct與queueSecond ?60?????@Bean ?61?????public?Binding?bindingExchangeSecond(Queue?queueSecond,?DirectExchange?directExchange){??????? ?62?????????return?BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); ?63?????}??? ?64?} ?65?@Configuration ?66?public?class?SimpleMessListener?{ ?67?????@Autowired ?68?????private?RabbitTemplate?template; ?69?????@Autowired ?70?????private?ConnectionConfig?connectionConfig; ?71?????private?int?index=0; ?72? ?73?????@Bean ?74?????public?SimpleMessageListenerContainer?messageContainer(){ ?75?????????SimpleMessageListenerContainer?container=new?SimpleMessageListenerContainer(); ?76?????????container.setConnectionFactory(connectionConfig.getConnectionFactory()); ?77?????????//?綁定Queue1/Queue2 ?78?????????container.setQueueNames("direct.first");?????? ?79?????????container.addQueueNames("direct.second"); ?80?????????//設置默認?consumer?數為3 ?81?????????container.setConcurrentConsumers(3); ?82?????????//設置最大?consumer?數為4 ?83?????????container.setMaxConcurrentConsumers(4); ?84?????????//標記?consumerTag ?85?????????container.setConsumerTagStrategy(queue?->??"consumer"+(++index)); ?86?????????//綁定ChannelAwareMessageListener顯示接收信息 ?87?????????container.setChannelAwareMessageListener(new?ChannelAwareMessageListener(){ ?88?????????????@Override ?89?????????????public?void?onMessage(Message?message,?com.rabbitmq.client.Channel?channel)? ?90?????????????????????throws?Exception?{ ?91?????????????????//?TODO?自動生成的方法存根 ?92?????????????????//?TODO?自動生成的方法存根 ?93?????????????????Thread?thread=Thread.currentThread(); ?94?????????????????System.out.println("Channel:"+channel.getChannelNumber()???????????????????????????? ?95?????????????????????????+"??ThreadId?is:"+thread.getId() ?96?????????????????????????+"??ConsumerTag:"+message.getMessageProperties().getConsumerTag() ?97?????????????????????????+"??Queue:"+message.getMessageProperties().getConsumerQueue()); ?98????????????????????????? ?99?????????????} 100????????????? 101?????????}); 102?????????return?container; 103?????} 104?}
運行結果:
觀察運行結果可以看到:每個 channel 都在固定的線程中運行,一個 channel 會向不同的 consumer 發送隊列信息。了解 channel、thread、queue、consumer 之間的關系,會對 SimpleMessageListenerContainer 有更深入認識。
5.4 DirectMessageListenerContainer
SimpleMessageListenerContainer 是經典的容器,使用 channel 共享,一旦某個 channel 關閉或重啟,意味著每個隊列 queue 中使用當前 channel 的 consumer 都會受到影響。?有見及此,在 RabbitMQ 2.0 后,系統引入了 DirectMessageListenerContainer ,它允許每個 consumer 都有各自的對應的 channel 的,channel 只管理負責管理當前 consumer 的通道。這樣令 consumer 運用更靈活,同時線程并沒有跟 channel 綁定,而是由獨立的線程池進行管理,這是更好地解決了 SimpleMessageListenerContainer 中上下文切換所帶來的資源消耗問題。
下面的例子,我們嘗試使用把 consumersPerQueue 設置為 4,并同時監控 direct 模式 exchange 的 direct.first,direct.second 隊列。
從管理界面可以看到,系統會為每個 consumer 都生成一個獨立的 channel 進行管理。
Producer 端代碼
?1?@Configuration ?2?public?class?BindingConfig?{ ?3?????public?final?static?String?first="direct.first"; ?4?????public?final?static?String?second="direct.second"; ?5?????public?final?static?String?Exchange_NAME="directExchange"; ?6?????public?final?static?String?RoutingKey1="directKey1"; ?7?????public?final?static?String?RoutingKey2="directKey2"; ?8????? ?9?????@Bean 10?????public?Queue?queueFirst(){ 11?????????return?new?Queue(first); 12?????} 13????? 14?????@Bean 15?????public?Queue?queueSecond(){ 16?????????return?new?Queue(second); 17?????} 18????? 19?????@Bean 20?????public?DirectExchange?directExchange(){ 21?????????return?new?DirectExchange(Exchange_NAME); 22?????} 23????? 24?????//利用BindingBuilder綁定Direct與queueFirst 25?????@Bean 26?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ 27?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 28?????} 29????? 30?????//利用BindingBuilder綁定Direct與queueSecond 31?????@Bean 32?????public?Binding?bindingExchangeSecond(Queue?queueSecond,?DirectExchange?directExchange){??????? 33?????????return?BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 34?????}??? 35?} 36? 37?@Configuration 38?public?class?ConnectionConfig?{ 39?????@Value("${spring.rabbitmq.host}") 40?????public?String?host; 41????? 42?????@Value("${spring.rabbitmq.port}") 43?????public?int?port; 44????? 45?????@Value("${spring.rabbitmq.username}") 46?????public?String?username; 47????? 48?????@Value("${spring.rabbitmq.password}") 49?????public?String?password; 50????? 51?????@Value("${spring.rabbitmq.virtual-host}") 52?????public?String?virtualHost; 53? 54?????@Bean 55?????public?ConnectionFactory?getConnectionFactory(){ 56?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); 57?????????factory.setHost(host); 58?????????factory.setPort(port); 59?????????factory.setUsername(username); 60?????????factory.setPassword(password); 61?????????factory.setVirtualHost(virtualHost); 62?????????return?factory; 63?????} 64?} 65? 66?@Controller 67?@RequestMapping("/producer") 68?public?class?ProducerController?{ 69?????@Autowired 70?????private?RabbitTemplate?template; 71?? 72?????@RequestMapping("/send") 73?????public?void?send(HttpServletResponse?response)?throws?InterruptedException,?IOException{ 74?????????for(Integer?n=0;n<100;n++){75?????????????????CorrelationData?correlationData=getCorrelationData();76?????????????????template.convertAndSend("directExchange","directKey1",77??????????????????????????????"?queue1"+"??"+n.toString(),correlationData);78?????????????????template.convertAndSend("directExchange","directKey2",79??????????????????????????????"queue2"+"??"+n.toString(),correlationData);???????????? 80?????????????????Thread.currentThread().sleep(30);81?????????}82?????}83?84?????private?CorrelationData?getCorrelationData(){85?????????return?new?CorrelationData(UUID.randomUUID().toString());86?????}87?}
Consumer 端代碼
?1?@Configuration ?2?public?class?ConnectionConfig?{ ?3?????@Value("${spring.rabbitmq.host}") ?4?????public?String?host; ?5????? ?6?????@Value("${spring.rabbitmq.port}") ?7?????public?int?port; ?8????? ?9?????@Value("${spring.rabbitmq.username}") 10?????public?String?username; 11????? 12?????@Value("${spring.rabbitmq.password}") 13?????public?String?password; 14????? 15?????@Value("${spring.rabbitmq.virtual-host}") 16?????public?String?virtualHost; 17? 18?????@Bean 19?????public?ConnectionFactory?getConnectionFactory(){ 20?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); 21?????????factory.setHost(host); 22?????????factory.setPort(port); 23?????????factory.setUsername(username); 24?????????factory.setPassword(password); 25?????????factory.setVirtualHost(virtualHost); 26?????????return?factory; 27?????} 28?} 29? 30?@Configuration 31?public?class?BindingConfig?{ 32?????public?final?static?String?first="direct.first"; 33?????public?final?static?String?second="direct.second"; 34?????public?final?static?String?Exchange_NAME="directExchange"; 35?????public?final?static?String?RoutingKey1="directKey1"; 36?????public?final?static?String?RoutingKey2="directKey2"; 37????? 38?????@Bean 39?????public?Queue?queueFirst(){ 40?????????return?new?Queue(first); 41?????} 42????? 43?????@Bean 44?????public?Queue?queueSecond(){ 45?????????return?new?Queue(second); 46?????} 47????? 48?????@Bean 49?????public?DirectExchange?directExchange(){ 50?????????return?new?DirectExchange(Exchange_NAME); 51?????} 52????? 53?????//利用BindingBuilder綁定Direct與queueFirst 54?????@Bean 55?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ 56?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 57?????} 58????? 59?????//利用BindingBuilder綁定Direct與queueSecond 60?????@Bean 61?????public?Binding?bindingExchangeSecond(Queue?queueSecond,?DirectExchange?directExchange){??????? 62?????????return?BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 63?????}??? 64?} 65? 66?@Configuration 67?public?class?DirectMessListener?{ 68?????@Autowired 69?????private?ConnectionConfig?connectionConfig; 70?????@Autowired 71?????private?RabbitTemplate?template; 72?????private?int?index=0; 73????? 74?????@Bean 75?????public?DirectMessageListenerContainer?messageContainer(){ 76?????????DirectMessageListenerContainer?container=new?DirectMessageListenerContainer(); 77?????????container.setConnectionFactory(connectionConfig.getConnectionFactory()); 78?????????//?設置每個隊列的?consumer?數量 79?????????container.setConsumersPerQueue(4); 80?????????container.addQueueNames("direct.first"); 81?????????container.addQueueNames("direct.second"); 82?????????container.setConsumerTagStrategy(queue?->?"consumer"+(++index)); 83?????????container.setMessageListener(new?ChannelAwareMessageListener(){ 84????????? @Override 85????????? public?void?onMessage(Message?message,?com.rabbitmq.client.Channel?channel)? 86?????????????????????throws?Exception?{ 87?????????????????//?TODO?自動生成的方法存根 88?????????????????//?TODO?自動生成的方法存根 89?????????????????Thread?thread=Thread.currentThread(); 90????????????????? 91?????????????????System.out.println("Channel:"+channel.getChannelNumber()???????????????????????????? 92?????????????????????????+"??ThreadId?is:"+thread.getId() 93?????????????????????????+"??ConsumerTag:"+message.getMessageProperties().getConsumerTag() 94?????????????????????????+"??Queue:"+message.getMessageProperties().getConsumerQueue()); 95???????????????}???? 96?????????}); 97?????????return?container; 98?????} 99?}
通過運行結果進一步可以證實,consumer 信息接收是由獨立的線程池進行管理的,并沒有與 channel 綁定,每個 consumer 都有自己單獨的 channel,即使 channel 發生問題時,也不會對其他的 consumer 發生影響,這正是?DirectMessageListenerContainer 的優勝之處。
5.5 Consumer 的信息接收確認方式
在第四節曾經介紹過在 Producer 端利用 ConfirmCallback / ReturnCallback 監控信息的發送,在這節將為大家在 Consumer 端監控信息的接收。
Consumer 的信息接收確認模式可以通過 AcknowledgeMode 設定,一共有三種模式:NONE、MANUAL、AUTO,默認是 AUTO 模式。其中 NONE 為系統確認,MANUAL 是手動確認。
而 AUTO 為自動模式,系統可以根據執行情況自動發送 ack / nack。如果方法未拋出異常,則發送 ack。如果拋出異常 AmqpRejectAndDontRequeueException 顧名思義消息被拒絕且不會重新加入隊列。如果方法拋出非 AmqpRejectAndDontRequeueException 異常,則系統發送 nack 消息重歸隊列。
Channel 消息接收的常用方法
方法 | 說明 |
void basicAck(long deliveryTag, boolean multiple) | deliveryTag 為該消息的標識,multiple 為 true 代表批量確認同一批次的信息接收成功,為 false 時代表單獨判定某個消息接收成功。 |
void basicReject(long deliveryTag, boolean requeue) | deliveryTag 為該消息的標識,requeue 為 true時,被拒絕的消息會重新進入隊列進行推送,為false時消息將不再進入隊列 |
void basicNack(long deliveryTag, boolean multiple, boolean requeue) | deliveryTag 為該消息的標識,multiple 為 true 代表批量確認同一批次的信息接收失敗,為 false 時代表單獨判定某個消息接收失敗。requeue 為 true時,消息會重新進入隊列進行推送,為false時消息將不再進入隊列 |
?AcknowledgeMode 配置為 MANUAL 后,用戶可通過 Channel 類的 void basicAck(long deliveryTag, boolean multiple) 方法手動確認消息接收是否成功。
若檢測到有異常,可通過void basicReject(long deliveryTag, boolean requeue) 或 void basicNack(long deliveryTag, boolean multiple, boolean requeue) 確認是否重新把消息推送。
通過配置 prefetchCount 可設置 consumer 每次接收到的信息數量,系統默認值為 250,這表示當 consumer 隊列接收到 250 請求其狀態皆為 unacked 時,broker server 將暫停向 consumer 發送消息,待消息處理后再繼續。
下面例子中我們嘗試把 prefetchCount 設置為 10,即每個 consumer 單次最多接收到的消息為 10 條,并把 consumersPerQueue 設置為 4,然后把?AcknowledgeMode 設置為 MANUAL,通過手動確認消息接收,一旦發生錯誤,消息重新加入隊列。
?Producer 端代碼
?1?@Configuration ?2?public?class?BindingConfig?{ ?3?????public?final?static?String?first="direct.first"; ?4?????public?final?static?String?second="direct.second"; ?5?????public?final?static?String?Exchange_NAME="directExchange"; ?6?????public?final?static?String?RoutingKey1="directKey1"; ?7?????public?final?static?String?RoutingKey2="directKey2"; ?8????? ?9?????@Bean 10?????public?Queue?queueFirst(){ 11?????????return?new?Queue(first); 12?????} 13????? 14?????@Bean 15?????public?Queue?queueSecond(){ 16?????????return?new?Queue(second); 17?????} 18????? 19?????@Bean 20?????public?DirectExchange?directExchange(){ 21?????????return?new?DirectExchange(Exchange_NAME); 22?????} 23????? 24?????//利用BindingBuilder綁定Direct與queueFirst 25?????@Bean 26?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ 27?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); 28?????} 29????? 30?????//利用BindingBuilder綁定Direct與queueSecond 31?????@Bean 32?????public?Binding?bindingExchangeSecond(Queue?queueSecond,?DirectExchange?directExchange){??????? 33?????????return?BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); 34?????}??? 35?} 36? 37?@Configuration 38?public?class?ConnectionConfig?{ 39?????@Value("${spring.rabbitmq.host}") 40?????public?String?host; 41????? 42?????@Value("${spring.rabbitmq.port}") 43?????public?int?port; 44????? 45?????@Value("${spring.rabbitmq.username}") 46?????public?String?username; 47????? 48?????@Value("${spring.rabbitmq.password}") 49?????public?String?password; 50????? 51?????@Value("${spring.rabbitmq.virtual-host}") 52?????public?String?virtualHost; 53? 54?????@Bean 55?????public?ConnectionFactory?getConnectionFactory(){ 56?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); 57?????????factory.setHost(host); 58?????????factory.setPort(port); 59?????????factory.setUsername(username); 60?????????factory.setPassword(password); 61?????????factory.setVirtualHost(virtualHost); 62?????????return?factory; 63?????} 64?} 65? 66?@Controller 67?@RequestMapping("/producer") 68?public?class?ProducerController?{ 69?????@Autowired 70?????private?RabbitTemplate?template; 71?? 72?????@RequestMapping("/send") 73?????public?void?send(HttpServletResponse?response)?throws?InterruptedException,?IOException{ 74?????????for(Integer?n=0;n<100;n++){75?????????????????CorrelationData?correlationData=getCorrelationData();76?????????????????template.convertAndSend("directExchange","directKey1",77??????????????????????????????"?queue1"+"??"+n.toString(),correlationData);78?????????????????template.convertAndSend("directExchange","directKey2",79??????????????????????????????"queue2"+"??"+n.toString(),correlationData);???????????? 80?????????}81?????}82?83?????private?CorrelationData?getCorrelationData(){84?????????return?new?CorrelationData(UUID.randomUUID().toString());85?????}86?}
運行后可看到 Broker Server 每條 queue 會有 100 條數據處于待處理狀態
Consumer 端代碼
??1?@Configuration ??2?public?class?ConnectionConfig?{ ??3?????@Value("${spring.rabbitmq.host}") ??4?????public?String?host; ??5????? ??6?????@Value("${spring.rabbitmq.port}") ??7?????public?int?port; ??8????? ??9?????@Value("${spring.rabbitmq.username}") ?10?????public?String?username; ?11????? ?12?????@Value("${spring.rabbitmq.password}") ?13?????public?String?password; ?14????? ?15?????@Value("${spring.rabbitmq.virtual-host}") ?16?????public?String?virtualHost; ?17? ?18?????@Bean ?19?????public?ConnectionFactory?getConnectionFactory(){ ?20?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); ?21?????????factory.setHost(host); ?22?????????factory.setPort(port); ?23?????????factory.setUsername(username); ?24?????????factory.setPassword(password); ?25?????????factory.setVirtualHost(virtualHost); ?26?????????return?factory; ?27?????} ?28?} ?29? ?30?@Configuration ?31?public?class?BindingConfig?{ ?32?????public?final?static?String?first="direct.first"; ?33?????public?final?static?String?second="direct.second"; ?34?????public?final?static?String?Exchange_NAME="directExchange"; ?35?????public?final?static?String?RoutingKey1="directKey1"; ?36?????public?final?static?String?RoutingKey2="directKey2"; ?37????? ?38?????@Bean ?39?????public?Queue?queueFirst(){ ?40?????????return?new?Queue(first); ?41?????} ?42????? ?43?????@Bean ?44?????public?Queue?queueSecond(){ ?45?????????return?new?Queue(second); ?46?????} ?47????? ?48?????@Bean ?49?????public?DirectExchange?directExchange(){ ?50?????????return?new?DirectExchange(Exchange_NAME); ?51?????} ?52????? ?53?????//利用BindingBuilder綁定Direct與queueFirst ?54?????@Bean ?55?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ ?56?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(RoutingKey1); ?57?????} ?58????? ?59?????//利用BindingBuilder綁定Direct與queueSecond ?60?????@Bean ?61?????public?Binding?bindingExchangeSecond(Queue?queueSecond,?DirectExchange?directExchange){??????? ?62?????????return?BindingBuilder.bind(queueSecond).to(directExchange).with(RoutingKey2); ?63?????}??? ?64?} ?65? ?66?@Configuration ?67?public?class?DirectMessListener?{ ?68?????@Autowired ?69?????private?ConnectionConfig?connectionConfig; ?70?????@Autowired ?71?????private?RabbitTemplate?template; ?72?????private?int?index=0; ?73????? ?74?????@Bean ?75?????public?DirectMessageListenerContainer?messageContainer(){ ?76?????????DirectMessageListenerContainer?container=new?DirectMessageListenerContainer(); ?77?????????container.setConnectionFactory(connectionConfig.getConnectionFactory()); ?78?????????//?設置每個隊列的?consumer?數量 ?79?????????container.setConsumersPerQueue(4); ?80?????????//?設置每個?consumer?每次的接收的消息數量為10個 ?81?????????container.setPrefetchCount(10); ?82?????????//?使用MANUAL進行手動確認?????? ?83?????????container.setAcknowledgeMode(AcknowledgeMode.MANUAL); ?84?????????container.addQueueNames("direct.first"); ?85?????????container.addQueueNames("direct.second"); ?86?????????container.setConsumerTagStrategy(queue?->?"consumer"+(++index)); ?87?????????container.setMessageListener(new?ChannelAwareMessageListener(){ ?88?????????????@Override ?89?????????????public?void?onMessage(Message?message,?com.rabbitmq.client.Channel?channel)? ?90?????????????????????throws?Exception?{ ?91?????????????????Thread?thread=Thread.currentThread(); ?92?????????????????MessageProperties?prop=message.getMessageProperties(); ?93?????????????????try{ ?94?????????????????????System.out.println("Channel:"+channel.getChannelNumber()???????????????????????????? ?95?????????????????????????????+"??ThreadId?is:"+thread.getId() ?96?????????????????????????????+"??ConsumerTag:"+prop.getConsumerTag() ?97?????????????????????????????+"??Queue:"+prop.getConsumerQueue()); ?98?????????????????????//通過Tag單個確認 ?99?????????????????????channel.basicAck(prop.getDeliveryTag(),?false); 100?????????????????}catch(Exception?ex){ 101?????????????????????//判定單個接收失敗,重新加入consumer隊列 102?????????????????????channel.basicReject(prop.getDeliveryTag(),?true); 103?????????????????} 104?????????????????thread.sleep(1000); 105?????????????}???? 106?????????}); 107?????????return?container; 108?????} 109?}
觀察信息接收情況,每個 consumer? 一次可處理10條信息,對隊列進行分批處理。
死信隊列(Dead-Letter-Exchange) 可被看作是死信交換器。當消息在一個隊列中變成死信后,它能被重新被發送到特定的交換器中,這個交換器就是DLX ,綁定DLX 的隊列就稱之為死信隊列。消息變成死信一般是由于以下幾種情況:
消息被拒絕,requeue 被設置為 false, 可通過上一介紹的 void basicReject (deliveryTag, requeue) 或 void basicNack(deliveryTag,multiple, requeue) 完成設置 ;
消息過期;
隊列超出最大長度。
其實死信隊列 DLX 也是一個正常的交換器,和一般的交換器沒有什么區別,我們可以用一般建立隊列的方法,建立一個死信隊列。然后建立一個正常的隊列,在正常隊列中加入參數 x-dead-letter-exchange、x-dead-letter-routing-key 與死信隊列進行綁定,完成綁定后在管理界面 Features 選項中?direct.queue.first 會顯示 DLX DLK。這時當被綁定的隊列出現超時,超長,或被拒絕時(注意requeue被設置為false時,對會激發死信),信息就會流入死信隊列被處理。
具體的例子Producer端:
?1?@Configuration? ?2?public?class?BindingConfig?{ ?3?????public?final?static?String?Queue_First="direct.queue.first"; ?4?????public?final?static?String?Exchange_Name="directExchange"; ?5?????public?final?static?String?Routing_Key_First="directKey1"; ?6????? ?7?????@Bean ?8?????public?Queue?queueFirst(){ ?9?????????return?new?Queue(this.Queue_First); 10?????} 11????? 12?????@Bean 13?????public?DirectExchange?directExchange(){ 14?????????return?new?DirectExchange(this.Exchange_Name); 15?????} 16????? 17?????@Bean 18?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ 19?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First); 20?????} 21?} 22? 23?@Configuration 24?public?class?ConnectionConfig?{ 25?????@Value("${spring.rabbitmq.host}") 26?????public?String?host; 27????? 28?????@Value("${spring.rabbitmq.port}") 29?????public?int?port; 30????? 31?????@Value("${spring.rabbitmq.username}") 32?????public?String?username; 33????? 34?????@Value("${spring.rabbitmq.password}") 35?????public?String?password; 36????? 37?????@Value("${spring.rabbitmq.virtual-host}") 38?????public?String?virtualHost; 39? 40?????@Bean 41?????public?ConnectionFactory?getConnectionFactory(){ 42?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); 43?????????System.out.println(host); 44?????????factory.setHost(host); 45?????????factory.setPort(port); 46?????????factory.setUsername(username); 47?????????factory.setPassword(password); 48?????????factory.setVirtualHost(virtualHost); 49?????????return?factory; 50?????} 51?} 52? 53?@Controller 54?@RequestMapping("/producer") 55?public?class?ProducerController?{ 56?????@Autowired 57?????private?RabbitTemplate?template; 58????? 59?????@RequestMapping("/send") 60?????public?void?send()?{ 61?????????for(int?n=0;n<10;n++){???????????? 62?????????????template.convertAndSend(BindingConfig.Exchange_NAME,BindingConfig.RoutingKey1,"Hello?World!???" 63????????????????????+String.valueOf(n),getCorrelationData()); 64?????????} 65?????} 66? 67??????private?CorrelationData?getCorrelationData(){ 68?????????return?new?CorrelationData(UUID.randomUUID().toString()); 69?????} 70?}
Customer 端
??1?@Configuration ??2?public?class?BindingConfig?{ ??3?????//普通隊列參數 ??4?????public?final?static?String?Queue_First="direct.queue.first"; ??5?????public?final?static?String?Exchange_Name="directExchange"; ??6?????public?final?static?String?Routing_Key_First="directKey1"; ??7?????//死信隊列參數 ??8?????public?final?static?String?Queue_Dead="direct.queue.dead"; ??9?????public?final?static?String?Exchange_Dead="directDead"; ?10?????public?final?static?String?Routing_Key_Dead="directDeadKey"; ?11????? ?12?????@Bean ?13?????public?Queue?queueFirst(){ ?14?????????Map<String,?Object>?args=new?HashMap<String,Object>(); ?15?????????//聲明當前死信的?Exchange ?16?????????args.put("x-dead-letter-exchange",?this.Exchange_Dead); ?17??????????//聲明當前隊列的死信路由key ?18?????????args.put("x-dead-letter-routing-key",?this.Routing_Key_Dead); ?19?????????//把死信隊列的參數綁定到當前隊列中 ?20?????????return?QueueBuilder.durable(Queue_First).withArguments(args).build(); ?21?????} ?22????? ?23?????@Bean ?24?????public?DirectExchange?directExchange(){ ?25?????????return?new?DirectExchange(this.Exchange_Name); ?26?????} ?27????? ?28?????@Bean ?29?????public?Binding?bindingExchangeFirst(Queue?queueFirst,?DirectExchange?directExchange){ ?30?????????return?BindingBuilder.bind(queueFirst).to(directExchange).with(Routing_Key_First); ?31?????} ?32????? ?33?????@Bean ?34?????public?Queue?queueDead(){ ?35?????????return?new?Queue(this.Queue_Dead); ?36?????} ?37????? ?38?????@Bean ?39?????public?DirectExchange?directExchangeDead(){ ?40?????????return?new?DirectExchange(this.Exchange_Dead); ?41?????} ?42????? ?43?????@Bean ?44?????public?Binding?bindingExchangeDead(Queue?queueDead,DirectExchange?directExchangeDead){ ?45?????????return?BindingBuilder.bind(queueDead).to(directExchangeDead).with(this.Routing_Key_Dead); ?46?????} ?47?} ?48? ?49?@Configuration ?50?public?class?ConnectionConfig?{ ?51?????@Value("${spring.rabbitmq.host}") ?52?????public?String?host; ?53????? ?54?????@Value("${spring.rabbitmq.port}") ?55?????public?int?port; ?56????? ?57?????@Value("${spring.rabbitmq.username}") ?58?????public?String?username; ?59????? ?60?????@Value("${spring.rabbitmq.password}") ?61?????public?String?password; ?62????? ?63?????@Value("${spring.rabbitmq.virtual-host}") ?64?????public?String?virtualHost; ?65? ?66?????@Bean ?67?????public?ConnectionFactory?getConnectionFactory(){ ?68?????????CachingConnectionFactory?factory=new?CachingConnectionFactory(); ?69?????????factory.setHost(host); ?70?????????factory.setPort(port); ?71?????????factory.setUsername(username); ?72?????????factory.setPassword(password); ?73?????????factory.setVirtualHost(virtualHost); ?74?????????return?factory; ?75?????} ?76?} ?77? ?78?@Configuration ?79?public?class?DirectMessListener?{ ?80?????@Autowired ?81?????private?ConnectionConfig?connectionConfig; ?82?????@Autowired ?83?????private?RabbitTemplate?template; ?84?????private?int?index=0,normalIndex=0,deadIndex=0;???? ?85????? ?86?????@Bean ?87?????public?DirectMessageListenerContainer?messageContainer(){ ?88?????????DirectMessageListenerContainer?container=new?DirectMessageListenerContainer(); ?89?????????container.setConnectionFactory(connectionConfig.getConnectionFactory()); ?90?????????//?設置每個隊列的?consumer?數量 ?91?????????container.setConsumersPerQueue(4); ?92?????????//?設置每個?consumer?每次的接收的消息數量 ?93?????????container.setPrefetchCount(10); ?94?????????//?使用MANUAL手動確認 ?95?????????container.setAcknowledgeMode(AcknowledgeMode.MANUAL); ?96?????????//?監聽隊列 ?97?????????container.addQueueNames(BindingConfig.Queue_First); ?98?????????container.addQueueNames(BindingConfig.Queue_Dead); ?99?????????container.setConsumerTagStrategy(queue?->?"consumer"+(++index)); 100?????????? 101?????????container.setMessageListener(new?ChannelAwareMessageListener(){ 102?????????????@Override 103?????????????public?void?onMessage(Message?message,?com.rabbitmq.client.Channel?channel)? 104?????????????????????throws?Exception?{ 105?????????????????MessageProperties?prop=message.getMessageProperties(); 106?????????????????if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_First)){ 107?????????????????????System.out.println("This?is?a?normal?queue!??"+(++normalIndex)); 108?????????????????????//把當前的隊列轉送到死信隊列中 109?????????????????????channel.basicReject(prop.getDeliveryTag(),?false);???????????????? 110?????????????????} 111?????????????????if(prop.getReceivedRoutingKey().equals(BindingConfig.Routing_Key_Dead)){ 112?????????????????????System.out.println("This?is?a?dead?queue!?"+(++deadIndex)); 113?????????????????????//模擬對死信隊列處理 114?????????????????????Thread.currentThread().sleep(5000); 115?????????????????????....... 116?????????????????????//處理完畢 117?????????????????????channel.basicAck(prop.getDeliveryTag(),?false); 118?????????????????} 119????????????????? 120?????????????} 121?????????}); 122?????????return?container; 123?????} 124?}
通過管理界面可以看,信息會先發送到 direct.queue.first,然后被放進死信隊列作處理。
運行結果
死信隊列最常用的場景可以在訂單支付,流程審批等環節。例如在 京*、淘* 等平臺,當下單成功后,客戶要在一定的時間內完成支付操作,否則訂單被視作無效,這些業務流程就可以使用死信隊列來處理。
RabbitMq 的持久化操作包含有 Queue 持久化、Message 持久化和 Exchange 持久化三類。
7.1 Queue 的持久化
隊列持久化只需要在 Queue 的構造函數 public Queue(String name, boolean durable) 把 durable 參數置為 true 就可實現。如果隊列不設置持久化( (durable 默認為 false), 那么在RabbitMQ 服務重啟之后,相關隊列的元數據會丟失,此時數據也會丟失。
7.2 Message 持久化
設置了Queue 持久化以后,當 RabbitMQ 服務重啟之后,隊列依然存在,但消息已經消失,可見單單設置隊列的持久化而不設置消息持久化顯得毫無意義,所以通常列隊持久化會與消息持久化共同使用。
在 RabbitMQ 原生態的框架下,需要把信息屬性設置為 MessageProperties.PERSISTENT TEXT PLAIN 才會實現消息的持久化。
而在 Spring 框架下,由于在使用回調函數時需要把 Message 重新返回隊列再進行處理,所以 Message 默認已經是持久化的。
7.3 Exchage 的持久化
交換器持久化可通過構造函數 public DirectExchange(String name, boolean durable, boolean autoDelete)? 把?durable 參數置為 true 就可實現,而 autoDelete 則是指在所在消費者都解除訂閱的情況下自動刪除。如果交換器不設置持久化,那么在 RabbitMQ 服務重啟之后,相關的交換器元數據會丟失,不過消息不會丟失,只是消息不再發送到該 Exchange 。對一個長期使用的交換器來說,持久化還是有其必要性的。
本章總結
RabbitMQ 發展至今,被越來越多的人認可,這和它在易用性、擴展性、可靠性和高可用性等方面的卓著表現是密不可分的。
相比于傳統的 ActiveMQ 和分布式 Kafka,它具有自己獨有的特點。
希望文章有幫于大家對 RabbitMQ 消息隊列方面有更深入的了解,在不同的開發環境中靈活運用。
由于時間倉促,文章當中有不明確的地方或有錯漏敬請點明。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。