您好,登錄后才能下訂單哦!
這篇文章主要介紹“rocketmq中DefaultRocketMQListenerContainer的原理及用法”,在日常操作中,相信很多人在rocketmq中DefaultRocketMQListenerContainer的原理及用法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”rocketmq中DefaultRocketMQListenerContainer的原理及用法”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
本文主要研究一下rocketmq的DefaultRocketMQListenerContainer
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultRocketMQListenerContainer implements InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware { private final static Logger log = LoggerFactory.getLogger(DefaultRocketMQListenerContainer.class); private ApplicationContext applicationContext; /** * The name of the DefaultRocketMQListenerContainer instance */ private String name; private long suspendCurrentQueueTimeMillis = 1000; /** * Message consume retry strategy<br> -1,no retry,put into DLQ directly<br> 0,broker control retry frequency<br> * >0,client control retry frequency. */ private int delayLevelWhenNextConsume = 0; private String nameServer; private AccessChannel accessChannel = AccessChannel.LOCAL; private String consumerGroup; private String topic; private int consumeThreadMax = 64; private String charset = "UTF-8"; private ObjectMapper objectMapper; private RocketMQListener rocketMQListener; private RocketMQMessageListener rocketMQMessageListener; private DefaultMQPushConsumer consumer; private Class messageType; private boolean running; // The following properties came from @RocketMQMessageListener. private ConsumeMode consumeMode; private SelectorType selectorType; private String selectorExpression; private MessageModel messageModel; private long consumeTimeout; //...... public void setRocketMQMessageListener(RocketMQMessageListener anno) { this.rocketMQMessageListener = anno; this.consumeMode = anno.consumeMode(); this.consumeThreadMax = anno.consumeThreadMax(); this.messageModel = anno.messageModel(); this.selectorExpression = anno.selectorExpression(); this.selectorType = anno.selectorType(); this.consumeTimeout = anno.consumeTimeout(); } @Override public void setupMessageListener(RocketMQListener rocketMQListener) { this.rocketMQListener = rocketMQListener; } @Override public void destroy() { this.setRunning(false); if (Objects.nonNull(consumer)) { consumer.shutdown(); } log.info("container destroyed, {}", this.toString()); } @Override public boolean isAutoStartup() { return true; } @Override public void stop(Runnable callback) { stop(); callback.run(); } @Override public void start() { if (this.isRunning()) { throw new IllegalStateException("container already running. " + this.toString()); } try { consumer.start(); } catch (MQClientException e) { throw new IllegalStateException("Failed to start RocketMQ push consumer", e); } this.setRunning(true); log.info("running container: {}", this.toString()); } @Override public void stop() { if (this.isRunning()) { if (Objects.nonNull(consumer)) { consumer.shutdown(); } setRunning(false); } } @Override public boolean isRunning() { return running; } private void setRunning(boolean running) { this.running = running; } @Override public int getPhase() { // Returning Integer.MAX_VALUE only suggests that // we will be the first bean to shutdown and last bean to start return Integer.MAX_VALUE; } @Override public void afterPropertiesSet() throws Exception { initRocketMQPushConsumer(); this.messageType = getMessageType(); log.debug("RocketMQ messageType: {}", messageType.getName()); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public String toString() { return "DefaultRocketMQListenerContainer{" + "consumerGroup='" + consumerGroup + '\'' + ", nameServer='" + nameServer + '\'' + ", topic='" + topic + '\'' + ", consumeMode=" + consumeMode + ", selectorType=" + selectorType + ", selectorExpression='" + selectorExpression + '\'' + ", messageModel=" + messageModel + '}'; } private void initRocketMQPushConsumer() throws MQClientException { Assert.notNull(rocketMQListener, "Property 'rocketMQListener' is required"); Assert.notNull(consumerGroup, "Property 'consumerGroup' is required"); Assert.notNull(nameServer, "Property 'nameServer' is required"); Assert.notNull(topic, "Property 'topic' is required"); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey()); boolean enableMsgTrace = rocketMQMessageListener.enableMsgTrace(); if (Objects.nonNull(rpcHook)) { consumer = new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); consumer.setVipChannelEnabled(false); consumer.setInstanceName(RocketMQUtil.getInstanceName(rpcHook, consumerGroup)); } else { log.debug("Access-key or secret-key not configure in " + this + "."); consumer = new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, this.applicationContext.getEnvironment(). resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic())); } String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer()); if (customizedNameServer != null) { consumer.setNamesrvAddr(customizedNameServer); } else { consumer.setNamesrvAddr(nameServer); } if (accessChannel != null) { consumer.setAccessChannel(accessChannel); } consumer.setConsumeThreadMax(consumeThreadMax); if (consumeThreadMax < consumer.getConsumeThreadMin()) { consumer.setConsumeThreadMin(consumeThreadMax); } consumer.setConsumeTimeout(consumeTimeout); consumer.setInstanceName(this.name); switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING); break; case CLUSTERING: consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING); break; default: throw new IllegalArgumentException("Property 'messageModel' was wrong."); } switch (selectorType) { case TAG: consumer.subscribe(topic, selectorExpression); break; case SQL92: consumer.subscribe(topic, MessageSelector.bySql(selectorExpression)); break; default: throw new IllegalArgumentException("Property 'selectorType' was wrong."); } switch (consumeMode) { case ORDERLY: consumer.setMessageListener(new DefaultMessageListenerOrderly()); break; case CONCURRENTLY: consumer.setMessageListener(new DefaultMessageListenerConcurrently()); break; default: throw new IllegalArgumentException("Property 'consumeMode' was wrong."); } if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) { ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer); } } private Class getMessageType() { Class<?> targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener); Type[] interfaces = targetClass.getGenericInterfaces(); Class<?> superclass = targetClass.getSuperclass(); while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) { interfaces = superclass.getGenericInterfaces(); superclass = targetClass.getSuperclass(); } if (Objects.nonNull(interfaces)) { for (Type type : interfaces) { if (type instanceof ParameterizedType) { ParameterizedType parameterizedType = (ParameterizedType) type; if (Objects.equals(parameterizedType.getRawType(), RocketMQListener.class)) { Type[] actualTypeArguments = parameterizedType.getActualTypeArguments(); if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { return (Class) actualTypeArguments[0]; } else { return Object.class; } } } } return Object.class; } else { return Object.class; } } //...... }
DefaultRocketMQListenerContainer實現了InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware接口;setRocketMQMessageListener方法會根據RocketMQMessageListener注解的信息來設置consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout
afterPropertiesSet方法執行了initRocketMQPushConsumer及getMessageType方法;initRocketMQPushConsumer方法會根據rpcHook是否為null來創建不同的DefaultMQPushConsumer,之后根據messageModel、selectorType、consumeMode等來配置consumer;如果rocketMQListener類型是RocketMQPushConsumerLifecycleListener的,則執行RocketMQPushConsumerLifecycleListener的prepareStart方法
setupMessageListener方法主要是保存了rocketMQListener;isAutoStartup方法返回true;start方法主要是執行consumer.start()方法;stop及destroy方法主要是執行consumer.shutdown()
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently { @SuppressWarnings("unchecked") @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); rocketMQListener.onMessage(doConvertMessage(messageExt)); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageExt:{}", messageExt, e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
DefaultMessageListenerConcurrently方法實現了MessageListenerConcurrently接口;它的consumeMessage方法使用for循環try catch執行rocketMQListener.onMessage(doConvertMessage(messageExt))回調,都成功返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS,一旦異常則返回ConsumeConcurrentlyStatus.RECONSUME_LATER
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java
public class DefaultMessageListenerOrderly implements MessageListenerOrderly { @SuppressWarnings("unchecked") @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); rocketMQListener.onMessage(doConvertMessage(messageExt)); long costTime = System.currentTimeMillis() - now; log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageExt:{}", messageExt, e); context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } } return ConsumeOrderlyStatus.SUCCESS; } }
DefaultMessageListenerOrderly實現了MessageListenerOrderly接口,其consumeMessage方法使用for循環try catch執行rocketMQListener.onMessage(doConvertMessage(messageExt))回調,都成功返回ConsumeOrderlyStatus.SUCCESS,一旦異常則返回ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT
DefaultRocketMQListenerContainer實現了InitializingBean, RocketMQListenerContainer, SmartLifecycle, ApplicationContextAware接口;setRocketMQMessageListener方法會根據RocketMQMessageListener注解的信息來設置consumeMode、consumeThreadMax、messageModel、selectorExpression、selectorType、consumeTimeout
afterPropertiesSet方法執行了initRocketMQPushConsumer及getMessageType方法;initRocketMQPushConsumer方法會根據rpcHook是否為null來創建不同的DefaultMQPushConsumer,之后根據messageModel、selectorType、consumeMode等來配置consumer;如果rocketMQListener類型是RocketMQPushConsumerLifecycleListener的,則執行RocketMQPushConsumerLifecycleListener的prepareStart方法
setupMessageListener方法主要是保存了rocketMQListener;isAutoStartup方法返回true;start方法主要是執行consumer.start()方法;stop及destroy方法主要是執行consumer.shutdown()
到此,關于“rocketmq中DefaultRocketMQListenerContainer的原理及用法”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續學習更多相關知識,請繼續關注億速云網站,小編會繼續努力為大家帶來更多實用的文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。