您好,登錄后才能下訂單哦!
這篇文章主要講解了“@RabbitListener起作用的原理是什么”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“@RabbitListener起作用的原理是什么”吧!
在spring中,定義rabbitMq的消費者可以相當方便,只需要在消息處理類或者類方法加上@RabbitListener注解,指定隊列名稱即可。
如下代碼:
@Component public class RabbitMqListener1 { @RabbitListener(queues = "queue1") public void consumer1(Message message) { } @RabbitListener(queues = "queue2") public void consumer2(String messsageBody) { } } @Component @RabbitListener(queues = "queue3") public class RabbitMqListener2 { @RabbitHandler(isDefault=true) public void consumer3() { } }
注意!!!如果@RabbitListener加在類上面,需要有一個默認的處理方法@RabbitHandler(isDefault=true),默認是false。
不設置一個true,消費mq消息的時候會出現“Listener method ‘no match’ threw exception”異常。
原因在RabbitListenerAnnotationBeanPostProcessor.processMultiMethodListeners方法,有興趣的可以看下。
可以看到代碼相當的簡單。但是!!!為什么加上這個注解,就能作為一個consumer接受mq的消息呢?為啥處理mq消息的方法,入參可以那么隨意?
有經驗的程序員,可能會有這樣的設想:
1、單純看這些listener的代碼,只是定義了由spring管理的bean,要能監聽rabbitMq的消息,肯定需要有另外一個類,這個類會掃描所有加了@RabbitListener的bean,進行加工。
2、看這些listener的代碼,可以發現處理mq消息的,都是具體的某個方法。那加工的過程,應該就是利用反射拿到對象、方法和@RabbitListener中的queue屬性,然后建立一個綁定關系(對象+方法)——>(queue的consumer)。queue的consumer在接收到mq消息后,找到綁定的“對象+方法”,再通過反射的方式,調用真正的處理方法。
3、mq消息的處理方法,可以那么隨意,應該是queue的consumer在調用真正處理方法之前,需要根據處理方法的參數類型,做一次數據轉換。
接下來,就去看看源碼,看一下設想是不是正確的~~
1、誰來掃描@RabbitListener注解的bean
在springBoot使用rabbit,一般是在@Configuration類上加上@EnableRabbit注解來開啟rabbit功能。那我們就去看看@EnableRabbit注解的源碼,看這個注解的作用
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(RabbitBootstrapConfiguration.class) public @interface EnableRabbit { }
可以看到,這個注解的作用,是導入RabbitBootstrapConfiguration配置類
@Configuration public class RabbitBootstrapConfiguration { @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public RabbitListenerAnnotationBeanPostProcessor rabbitListenerAnnotationProcessor() { return new RabbitListenerAnnotationBeanPostProcessor(); } @Bean(name = RabbitListenerConfigUtils.RABBIT_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME) public RabbitListenerEndpointRegistry defaultRabbitListenerEndpointRegistry() { return new RabbitListenerEndpointRegistry(); } }
RabbitBootstrapConfiguration 配置類的作用,就是定義了RabbitListenerAnnotationBeanPostProcessor 和RabbitListenerEndpointRegistry 兩個bean。
看到RabbitListenerAnnotationBeanPostProcessor 這個類名,就可以猜到,該類的實例bean就是用來掃描加了@RabbitListener 的類,并做一些加工。
(“RabbitListenerAnnotationBean”——針對添加了@RabbitListener注解的bean; “PostProcessor”——后置加工)
2、怎么建立(對象+方法)——>(queue的consumer)的映射關系
分析一下RabbitListenerAnnotationBeanPostProcessor類的源碼
// 實現了BeanPostProcessor、Ordered、BeanFactoryAware、BeanClassLoaderAware、EnvironmentAware和SmartInitializingSingleton 6個接口 public class RabbitListenerAnnotationBeanPostProcessor implements BeanPostProcessor, Ordered, BeanFactoryAware, BeanClassLoaderAware, EnvironmentAware, SmartInitializingSingleton { ....... // 完成初始化bean之后,調用該方法 @Override public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { Class<?> targetClass = AopUtils.getTargetClass(bean); TypeMetadata metadata = this.typeCache.get(targetClass); if (metadata == null) { metadata = buildMetadata(targetClass); this.typeCache.putIfAbsent(targetClass, metadata); } for (ListenerMethod lm : metadata.listenerMethods) { for (RabbitListener rabbitListener : lm.annotations) { processAmqpListener(rabbitListener, lm.method, bean, beanName); } } if (metadata.handlerMethods.length > 0) { processMultiMethodListeners(metadata.classAnnotations, metadata.handlerMethods, bean, beanName); } return bean; } // 根據Class,獲取元數據 private TypeMetadata buildMetadata(Class<?> targetClass) { Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass); final boolean hasClassLevelListeners = classLevelListeners.size() > 0; final List<ListenerMethod> methods = new ArrayList<ListenerMethod>(); final List<Method> multiMethods = new ArrayList<Method>(); ReflectionUtils.doWithMethods(targetClass, new ReflectionUtils.MethodCallback() { @Override public void doWith(Method method) throws IllegalArgumentException, IllegalAccessException { Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method); if (listenerAnnotations.size() > 0) { methods.add(new ListenerMethod(method, listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()]))); } if (hasClassLevelListeners) { RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class); if (rabbitHandler != null) { multiMethods.add(method); } } } }, ReflectionUtils.USER_DECLARED_METHODS); if (methods.isEmpty() && multiMethods.isEmpty()) { return TypeMetadata.EMPTY; } return new TypeMetadata( methods.toArray(new ListenerMethod[methods.size()]), multiMethods.toArray(new Method[multiMethods.size()]), classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()])); } // 檢查一下是否使用jdk代理,使用jdk代理方式必須實現了接口 // new一個MethodRabbitListenerEndpoint對象,交由processListener方法進行處理 protected void processAmqpListener(RabbitListener rabbitListener, Method method, Object bean, String beanName) { Method methodToUse = checkProxy(method, bean); MethodRabbitListenerEndpoint endpoint = new MethodRabbitListenerEndpoint(); endpoint.setMethod(methodToUse); endpoint.setBeanFactory(this.beanFactory); processListener(endpoint, rabbitListener, bean, methodToUse, beanName); } // 前面大半代碼都是對MethodRabbitListenerEndpoint對象的屬性設置:處理消息的bean、消息處理方法的工廠類、監聽的隊列名。。。。 // 通過beanFactory獲取RabbitListenerContainerFactory類的bean // 調用RabbitListenerEndpointRegistar的registerEndpoint方法注冊mq消息消費端點 protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean, Object adminTarget, String beanName) { endpoint.setBean(bean); endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory); endpoint.setId(getEndpointId(rabbitListener)); endpoint.setQueueNames(resolveQueues(rabbitListener)); String group = rabbitListener.group(); if (StringUtils.hasText(group)) { Object resolvedGroup = resolveExpression(group); if (resolvedGroup instanceof String) { endpoint.setGroup((String) resolvedGroup); } } endpoint.setExclusive(rabbitListener.exclusive()); String priority = resolve(rabbitListener.priority()); if (StringUtils.hasText(priority)) { try { endpoint.setPriority(Integer.valueOf(priority)); } catch (NumberFormatException ex) { throw new BeanInitializationException("Invalid priority value for " + rabbitListener + " (must be an integer)", ex); } } String rabbitAdmin = resolve(rabbitListener.admin()); if (StringUtils.hasText(rabbitAdmin)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name"); try { endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class)); } catch (NoSuchBeanDefinitionException ex) { throw new BeanInitializationException("Could not register rabbit listener endpoint on [" + adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" + rabbitAdmin + "' was found in the application context", ex); } } RabbitListenerContainerFactory<?> factory = null; String containerFactoryBeanName = resolve(rabbitListener.containerFactory()); if (StringUtils.hasText(containerFactoryBeanName)) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); try { factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class); } catch (NoSuchBeanDefinitionException ex) { throw new BeanInitializationException("Could not register rabbit listener endpoint on [" + adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" + containerFactoryBeanName + "' was found in the application context", ex); } } this.registrar.registerEndpoint(endpoint, factory); } ........ }
這個類的代碼比較長,只貼部分比較主要的部分,其他的,可以自己查看源碼進行了解。
RabbitListenerAnnotationBeanPostProcessor實現了BeanPostProcessor(bean初始化后的后置處理)、Ordered(后置處理的排序)、BeanFactoryAware(注入BeanFactory)、BeanClassLoaderAware(注入BeanClassLoader)、EnvironmentAware(注入spring環境)和SmartInitializingSingleton(單例bean初始化后的回調) 6個接口。
我們需要關注的是BeanPostProcessor接口定義的方法,看postProcessAfterInitialization方法的代碼,大致流程為:
1、通過AopUtils得到bean代理的對象的class
2、判斷緩存中是否有該class的類型元數據,如果沒有則調用buildMetadata方法生成類型元數據并放入緩存
3、遍歷加了@RabbitListener注解的方法,調用processAmqpListener方法進行處理
4、調用processMultiMethodListeners方法對加了@RabbitHandler的方法進行處理
關于buildMetadata方法:
代碼不復雜,就是利用反射,拿到class中,添加了@RabbitListener和@RabbitHandler注解的方法。另外,從代碼中也可以看出,@RabbitHandler注解要生效,必須在class上增加@RabbitListener注解
關于processAmqpListener方法:
沒有什么實際內容,就干兩個事情:
1、檢查一下是否使用jdk代理,使用jdk代理方式必須實現了接口
2、new一個MethodRabbitListenerEndpoint對象,交由processListener方法進行處理
關于processListener方法:
1、前面大半代碼都是對MethodRabbitListenerEndpoint對象的屬性設置:處理消息的bean、消息處理方法的工廠類、監聽的隊列名。。。。
其中要關注一下setMessageHandlerMethodFactory方法,查看MessageHandlerMethodFactory接口的源碼
public interface MessageHandlerMethodFactory { InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method);
從入參和返回值可以看出來,這個工廠的作用就是將spring的bean對象和方法包裝成一個InvocableHandlerMethod對象,也就是我們上面提到的(對象+方法)。
2、通過beanFactory獲取RabbitListenerContainerFactory類的bean。
3、調用RabbitListenerEndpointRegistar的registerEndpoint方法注冊mq消息消費端點。
繼續往下追,看一下RabbitListenerEndpointRegistar的代碼:
public class RabbitListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean { // 將整個endpointDescriptors數組進行注冊 protected void registerAllEndpoints() { synchronized (this.endpointDescriptors) { for (AmqpListenerEndpointDescriptor descriptor : this.endpointDescriptors) { this.endpointRegistry.registerListenerContainer( descriptor.endpoint, resolveContainerFactory(descriptor)); } this.startImmediately = true; // trigger immediate startup } } // 解析得到RabbitListenerContainerFactory // 如果AmqpListenerEndpointDescriptor 的containerFactory屬性不為空,直接返回containerFactory // 如果為空,嘗試從beanFactory獲取 private RabbitListenerContainerFactory<?> resolveContainerFactory(AmqpListenerEndpointDescriptor descriptor) { if (descriptor.containerFactory != null) { return descriptor.containerFactory; } else if (this.containerFactory != null) { return this.containerFactory; } else if (this.containerFactoryBeanName != null) { Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name"); this.containerFactory = this.beanFactory.getBean( this.containerFactoryBeanName, RabbitListenerContainerFactory.class); return this.containerFactory; // Consider changing this if live change of the factory is required } else { throw new IllegalStateException("Could not resolve the " + RabbitListenerContainerFactory.class.getSimpleName() + " to use for [" + descriptor.endpoint + "] no factory was given and no default is set."); } } // new一個AmqpListenerEndpointDescriptor對象 // 如果立即啟動,則調用RabbitListenerEndpointRegistry注冊器來注冊消息監聽 // 如果不是立即啟動,則添加到endpointDescriptors列表中,后面通過registerAllEndpoints方法統一啟動 public void registerEndpoint(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) { Assert.notNull(endpoint, "Endpoint must be set"); Assert.hasText(endpoint.getId(), "Endpoint id must be set"); // Factory may be null, we defer the resolution right before actually creating the container AmqpListenerEndpointDescriptor descriptor = new AmqpListenerEndpointDescriptor(endpoint, factory); synchronized (this.endpointDescriptors) { if (this.startImmediately) { // Register and start immediately this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor), true); } else { this.endpointDescriptors.add(descriptor); } } } }
從上面的代碼可以看出,我們關心的內容,應該是在RabbitListenerEndpointRegistry類的registerListenerContainer方法!!
public class RabbitListenerEndpointRegistry implements DisposableBean, SmartLifecycle, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> { // 檢查是否被注冊過,注冊過就不能注冊第二次 // 調用createListenerContainer創建消息監聽 // 關于分組消費的,我們不關心 // 是否立即啟動,是的話,同步調用startIfNecessary方法 public void registerListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory, boolean startImmediately) { Assert.notNull(endpoint, "Endpoint must not be null"); Assert.notNull(factory, "Factory must not be null"); String id = endpoint.getId(); Assert.hasText(id, "Endpoint id must not be empty"); synchronized (this.listenerContainers) { Assert.state(!this.listenerContainers.containsKey(id), "Another endpoint is already registered with id '" + id + "'"); MessageListenerContainer container = createListenerContainer(endpoint, factory); this.listenerContainers.put(id, container); if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) { List<MessageListenerContainer> containerGroup; if (this.applicationContext.containsBean(endpoint.getGroup())) { containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class); } else { containerGroup = new ArrayList<MessageListenerContainer>(); this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup); } containerGroup.add(container); } if (startImmediately) { startIfNecessary(container); } } // 其實就是調用了RabbitListenerContainerFactory的createListenerContainer生成了一個MessageListenerContainer對象 protected MessageListenerContainer createListenerContainer(RabbitListenerEndpoint endpoint, RabbitListenerContainerFactory<?> factory) { MessageListenerContainer listenerContainer = factory.createListenerContainer(endpoint); if (listenerContainer instanceof InitializingBean) { try { ((InitializingBean) listenerContainer).afterPropertiesSet(); } catch (Exception ex) { throw new BeanInitializationException("Failed to initialize message listener container", ex); } } int containerPhase = listenerContainer.getPhase(); if (containerPhase < Integer.MAX_VALUE) { // a custom phase value if (this.phase < Integer.MAX_VALUE && this.phase != containerPhase) { throw new IllegalStateException("Encountered phase mismatch between container factory definitions: " + this.phase + " vs " + containerPhase); } this.phase = listenerContainer.getPhase(); } return listenerContainer; } }
createListenerContainer方法調用了RabbitListenerContainerFactory接口的createListenerContainer方法創建一個MessageListenerContainer對象。
在這里,如果是通過RabbitAutoConfiguration自動配置的,那么RabbitListenerContainerFactory接口的具體實現類是SimpleRabbitListenerContainerFactory,MessageListenerContainer接口的具體實現類是SimpleMessageListenerContainer。有興趣的話,可以去看下rabbitMq自動配置的幾個類。
RabbitListenerContainerFactory接口的createListenerContainer方法是由AbstractRabbitListenerContainerFactory抽象類實現,代碼如下:
@Override public C createListenerContainer(RabbitListenerEndpoint endpoint) { C instance = createContainerInstance(); if (this.connectionFactory != null) { instance.setConnectionFactory(this.connectionFactory); } if (this.errorHandler != null) { instance.setErrorHandler(this.errorHandler); } if (this.messageConverter != null) { instance.setMessageConverter(this.messageConverter); } if (this.acknowledgeMode != null) { instance.setAcknowledgeMode(this.acknowledgeMode); } if (this.channelTransacted != null) { instance.setChannelTransacted(this.channelTransacted); } if (this.autoStartup != null) { instance.setAutoStartup(this.autoStartup); } if (this.phase != null) { instance.setPhase(this.phase); } instance.setListenerId(endpoint.getId()); // 最重要的一行!!! endpoint.setupListenerContainer(instance); initializeContainer(instance); return instance; }
乍一看,都是對MessageListenerContainer實例的初始化,實際上有一行,相當重要“ endpoint.setupListenerContainer(instance); ”,這一行最終是走到
AbstractRabbitListenerEndpoint.setupListenerContainer
public abstract class AbstractRabbitListenerEndpoint implements RabbitListenerEndpoint, BeanFactoryAware { ...... // 設置MessageListenerContainer,最重要的就是設置監聽的隊列名稱!!! @Override public void setupListenerContainer(MessageListenerContainer listenerContainer) { SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) listenerContainer; boolean queuesEmpty = getQueues().isEmpty(); boolean queueNamesEmpty = getQueueNames().isEmpty(); if (!queuesEmpty && !queueNamesEmpty) { throw new IllegalStateException("Queues or queue names must be provided but not both for " + this); } if (queuesEmpty) { Collection<String> names = getQueueNames(); container.setQueueNames(names.toArray(new String[names.size()])); } else { Collection<Queue> instances = getQueues(); container.setQueues(instances.toArray(new Queue[instances.size()])); } container.setExclusive(isExclusive()); if (getPriority() != null) { Map<String, Object> args = new HashMap<String, Object>(); args.put("x-priority", getPriority()); container.setConsumerArguments(args); } if (getAdmin() != null) { container.setRabbitAdmin(getAdmin()); } setupMessageListener(listenerContainer); } // 創建MessageListener protected abstract MessageListener createMessageListener(MessageListenerContainer container); // 創建MessageListener,設置到MessageListenerContainer 里 private void setupMessageListener(MessageListenerContainer container) { MessageListener messageListener = createMessageListener(container); Assert.state(messageListener != null, "Endpoint [" + this + "] must provide a non null message listener"); container.setupMessageListener(messageListener); } ...... }
用@RabbitLinstener注解的方法,使用的endpoint是MethodRabbitListenerEndpoint繼承自AbstractRabbitListenerEndpoint,所以看看AbstractRabbitListenerEndpoint的createMessageListener方法
public class MethodRabbitListenerEndpoint extends AbstractRabbitListenerEndpoint { ...... @Override protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) { Assert.state(this.messageHandlerMethodFactory != null, "Could not create message listener - MessageHandlerMethodFactory not set"); MessagingMessageListenerAdapter messageListener = createMessageListenerInstance(); messageListener.setHandlerMethod(configureListenerAdapter(messageListener)); String replyToAddress = getDefaultReplyToAddress(); if (replyToAddress != null) { messageListener.setResponseAddress(replyToAddress); } MessageConverter messageConverter = container.getMessageConverter(); if (messageConverter != null) { messageListener.setMessageConverter(messageConverter); } if (getBeanResolver() != null) { messageListener.setBeanResolver(getBeanResolver()); } return messageListener; } protected MessagingMessageListenerAdapter createMessageListenerInstance() { return new MessagingMessageListenerAdapter(this.bean, this.method); } ...... }
從上面代碼可以看出,createMessageListener方法返回了一個MessagingMessageListenerAdapter實例,MessagingMessageListenerAdapter實現了MessageListener接口
到這里,我們就能得出一些結論:
1、有@RabbitListener注解的方法,會生成MethodRabbitListenerEndpoint對象
2、通過MethodRabbitListenerEndpoint對象和SimpleRabbitListenerContainerFactory工廠bean,生成SimpleMessageListenerContainer對象
3、SimpleMessageListenerContainer對象保存了要監聽的隊列名,創建了用于處理消息的MessagingMessageListenerAdapter實例
4、MessagingMessageListenerAdapter持有@RabbitListener注解的對象和方法,起到一個適配器的作用
SimpleMessageListenerContainer是相當重要的一個類,,包裝了整個mq消息消費需要的信息:
1、保存了要監聽的隊列名,啟動的時候,根據隊列名創建從服務器拉取消息的consumer——BlockingQueueConsumer
2、創建了一個MessagingMessageListenerAdapter對象,當consumer從服務器拿到消息后,由MessagingMessageListenerAdapter進行處理
3、誰來做數據轉換?
是MessagingMessageListenerAdapter,有興趣的,可以看看MessagingMessageListenerAdapter轉換參數的源碼。
感謝各位的閱讀,以上就是“@RabbitListener起作用的原理是什么”的內容了,經過本文的學習后,相信大家對@RabbitListener起作用的原理是什么這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。