您好,登錄后才能下訂單哦!
這篇文章主要介紹了RabbitMQ如何實現RPC遠程調用消息隊列,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
客戶端接口
我們創建一個客戶端類來說明如何使用RPC服務,暴露一個call方法來發送RPC請求和數據獲取結果。
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient(); String result = fibonacciRpc.call("4"); System.out.println( "fib(4) is " + result);
盡管RPC是編程中一種常見的模式,但其也常常飽受批評。因為程序員常常不知道調用的方法是本地方法還是一個RPC方法,這在調試中常常增加一些不必要的復雜性。我們應該簡化代碼,而不是濫用RPC導致代碼變的臃腫。
回調隊列
一般來說,通過RabbitMQ實現RPC非常簡單,客戶端發送一個請求消息,服務端響應消息就完成了。為了接收到響應內容,我們在請求中發送”callback“隊列地址,也可以使用默認的隊列。
callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder().replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes());
AMQP協議中預定了14個消息屬性,除了下面幾個,其它的都很少使用:
deliveryMode : 標識消息是持久化還是瞬態的。
contentType : 描述 mime-type的編碼類型,如JSON編碼為”application/json“。
replyTo : 通常在回調隊列中使用。
correlationId : 在請求中關聯RPC響應時使用。
關聯Id(Correlation Id)
在前面的方法中,要求在每個RPC請求創建回調隊列,這可真是一件繁瑣的事情,但幸運的是我們有個好方法-在每個客戶端創建一個簡單的回調隊列。
這樣問題又來了,隊列如何知道這些響應來自哪個請求呢?這時候correlationId就出場了。我們在每個請求中都設置一個唯一的值,這樣我們在回調隊列中接收消息的時候就能知道是哪個請求發送的。如果收到未知的correlationId,就廢棄該消息,因為它不是我們發出的請求。
你可能會問,為什么拋棄未知消息而不是拋出錯誤呢?這是由服務器競爭資源所導致的。盡管這不太可能,試想一下,如果RPC服務器在發送完響應后而在發送應答消息前死掉了,重啟RPC服務器會重新發送請求。這就是我們在客戶機上優雅地處理重復的反應,RPC應該是等同的。
(1)客戶端啟動,創建一個匿名且唯一的回調隊列。
(2)對每個RPC請求,客戶端發送一個包含replyTo和correlationId兩個屬性的消息。
(3)請求發送到rpc_queue隊列。
(4)RPC服務在隊列中等待請求,當請求出現時,根據replyTo字段使用隊列將結果發送到客戶端。
(5)客戶端在回調隊列中等待數據。當消息出現時,它會檢查correlationId屬性,如果該值匹配的話,就會返回響應結果給應用。
示例代碼
RPCServer.java
package com.favccxx.favrabbit; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) { Connection connection = null; Channel channel = null; try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(RPC_QUEUE_NAME, false, consumer); System.out.println(" [x] Awaiting RPC requests"); while (true) { String response = null; QueueingConsumer.Delivery delivery = consumer.nextDelivery(); BasicProperties props = delivery.getProperties(); BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()) .build(); try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response = "" + fib(n); } catch (Exception e) { System.out.println(" [.] " + e.toString()); response = ""; } finally { channel.basicPublish("", props.getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } catch (Exception e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (Exception ignore) { } } } } }
RPCClient.java
package com.favccxx.favrabbit; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; import com.rabbitmq.client.AMQP.BasicProperties; import java.util.UUID; public class RPCClient { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; private String replyQueueName; private QueueingConsumer consumer; public RPCClient() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); replyQueueName = channel.queueDeclare().getQueue(); consumer = new QueueingConsumer(channel); channel.basicConsume(replyQueueName, true, consumer); } public String call(String message) throws Exception { String response = null; String corrId = UUID.randomUUID().toString(); BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery.getProperties().getCorrelationId().equals(corrId)) { response = new String(delivery.getBody(), "UTF-8"); break; } } return response; } public void close() throws Exception { connection.close(); } public static void main(String[] argv) { RPCClient fibonacciRpc = null; String response = null; try { fibonacciRpc = new RPCClient(); System.out.println(" [x] Requesting fib(30)"); response = fibonacciRpc.call("30"); System.out.println(" [.] Got '" + response + "'"); } catch (Exception e) { e.printStackTrace(); } finally { if (fibonacciRpc != null) { try { fibonacciRpc.close(); } catch (Exception ignore) { } } } } }
先啟動RPCServer,然后運行RPCClient,控制臺輸出如下內容
RPCClient[x] Requesting fib(30) RPCClient[.] Got '832040' RPCServer[x] Awaiting RPC requests RPCServer[.] fib(30) |
感謝你能夠認真閱讀完這篇文章,希望小編分享的“RabbitMQ如何實現RPC遠程調用消息隊列”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關注億速云行業資訊頻道,更多相關知識等著你來學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。