您好,登錄后才能下訂單哦!
本篇內容介紹了“RocketMQTransactionAnnotationProcessor的原理和用法”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
本文主要研究一下RocketMQTransactionAnnotationProcessor
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/RocketMQTransactionAnnotationProcessor.java
public class RocketMQTransactionAnnotationProcessor implements BeanPostProcessor, Ordered, ApplicationContextAware { private final static Logger log = LoggerFactory.getLogger(RocketMQTransactionAnnotationProcessor.class); private ApplicationContext applicationContext; private final Set<Class<?>> nonProcessedClasses = Collections.newSetFromMap(new ConcurrentHashMap<Class<?>, Boolean>(64)); private TransactionHandlerRegistry transactionHandlerRegistry; public RocketMQTransactionAnnotationProcessor(TransactionHandlerRegistry transactionHandlerRegistry) { this.transactionHandlerRegistry = transactionHandlerRegistry; } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (!this.nonProcessedClasses.contains(bean.getClass())) { Class<?> targetClass = AopUtils.getTargetClass(bean); RocketMQTransactionListener listener = AnnotationUtils.findAnnotation(targetClass, RocketMQTransactionListener.class); this.nonProcessedClasses.add(bean.getClass()); if (listener == null) { // for quick search log.trace("No @RocketMQTransactionListener annotations found on bean type: {}", bean.getClass()); } else { try { processTransactionListenerAnnotation(listener, bean); } catch (MQClientException e) { log.error("Failed to process annotation " + listener, e); throw new BeanCreationException("Failed to process annotation " + listener, e); } } } return bean; } private void processTransactionListenerAnnotation(RocketMQTransactionListener listener, Object bean) throws MQClientException { if (transactionHandlerRegistry == null) { throw new MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must work with RocketMQTemplate", null); } if (!RocketMQLocalTransactionListener.class.isAssignableFrom(bean.getClass())) { throw new MQClientException("Bad usage of @RocketMQTransactionListener, " + "the class must implement interface RocketMQLocalTransactionListener", null); } TransactionHandler transactionHandler = new TransactionHandler(); transactionHandler.setBeanFactory(this.applicationContext.getAutowireCapableBeanFactory()); transactionHandler.setName(listener.txProducerGroup()); transactionHandler.setBeanName(bean.getClass().getName()); transactionHandler.setListener((RocketMQLocalTransactionListener) bean); transactionHandler.setCheckExecutor(listener.corePoolSize(), listener.maximumPoolSize(), listener.keepAliveTime(), listener.blockingQueueSize()); RPCHook rpcHook = RocketMQUtil.getRPCHookByAkSk(applicationContext.getEnvironment(), listener.accessKey(), listener.secretKey()); if (Objects.nonNull(rpcHook)) { transactionHandler.setRpcHook(rpcHook); } else { log.debug("Access-key or secret-key not configure in " + listener + "."); } transactionHandlerRegistry.registerTransactionHandler(transactionHandler); } @Override public int getOrder() { return LOWEST_PRECEDENCE; } }
RocketMQTransactionAnnotationProcessor實現了BeanPostProcessor, Ordered, ApplicationContextAware接口
postProcessAfterInitialization方法會查找標記有RocketMQTransactionListener注解的bean,然后執行processTransactionListenerAnnotation方法
processTransactionListenerAnnotation方法會創建transactionHandler,然后執行transactionHandlerRegistry.registerTransactionHandler進行注冊
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/TransactionHandler.java
class TransactionHandler { private String name; private String beanName; private RocketMQLocalTransactionListener bean; private BeanFactory beanFactory; private ThreadPoolExecutor checkExecutor; private RPCHook rpcHook; public String getBeanName() { return beanName; } public void setBeanName(String beanName) { this.beanName = beanName; } public String getName() { return name; } public void setName(String name) { this.name = name; } public RPCHook getRpcHook() { return rpcHook; } public void setRpcHook(RPCHook rpcHook) { this.rpcHook = rpcHook; } public BeanFactory getBeanFactory() { return beanFactory; } public void setBeanFactory(BeanFactory beanFactory) { this.beanFactory = beanFactory; } public void setListener(RocketMQLocalTransactionListener listener) { this.bean = listener; } public RocketMQLocalTransactionListener getListener() { return this.bean; } public void setCheckExecutor(int corePoolSize, int maxPoolSize, long keepAliveTime, int blockingQueueSize) { this.checkExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<>(blockingQueueSize)); } public ThreadPoolExecutor getCheckExecutor() { return checkExecutor; } }
TransactionHandler包含了name、beanName、bean、beanFactory、checkExecutor、rpcHook屬性
rocketmq-spring-boot-2.0.3-sources.jar!/org/apache/rocketmq/spring/config/TransactionHandlerRegistry.java
public class TransactionHandlerRegistry implements DisposableBean { private RocketMQTemplate rocketMQTemplate; private final Set<String> listenerContainers = new ConcurrentSet<>(); public TransactionHandlerRegistry(RocketMQTemplate template) { this.rocketMQTemplate = template; } @Override public void destroy() throws Exception { listenerContainers.clear(); } public void registerTransactionHandler(TransactionHandler handler) throws MQClientException { if (listenerContainers.contains(handler.getName())) { throw new MQClientException(-1, String .format("The transaction name [%s] has been defined in TransactionListener [%s]", handler.getName(), handler.getBeanName())); } listenerContainers.add(handler.getName()); rocketMQTemplate.createAndStartTransactionMQProducer(handler.getName(), handler.getListener(), handler.getCheckExecutor(), handler.getRpcHook()); } }
TransactionHandlerRegistry實現了DisposableBean接口,其clear方法直接清空listenerContainers;registerTransactionHandler方法會往listenerContainers添加該handler的name,然后執行rocketMQTemplate.createAndStartTransactionMQProducer來創建并啟動TransactionMQProducer
RocketMQTransactionAnnotationProcessor實現了BeanPostProcessor, Ordered, ApplicationContextAware接口
postProcessAfterInitialization方法會查找標記有RocketMQTransactionListener注解的bean,然后執行processTransactionListenerAnnotation方法
processTransactionListenerAnnotation方法會創建transactionHandler,然后執行transactionHandlerRegistry.registerTransactionHandler進行注冊
“RocketMQTransactionAnnotationProcessor的原理和用法”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。