您好,登錄后才能下訂單哦!
本篇內容介紹了“rabbitmq的基本概念以及amqp-client的使用”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
package com.rabbitmq.demo; import com.rabbitmq.client.*; public class Produce { public static void main(String args[]) throws Exception{ ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setPassword("guest"); connectionFactory.setPassword("guest"); connectionFactory.setHost("192.168.1.141"); connectionFactory.setPort(5672); Connection connection=connectionFactory.newConnection(); Channel channel= connection.createChannel(); String exchangeName="test"; String queueName="test"; String routingkey="routingkey"; String bindingkey=routingkey; //設置交換機,直連,持久化,非自動刪除,參數 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,null); //channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,null); /** * BuiltinExchangeType.FANOUT 不需要綁定 bindingkey與routingkey exchange直接將消息投遞到queue * BuiltinExchangeType.DIRECT 需要綁定 且bindingkey與routingkey一致時 exchange投遞消息到queue * BuiltinExchangeType.TOPIC 需要綁定 模糊匹配,規則如下 * RoutingKey 為一個點號"."分隔的字符串(被點號"。"分隔開的每段獨立的字符,如com.rabbit.demo,com.rabbit.test) * BindingKey RoutingKey 樣也是點號"."分隔的字符串; * BindingKey 中可以存在兩種特殊字符串"*"和"#",用于做模糊匹配,其中"#"用于匹配多個(零個到多個),* 匹配任意一個單詞。 * 如果binding_key 是 “#” - 它會接收所有的Message,不管routing_key是什么,就像是fanout exchange。 * 如果 “*” and “#” 沒有被使用,那么topic exchange就變成了direct exchange。 * BuiltinExchangeType.HEADERS 不需要綁定 * 消費者arguments指定“x-match”,這個鍵的Value可以是any或者all,這代表消息攜帶的Hash是需要全部匹配(all),還是僅匹配一個鍵(any)就可以了 */ /** * 持久化:數據會保存到磁盤,重啟rabbitmq數據依舊存在 * * 排他:該隊列僅對首次聲明它的連接可見,并在連接斷開時自動刪除。 * 需要注意三點:排他隊列是基于連接( Connection) 可見的,同一個連接的不同信道 (Channel) * 是可以同時訪問同一連接創建的排他隊列; "首次"是指如果一個連接己經聲明了 * 排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同:即使該隊 * 列是持久化的,一旦連接關閉或者客戶端退出,該排他隊列都會被自動刪除,這種隊列 * 適用于一個客戶端同時發送和讀取消息的應用場景。 * * * 自動刪除:設置是否自動刪除。為 true 則設置隊列為自動刪除。自動刪除的前提是: * 至少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會 * 自動刪除。不能把這個參數錯誤地理解為: "當連接到此隊列的所有客戶端斷開時,這 * 個隊列自動刪除",因為生產者客戶端創建這個隊列,或者沒有消費者客戶端與這個隊 * 列連接時,都不會自動刪除這個隊列。 * * 根據業務數據,最好提前建好exchange,queue,及綁定關系,生產端,消費端可以避免很多錯誤,如exchang創建失敗, * 綁定關系不確定導致消息投遞失敗 * * rabbitmq clinet 屬性集 * props 消息的基本屬性集,其包含 14 個屬性成員,分別有 contentType * content ncoding headers Map<String Object>) deliveryMode priority * correlationld replyTo expiration messageld timestamp type userld * appld cluster 。 * * mandatory 參數設為 true 時,交換器無法根據自身的類型和路由鍵的隊列, * 那么 RabbitM 會調用 Basic.Return 命令將消息返回給生產者 數設置為 false 時,出現上述情形, * 則消息直接被丟棄 那么生產者如何獲取到沒有被正確路由到合適隊列的消息呢?這時channel addReturnListener 來添加 ReturnListener 監昕器實現。 * 通過rabbitmq備份隊列,當消息發送不到主隊列,將自動發送到備份隊列,設置了備份隊列,mandatory參數將無效 */ //設置隊列,持久化,非排他,非自動刪除 channel.queueDeclare(queueName,true,false,false,null); //channel.queueBind(queueName,exchangeName, bindingkey); for (int i = 0; i <10 ; i++) { channel.basicPublish(exchangeName,routingkey, MessageProperties.PERSISTENT_TEXT_PLAIN,"hi,Rabbitmq".getBytes() ); } channel.close(); connection.close(); } }
package com.rabbitmq.demo; import com.rabbitmq.client.*; import java.io.IOException; public class Consumer { public static void main(java.lang.String args[]) throws Exception{ ConnectionFactory connectionFactory=new ConnectionFactory(); connectionFactory.setPassword("guest"); connectionFactory.setPassword("guest"); connectionFactory.setHost("192.168.1.141"); connectionFactory.setPort(5672); Connection connection=connectionFactory.newConnection(); Channel channel= connection.createChannel(); /*while(true){ //單個獲取消息,當沒有消息返回null,pull模式 GetResponse response = channel.basicGet("test001", true); if(response!=null){ System.out.println(new String(response.getBody())); } }*/ //從queue中手動確認獲取消息,push模式 /** * 隊列名,自動簽收,默認消費者 * queue 隊列的名稱: * autoAck 設置是否自動確認。建議設成 fa se ,即不自動確認: * utoAck 參數置為 false ,對于 RabbitMQ 服務端而 ,隊列中的消息分成了兩個部分 * 部分是等待投遞給消費者的消息:一部分是己經投遞給消費者,但是還沒有收到消費者確認 * 信號的消息。 如果 RabbitMQ 直沒有收到消費者的確認信號,并且消費此消息的消費者己經 * 斷開連接,則 RabbitMQ 會安排該消息重新進入隊列,等待投遞給下 個消費者,當然也有可 * 能還是原來的那個消費者。 * RabbitMQ 會為未確認的消息設置過期時間,它判斷此消息是否需要重新投遞給消費者的 * 依據是消費該消息的消費者連接是否己經斷開,這么設計的原因是 RabbitMQ 允許消費者 * 消費 條消息的時間可以很久很久。 * consumerTag: 消費者標簽,用來區分多個消費者: * noLocal 設置為 true 則表示不能將同一個 Connectio口中生產者發送的消息傳送給 * 這個 Connection 中的消費者: * exclusive 設置是否排他 * arguments 設置消費者的其他參數: * callback 設置消費者的回調函數。用來處理 Rabb itM 推送過來的消息,比如 * DefaultConsumer 使用時需要客戶端重寫 (overr e) 其中的方法。 */ channel.basicConsume("test",false,new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者1:"+new java.lang.String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }); //創建多個消費者,默認為平均分攤round-robin //不支持隊列層面的廣播消費 /** * 上面代碼中顯式地設置 autoAck false 然后在接收到消息之后 行顯式 ack * (channel basicAck ), 對于消費者來說這 設置是非常 可以防止 * 丟失。 * * */ channel.basicConsume("queue",false,new DefaultConsumer(channel){ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消費者2:"+new java.lang.String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }); //單個拒絕接受消息 //channel.basicReject(111, false); /** * 多個拒絕消息 */ // channel.basicNack(); /** * channel.basicReject 或者 channel.basicNack 中的 requeue 設直為 false ,可 * 以啟用"死信隊列"的功能。死信隊列可以通過檢測被拒絕或者未送達的消息來追蹤問題 */ /*** * Basic Consume 將信道 (Channel) 直為接收模式,直到取消隊列的訂閱為止。在接收 * 模式期間, RabbitMQ 會不斷地推送消息給消費者,當然推送消息的個數還是會受到 Basic.Qos * 的限制.如果只想從隊列獲得單條消息而不是持續訂閱,建議還是使用 Basic.Get 進行消費.但 * 是不能將 Basic.Get 放在一個循環里來代替 Basic.Consume ,這樣做會嚴重影響 RabbitMQ * 的性能.如果要實現高吞吐量,消費者理應使用 Basic.Consume 方法。 * * */ } }
“rabbitmq的基本概念以及amqp-client的使用”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。