您好,登錄后才能下訂單哦!
本篇內容主要講解“dubbo怎么實現consumer從多個group中調用指定group的provider”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“dubbo怎么實現consumer從多個group中調用指定group的provider”吧!
在工作中,遇到這樣的場景:
有個es索引構建服務,需要從各個業務服務獲取索引的信息,從而構建索引,業務服務都實現同一個接口IndexInfoProvider,通過設置不同的group來達到區分的效果(group就是es索引名)。
索引構建服務在內存維護了一個Map<String, IndexInfoProvider> providerMap,key是索引名——也就是provider的group,value是IndexInfoProvider服務的consumer。
為了圖方便,索引構建服務還一次性初始化了所有consumer,假設總共有200個分組,那么Map就會緩存200個consumer,初始化的時候巨慢。如果不一次性初始化,而是按需的話,代碼會變得復雜一些,可能需要像雙重檢驗這樣的措施。而且因為是緩存,必然也會遇到緩存一致性的問題,例如新增一個索引。
基于這樣的問題,就想著,dubbo的一個consumer,能不能按需調用指定分組的provider呢~
為什么有200個分組,就要緩存200個consumer,而不是像我們平時那樣,1個就可以呢?
業務服務在實現IndexInfoProvider接口的時候,都指定是分組,而且分組名,就是對應的索引名。
相應的,消費者就必須指定消費的分組,200個索引,自然就需要200個consumer。
在我們日常的使用中,consumer基本都是只能調用一個group的provider。但實際上,dubbo是支持調用多個group的provider的。
這樣寫是指定a分組和b分組,多個分組之間用英文逗號分隔
這樣寫,是所有的分組
如果將es索引構建服務的IndexInfoProvider接口的consumer分組設置為group="*",然后在調用接口的時候,根據需要,從一堆provider中篩選出指定group的provider,只調用這些provider,是不是就可以代替Map緩存了?
初步方案:
利用ThreadLocal來指定要調用的分組,在調用方法前設置group到ThreadLocal中。
實現dubbo的負載均衡拓展,只選取指定分組的節點來調用。
調用結束,清理ThreadLocal中的分組信息。
似乎是可行的?
通過源碼和代碼debug發現,consumer持有的Invoker,有點像一個責任鏈。
如果是單分組——!group.contains(",") && !group.equals("*"),那么會是這樣:
MockClusterInvoker->FailoverClusterInvoker
如果是多分組,則會變成:
MockClusterInvoker->MergeableClusterInvoker->FailoverClusterInvoker
負載均衡的機制,是得到了FailoverClusterInvoker才會生效
MockClusterInvoker只是一種降級機制,不是導致問題的原因
問題是出在MergeableClusterInvoker,下面是導致問題的代碼!!!
代碼大概的意思是:消費者沒有指定合并策略,那么就會調用第一個有效的服務提供者,如果都是無效,就直接調用第一個。
我是沒有指定合并策略的,所以會變成調用第一個有效的服務提供者。根本走不到負載均衡那里。
不指定合并策略不行,那指定呢?指定合并策略會怎樣呢?
指定了合并策略,會調用所有invoker,然后用Merger合并結果。
這種很適合這樣種場景:
一部分數據在服務A,一部分數據在服務B,需要分別調用A和B,然后把兩者的結果集合并
設想著這樣的方案:
指定了合并策略,那么所有分組的invoker都會被調用到,那么請求就到FailoverClusterInvoker了,負載均衡spi就生效了
修改一下負載均衡spi,如果目前調用的invoker不是指定分組的,那么就直接返回null
實現自己的合并spi,返回第一個不會null的結果
雖然不是什么優雅的方式,但是,似乎是能做到的???
實際上,行不通,達不到想要的效果。原因是MergeableClusterInvoker內部是用線程池并發調用的,ThreadLocal里的分組信息會丟失。
MergeableClusterInvoker類是沒有留拓展的余地啦,還有其他機會么?MergeableClusterInvoker的Invokers列表從哪里來的?
Invokers是從directory來的,這里有沒有拓展的余地呢?
有的,在AbstractDirectory的list方法
看到這,發現用路由spi,還是有機會的,如果實現動態路由,每次只給MergeableClusterInvoker返回指定分組的Invoker,是不是就可以呢?怎么讓routers包含我們自定義的路由spi呢?
需要url上攜帶了router參數,但是這里的url的參數是由誰決定的?@Reference 注解是沒有沒有的指定router參數的…
最后debuig,兜兜轉轉,發現只能靠重寫ReferenceConfig類的loadRegistries方法,往url上加上動態路由的參數。
public class AssignGroupRouterFactory implements RouterFactory { public static final String NAME = "assignGroup"; @Override public Router getRouter(URL url) { return new AssignGroupRouter(url); } }
public class AssignGroupRouter implements Router { private final URL url; public AssignGroupRouter(URL url) { this.url = url; } @Override public URL getUrl() { return url; } @Override public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { String assignGroup = IndexInfoProviderInvoker.getAssignGroup(); if (Objects.isNull(assignGroup)) { return invokers; } return invokers.stream() .filter(invoker -> { URL invokerUrl = invoker.getUrl(); return Objects.equals(assignGroup, invokerUrl.getParameter(Constants.GROUP_KEY)); }).collect(Collectors.toList()); } @Override public int compareTo(Router o) { return 1; } }
public class AssignGroupReferenceConfig<T> extends ReferenceConfig<T> { @Override protected List<URL> loadRegistries(boolean provider) { List<URL> urls = super.loadRegistries(provider); if (CollectionUtils.isEmpty(urls)) { return urls; } // 指定動態路由,路由方式為assignGroup return urls.stream() .map(url -> url.addParameter(Constants.ROUTER_KEY, "assignGroup").addParameter(Constants.RUNTIME_KEY, "true")) .collect(Collectors.toList()); } }
@Component public class IndexInfoProviderInvokerProxy { public static final ThreadLocal<String> INDEX_TYPE_THREAD_LOCAL = new ThreadLocal<>(); private final IndexInfoProvider indexInfoProvider; public IndexInfoProviderInvokerProxy(IndexInfoProvider indexInfoProvider) { this.indexInfoProvider = indexInfoProvider; } /** * 獲取文檔id 在[start, end) 之間的所有索引文檔 * * @param indexType 索引名 * @param start 起始id * @param end 結束id * @return 文檔列表 */ public List<IndexDocument> getDocsByIdRange(String indexType, long start, long end) { return invokeWitchIndexType(indexType, () -> indexInfoProvider.getDocsByIdRange(start, end)); } private <T> T invokeWitchIndexType(String indexType, Supplier<T> supplier) { INDEX_TYPE_THREAD_LOCAL.set(indexType); T result = supplier.get(); INDEX_TYPE_THREAD_LOCAL.remove(); return result; } }
到此,相信大家對“dubbo怎么實現consumer從多個group中調用指定group的provider”有了更深的了解,不妨來實際操作一番吧!這里是億速云網站,更多相關內容可以進入相關頻道進行查詢,關注我們,繼續學習!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。