您好,登錄后才能下訂單哦!
本篇內容主要講解“Dubbo服務導出到本地的方法”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“Dubbo服務導出到本地的方法”吧!
context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-provider.xml"}); // 刪除了一些步驟 public void refresh() throws BeansException, IllegalStateException { synchronized (this.startupShutdownMonitor) { try { // 1. 里面的核心代碼就是初始化了applicationEventMulticaster,用于后面發布事件使用 // this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory); initApplicationEventMulticaster(); // 2. 初始化非延遲加載的bean,這里就會初始化dubbo配置的一些bean,包括ServiceBean,用于服務導出 finishBeanFactoryInitialization(beanFactory); // 3. 發布容器刷新事件,這里面是服務導出的入口 finishRefresh(); } } }
// 步驟2分析 // 這里Spring容器會初始化非延遲加載的bean,包括<dubbo:service/>表示的bean // <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/> finishBeanFactoryInitialization(beanFactory);
// Spring容器初始化<dubbo:service/>表示的ServiceBean時會創建ServiceBean對象,由于ServiceBean實現了 // ApplicationContextAware接口,所以Spring容器會先調用setApplicationContext給其注入Spring容器 class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware { @Override public void setApplicationContext(ApplicationContext applicationContext) { this.applicationContext = applicationContext; // 給SpringExtensionFactory注入Spring容器 SpringExtensionFactory.addApplicationContext(applicationContext); if (applicationContext != null) { SPRING_CONTEXT = applicationContext; try { // method是addListener方法,調用該方法用于給applicationEventMulticaster // 添加listener method.invoke(applicationContext, new Object[]{this}); supportedApplicationListener = true; } } } }
// 步驟3分析,發布相關事件,這里會發布容器刷新事件 finishRefresh(); protected void finishRefresh() { initLifecycleProcessor(); getLifecycleProcessor().onRefresh(); // 1). 發布容器刷新事件,ServiceBean監聽的就是該事件 // ServiceBean implements ApplicationListener<ContextRefreshedEvent> publishEvent(new ContextRefreshedEvent(this)); LiveBeansView.registerApplicationContext(this); } // 2). 步驟1會走到這里,這里會獲取之前的applicationEventMulticaster,用于發布事件 getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType); // 3). 走到這里,listener就是之前調用 method.invoke(applicationContext, new Object[]{this}); // 加進去的ServiceBean,this表示ServiceBean,也是listener,event就是容器刷新事件 doInvokeListener(listener, event); // 4) 走到這里,最終調用ServiceBean實現的onApplicationEvent方法 listener.onApplicationEvent(event);
這樣,就走到了Dubbo暴露服務的入口
的方法。這也是Dubbo官方文檔中提及的入口方法,參考 服務導出
public void onApplicationEvent(ContextRefreshedEvent event) { // 如果服務沒有被暴露并且服務沒有被取消暴露,則打印日志 if (isDelay() && !isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } // 導出服務 export(); } }
接下來研究一下 Dubbo 導出服務的過程。Dubbo 服務導出過程始于Spring容器發布刷新事件,Dubbo在接收到事件后,會立即執行服務導出邏輯。整個邏輯大致可分為三個部分,第一部分是前置工作,主要用于檢查參數,組裝 URL。第二部分是導出服務,包含導出服務到本地 (JVM),和導出服務到遠程兩個過程。第三部分是向注冊中心注冊服務,用于服務發現。下面將會對這三個部分代碼進行詳細的分析。
服務導出的入口方法是ServiceBean的onApplicationEvent
。onApplicationEvent 是一個事件響應方法,該方法會在收到Spring上下文刷新事件后執行服務導出操作。方法代碼如下
public void onApplicationEvent(ContextRefreshedEvent event) { // 如果服務沒有被暴露并且服務沒有被取消暴露,則打印日志 if (isDelay() && !isExported() && !isUnexported()) { if (logger.isInfoEnabled()) { logger.info("The service ready on spring started. service: " + getInterface()); } // 導出服務 export(); } }
這個方法首先會根據條件決定是否導出服務,比如有些服務設置了延時導出,那么此時就不應該在此處導出。還有一些服務已經被導出了,或者當前服務被取消導出了,此時也不能再次導出相關服務。注意這里的 isDelay 方法,這個方法字面意思是“是否延遲導出服務”,返回 true 表示延遲導出,false 表示不延遲導出。但是該方法真實意思卻并非如此,當方法返回 true 時,表示無需延遲導出。返回 false 時,表示需要延遲導出。與字面意思恰恰相反,這個需要大家注意一下。 前置工作主要包含兩個部分,分別是配置檢查,以及 URL 裝配。在導出服務之前,Dubbo 需要檢查用戶的配置是否合理,或者為用戶補充缺省配置。配置檢查完成后,接下來需要根據這些配置組裝 URL。在 Dubbo 中,URL 的作用十分重要。Dubbo 使用 URL 作為配置載體,所有的拓展點都是通過 URL 獲取配置。這一點,官方文檔中有所說明。下面的export方法會走到doExport()
方法。
public synchronized void export() { if (provider != null) { if (export == null) { export = provider.getExport(); } if (delay == null) { delay = provider.getDelay(); } } if (export != null && !export) { return; } if (delay != null && delay > 0) { delayExportExecutor.schedule(new Runnable() { @Override public void run() { doExport(); } }, delay, TimeUnit.MILLISECONDS); } else { doExport(); } }
以下是配置檢查的相關分析,代碼比較多,需要大家耐心看一下。下面對配置檢查的邏輯進行簡單的總結,如下:
檢測 dubbo:service 標簽的 interface 屬性合法性,不合法則拋出異常
檢測 ProviderConfig、ApplicationConfig 等核心配置類對象是否為空,若為空,則嘗試從其他配置類對象中獲取相應的實例。
檢測并處理泛化服務和普通服務類
檢測本地存根配置,并進行相應的處理
對 ApplicationConfig、RegistryConfig 等配置類進行檢測,為空則嘗試創建,若無法創建則拋出異常配置檢查并非本文重點,因此這里不打算對 doExport 方法所調用的方法進行分析(doExportUrls 方法除外)。在這些方法中,除了appendProperties方法稍微復雜一些,其他方法邏輯不是很復雜。因此,大家可自行分析。
protected synchronized void doExport() { if (unexported) { throw new IllegalStateException("Already unexported!"); } if (exported) { return; } exported = true; if (interfaceName == null || interfaceName.length() == 0) { // 拋異常 } checkDefault(); if (provider != null) { if (application == null) { application = provider.getApplication(); } if (module == null) { module = provider.getModule(); } if (registries == null) { registries = provider.getRegistries(); } if (monitor == null) { monitor = provider.getMonitor(); } if (protocols == null) { protocols = provider.getProtocols(); } } if (module != null) { if (registries == null) { registries = module.getRegistries(); } if (monitor == null) { monitor = module.getMonitor(); } } if (application != null) { if (registries == null) { registries = application.getRegistries(); } if (monitor == null) { monitor = application.getMonitor(); } } // 檢測ref是否為泛化服務類型 if (ref instanceof GenericService) { // 設置interfaceClass為GenericService interfaceClass = GenericService.class; if (StringUtils.isEmpty(generic)) { // 設置generic = true generic = Boolean.TRUE.toString(); } } else { try { // 獲得接口類型 interfaceClass = Class.forName(interfaceName, true, Thread.currentThread().getContextClassLoader()); } catch (ClassNotFoundException e) { throw new IllegalStateException(e.getMessage(), e); } // 對interfaceClass,以及<dubbo:method>標簽中的必要字段進行檢查 checkInterfaceAndMethods(interfaceClass, methods); // 對ref合法性進行檢測 checkRef(); generic = Boolean.FALSE.toString(); } // stub local一樣都是配置本地存根 if (local != null) { if ("true".equals(local)) { local = interfaceName + "Local"; } Class<?> localClass; try { localClass = ClassHelper.forNameWithThreadContextClassLoader(local); } } if (stub != null) { if ("true".equals(stub)) { stub = interfaceName + "Stub"; } Class<?> stubClass; try { stubClass = ClassHelper.forNameWithThreadContextClassLoader(stub); } } checkApplication(); checkRegistry(); checkProtocol(); appendProperties(this); // 本地存根、mock合法性校驗 checkStubAndMock(interfaceClass); if (path == null || path.length() == 0) { path = interfaceName; } // 核心代碼,暴露服務、注冊邏輯就在其中 doExportUrls(); ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref); ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel); }
Dubbo 允許我們使用不同的協議導出服務,也允許我們向多個注冊中心注冊服務。Dubbo在doExportUrls方法中對多協議,多注冊中心進行了支持。相關代碼如下
/** * 多協議多注冊中心暴露服務進行支持 */ private void doExportUrls() { // 加載注冊中心鏈接 List<URL> registryURLs = loadRegistries(true); // 遍歷protocols,并在每個協議下暴露 for (ProtocolConfig protocolConfig : protocols) { doExportUrlsFor1Protocol(protocolConfig, registryURLs); } }
上面代碼首先是通過loadRegistries加載注冊中心鏈接,然后再遍歷ProtocolConfig集合導出每個服務。并在導出服務的過程中,將服務注冊到注冊中心。我們先來看一下loadRegistries方法的邏輯。先可以打開看下該方法可以得到什么。
<!-- provider's application name, used for tracing dependency relationship --> <dubbo:application name="demo-provider"/> <!-- use multicast registry center to export service --> <dubbo:registry address="zookeeper://10.101.99.127:2181"/> <!-- use dubbo protocol to export service on port 20880 --> <dubbo:protocol name="dubbo" port="20880"/> <dubbo:provider server="netty4"/> <!-- service implementation, as same as regular local bean --> <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/> <!-- declare the service interface to be exported --> <dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
protected List<URL> loadRegistries(boolean provider) { checkRegistry(); List<URL> registryList = new ArrayList<URL>(); // 如果registries為空,直接返回空集合 if (registries != null && !registries.isEmpty()) { // 遍歷注冊中心配置集合registries for (RegistryConfig config : registries) { // 獲得地址 String address = config.getAddress(); // 若地址為空,則設置為0.0.0.0 if (address == null || address.length() == 0) { address = Constants.ANYHOST_VALUE; } String sysaddress = System.getProperty("dubbo.registry.address"); if (sysaddress != null && sysaddress.length() > 0) { address = sysaddress; } // 如果地址為N/A,則跳過 if (address.length() > 0 && !RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); // 添加ApplicationConfig中的字段信息到map中 appendParameters(map, application); // 添加RegistryConfig字段信息到map中 appendParameters(map, config); // 添加path、協議版本 map.put("path", RegistryService.class.getName()); map.put("dubbo", Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } // 如果map中沒有protocol,則默認為使用dubbo協議 if (!map.containsKey("protocol")) { if (ExtensionLoader.getExtensionLoader(RegistryFactory.class).hasExtension("remote")) { map.put("protocol", "remote"); } else { map.put("protocol", "dubbo"); } } // 解析得到URL列表,address可能包含多個注冊中心ip,因此解析得到的是一個URL列表 List<URL> urls = UrlUtils.parseURLs(address, map); // 遍歷URL 列表 for (URL url : urls) { // 將URL協議頭設置為registry url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol()); // 這里將協議設置為了registry,這也是后面調用的是RegistryProtocol的export()方法原因 url = url.setProtocol(Constants.REGISTRY_PROTOCOL); // 通過判斷條件,決定是否添加url到registryList中,條件如下: // 如果是服務提供者,并且是注冊中心服務或者是消費者端,并且是訂閱服務,則加入到registryList if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
ProtocolConfig主要封裝了<dubbo:protocol name="dubbo" port="20880"/>標簽的信息,意思是使用Dubbo協議暴露服務。
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { // 獲取協議名 String name = protocolConfig.getName(); // 如果為空,則是默認的dubbo if (name == null || name.length() == 0) { name = "dubbo"; } Map<String, String> map = new HashMap<String, String>(); // 設置服務提供者側 map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion()); map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis())); if (ConfigUtils.getPid() > 0) { map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid())); } // 這段代碼其實完成了子節點配置信息對父節點的覆蓋 appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this); // 如果method的配置列表不為空 if (methods != null && !methods.isEmpty()) { // 遍歷method配置列表 for (MethodConfig method : methods) { // 把方法名加入map appendParameters(map, method, method.getName()); // 添加 MethodConfig對象的字段信息到map中,鍵=方法名.屬性名 // 比如存儲<dubbo:method name="sayHello" retries="2">對應的MethodConfig, // 鍵=sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"} String retryKey = method.getName() + ".retry"; if (map.containsKey(retryKey)) { String retryValue = map.remove(retryKey); // 如果retryValue為false,則不重試,設置值為0 if ("false".equals(retryValue)) { map.put(method.getName() + ".retries", "0"); } }
if (ProtocolUtils.isGeneric(generic)) { map.put(Constants.GENERIC_KEY, generic); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames(); if (methods.length == 0) { map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ",")); } } if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(Constants.TOKEN_KEY, token); } }
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) { protocolConfig.setRegister(false); map.put("notify", "false"); } // export service String contextPath = protocolConfig.getContextpath(); if ((contextPath == null || contextPath.length() == 0) && provider != null) { contextPath = provider.getContextpath(); } String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); }
String scope = url.getParameter(Constants.SCOPE_KEY); // don't export when none is configured if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) { // 暴露到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } // 暴露到遠程 if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) { // 后面分析 } } this.urls.add(url); }
前置工作做完,接下來就可以進行服務導出了。服務導出分為導出到本地(JVM)和導出到遠程。
// 暴露到本地 if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) { exportLocal(url); } private void exportLocal(URL url) { // 如果協議不是injvm if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) { // 生成本地的url,分別把協議改為injvm,設置host和port URL local = URL.valueOf(url.toFullString()).setProtocol(Constants.LOCAL_PROTOCOL).setHost(LOCALHOST).setPort(0); ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref)); // 通過代理工程創建invoker // 再調用export方法進行暴露服務,生成Exporter // 這里的protocol是生成的拓展代理對象,具體可看https://segmentfault.com/a/1190000020384210 // 它是在運行時才根據URL中的protocol參數去決定運行哪個Protocol實例的export方法,這里由于前面 // setProtocol(Constants.LOCAL_PROTOCOL),所以調用的是InjvmProtocol的export方法 Exporter<?> exporter = protocol.export(proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); // 把生成的暴露者加入集合 exporters.add(exporter); } }
下面兩個是url和local的具體值,因為Dubbo采用自適應拓展機制,exportLocal(URL url)中用到的protocol是自適應拓展,protocol的export方法會用到URL中protocol參數從而決定具體生成protocol的哪個實例,所以URL的protocol值可以關注下。
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
下面分析下面這句代碼。它是核心方法,分為兩步。
Exporter<?> exporter = protocol.export( proxyFactory.getInvoker(ref, (Class) interfaceClass, local)); 1) proxyFactory.getInvoker(ref, (Class) interfaceClass, local) -> 返回invoker 2) protocol.export(invoker)
// 步驟1)分析 // proxyFactory也是自適應拓展代理帶,它默認使用JavassistProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension(); // 這里調用的就是JavassistProxyFactory的getInvoker方法 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { // 創建Wrapper對象 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type); // 創建匿名Invoker類對象,并實現doInvoke方法 return new AbstractProxyInvoker<T>(proxy, type, url) { @Override protected Object doInvoke(T proxy, String methodName,Class<?>[] parameterTypes,Object[] arguments) throws Throwable { // 調用Wrapper的invokeMethod方法,invokeMethod最終會調用目標方法 return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments); } }; }
在 Dubbo 中,Invoker是一個非常重要的模型
。在服務提供端,以及服務引用端均會出現Invoker。Dubbo 官方文檔中對Invoker進行了說明,這里引用一下。Invoker是實體域,它是Dubbo的核心模型,其它模型都向它靠擾,或轉換成它,它代表一個可執行體,可向它發起invoke調用
,它有可能是一個本地的實現,也可能是一個遠程的實現,也可能一個集群實現。這里面getInvoker方法創建了一個匿名Invoker對象,我理解是通過invoke實行遠程調用時,會走wrapper.invokeMethod方法,而wrapper實際上是一個代理類,調用wrapper.invokeMethod最終會走proxy,也就是DemoService的sayHello方法。Wrapper創建比較復雜,可以參考 Dubbo中JavaAssist的Wrapper.getWrapper生成代理分析。
// 步驟2分析,調用的是InjvmProtocol的export方法 public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { // 該方法只是創建了一個,因為暴露到本地,所以在同一個jvm中,所以不需要其他操作 return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap); }
到此,相信大家對“Dubbo服務導出到本地的方法”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。