您好,登錄后才能下訂單哦!
這篇文章將為大家詳細講解有關RabbitMq的5種開發方案是什么,文章內容質量較高,因此小編分享給大家做個參考,希望大家閱讀完這篇文章后對相關知識有一定的了解。
windows安裝rabbitmq
mac安裝rabbitmq 需要注意的是,mac安裝rabbitmq,啟動的時候命令前,需要加 sudo,不然會報錯誤
2.1 Producer(生產者)
2.2 Consumer(消費者)
2.3 Exchange(交換機)
2.4 Queue(隊列)
2.5 rountingKey(交換機與隊列之間的關系)
官網的6中模式,可以點開這個網址,顯示6中模式,第6中模式RPC遠程調用我們不需要用該模式,所以我們只要關注前五種就可以了。
接下來我們就直接簡單教學rabbitmq的簡單使用
/** * 聲明隊列,五個參數列表,如果直接使用默認channel.queueDeclare("queue"),那么其他參數都會自動默認設置屬性,所以一般我們幾乎都默認它 * String queue 隊列名稱 * boolean durable 隊列是需要持久化,意思就是rabbitmq重啟的時候,如果不是持久化,那么該隊列就會消失,默認true * boolean exclusive 如果你想創建一個只有自己可見的隊列,不允許其它用戶訪問RabbitMQ允許你將一個Queue聲明成為排他性的,只對首次聲明它的連接(Connection)可見,會在其連接斷開的時候自動刪除。所以我們開發中一般不需要此操作,默認false * boolean autoDelete 消息是需要持久化,意思就是rabbitmq重啟的時候,如果為true,那么關閉期間接受的消息就會自動消失,默認false * Map<String, Object> arguments 這個參數我們只會在使用延遲隊列中才會用到,就是延遲隊列的相關配置等屬性 * 方法channel.queueDelete("queue")等同于channel.queueDeclare("queue",true,false,false,null); */ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/** * 綁定隊列到交換機 * String queue 隊列名 * String exchange 交換機名 * String routingKey 綁定關系 如 大頭兒子,小頭爸爸 他們的rountingKey就是父子 */ channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
版本隨意,本博客任何版本高版本也行。
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.4.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.3.2</version> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.0.RELEASE</version> </dependency>
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; public class ConnectionUtil { public static Connection getConnection() throws Exception { //定義連接工廠 ConnectionFactory factory = new ConnectionFactory(); //設置服務地址 factory.setHost("localhost"); //端口 factory.setPort(5672); //設置賬號信息,用戶名、密碼、vhost factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); // 通過工程獲取連接 Connection connection = factory.newConnection(); return connection; } }
理解:該rabbit服務器只有一個單一的隊列
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME = "test_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); // 從連接中創建通道 Channel channel = connection.createChannel(); // 聲明(創建)隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 消息內容 String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); //關閉通道和連接 channel.close(); connection.close(); } }
public class Recv { private final static String QUEUE_NAME = "mujiutian_queue"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); //創建channel Channel channel = connection.createChannel(); //創建隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); //監聽隊列,發送消息 channel.basicConsume(QUEUE_NAME, true, consumer); // 獲取消息,該線程一直進行 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("獲取到消息"+message); } } }
先打開rece的main函數,這樣就可以執行send發送消息。
理解:rabbit服務器有一個exchange(交換機),該交換機下有兩個隊列,總共發送了100條消息,A隊列效率高可以搶到80條消息給消費者,B隊列只能搶到20條消息給發送者,他們的總和是100條,為工作模式。
先執行,先執行消費者main函數,在看結果圖:
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { // 消息內容 String message = "" + i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(10); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("收到消息'" + message + "'"); // 返回確認狀態 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv2 { private final static String QUEUE_NAME = "test_queue_work"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成狀態 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
如圖:
// 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1);
跟這個有關系,這也就是我們所說的工作模式
理解:就是群發,該交換機下的所有隊列,都會接受相同的所有交換機發來的消息,類似于qq群一樣
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); // 消息內容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue_work"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv2 { private final static String QUEUE_NAME = "test_queue_work2"; private final static String EXCHANGE_NAME = "test_exchange_fanout"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
理解就是:你是人,如果性別是女,請進女廁,是男性,請進男廁,如同,一個交換機下面的多個隊列,根據rountingKey判斷該接受的消息
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); // 消息內容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue_work"; private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key"); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv2 { private final static String QUEUE_NAME = "test_queue_work2"; private final static String EXCHANGE_NAME = "test_exchange_direct"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2"); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
理解就是:路徑aas.# A隊列接受這種所有路徑,B隊列接受路徑下的所有隊列aab.#
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; public class Send { private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明exchange channel.exchangeDeclare(EXCHANGE_NAME, "topic"); // 消息內容 String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv { private final static String QUEUE_NAME = "test_queue_topic_work"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*"); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.QueueingConsumer; public class Recv2 { private final static String QUEUE_NAME = "test_queue_topic_work2"; private final static String EXCHANGE_NAME = "test_exchange_topic"; public static void main(String[] argv) throws Exception { // 獲取到連接以及mq通道 Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 綁定隊列到交換機 channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*"); // 同一時刻服務器只會發一條消息給消費者 channel.basicQos(1); // 定義隊列的消費者 QueueingConsumer consumer = new QueueingConsumer(channel); // 監聽隊列,手動返回完成 channel.basicConsume(QUEUE_NAME, false, consumer); // 獲取消息 while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); Thread.sleep(10); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } }
關于RabbitMq的5種開發方案是什么就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。