您好,登錄后才能下訂單哦!
這篇文章主要介紹“RocketMQ producer同步發送和單向發送源碼分析”的相關知識,小編通過實際案例向大家展示操作過程,操作方法簡單快捷,實用性強,希望這篇“RocketMQ producer同步發送和單向發送源碼分析”文章能幫助大家解決問題。
RocketMQ生產者發送消息分為三種模式,分別是同步發送,異步發送和單向發送。
單向發送,這個就是發送之后不用接收結果的,就是你發出去一個消息,然后就返回了,就算有結果返回也不會接收了,這是站在消息生產者的角度;
同步發送的話,就是發出去一個消息,這個線程要等著它返回消息發送結果,然后你這個線程再根據這個消息發送結果再做一些業務操作等等;
異步發送,這個就是在你發送消息之前要給一個callback,發送的時候,你這個線程就不用等著,該干什么就干什么,然后發送結果回來的時候,是由其他線程調用你這個callback來處理的,你可以把這個callback看作是一個回調函數,回調方法,這個方法里面的業務邏輯就是你對這個消息發送結果的處理。注意,本文介紹的消息發送只是普通的消息發送,那種事務類型的消息,我們以后會有介紹。
producer同步發送消息的示例在org.apache.rocketmq.example.simple.Producer類中,代碼如下:
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { // 1. 創建 DefaultMQProducer 對象 DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.setNamesrvAddr("127.0.0.1:9876"); /* * Launch the instance. */ // todo 2. 啟動 producer producer.start(); for (int i = 0; i < 1000; i++) { try { Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 3. 發送消息 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } ... } producer.shutdown(); } }
我們可以看到這個代碼,你是同步消息你是需要在你自己的業務線程里面接收這個sendResult的,然后在做一些業務處理,比如我這里就是打印了一下這個sendResult。
接下來我們看下它是怎樣發送的,這里是調用了這個producer的send方法。
@Override public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // topic 和消息長度 校驗 Validators.checkMessage(msg, this); msg.setTopic(withNamespace(msg.getTopic())); // todo return this.defaultMQProducerImpl.send(msg); }
我們可以看到,這個 DefaultMQProducer 將這個消息給了defaultMQProducerImpl 這個實現的send方法來處理了。
public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // todo 默認超時時間3s return send(msg, this.defaultMQProducer.getSendMsgTimeout()); }
defaultMQProducerImpl的send方法,加了個超時時間 ,然后有調用它的重載方法send(msg,timeout)
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // todo 同步模式 return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout); }
這個send(msg,timeout)又調用了sendDefaultImpl 方法,然后他這里加了個通信模式是同步,CommunicationMode.SYNC。
sendDefaultImpl 方法就比較長了了我們分成幾部分來介紹:
private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { // 判斷狀態是否是running this.makeSureStateOK(); // 檢查消息合法性 Validators.checkMessage(msg, this.defaultMQProducer); // 隨機的invokeID final long invokeID = random.nextLong(); long beginTimestampFirst = System.currentTimeMillis(); long beginTimestampPrev = beginTimestampFirst; long endTimestamp = beginTimestampFirst; // todo 獲取topic信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); ... }
這一小段代碼其實就是做了一些準備檢查工作,注意第二行的個檢查消息合法性,它要檢查你topic,消息長度的,你不能發空消息,消息長度也不能太長,默認是不超過4m,接下來這些就是記錄一下時間了,再看最后一行,就是根據你這個消息發送的topic,然后獲取topic 發送消息的這么一個信息,這里面就有這topic 有幾個MessageQueue,然后每個MessageQueue對應在哪個broker上面,broker 的地址又是啥的,它這個方法會先從本地的一個緩存中獲取下,沒有的話就從nameserv更新下這個本地緩存,再找找,要是再找不到,它就認為你沒有這個topic了,然后就去nameserv上面拉取一個默認topic的一些配置信息給你用(這個其實就是在新建一個topic)。 接著這個方法往下看,接著就是判斷 這個TopicPublishInfo 是否存在了,如果不存在的話就拋出異常了,沒有后續了就,如果存在的話:
... if (topicPublishInfo != null && topicPublishInfo.ok()) { boolean callTimeout = false; MessageQueue mq = null; Exception exception = null; SendResult sendResult = null; // 重試次數 區分同步、其他 int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1; int times = 0; // 存放發送過的broker name String[] brokersSent = new String[timesTotal]; // 重試發送 for (; times < timesTotal; times++) { String lastBrokerName = null == mq ? null : mq.getBrokerName(); // todo 選擇message queue MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName); if (mqSelected != null) { mq = mqSelected; brokersSent[times] = mq.getBrokerName(); try { beginTimestampPrev = System.currentTimeMillis(); if (times > 0) { //Reset topic with namespace during resend. msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic())); } long costTime = beginTimestampPrev - beginTimestampFirst; if (timeout < costTime) { callTimeout = true; break; } // todo 進行發送 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime); endTimestamp = System.currentTimeMillis(); // todo isolation 參數為false(看一下異常情況) this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false); switch (communicationMode) { case ASYNC: return null; case ONEWAY: return null; case SYNC: if (sendResult.getSendStatus() != SendStatus.SEND_OK) { if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) { continue; } } return sendResult; default: break; } } catch (RemotingException e) { endTimestamp = System.currentTimeMillis(); this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true); log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e); log.warn(msg.toString()); ...
其實下面還有許多處理異常的操作沒有放上,不過不影響我們的主流程,先是判斷你這個通信模式,如果是同步的話,默認重試次數就是2 ,然后加上本身這次請求,也就是最查請求3次。這個for循環就是失敗重試的代碼,再看下代碼selectOneMessageQueue這個就是選擇一個MesssageQueue的方法了,這個是比較重要的,這里我們先不說,你可以把它理解為 我們的負載均衡。接著往下走,就是判斷一下時間了,計算一下剩下的時間, 如果這一堆前面的內容耗時很長,然后已經超了之前設置的默認超時時間,這個時候就會超時了,然后將這個calltimeout設置成true了。
接著就是進行發送了調用sendKernelImpl 方法:
private SendResult sendKernelImpl(final Message msg, final MessageQueue mq, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); // 根據MessageQueue獲取Broker的網絡地址 String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); if (null == brokerAddr) { tryToFindTopicPublishInfo(mq.getTopic()); brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); } SendMessageContext context = null; ...
這個sendKernelImpl 也是有點長,然后我們一部分一部分的看下,這就是根據MessageQueue里面的broker name 獲取一下broker addr,他這個broker addr 選的是master的,比如說我們 broker使用的是 master/slave 高可用架構,這個時候只會選擇那個master,畢竟是往里面寫消息,然后只能用master,等到介紹消息消費者的時候,消息消費者是可以向slave node 獲取消息消費的,前提是 master 負載比較大,然后消息消費者下次獲取消費的消息已經在slave里面了,然后消息消費者獲取到消息之后,它里面有個字段是告訴你下次可以去xxx 地址的broker 拉取消息,這個我們介紹到消息消費者的時候再說。
接著回來,如果沒有獲取到這個broker 地址的話,就是去nameserv上更新下本地緩存,然后再獲取下。接著再往下就是再次判斷一下這個broker addr 了,如果還沒有就拋出異常,如果有的話 就執行下面的代碼了:
... SendMessageContext context = null; if (brokerAddr != null) { brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr); byte[] prevBody = msg.getBody(); try { //for MessageBatch,ID has been set in the generating process // 給消息設置全局唯一id, 對于MessageBatch在生成過程中已設置了id if (!(msg instanceof MessageBatch)) { MessageClientIDSetter.setUniqID(msg); } boolean topicWithNamespace = false; if (null != this.mQClientFactory.getClientConfig().getNamespace()) { msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace()); topicWithNamespace = true; } int sysFlag = 0; // 消息體是否壓縮 boolean msgBodyCompressed = false; // 壓縮消息 內容部分超了4k就會壓縮 if (this.tryToCompressMessage(msg)) { sysFlag |= MessageSysFlag.COMPRESSED_FLAG; msgBodyCompressed = true; } final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (tranMsg != null && Boolean.parseBoolean(tranMsg)) { sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE; } // 判斷有沒有hook if (hasCheckForbiddenHook()) { CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext(); checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr()); checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup()); checkForbiddenContext.setCommunicationMode(communicationMode); checkForbiddenContext.setBrokerAddr(brokerAddr); checkForbiddenContext.setMessage(msg); checkForbiddenContext.setMq(mq); checkForbiddenContext.setUnitMode(this.isUnitMode()); // 執行Forbidden 鉤子 this.executeCheckForbiddenHook(checkForbiddenContext); } ...
第一句,這個其實就是進行一個vip通道地址的轉換,這個比較有意思,如果你這個支持vip channel的話,它會把broker addr 里面的端口改變一下,這個所謂的vip channel ,其實就是與它的另一個端口建立連接,這個端口就是當前端口-2 ;
接著,如果這個消息不是批量消息的話,我們就給這個消息設置一個唯一的消息id,再往下就是 sysflag的處理了,這個sysflag里面記錄了好幾個屬性值,使用二進制來處理的,比如說消息是否壓縮了(這個壓縮,就是你消息內容超過了默認的4k之后,就會進行壓縮,這個壓縮的閾值你是可以配置的),是否是個事務消息等等。 接下來就是執行hook了,這個hook就是forbidenHook ,其實就是對消息進行過濾。
... if (this.hasSendMessageHook()) { context = new SendMessageContext(); context.setProducer(this); context.setProducerGroup(this.defaultMQProducer.getProducerGroup()); context.setCommunicationMode(communicationMode); context.setBornHost(this.defaultMQProducer.getClientIP()); context.setBrokerAddr(brokerAddr); context.setMessage(msg); context.setMq(mq); context.setNamespace(this.defaultMQProducer.getNamespace()); String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); if (isTrans != null && isTrans.equals("true")) { context.setMsgType(MessageType.Trans_Msg_Half); } if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) { context.setMsgType(MessageType.Delay_Msg); } this.executeSendMessageHookBefore(context); } // 封裝消息頭 SendMessageRequestHeader requestHeader = new SendMessageRequestHeader(); // 設置group requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); // topic requestHeader.setTopic(msg.getTopic()); // 設置默認topic requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey()); // 設置默認topic的隊列數量 默認4個 requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums()); // 隊列id requestHeader.setQueueId(mq.getQueueId()); // 消息系統標記 requestHeader.setSysFlag(sysFlag); // 消息發送時間 requestHeader.setBornTimestamp(System.currentTimeMillis()); // 消息標記(RocketMQ對消息標記不做任何處理,供應用程序使用) requestHeader.setFlag(msg.getFlag()); // 設置擴展屬性 requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties())); requestHeader.setReconsumeTimes(0); requestHeader.setUnitMode(this.isUnitMode()); // 是否批量 requestHeader.setBatch(msg instanceof MessageBatch); // 判斷消息是否是 %RETRY% 開頭 if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { String reconsumeTimes = MessageAccessor.getReconsumeTime(msg); if (reconsumeTimes != null) { requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME); } String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg); if (maxReconsumeTimes != null) { requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes)); MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES); } } ...
在往下就是執行一下發送消息之前的hook,再往下就是封裝發送消息請求頭,然后這個請求頭里面就涵蓋了很多的參數,比如說topic,MessageQueue 隊列Id, 出生日期,flag等等。再往下就是消息發送了
... SendResult sendResult = null; // 同步 異步 單向 switch (communicationMode) { // 異步 case ASYNC: Message tmpMessage = msg; boolean messageCloned = false; if (msgBodyCompressed) { //If msg body was compressed, msgbody should be reset using prevBody. //Clone new message using commpressed message body and recover origin massage. //Fix bug:https://github.com/apache/rocketmq-externals/issues/66 tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; msg.setBody(prevBody); } if (topicWithNamespace) { if (!messageCloned) { tmpMessage = MessageAccessor.cloneMessage(msg); messageCloned = true; } msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace())); } // 判斷超時時間 long costTimeAsync = System.currentTimeMillis() - beginStartTime; if (timeout < costTimeAsync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } // todo sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), tmpMessage, requestHeader, timeout - costTimeAsync, communicationMode, sendCallback, topicPublishInfo, this.mQClientFactory, this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(), context, this); break; // 單向 case ONEWAY: // 同步 case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; // 判判是否超時 if (timeout < costTimeSync) { throw new RemotingTooMuchRequestException("sendKernelImpl call timeout"); } // todo 交給 mq api去發送消息 sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage( brokerAddr, mq.getBrokerName(), msg, requestHeader, timeout - costTimeSync, communicationMode, context, this); break; default: assert false; break; } // 是否注冊了消息發送鉤子函數 if (this.hasSendMessageHook()) { context.setSendResult(sendResult); this.executeSendMessageHookAfter(context); } ...
因為本小節主要是介紹下這個同步發送消息,然后我們就主要介紹下這個sync的代碼邏輯: 首先是判斷超時,然后交給 MQClientAPI層去處理,然后返回sendResult。
我們這里接著看下MQClientAPIImpl里面的sendMessage 實現:
public SendResult sendMessage( final String addr, final String brokerName, final Message msg, final SendMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final SendCallback sendCallback, final TopicPublishInfo topicPublishInfo, final MQClientInstance instance, final int retryTimesWhenSendFailed, final SendMessageContext context, final DefaultMQProducerImpl producer ) throws RemotingException, MQBrokerException, InterruptedException { long beginStartTime = System.currentTimeMillis(); RemotingCommand request = null; String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE); boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG); if (isReply) { if (sendSmartMsg) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2); } else { request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader); } } else { // sendSmartMsg默認開啟,也算一種優化吧 批量消息 if (sendSmartMsg || msg instanceof MessageBatch) { SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader); request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2); } else { // 普通消息 request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader); } } // 設置消息體 request.setBody(msg.getBody()); switch (communicationMode) { case ONEWAY: // todo this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; case ASYNC: final AtomicInteger times = new AtomicInteger(); long costTimeAsync = System.currentTimeMillis() - beginStartTime; // 判斷超時時間 if (timeoutMillis < costTimeAsync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } // todo this.sendMessageAsync(addr, brokerName, msg, timeoutMillis - costTimeAsync, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, context, producer); return null; case SYNC: long costTimeSync = System.currentTimeMillis() - beginStartTime; // 判斷超時時間 if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } // todo 同步發送 return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); default: assert false; break; } return null; }
這里先生成一個RemotingCommand 這么個實體對象,然后RequestCode就是SEND_MESSAGE,其實這里判斷了一下sendSmartMsg 這個參數,把requestHeader優化了一下,然后換成了requestHeaderV2,其實這個requestHeaderV2 內容跟requestHeader一樣,但是變量名是單個字母的,然后序列化,反序列化,傳輸內容都有所優化,其實他這個序列化使用是json形式的,然后想想就知道有些哪些好處了, 唯一的缺點就是可讀性差點,但是這個玩意是對用戶透明的,用戶不需要關心。
接著就是判斷通信類型,然后發送消息了,這里是同步發送,先是判斷一下超時時間,接著就是調用sendMessageSync 進行同步發送了,我們接著來看下這個sendMessageSync 方法實現。
private SendResult sendMessageSync( final String addr, final String brokerName, final Message msg, final long timeoutMillis, final RemotingCommand request ) throws RemotingException, MQBrokerException, InterruptedException { // todo 同步調用 RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis); assert response != null; // 處理響應 return this.processSendResponse(brokerName, msg, response,addr); }
這里就調用到了client 模塊(這個client其實就是直接操作netty了)來處理了,然后返回響應,調用processSendResponse 方法來處理響應。
我們再來看下client的 invokeSync 方法:
public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { // 開始時間 long beginStartTime = System.currentTimeMillis(); // todo 輪詢獲取namesrv地址Channel final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { // 執行開始之前的rpchook doBeforeRpcHooks(addr, request); long costTime = System.currentTimeMillis() - beginStartTime; // 判斷超時 之前有獲取鏈接的操作,可能會出現超時的情況 if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } // todo 進行同步執行,獲取響應 RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); // 執行之后的rpchook doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response); return response; // 遠程發送請求異常 } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); // 關閉channel this.closeChannel(addr, channel); throw e; // 超時異常 } catch (RemotingTimeoutException e) { // 如果超時 就關閉cahnnel話,就關閉channel 默認是不關閉的 if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
這里有兩個點需要關注下,首先是根據broker addr 這個地址獲取一下對應的channel ,如果不存在的話就創建一下這個連接, 稍微看下這塊的代碼:
private Channel getAndCreateChannel(final String addr) throws RemotingConnectException, InterruptedException { // 如果地址不存在,就返回namesrv 的channel if (null == addr) { return getAndCreateNameserverChannel(); } ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } // 創建channel return this.createChannel(addr); }
如果你這個addr是空的話,這個就是默認找nameserv的addr ,然后找對應channel就可以了,如果不是null ,然后它會去這個channelTable 這個map中去找,如果沒有的話就創建一個對應的channel
接著回到這個invokeSync 方法中,獲得channel之后,就是執行一下rpcHook了,這東西就是你在創建MQProducer的時候設置的,在調用前執行一次,調用后執行一次,其實你就可以通過這個hook來實現很多功能,監控的功能比較多些。接著就是調用了invokeSyncImpl 這個實現方法來發送消息了,這個方法是它的一個父類里面的:
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { // 獲取 請求id final int opaque = request.getOpaque(); try { // 創建ResponseFuture final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); // 放入responseTable 表中 this.responseTable.put(opaque, responseFuture); // 獲取遠程地址 final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { // 成功 if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; // 失敗 } else { responseFuture.setSendRequestOK(false); } // 移除response中的緩存 responseTable.remove(opaque); responseFuture.setCause(f.cause()); responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { // 成功了還是null 還是超時 if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { // 沒發出去,就排除異常 throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } // 返回響應結果 return responseCommand; } finally { // 移除 this.responseTable.remove(opaque); } }
這個方法其實就是最終往 channel里面寫內容的方法了,我們來看下,先是為這次request創建一個id 吧,這個id主要用來返回響應的時候用的。
接著創建一個ResposeFuture ,這個東西異步,同步都可以用,這個一會介紹一下它的原理,接著就是將這個id 與這個 ResposeFuture 關聯起來放到這個 responseTable 里面的, 接著就是往channel里面發送消息了,這里它添加一個listener ,這listener的執行時機就是發送出去的時候,最后就是等待這個響應了。
我們來解釋下這個ResposeFuture 原理, 當執行了responseFuture.waitResponse(timeoutMillis); 這行代碼,當前線程就會wait ,然后被阻塞,然后等著響應回來的時候,netty處理響應的線程會從響應里面獲取一下這個opaque這個id,就是請求之前在request生成的,broker 在響應的時候會會把這個id 放回到response 中, 然后會根據這個opaque 從responseTable中找到這個 ResposeFuture ,然后把響應設置到這個里面,最后喚醒一下wait在這個對象里面的線程就可以了,這樣你這個業務線程就得到了這個RemotingResponse 了。 好了,到這我們就解釋清楚了,然后我們看下他這個代碼是怎樣實現的:
public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) { final int opaque = cmd.getOpaque(); // 獲取對應id 的responseFuture final ResponseFuture responseFuture = responseTable.get(opaque); if (responseFuture != null) { // 設置 responseFuture.setResponseCommand(cmd); // 從響應表中移除 responseTable.remove(opaque); if (responseFuture.getInvokeCallback() != null) { // todo 執行回調 executeInvokeCallback(responseFuture); } else { responseFuture.putResponse(cmd); responseFuture.release(); } } else { log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel())); log.warn(cmd.toString()); } }
不過它這個ResposeFuture 是使用CountDownLatch 來實現這個wait與喚醒的。我們來具體看下這個 waitResponse方法與這個putResponse方法:
public RemotingCommand waitResponse(final long timeoutMillis) throws InterruptedException { this.countDownLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); return this.responseCommand; } public void putResponse(final RemotingCommand responseCommand) { this.responseCommand = responseCommand; this.countDownLatch.countDown(); }
單向發送其實這塊跟同步發送的流程差不多,我們來看下它的生產者代碼是怎樣寫的: org.apache.rocketmq.example.openmessaging.SimpleProducer:
public static void main(String[] args) { final MessagingAccessPoint messagingAccessPoint = OMS.getMessagingAccessPoint("oms:rocketmq://localhost:9876/default:default"); final Producer producer = messagingAccessPoint.createProducer(); messagingAccessPoint.startup(); System.out.printf("MessagingAccessPoint startup OK%n"); producer.startup(); System.out.printf("Producer startup OK%n"); ... { producer.sendOneway(producer.createBytesMessage("OMS_HELLO_TOPIC", "OMS_HELLO_BODY".getBytes(Charset.forName("UTF-8")))); System.out.printf("Send oneway message OK%n"); } ... }
可以看到我們最后發送的時候調用的是sendOneway方法,這個方法是沒有返回值的。
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.sendOneway(msg); }
這里就是調用了defaultMQProducerImpl的 sendOneway方法
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException { msg.setTopic(withNamespace(msg.getTopic())); this.defaultMQProducerImpl.sendOneway(msg); }
這里需要注意的是它也是調用了sendDefaultImpl 方法,然后通信方式是oneway 。這里我們就不細說了,可以看下同步方法解析這個方法的說明,這里唯一要提一點是單向發送是沒有這個重試的,然后就發送一次。下面的流程都是一樣的,然后就到了這個MQClientAPIImpl 的 sendMessage 方法
... switch (communicationMode) { case ONEWAY: // todo this.remotingClient.invokeOneway(addr, request, timeoutMillis); return null; ...
然后他這個是又調用了NettyRemotingClient 的 invokeOneway 方法:
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { doBeforeRpcHooks(addr, request); this.invokeOnewayImpl(channel, request, timeoutMillis); } catch (RemotingSendRequestException e) { log.warn("invokeOneway: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
這里也是根據broker addr 獲取channel, 如果沒有的話,也是創建一個,接著就是執行這個rpc調用前的hook ,注意這里沒有調用后的一個hook,因為我們并不知道它是什么情況。 接著又調用了invokeOnewayImpl 方法:
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { // 請求體, 標記是一個單向調用 request.markOnewayRPC(); // 獲取憑證 boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { // 釋放信號量 once.release(); if (!f.isSuccess()) { log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed."); } } }); } catch (Exception e) { // 釋放信號量 once.release(); log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed."); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast"); } else { String info = String.format( "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreOneway.getQueueLength(), this.semaphoreOneway.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }
這里使用了semaphore進行限流,然后默認的話是同時支持65535 個請求發送的,這個semaphore 限流只有單向發送與這個異步發送會有,接著就會將這個request寫入channel中,然后add了一個listener ,這個listener執行時機就是消息發送出去了,這個時候就會釋放 信號量。到這我們這個單向發送就解析完成了。
關于“RocketMQ producer同步發送和單向發送源碼分析”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識,可以關注億速云行業資訊頻道,小編每天都會為大家更新不同的知識點。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。