您好,登錄后才能下訂單哦!
這篇文章主要講解了“NacosNamingService中subscribe及unsubscribe的原理和使用方法”,文中的講解內容簡單清晰,易于學習與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學習“NacosNamingService中subscribe及unsubscribe的原理和使用方法”吧!
本文主要研究一下NacosNamingService的subscribe及unsubscribe
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/NacosNamingService.java
public class NacosNamingService implements NamingService { private static final String DEFAULT_PORT = "8080"; private static final long DEFAULT_HEART_BEAT_INTERVAL = TimeUnit.SECONDS.toMillis(5); /** * Each Naming instance should have different namespace. */ private String namespace; private String endpoint; private String serverList; private String cacheDir; private String logName; private HostReactor hostReactor; private BeatReactor beatReactor; private EventDispatcher eventDispatcher; private NamingProxy serverProxy; //...... @Override public void subscribe(String serviceName, EventListener listener) throws NacosException { subscribe(serviceName, new ArrayList<String>(), listener); } @Override public void subscribe(String serviceName, String groupName, EventListener listener) throws NacosException { subscribe(serviceName, groupName, new ArrayList<String>(), listener); } @Override public void subscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException { subscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener); } @Override public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException { eventDispatcher.addListener(hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")), StringUtils.join(clusters, ","), listener); } @Override public void unsubscribe(String serviceName, EventListener listener) throws NacosException { unsubscribe(serviceName, new ArrayList<String>(), listener); } @Override public void unsubscribe(String serviceName, String groupName, EventListener listener) throws NacosException { unsubscribe(serviceName, groupName, new ArrayList<String>(), listener); } @Override public void unsubscribe(String serviceName, List<String> clusters, EventListener listener) throws NacosException { unsubscribe(serviceName, Constants.DEFAULT_GROUP, clusters, listener); } @Override public void unsubscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException { eventDispatcher.removeListener(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","), listener); } //...... }
subscribe方法執行eventDispatcher.addListener;unsubscribe方法執行eventDispatcher.removeListener
nacos-1.1.3/client/src/main/java/com/alibaba/nacos/client/naming/core/EventDispatcher.java
public class EventDispatcher { private ExecutorService executor = null; private BlockingQueue<ServiceInfo> changedServices = new LinkedBlockingQueue<ServiceInfo>(); private ConcurrentMap<String, List<EventListener>> observerMap = new ConcurrentHashMap<String, List<EventListener>>(); public EventDispatcher() { executor = Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener"); thread.setDaemon(true); return thread; } }); executor.execute(new Notifier()); } public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) { NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map"); List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>()); observers.add(listener); observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers); if (observers != null) { observers.add(listener); } serviceChanged(serviceInfo); } public void removeListener(String serviceName, String clusters, EventListener listener) { NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map"); List<EventListener> observers = observerMap.get(ServiceInfo.getKey(serviceName, clusters)); if (observers != null) { Iterator<EventListener> iter = observers.iterator(); while (iter.hasNext()) { EventListener oldListener = iter.next(); if (oldListener.equals(listener)) { iter.remove(); } } if (observers.isEmpty()) { observerMap.remove(ServiceInfo.getKey(serviceName, clusters)); } } } public List<ServiceInfo> getSubscribeServices() { List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>(); for (String key : observerMap.keySet()) { serviceInfos.add(ServiceInfo.fromKey(key)); } return serviceInfos; } public void serviceChanged(ServiceInfo serviceInfo) { if (serviceInfo == null) { return; } changedServices.add(serviceInfo); } private class Notifier implements Runnable { @Override public void run() { while (true) { ServiceInfo serviceInfo = null; try { serviceInfo = changedServices.poll(5, TimeUnit.MINUTES); } catch (Exception ignore) { } if (serviceInfo == null) { continue; } try { List<EventListener> listeners = observerMap.get(serviceInfo.getKey()); if (!CollectionUtils.isEmpty(listeners)) { for (EventListener listener : listeners) { List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts()); listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts)); } } } catch (Exception e) { NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e); } } } } public void setExecutor(ExecutorService executor) { ExecutorService oldExecutor = this.executor; this.executor = executor; oldExecutor.shutdown(); } }
EventDispatcher的構造器創建了executor,并執行Notifier;Notifier使用一個while true循環不斷執行changedServices.poll(5, TimeUnit.MINUTES)拉取serviceInfo,拉取到的話會從observerMap取出對應的EventListener列表,然后挨個回調listener.onEvent方法
addListener方法則是往observerMap創建或添加observers,然后執行serviceChanged方法;removeListener則是從observerMap移除指定的listener,如果指定key的listener列表為空則刪除該key
serviceChanged方法會往changedServices添加serviceInfo;之后Notifier異步線程可用拉取信息執行listener.onEvent回調
NacosNamingService的subscribe方法執行eventDispatcher.addListener;unsubscribe方法執行eventDispatcher.removeListener
感謝各位的閱讀,以上就是“NacosNamingService中subscribe及unsubscribe的原理和使用方法”的內容了,經過本文的學習后,相信大家對NacosNamingService中subscribe及unsubscribe的原理和使用方法這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關知識點的文章,歡迎關注!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。