您好,登錄后才能下訂單哦!
SpringCloud中服務注冊與發現Eureka以及注冊源碼的示例分析,相信很多沒有經驗的人對此束手無策,為此本文總結了問題出現的原因和解決方法,通過這篇文章希望你能解決這個問題。
服務注冊與發現
關系:
1.服務提供者在啟動時,向注冊中心注冊自己提供的服務。
2.服務消費者在啟動時,向注冊中心訂閱自己所需的服務。
3.注冊中心返回服務提供者地址給消費者。
4.服務消費者從提供者地址中調用消費者。
Eureka介紹:
Spring Cloud Eureka 是Spring Cloud Netflix 微服務套件中的一部分, 它基于Netflix Eureka 做了二次封裝, 主要負責完成微服務架構中的服務治理功能。Spring Cloud 通過為Eureka 增加了Spring Boot 風格的自動化配置,我們只需通過簡單引入依賴和注解配置就能讓Spring Boot 構建的微服務應用輕松地與Eureka 服務治理體系進行整合。
Netflix Eureka
Spring Cloud Eureka, 使用Netflix Eureka來實現服務注冊與發現, 它既包含了服務端組件,也包含了客戶端組件,并且服務端與客戶端均采用Java編寫,所以Eureka主要適用于通過Java實現的分布式系統,或是與NM兼容語言構建的系統。但是, 由于Eureka服務端的服務治理機制提供了完備的RESTfulAPL所以它也支持將非Java語言構建的微服務應用納入Eureka的服務治理體系中來。只是在使用其他語言平臺的時候,需要自己來實現Eureka的客戶端程序。不過慶幸的是,在目前幾個較為流行的開發平臺上,都已經有了一些針對Eureka 注冊中心的客戶端實現框架, 比如.NET平臺的Steeltoe、Node.js 的eureka-js-client等。
Eureka服務端, 我們也稱為服務注冊中心。它同其他服務注冊中心一樣,支持高可用配置。它依托于強一致性提供良好的服務實例可用性, 可以應對多種不同的故障場景。如果Eureka以集群模式部署,當集群中有分片出現故障時,那么Eureka就轉入自我保護模式。它允許在分片故障期間繼續提供服務的發現和注冊,當故障分片恢復運行時, 集群中的其他分片會把它們的狀態再次同步回來。以在AWS 上的實踐為例, Netflix推薦每個可用的區域運行一個Eureka服務端,通過它來形成集群。不同可用區域的服務注冊中心通過異步模式互相復制各自的狀態,這意味著在任意給定的時間點每個實例關于所有服務的狀態是有細微差別的。
Eureka客戶端,主要處理服務的注冊與發現。客戶端服務通過注解和參數配置的方式,嵌入在客戶端應用程序的代碼中, 在應用程序運行時,Eureka客戶端向注冊中心注冊自身提供的服務并周期性地發送心跳來更新它的服務租約。同時,它也能從服務端查詢當前注冊的服務信息并把它們緩存到本地并周期性地刷新服務狀態。
Eureka環境搭建
一、服務端搭建
1.新建項目
2.配置
# server (eureka 默認端口為:8761) server.port=8761 # spring spring.application.name=spring-cloud-server # eureka # 是否注冊到eureka eureka.client.register-with-eureka=false # 是否從eureka獲取注冊信息 eureka.client.fetch-registry=false # eureka服務器的地址(注意:地址最后面的 /eureka/ 這個是固定值) eureka.client.serviceUrl.defaultZone=http://localhost:${server.port}/eureka/
@EnableEurekaServer @SpringBootApplication public class CloudTestEurekaServerApplication { public static void main(String[] args) { SpringApplication.run(CloudTestEurekaServerApplication.class, args); } }
3.啟動服務
二、客戶端(provider)搭建
1.創建項目
2.配置
# server server.port=7777 # spring spring.application.name=spring-cloud-provider # eureka eureka.client.serviceUrl.defaultZone=http://localhost:8761/eureka/
@EnableEurekaClient @SpringBootApplication public class CloudTestEurekaProviderApplication { public static void main(String[] args) { SpringApplication.run(CloudTestEurekaProviderApplication.class, args); } }
3.啟動服務
三、消費者(consumer)搭建
自我保護
當我們在本地調試基于Eureka的程序時, 基本上都會碰到這樣一個問題, 在服務注冊中心的信息面板中出現類似下面的紅色警告信息:
EMERGENCY! EUREKA MAY BE INCORRECTLY CLAIMING INSTANCES ARE UP WHEN THEY'RE NOT.RENEWALS ARE LESSER TH邸THRESHOLD AND HENCE THE INSTANCES ARE NOT BEING EXPI邸D JUST TO BE SAFE.
實際上, 該警告就是觸發了EurekaServer的自我保護機制。之前我們介紹過, 服務注冊到EurekaServer之后,會維護一個心跳連接,告訴EurekaServer自己還活著。EurekaServer在運行期間,會統計心跳失敗的比例在15分鐘之內是否低于85%, 如果出現低于的情況(在單機調試的時候很容易滿足, 實際在生產環境上通常是由于網絡不穩定導致), EurekaServer會將當前的實例注冊信息保護起來, 讓這些實例不會過期, 盡可能保護這些注冊信息。但是, 在這段保護期間內實例若出現問題, 那么客戶端很容易拿到實際已經不存在的服務實例, 會出現調用失敗的清況, 所以客戶端必須要有容錯機制, 比如可以使用請求重試、斷路器等機制。
Euraka服務啟動過程源碼解析
啟動日志如下
[ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'environmentManager': registering with JMX server as MBean [org.springframework.cloud.context.environment:name=environmentManager,type=EnvironmentManager] [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'serviceRegistryEndpoint': registering with JMX server as MBean [org.springframework.cloud.client.serviceregistry.endpoint:name=serviceRegistryEndpoint,type=ServiceRegistryEndpoint] [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'refreshScope': registering with JMX server as MBean [org.springframework.cloud.context.scope.refresh:name=refreshScope,type=RefreshScope] [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'configurationPropertiesRebinder': registering with JMX server as MBean [org.springframework.cloud.context.properties:name=configurationPropertiesRebinder,context=ab7395e,type=ConfigurationPropertiesRebinder] [ main] o.s.j.e.a.AnnotationMBeanExporter : Located managed bean 'refreshEndpoint': registering with JMX server as MBean [org.springframework.cloud.endpoint:name=refreshEndpoint,type=RefreshEndpoint] [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0 [ main] o.s.c.n.e.s.EurekaServiceRegistry : Registering application spring-cloud-server with eureka with status UP [ Thread-11] o.s.c.n.e.server.EurekaServerBootstrap : Setting the eureka configuration.. [ Thread-11] o.s.c.n.e.server.EurekaServerBootstrap : Eureka data center value eureka.datacenter is not set, defaulting to default [ Thread-11] o.s.c.n.e.server.EurekaServerBootstrap : Eureka environment value eureka.environment is not set, defaulting to test [ Thread-11] o.s.c.n.e.server.EurekaServerBootstrap : isAws returned false [ Thread-11] o.s.c.n.e.server.EurekaServerBootstrap : Initialized server context [ Thread-11] c.n.e.r.PeerAwareInstanceRegistryImpl : Got 1 instances from neighboring DS node [ Thread-11] c.n.e.r.PeerAwareInstanceRegistryImpl : Renew threshold is: 1 [ Thread-11] c.n.e.r.PeerAwareInstanceRegistryImpl : Changing status to UP [ Thread-11] e.s.EurekaServerInitializerConfiguration : Started Eureka Server [ main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8761 (http) [ main] .s.c.n.e.s.EurekaAutoServiceRegistration : Updating port to 8761 [ main] c.m.e.CloudTestEurekaServerApplication : Started CloudTestEurekaServerApplication in 4.076 seconds (JVM running for 4.54) [nio-8761-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring FrameworkServlet 'dispatcherServlet' [nio-8761-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization started [nio-8761-exec-1] o.s.web.servlet.DispatcherServlet : FrameworkServlet 'dispatcherServlet': initialization completed in 11 ms [a-EvictionTimer] c.n.e.registry.AbstractInstanceRegistry : Running the evict task with compensationTime 0ms
2.可以看到在"Started Eureka Server"這一行,發現執行了類EurekaServerInitializerConfiguration,所以它是程序入口,進入:
@Configuration public class EurekaServerInitializerConfiguration implements ServletContextAware, SmartLifecycle, Ordered { @Override public void start() { new Thread(new Runnable() { @Override public void run() { try { //TODO: is this class even needed now? eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server"); publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); EurekaServerInitializerConfiguration.this.running = true; publish(new EurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exception ex) { // Help! log.error("Could not initialize Eureka servlet context", ex); } } }).start(); }
3.可以發現這個類上面有注解@Configuration,說明這個類可以被spring容器感知到,然后實例化,并且會執行start()方法,開啟一個線程執行功能;然后再進入contextInitialied方法:
public class EurekaServerBootstrap { public void contextInitialized(ServletContext context) { try { initEurekaEnvironment(); initEurekaServerContext(); context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } }
4.可發現上面方法主要有兩個功能:環境初始化和服務初始化,這里只看服務初始化,進入initEurekaServerContext()方法,可以看到下面代碼:
public class EurekaServerBootstrap { protected void initEurekaServerContext() throws Exception { // For backward compatibility JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); if (isAws(this.applicationInfoManager.getInfo())) { this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager); this.awsBinder.start(); } EurekaServerContextHolder.initialize(this.serverContext); log.info("Initialized server context"); // Copy registry from neighboring eureka node int registryCount = this.registry.syncUp(); this.registry.openForTraffic(this.applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); }
5.上面代碼首先初始化server上下文,然后再去注冊。可以看到先獲得變量registryCount(注冊表),然后通過調用openForTraffic方法,為注冊監測數據做準備,或者可以這樣說(檢測監測的數據是否存活,如果不存活,做剔除操作),下面是函數一步一步進入的情況:
public class InstanceRegistry extends PeerAwareInstanceRegistryImpl implements ApplicationContextAware { @Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { super.openForTraffic(applicationInfoManager, count == 0 ? this.defaultOpenForTrafficCount : count); }
@Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. this.expectedNumberOfRenewsPerMin = count * 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); logger.info("Got " + count + " instances from neighboring DS node"); logger.info("Renew threshold is: " + numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); super.postInit(); }
進入postInit()方法:
protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); }
之前在日志中有一句:"Running the evict task with compensationTime 0ms",這句話就是做節點剔除操作,就是在EvictionTask()方法執行的。
失效剔除
有些時候, 我們的服務實例并不一定會正常下線, 可能由于內存溢出、網絡故障等原因使得服務不能正常工作, 而服務注冊中心并未收到“服務下線” 的請求。為了從服務列表中將這些無法提供服務的實例剔除, Eureka Server在啟動的時候會創建一個定時任務,默認每隔一段時間(默認為60秒) 將當前清單中超時(默認為90秒)沒有續約的服務剔除出去。
class EvictionTask extends TimerTask { private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l); @Override public void run() { try { long compensationTimeMs = getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); evict(compensationTimeMs); } catch (Throwable e) { logger.error("Could not run the evict task", e); } } /** * compute a compensation time defined as the actual time this task was executed since the prev iteration, * vs the configured amount of time for execution. This is useful for cases where changes in time (due to * clock skew or gc for example) causes the actual eviction task to execute later than the desired time * according to the configured cycle. */ long getCompensationTimeMs() { long currNanos = getCurrentTimeNano(); long lastNanos = lastExecutionNanosRef.getAndSet(currNanos); if (lastNanos == 0l) { return 0l; } long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos); long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs(); return compensationTime <= 0l ? 0l : compensationTime; } long getCurrentTimeNano() { // for testing return System.nanoTime(); } }
Eureka客戶端注冊過程
入口:DiscoveryClient
首先上日志信息:
[ main] com.netflix.discovery.DiscoveryClient : Initializing Eureka in region us-east-1 [ main] com.netflix.discovery.DiscoveryClient : Client configured to neither register nor query for data. [ main] com.netflix.discovery.DiscoveryClient : Discovery Client initialized at timestamp 1517475139464 with initial instances count: 0 [ main] c.n.eureka.DefaultEurekaServerContext : Initializing ...
進入DiscoveryClient,首先看它的構造函數,里面執行initScheduledTasks()方法進行注冊
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else { logger.warn("Setting instanceInfo to a passed in null value"); } this.backupRegistryProvider = backupRegistryProvider; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }
2.進入initScheduledTasks()方法,里面有兩個大if代碼塊,"clientConfig.shouldRegisterWithEureka()"是核心邏輯,用于向Eureka注冊
/** * Initializes all scheduled tasks. */ private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs); // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
3.上面代碼執行了onDemandUpdate()方法,進入可見:
public boolean onDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); } InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; } } public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
4.可以看見,執行了register()方法進行注冊,進入:
boolean register() throws Throwable { logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
5.最后函數返回204,所以當注冊狀態為204,即為注冊成功。現在客戶端注冊成功了,就應該到了服務端接收注冊的過程:
Eureka接收注冊的過程
入口ApplicationResource
@Produces({"application/xml", "application/json"}) public class ApplicationResource { /** * Registers information about a particular instance for an * {@link com.netflix.discovery.shared.Application}. * * @param info * {@link InstanceInfo} information of the instance. * @param isReplication * a header parameter containing information whether this is * replicated from other nodes. */ @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
2.可以看到,最終也是返回204;同時一步一步進入register方法,看是如何完成注冊的。
@Override public void register(final InstanceInfo info, final boolean isReplication) { handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication); super.register(info, isReplication); }
/** * Registers the information about the {@link InstanceInfo} and replicates * this information to all peer eureka nodes. If this is replication event * from other replica nodes then it is not replicated. * * @param info * the {@link InstanceInfo} to be registered and replicated. * @param isReplication * true if this is a replication event from other replica nodes, * false otherwise. */ @Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
/** * Registers a new instance with a given duration. * * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean) */ public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { // Since the client wants to cancel it, reduce the threshold // (1 // for 30 seconds, 2 for a minute) this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // This is where the initial state transfer of overridden status happens if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } }
由于Eureka并沒有數據庫,索引通過map放在數據庫中。
看完上述內容,你們掌握SpringCloud中服務注冊與發現Eureka以及注冊源碼的示例分析的方法了嗎?如果還想學到更多技能或想了解更多相關內容,歡迎關注億速云行業資訊頻道,感謝各位的閱讀!
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。