您好,登錄后才能下訂單哦!
本篇內容介紹了“nacos中DistroConsistencyServiceImpl的原理和作用是什么”的有關知識,在實際案例的操作過程中,不少人都會遇到這樣的困境,接下來就讓小編帶領大家學習一下如何處理這些情況吧!希望大家仔細閱讀,能夠學有所成!
本文主要研究一下nacos的DistroConsistencyServiceImpl
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ConsistencyService.java
public interface ConsistencyService { /** * Put a data related to a key to Nacos cluster * * @param key key of data, this key should be globally unique * @param value value of data * @throws NacosException * @see */ void put(String key, Record value) throws NacosException; /** * Remove a data from Nacos cluster * * @param key key of data * @throws NacosException */ void remove(String key) throws NacosException; /** * Get a data from Nacos cluster * * @param key key of data * @return data related to the key * @throws NacosException */ Datum get(String key) throws NacosException; /** * Listen for changes of a data * * @param key key of data * @param listener callback of data change * @throws NacosException */ void listen(String key, RecordListener listener) throws NacosException; /** * Cancel listening of a data * * @param key key of data * @param listener callback of data change * @throws NacosException */ void unlisten(String key, RecordListener listener) throws NacosException; /** * Tell the status of this consistency service * * @return true if available */ boolean isAvailable(); }
ConsistencyService定義了put、remove、get、listen、unlisten、isAvailable方法
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/EphemeralConsistencyService.java
public interface EphemeralConsistencyService extends ConsistencyService { }
EphemeralConsistencyService接口繼承了ConsistencyService接口
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
@org.springframework.stereotype.Service("distroConsistencyService") public class DistroConsistencyServiceImpl implements EphemeralConsistencyService { private ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); t.setName("com.alibaba.nacos.naming.distro.notifier"); return t; } }); @Autowired private DistroMapper distroMapper; @Autowired private DataStore dataStore; @Autowired private TaskDispatcher taskDispatcher; @Autowired private DataSyncer dataSyncer; @Autowired private Serializer serializer; @Autowired private ServerListManager serverListManager; @Autowired private SwitchDomain switchDomain; @Autowired private GlobalConfig globalConfig; private boolean initialized = false; public volatile Notifier notifier = new Notifier(); private Map<String, CopyOnWriteArrayList<RecordListener>> listeners = new ConcurrentHashMap<>(); private Map<String, String> syncChecksumTasks = new ConcurrentHashMap<>(16); @PostConstruct public void init() { GlobalExecutor.submit(new Runnable() { @Override public void run() { try { load(); } catch (Exception e) { Loggers.DISTRO.error("load data failed.", e); } } }); executor.submit(notifier); } public void load() throws Exception { if (SystemUtils.STANDALONE_MODE) { initialized = true; return; } // size = 1 means only myself in the list, we need at least one another server alive: while (serverListManager.getHealthyServers().size() <= 1) { Thread.sleep(1000L); Loggers.DISTRO.info("waiting server list init..."); } for (Server server : serverListManager.getHealthyServers()) { if (NetUtils.localServer().equals(server.getKey())) { continue; } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("sync from " + server); } // try sync data from remote server: if (syncAllDataFromRemote(server)) { initialized = true; return; } } } //...... public boolean syncAllDataFromRemote(Server server) { try { byte[] data = NamingProxy.getAllData(server.getKey()); processData(data); return true; } catch (Exception e) { Loggers.DISTRO.error("sync full data from " + server + " failed!", e); return false; } } public void processData(byte[] data) throws Exception { if (data.length > 0) { Map<String, Datum<Instances>> datumMap = serializer.deserializeMap(data, Instances.class); for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { dataStore.put(entry.getKey(), entry.getValue()); if (!listeners.containsKey(entry.getKey())) { // pretty sure the service not exist: if (switchDomain.isDefaultInstanceEphemeral()) { // create empty service Loggers.DISTRO.info("creating service {}", entry.getKey()); Service service = new Service(); String serviceName = KeyBuilder.getServiceName(entry.getKey()); String namespaceId = KeyBuilder.getNamespace(entry.getKey()); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(Constants.DEFAULT_GROUP); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); listeners.get(KeyBuilder.SERVICE_META_KEY_PREFIX).get(0) .onChange(KeyBuilder.buildServiceMetaKey(namespaceId, serviceName), service); } } } for (Map.Entry<String, Datum<Instances>> entry : datumMap.entrySet()) { if (!listeners.containsKey(entry.getKey())) { // Should not happen: Loggers.DISTRO.warn("listener of {} not found.", entry.getKey()); continue; } try { for (RecordListener listener : listeners.get(entry.getKey())) { listener.onChange(entry.getKey(), entry.getValue().value); } } catch (Exception e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while execute listener of key: {}", entry.getKey(), e); continue; } // Update data store if listener executed successfully: dataStore.put(entry.getKey(), entry.getValue()); } } } //...... @Override public void put(String key, Record value) throws NacosException { onPut(key, value); taskDispatcher.addTask(key); } @Override public void remove(String key) throws NacosException { onRemove(key); listeners.remove(key); } @Override public Datum get(String key) throws NacosException { return dataStore.get(key); } //...... @Override public void listen(String key, RecordListener listener) throws NacosException { if (!listeners.containsKey(key)) { listeners.put(key, new CopyOnWriteArrayList<>()); } if (listeners.get(key).contains(listener)) { return; } listeners.get(key).add(listener); } @Override public void unlisten(String key, RecordListener listener) throws NacosException { if (!listeners.containsKey(key)) { return; } for (RecordListener recordListener : listeners.get(key)) { if (recordListener.equals(listener)) { listeners.get(key).remove(listener); break; } } } @Override public boolean isAvailable() { return isInitialized() || ServerStatus.UP.name().equals(switchDomain.getOverriddenServerStatus()); } //...... }
DistroConsistencyServiceImpl實現了EphemeralConsistencyService接口
其init方法會異步執行load方法,該方法會執行syncAllDataFromRemote進行初始化,該方法會通過NamingProxy.getAllData獲取data,然后執行processData,它主要是執行回調然后往dataStore添加數據;init方法最后會異步執行Notifier
其put方法會執行onPut方法及taskDispatcher.addTask(key);其remove方法會執行onRemove方法即listeners.remove(key);其get方法直接從dataStore讀取;其listen會添加RecordListener;其unlisten則會移除RecordListener;其isAvailable會通過isInitialized及ServerStatus.UP狀態來判斷
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/consistency/ephemeral/distro/DistroConsistencyServiceImpl.java
public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue<Pair> tasks = new LinkedBlockingQueue<Pair>(1024 * 1024); public void addTask(String datumKey, ApplyAction action) { if (services.containsKey(datumKey) && action == ApplyAction.CHANGE) { return; } if (action == ApplyAction.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } tasks.add(Pair.with(datumKey, action)); } public int getTaskSize() { return tasks.size(); } @Override public void run() { Loggers.DISTRO.info("distro notifier started"); while (true) { try { Pair pair = tasks.take(); if (pair == null) { continue; } String datumKey = (String) pair.getValue0(); ApplyAction action = (ApplyAction) pair.getValue1(); services.remove(datumKey); int count = 0; if (!listeners.containsKey(datumKey)) { continue; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == ApplyAction.CHANGE) { listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == ApplyAction.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } }
Notifier實現了Runnable接口,其run方法會從LinkedBlockingQueue取task,然后挨個執行listener回調
DistroConsistencyServiceImpl實現了EphemeralConsistencyService接口
其init方法會異步執行load方法,該方法會執行syncAllDataFromRemote進行初始化,該方法會通過NamingProxy.getAllData獲取data,然后執行processData,它主要是執行回調然后往dataStore添加數據;init方法最后會異步執行Notifier
其put方法會執行onPut方法及taskDispatcher.addTask(key);其remove方法會執行onRemove方法即listeners.remove(key);其get方法直接從dataStore讀取;其listen會添加RecordListener;其unlisten則會移除RecordListener;其isAvailable會通過isInitialized及ServerStatus.UP狀態來判斷
“nacos中DistroConsistencyServiceImpl的原理和作用是什么”的內容就介紹到這里了,感謝大家的閱讀。如果想了解更多行業相關的知識可以關注億速云網站,小編將為大家輸出更多高質量的實用文章!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。