您好,登錄后才能下訂單哦!
本篇文章給大家分享的是有關 RpcClient發送消息和同步接收消息原理是什么,小編覺得挺實用的,因此分享給大家學習,希望大家閱讀完這篇文章后可以有所收獲,話不多說,跟著小編一起來看看吧。
本身使用RpcClient發送消息與同步接收消息的代碼是很簡單的,如下:
RpcClient client = new RpcClient(channel, exchange, routingKey);
String msg = "hello world!";
byte[] result = client.primitiveCall(msg.getBytes());
這里的primitiveCall調用后,當前線程會進行同步等待,等待消息接收端給自己的回復消息
一個完整的發送消息與接收回復消息的圖例:
整個流程詳解:
l 創建RpcClient
RpcClient client = new RpcClient(channel, exchange, routingKey);
創建RpcClient時會做兩件事:
A:創建一個回復queue,接收當前RpcClient發送的消息的消息接收人會將回復消息發到這個replyQueue上供當前RpcClient去接收回復消息
_replyQueue = setupReplyQueue();
protected String setupReplyQueue() throws IOException {
return _channel.queueDeclare("", false, false, true, true, null).getQueue();
//這里實際上是由rabbitmq server去定義一個唯一的queue(因為queueName是空的,所以是由server去生成queueName),最后返回這個queueName,queueName是由server生成的,使用的是以下這個方法:
Queue.DeclareOk queueDeclare(String queueName, boolean passive, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments)
}
B:創建一個接收回復消息的consumer
_consumer = setupConsumer();
protected DefaultConsumer setupConsumer() throws IOException {
//創建一個接收消息的DefaultConsumer實例
DefaultConsumer consumer = new DefaultConsumer(_channel) {
@Override //發生shutdown的時候回調
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException signal) {
synchronized (_continuationMap) {
for (Entry<String, BlockingCell<Object>> entry : _continuationMap.entrySet()) {
entry.getValue().set(signal);
}
_consumer = null;
}
}
@Override //處理消息交付
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
//這部分就是和下面的代碼一起協作來實現將異步接收強制變成同步接收
synchronized (_continuationMap) {
String replyId = properties.getCorrelationId();
BlockingCell<Object> blocker = _continuationMap.get(replyId);
_continuationMap.remove(replyId);
blocker.set(body);
}
}
};
//讓接收消息的consumer去replyQueue上去接收消息,這個過程對于主線程來說是異步進行的,只要replyQueue上有消息了,consumer就會去replyQueue上去接收消息,并回調它的handleDelivery方法
_channel.basicConsume(_replyQueue, true, consumer);
return consumer;
}
l 發送消息
byte[] result = rpcClient.primitiveCall(msg.getBytes());
使用rpcClient的primitiveCall發送消息,看看是怎么做的
public byte[] primitiveCall(byte[] message) throws IOException, ShutdownSignalException {
return primitiveCall(null, message);
}
繼續跟蹤,核心方法是這個
public byte[] primitiveCall(AMQP.BasicProperties props, byte[] message) throws IOException, ShutdownSignalException{
//檢查consumer是否為空,若為空,拋出異常
checkConsumer();
BlockingCell<Object> k = new BlockingCell<Object>();
synchronized (_continuationMap) {
_correlationId++;
String replyId = "">
//如果props不為空,則將上一步驟創建的replyQueue設置到props上去,還有replyId
if (props != null) {
props.setCorrelationId(replyId);
props.setReplyTo(_replyQueue);
}
else {
//如果props為空,則創建一個,并將replyId和replyQueue都設置到props上
props = new AMQP.BasicProperties(null, null, null, null,
null, replyId,
_replyQueue, null, null, null,
null, null, null, null);
}
_continuationMap.put(replyId, k);
}
//使用上面的props發送消息,這樣replyQueue和replyId就跟著傳遞到了接收消息的那一方去了,接收消息的client去props上去取到replyQueue,它就知道了它接收的消息的回復queue,然后它會將回復消息發送到replyQueue上去,而在上一步驟我們已經指定了一個consumer去replyQueue上去取消息,所以整個發送和接收消息的所有client是有條不紊的進行著
publish(props, message); //這行代碼執行完后,只是將消息發送出去了,接收回復消息是異步的,由上一步驟的consumer去接收回復消息
//這里就是進行同步等待接收回復消息,將異步接收變成同步回復接收的核心就在這里
Object reply = k.uninterruptibleGet();
if (reply instanceof ShutdownSignalException) {
ShutdownSignalException sig = (ShutdownSignalException) reply;
ShutdownSignalException wrapper =
new ShutdownSignalException(sig.isHardError(),
sig.isInitiatedByApplication(),
sig.getReason(),
sig.getReference());
wrapper.initCause(sig);
throw wrapper;
} else {
return (byte[]) reply;
}
}
完整描述
創建RpcClient實例:
1,定義一個Map,用于存放每個消息的相關信息:
private final Map<String, BlockingCell<Object>> _continuationMap = new HashMap<String, BlockingCell<Object>>();
Key是一個correlationId,相當于當前rpcClient實例發送消息的一個計數器,初始化時是0,每發送一個消息時,加1
Value是一個com.rabbitmq.utility.BlockingCell對象,它是在發送消息前創建,并和當前的correlationId進行關聯,放進來
_continuationMap.put(correlationId, blockingCell);
2,correlationId初始化為0
3,創建一個回復queue,replyQueue=channel.queueDeclare("", false, false, true, true, null).getQueue();
4,創建一個接收回復消息的consumer
5,指定consumer接收replyQueue上的消息,channel.basicConsume(replyQueue, true, consumer);
RpcClient發送消息:
1,創建一個BlockingCell<Object>對象blockingCell
1,correlationId++
2,創建BasicProperties對象,并將correlationId,replyQueue設置到它上面,發送消息時,它會被傳遞到接收方
3,以correlationId為Key,將blockingCell放入到_continuationMap中
4,發送消息:channel.basicPublish(exchange, routingKey, 上面 步驟得到的BasicProperties對象, message);
5,獲取回復消息,Object reply = blockingCell.uninterruptibleGet();這里就是同步等待回復消息
RpcServer接收消息:
1,接收消息
2,從request中獲取BasicProperties對象requestProperties,requestProperties=request.getProperties()
3,從requestProperties中得到correlationId,replyQueue
4,創建一個回復消息用的BasicProperties對象replyProperties,并將correlationId設置到它上面
4,發送回復消息:channel.basicPublish("", replyQueue, replyProperties, replyMessage);
RpcClient接收回復:
1,replyQueue一有消息,consumer就會接收到并回調consumer的handleDelivery方法
2,獲取傳遞過來的BasicProperties獲取correlationId
3,根據correlationId去continuationMap中取BlockingCell對象,BlockingCell<Object> blocker = continuationMap.get(correlationId);
4,從continuationMap中刪除,continuationMap.remove(correlationId);
5,將回復消息設置到blocker對象里面,blocker.set(replyMessage);
同步等待回復消息:
1,【RpcClient發送消息】第4步主線程,發送消息后,第5步就去獲取回復消息
2,【RpcClient發送消息】第5步主線程,blockingCell.uninterruptibleGet(),如果blockingCell沒有被set(value)過,那么讓當前主線程處于等待wait(),等待狀態
3,【RpcClient接收回復】第5步blocker.set(replyMessage);這里的blocker其實就是上面主線程創建的blockingCell,因為它是根據correlationId去continuationMap中取的,set(replyMessage),blocker會用一個屬性將replyMessage保存起來,供get的時候去返回這個屬性,然后調用notify();喚醒處于等待的主線程(當前這步所在的線程和上一步主線程是在兩個線程,所以主線程的等待是可以被這個線程喚醒的),主線程被喚醒后,get()就會取到replyMessage,最終整個步驟實現了將異步接收強制轉換為同步等待接收
BlockingCell類
public class BlockingCell<T> {
private boolean _filled = false;
private T _value;
private static final long NANOS_IN_MILLI = 1000 * 1000;
private static final long INFINITY = -1;
public BlockingCell() {
}
public synchronized T get() throws InterruptedException {
while (!_filled) { //如果value沒有被設置過
wait(); //讓當前線程處于等待,直到其它線程調用當前對象的notify()或notifyAll()為止
}
return _value;
}
//帶超時的get
public synchronized T get(long timeout) throws InterruptedException, TimeoutException {
if (timeout < 0 && timeout != INFINITY)
throw new AssertionError("Timeout cannot be less than zero");
if (!_filled && timeout != 0) {
wait(timeout == INFINITY ? 0 : timeout);
}
if (!_filled)
throw new TimeoutException();
return _value;
}
//無限制的等待,直到取到值為止
public synchronized T uninterruptibleGet() {
while (true) {
try {
return get();
} catch (InterruptedException ex) {
}
}
}
public synchronized T uninterruptibleGet(int timeout) throws TimeoutException {
long now = System.nanoTime() / NANOS_IN_MILLI;
long runTime = now + timeout;
do {
try {
return get(runTime - now);
} catch (InterruptedException e) {
}
} while ((timeout == INFINITY) || ((now = System.nanoTime() / NANOS_IN_MILLI) < runTime));
throw new TimeoutException();
}
public synchronized void set(T newValue) {
if (_filled) {
throw new AssertionError("BlockingCell can only be set once");
}
_value = newValue;
_filled = true;
notify(); //喚醒當前線程(處于等待狀態)
}
//保證只能被set(value)一次
public synchronized boolean setIfUnset(T newValue) {
if (_filled) {
return false;
}
set(newValue);
_filled = true;
return true;
}
}
以上就是 RpcClient發送消息和同步接收消息原理是什么,小編相信有部分知識點可能是我們日常工作會見到或用到的。希望你能通過這篇文章學到更多知識。更多詳情敬請關注億速云行業資訊頻道。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。