您好,登錄后才能下訂單哦!
本篇內容介紹了“RocketMQTemplate的原理和作用是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
本文主要研究一下RocketMQTemplate
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java
public class RocketMQTemplate extends AbstractMessageSendingTemplate<String> implements InitializingBean, DisposableBean { private static final Logger log = LoggerFactory.getLogger(RocketMQTemplate.class); private DefaultMQProducer producer; private ObjectMapper objectMapper; private String charset = "UTF-8"; private MessageQueueSelector messageQueueSelector = new SelectMessageQueueByHash(); private final Map<String, TransactionMQProducer> cache = new ConcurrentHashMap<>(); //only put TransactionMQProducer by now!!! //...... @Override public void afterPropertiesSet() throws Exception { if (producer != null) { producer.start(); } } @Override protected void doSend(String destination, Message<?> message) { SendResult sendResult = syncSend(destination, message); log.debug("send message to `{}` finished. result:{}", destination, sendResult); } @Override protected Message<?> doConvert(Object payload, Map<String, Object> headers, MessagePostProcessor postProcessor) { String content; if (payload instanceof String) { content = (String) payload; } else { // If payload not as string, use objectMapper change it. try { content = objectMapper.writeValueAsString(payload); } catch (JsonProcessingException e) { log.error("convert payload to String failed. payload:{}", payload); throw new RuntimeException("convert to payload to String failed.", e); } } MessageBuilder<?> builder = MessageBuilder.withPayload(content); if (headers != null) { builder.copyHeaders(headers); } builder.setHeaderIfAbsent(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN); Message<?> message = builder.build(); if (postProcessor != null) { message = postProcessor.postProcessMessage(message); } return message; } @Override public void destroy() { if (Objects.nonNull(producer)) { producer.shutdown(); } for (Map.Entry<String, TransactionMQProducer> kv : cache.entrySet()) { if (Objects.nonNull(kv.getValue())) { kv.getValue().shutdown(); } } cache.clear(); } //...... }
RocketMQTemplate繼承了spring-messaging的AbstractMessageSendingTemplate,實現了InitializingBean, DisposableBean接口;提供了syncSend、syncSendOrderly、asyncSend、asyncSendOrderly、sendOneWay、sendOneWayOrderly、sendMessageInTransaction等方法
afterPropertiesSet方法執行producer.start();destroy方法執行producer.shutdown()以及TransactionMQProducer的shutdown并清空cache集合
doSend方法內部調用的是syncSend方法,返回的sendResult僅僅debug輸出;doConvert方法針對String類型的payload不做處理,其他類型使用objectMapper.writeValueAsString轉為String作為content,然后構造message,執行postProcessor.postProcessMessage,然后返回
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java
/** * Same to {@link #syncSend(String, Message)} with send timeout specified in addition. * * @param destination formats: `topicName:tags` * @param message {@link org.springframework.messaging.Message} * @param timeout send timeout with millis * @param delayLevel level for the delay message * @return {@link SendResult} */ public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("syncSend failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { long now = System.currentTimeMillis(); org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); } SendResult sendResult = producer.send(rocketMsg, timeout); long costTime = System.currentTimeMillis() - now; log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); return sendResult; } catch (Exception e) { log.error("syncSend failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } } /** * syncSend batch messages in a given timeout. * * @param destination formats: `topicName:tags` * @param messages Collection of {@link org.springframework.messaging.Message} * @param timeout send timeout with millis * @return {@link SendResult} */ public SendResult syncSend(String destination, Collection<Message<?>> messages, long timeout) { if (Objects.isNull(messages) || messages.size() == 0) { log.error("syncSend with batch failed. destination:{}, messages is empty ", destination); throw new IllegalArgumentException("`messages` can not be empty"); } try { long now = System.currentTimeMillis(); Collection<org.apache.rocketmq.common.message.Message> rmqMsgs = new ArrayList<>(); org.apache.rocketmq.common.message.Message rocketMsg; for (Message<?> msg:messages) { if (Objects.isNull(msg) || Objects.isNull(msg.getPayload())) { log.warn("Found a message empty in the batch, skip it"); continue; } rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, msg); rmqMsgs.add(rocketMsg); } SendResult sendResult = producer.send(rmqMsgs, timeout); long costTime = System.currentTimeMillis() - now; log.debug("send messages cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); return sendResult; } catch (Exception e) { log.error("syncSend with batch failed. destination:{}, messages.size:{} ", destination, messages.size()); throw new MessagingException(e.getMessage(), e); } }
syncSend方法支持單個及多個org.springframework.messaging.Message,其中單個Message的接口支持delayLevel
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java
/** * Same to {@link #syncSendOrderly(String, Message, String)} with send timeout specified in addition. * * @param destination formats: `topicName:tags` * @param message {@link org.springframework.messaging.Message} * @param hashKey use this key to select queue. for example: orderId, productId ... * @param timeout send timeout with millis * @return {@link SendResult} */ public SendResult syncSendOrderly(String destination, Message<?> message, String hashKey, long timeout) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("syncSendOrderly failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { long now = System.currentTimeMillis(); org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); SendResult sendResult = producer.send(rocketMsg, messageQueueSelector, hashKey, timeout); long costTime = System.currentTimeMillis() - now; log.debug("send message cost: {} ms, msgId:{}", costTime, sendResult.getMsgId()); return sendResult; } catch (Exception e) { log.error("syncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } }
syncSendOrderly方法內部調用的是producer.send(rocketMsg, messageQueueSelector, hashKey, timeout)方法,同步返回SendResult
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java
/** * Same to {@link #asyncSend(String, Message, SendCallback)} with send timeout and delay level specified in addition. * * @param destination formats: `topicName:tags` * @param message {@link org.springframework.messaging.Message} * @param sendCallback {@link SendCallback} * @param timeout send timeout with millis * @param delayLevel level for the delay message */ public void asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("asyncSend failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); if (delayLevel > 0) { rocketMsg.setDelayTimeLevel(delayLevel); } producer.send(rocketMsg, sendCallback, timeout); } catch (Exception e) { log.info("asyncSend failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } }
asyncSend方法需要傳入SendCallback,內部執行的是producer.send(rocketMsg, sendCallback, timeout)
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java
/** * Same to {@link #asyncSendOrderly(String, Message, String, SendCallback)} with send timeout specified in * addition. * * @param destination formats: `topicName:tags` * @param message {@link org.springframework.messaging.Message} * @param hashKey use this key to select queue. for example: orderId, productId ... * @param sendCallback {@link SendCallback} * @param timeout send timeout with millis */ public void asyncSendOrderly(String destination, Message<?> message, String hashKey, SendCallback sendCallback, long timeout) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("asyncSendOrderly failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout); } catch (Exception e) { log.error("asyncSendOrderly failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } }
asyncSendOrderly方法內部執行的是producer.send(rocketMsg, messageQueueSelector, hashKey, sendCallback, timeout)
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java
/** * Similar to <a href="https://en.wikipedia.org/wiki/User_Datagram_Protocol">UDP</a>, this method won't wait for * acknowledgement from broker before return. Obviously, it has maximums throughput yet potentials of message loss. * <p> * One-way transmission is used for cases requiring moderate reliability, such as log collection. * * @param destination formats: `topicName:tags` * @param message {@link org.springframework.messaging.Message} */ public void sendOneWay(String destination, Message<?> message) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("sendOneWay failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); producer.sendOneway(rocketMsg); } catch (Exception e) { log.error("sendOneWay failed. destination:{}, message:{} ", destination, message); throw new MessagingException(e.getMessage(), e); } }
sendOneWay方法內部執行的是producer.sendOneway(rocketMsg)
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java
/** * Same to {@link #sendOneWay(String, Message)} with send orderly with hashKey by specified. * * @param destination formats: `topicName:tags` * @param message {@link org.springframework.messaging.Message} * @param hashKey use this key to select queue. for example: orderId, productId ... */ public void sendOneWayOrderly(String destination, Message<?> message, String hashKey) { if (Objects.isNull(message) || Objects.isNull(message.getPayload())) { log.error("sendOneWayOrderly failed. destination:{}, message is null ", destination); throw new IllegalArgumentException("`message` and `message.payload` cannot be null"); } try { org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); producer.sendOneway(rocketMsg, messageQueueSelector, hashKey); } catch (Exception e) { log.error("sendOneWayOrderly failed. destination:{}, message:{}", destination, message); throw new MessagingException(e.getMessage(), e); } }
sendOneWayOrderly方法內部執行的是producer.sendOneway(rocketMsg, messageQueueSelector, hashKey)
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/core/RocketMQTemplate.java
/** * Send Spring Message in Transaction * * @param txProducerGroup the validate txProducerGroup name, set null if using the default name * @param destination destination formats: `topicName:tags` * @param message message {@link org.springframework.messaging.Message} * @param arg ext arg * @return TransactionSendResult * @throws MessagingException */ public TransactionSendResult sendMessageInTransaction(final String txProducerGroup, final String destination, final Message<?> message, final Object arg) throws MessagingException { try { TransactionMQProducer txProducer = this.stageMQProducer(txProducerGroup); org.apache.rocketmq.common.message.Message rocketMsg = RocketMQUtil.convertToRocketMessage(objectMapper, charset, destination, message); return txProducer.sendMessageInTransaction(rocketMsg, arg); } catch (MQClientException e) { throw RocketMQUtil.convert(e); } }
sendMessageInTransaction方法內部執行的是txProducer.sendMessageInTransaction(rocketMsg, arg)
RocketMQTemplate繼承了spring-messaging的AbstractMessageSendingTemplate,實現了InitializingBean, DisposableBean接口;提供了syncSend、syncSendOrderly、asyncSend、asyncSendOrderly、sendOneWay、sendOneWayOrderly、sendMessageInTransaction等方法
afterPropertiesSet方法執行producer.start();destroy方法執行producer.shutdown()以及TransactionMQProducer的shutdown并清空cache集合
doSend方法內部調用的是syncSend方法,返回的sendResult僅僅debug輸出;doConvert方法針對String類型的payload不做處理,其他類型使用objectMapper.writeValueAsString轉為String作為content,然后構造message,執行postProcessor.postProcessMessage,然后返回
“RocketMQTemplate的原理和作用是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。