您好,登錄后才能下訂單哦!
這篇文章給大家介紹Ribbon中怎么使用 LoadBalancer 實現負載均衡,內容非常詳細,感興趣的小伙伴們可以參考借鑒,希望對大家能有所幫助。
Ribbon 的負載均衡器是通過 LoadBalancerClient 來實現的,在應用啟動的時候,LoadBalancerClient 默認會從 EurekaClient 獲取服務列表,并將服務注冊列表緩存在本地,當調用 LoadBalancerClient 的 choose() 方法的時候, 根據負載均衡策略 IRule 來選擇一個可用的服務,從而實現負載均衡。
當然,LoadBalancerClient 也可以不從 EurekaClient 中獲取服務列表,這是需要自己維護一個服務注冊列表信息,具體操作如下:
ribbon: eureka: enabled: false stores: ribbon: listOfServers: baidu.com, google.com
主要流程:
1. 當應用啟動的時候,ILoadBalancer 從 EurekaClient 獲取服務列表
2. 然后每 10 秒 向 EurekaClient 發送一次心跳檢測,如果注冊列表發生了變化,則更新獲取重新獲取
3. LoadBalancerClient 調用 choose() 方法來選擇服務的時候,會調用 ILoadBalancer 的 chooseServer() 來獲取一個可以的服務,
4. 在 ILoadBalancer 進行獲取服務的時候,會根據負載均衡策略 IRule 來進行選擇
5. 返回可用的服務
下面就來看看每個類的實現原理
RibbonLoadBalancerClient 它是 Ribbon 負載均衡實現的一個重要的類,最終的負載均衡的請求處理都由它來執行,先來看下它的類圖:
它實現了 LoadBalancerClient 接口,而 LoadBalancerClient 接口實現了 ServiceInstanceChooser 接口:
該接口用來從負載均衡器中獲取一個可用的服務,只有一個方法:
public interface ServiceInstanceChooser { /** * @param serviceId:服務ID * @return 可用的服務實例 */ ServiceInstance choose(String serviceId); }
表示負載均衡的客戶端,是一個接口,繼承了 ServiceInstanceChooser 接口 ,共有三個方法:
public interface LoadBalancerClient extends ServiceInstanceChooser { /** * 執行請求 * @param serviceId :用于查找 LoadBalancer的服務ID * @param request:允許實現執行前后操作 */ <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException; /** * 執行請求 * @param serviceId :用于查找 LoadBalancer的服務ID * @param serviceInstance :執行請求的服務 * @param request:允許實現執行前后操作 */ <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException; /** * 創建具有真實主機和端口的正確URI,有些系統使用帶有邏輯服務名的URL作為主機,調用該方法將會使用 host:port 來替換邏輯服務名 * @param instance :用于重建URI的服務實例 * @param original :具有邏輯服務名的URL * @return A reconstructed URI. */ URI reconstructURI(ServiceInstance instance, URI original); }
主要看下從 ServiceInstanceChooser,LoadBalancerClient 中實現的接口方法
public class RibbonLoadBalancerClient implements LoadBalancerClient { // 工廠:主要用來創建客戶端,創建負載均衡器,進行客戶端配置等 // 對于每一個客戶端名稱都會創建一個Spring ApplicationContext,可以從中獲取需要的bean private SpringClientFactory clientFactory; protected ILoadBalancer getLoadBalancer(String serviceId) { return this.clientFactory.getLoadBalancer(serviceId); } .................. } public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> { // 獲取客戶端 public <C extends IClient<?, ?>> C getClient(String name, Class<C> clientClass) { return getInstance(name, clientClass); } // 獲取負載均衡器 public ILoadBalancer getLoadBalancer(String name) { return getInstance(name, ILoadBalancer.class); } //獲取客戶端配置 public IClientConfig getClientConfig(String name) { return getInstance(name, IClientConfig.class); } // 獲取 RibbonLoadBalancerContext public RibbonLoadBalancerContext getLoadBalancerContext(String serviceId) { return getInstance(serviceId, RibbonLoadBalancerContext.class); } // 獲取對應的bean public <T> T getInstance(String name, Class<T> type) { AnnotationConfigApplicationContext context = getContext(name); ..... return context.getBean(type); ..... } }
該方法主要用來獲取一個可用的服務實例
public ServiceInstance choose(String serviceId, Object hint) { Server server = getServer(getLoadBalancer(serviceId), hint); if (server == null) { return null; } // RibbonServer 實現了 ServiceInstance return new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); } // 根據服務ID獲取負載均衡器,會調用 SpringClientFactory 的方法進行獲取 protected ILoadBalancer getLoadBalancer(String serviceId) { return this.clientFactory.getLoadBalancer(serviceId); } // 根據負載均衡器來獲取可用的服務 protected Server getServer(ILoadBalancer loadBalancer, Object hint) { if (loadBalancer == null) { return null; } return loadBalancer.chooseServer(hint != null ? hint : "default"); }
最后會調用 ILoadBalancer.chooseServer 來獲取可用服務,后面再來說 ILoadBalancer 。
該方法執行請求
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { Server server = null; if (serviceInstance instanceof RibbonServer) { server = ((RibbonServer) serviceInstance).getServer(); } RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId); // 狀態記錄器,記錄著服務的狀態 RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server); ........... T returnVal = request.apply(serviceInstance); statsRecorder.recordStats(returnVal); return returnVal; ........... } // apply 方法調用如下,最終返回 ClientHttpResponse public ListenableFuture<ClientHttpResponse> intercept(final HttpRequest request, final byte[] body, final AsyncClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); return this.loadBalancer.execute(serviceName, new LoadBalancerRequest<ListenableFuture<ClientHttpResponse>>() { @Override public ListenableFuture<ClientHttpResponse> apply( final ServiceInstance instance) throws Exception { HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, AsyncLoadBalancerInterceptor.this.loadBalancer); return execution.executeAsync(serviceRequest, body); } }); }
以上就是負載均衡器流程圖左邊部分的原理,接下來看下右邊的部分
通過上面的分析,負載均衡器獲取一個可用的服務,最終會調用 ILoadBalancer 的 chooseServer 方法,下面就來看下 ILoadBalancer 的實現原理
首先來看下 ILoadBalancer 的整體類圖:
在上面的類圖中,主要的邏輯實在 BaseLoadBalancer 中實現,而 DynamicServerListLoadBalancer 主要提供動態獲取服務列表的能力。
首先來看下 ILoadBalancer,它表示一個負載均衡器接口,
public interface ILoadBalancer { // 添加服務 public void addServers(List<Server> newServers); //獲取服務 public Server chooseServer(Object key); //標記某個服務下線 public void markServerDown(Server server); //獲取狀態為UP的所有可用服務列表 public List<Server> getReachableServers(); //獲取所有服務列表,包括可用的和不可用的 public List<Server> getAllServers(); }
實現 ILoadBalancer 接口,提供一些默認實現
public abstract class AbstractLoadBalancer implements ILoadBalancer { public enum ServerGroup{ALL, STATUS_UP, STATUS_NOT_UP} public Server chooseServer() { return chooseServer(null); } public abstract List<Server> getServerList(ServerGroup serverGroup); // 獲取狀態 public abstract LoadBalancerStats getLoadBalancerStats(); }
客戶端配置
public interface IClientConfigAware { public abstract void initWithNiwsConfig(IClientConfig clientConfig); }
負載均衡器的主要實現邏輯,在該類中,會根據負載均衡策略 IRule 來獲取可用的服務,會通過 IPing 來檢測服務的可用性,此外,該類中從 EurkaClient 獲取到服務列表后,會在該類中保存下來,會維護所有的服務列表和可用的服務列表。
首先來看下它的一些屬性,然后再來看每個對應的方法
public class BaseLoadBalancer extends AbstractLoadBalancer implements PrimeConnections.PrimeConnectionListener, IClientConfigAware { // 默認的負載均衡策略:輪詢選擇服務實例 private final static IRule DEFAULT_RULE = new RoundRobinRule(); protected IRule rule = DEFAULT_RULE; // 默認 ping 的策略,會調用 IPing 來檢測服務是否可用 private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy(); protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY; protected IPing ping = null; // 所有服務列表 protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>()); // 狀態為 up 的服務列表 protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>()); // 鎖 protected ReadWriteLock allServerLock = new ReentrantReadWriteLock(); protected ReadWriteLock upServerLock = new ReentrantReadWriteLock(); // 定時任務,去 ping 服務是否可用 protected Timer lbTimer = null; // ping 的時間間隔,10秒 protected int pingIntervalSeconds = 10; // ping 的最大次數 protected int maxTotalPingTimeSeconds = 5; // 負載均衡器的狀態 protected LoadBalancerStats lbStats; // 客戶端配置 private IClientConfig config; // 服務列表變化監聽器 private List<ServerListChangeListener> changeListeners = new CopyOnWriteArrayList<ServerListChangeListener>(); // 服務狀態變化監聽器 private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>(); // 構造方法,使用默認的配置來創建負載均衡器,還有其他重載的構造方法,可用根據需要來創建負載均衡器 public BaseLoadBalancer() { this.name = DEFAULT_NAME; this.ping = null; setRule(DEFAULT_RULE); setupPingTask(); lbStats = new LoadBalancerStats(DEFAULT_NAME); } ..................... }
在上面的屬性中,Ribbon 提供了一些默認的配置:
IClientConfig 表示客戶端的配置,實現類為 DefaultClientConfigImpl,在該類中配置了默認的值,:
public class DefaultClientConfigImpl implements IClientConfig { // ping 的默認策略 DummyPing public static final String DEFAULT_NFLOADBALANCER_PING_CLASSNAME = "com.netflix.loadbalancer.DummyPing"; // DummyPing.class.getName(); public static final String DEFAULT_NFLOADBALANCER_RULE_CLASSNAME = "com.netflix.loadbalancer.AvailabilityFilteringRule"; public static final String DEFAULT_NFLOADBALANCER_CLASSNAME = "com.netflix.loadbalancer.ZoneAwareLoadBalancer"; public static final int DEFAULT_MAX_TOTAL_TIME_TO_PRIME_CONNECTIONS = 30000; public static final int DEFAULT_MAX_RETRIES_PER_SERVER_PRIME_CONNECTION = 9; ............................................. }
IRule 表示 負載均衡策略,即如何去選擇服務實例,默認為 RoundRobinRule,即通過輪詢的方式選擇服務。Ribbon 默認提供的有 7 種。
IPing 表示檢測服務是否可用策略,Ribbon 也提供了很多策略,共有 5 種,默認為 DummyPing
關于 IRule 和 IPing 的策略,后面會專門進行研究。
在 BaseLoadBalancer 中,除了提供一個無參的構造方法(使用的是默認的配置)外,還提供了很多重載的構造方法,下面來看下根據客戶端的配置來創建BaseLoadBalancer :
// 根據客戶端配置來創建 BaseLoadBalancer public BaseLoadBalancer(IClientConfig config) { initWithNiwsConfig(config); } @Override public void initWithNiwsConfig(IClientConfig clientConfig) { // 負載均衡策略 String ruleClassName = (String) clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerRuleClassName); // ping 策略 String pingClassName = (String) clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerPingClassName); IRule rule = (IRule) ClientFactory.instantiateInstanceWithClientConfig(ruleClassName, clientConfig); IPing ping = (IPing) ClientFactory.instantiateInstanceWithClientConfig(pingClassName, clientConfig); // 狀態 LoadBalancerStats stats = createLoadBalancerStatsFromConfig(clientConfig); // 初始化配置 initWithConfig(clientConfig, rule, ping, stats); } void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) { this.config = clientConfig; String clientName = clientConfig.getClientName(); this.name = clientName; // ping 的周期 int pingIntervalTime = Integer.parseInt(clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerPingInterval,Integer.parseInt("30"))); // 最大 ping 的次數 int maxTotalPingTime = Integer.parseInt(clientConfig.getProperty(CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,Integer.parseInt("2"))); setPingInterval(pingIntervalTime); setMaxTotalPingTime(maxTotalPingTime); setRule(rule); setPing(ping); setLoadBalancerStats(stats); rule.setLoadBalancer(this); if (ping instanceof AbstractLoadBalancerPing) { ((AbstractLoadBalancerPing) ping).setLoadBalancer(this); } ................. // 注冊監控/可忽略 init(); }
在上面的構造方法中,可用根據客戶端配置的信息來創建一個BaseLoadBalancer,如客戶端可以配置負載均衡策略,ping的策略,ping的時間間隔和最大次數等。
在 Ribbon 中,負載均衡器多久才去更新獲取服務列表呢?在 BaseLoadBalancer 類中,有一個 setupPingTask 方法,在該方法內部,會創建 PingTask 定時任務去檢測服務的可用性,而 PingTask 又會創建 Pinger 對象,在 Pinger 對象的 runPinger() 方法中,會根據ping策略即 pingerStrategy 的 pingServers(ping, allServer) 來獲取服務的可用性,主要邏輯如下:
void setupPingTask() { if (canSkipPing()) { return; } // 如果已經有了定時任務,則取消 if (lbTimer != null) { lbTimer.cancel(); } // 第二個參數為true,表示它是一個deamon線程 lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true); // 創建 PingTask, 它繼承于 TimerTask,定時執行 run 方法 lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000); ...... } class PingTask extends TimerTask { public void run() { // 默認 pingStrategy = new SerialPingStrategy() new Pinger(pingStrategy).runPinger(); } } public void runPinger() throws Exception { // 如果正在ping,則返回 if (!pingInProgress.compareAndSet(false, true)) { return; // Ping in progress - nothing to do } // 所有的服務,包括不可用的服務 Server[] allServers = null; boolean[] results = null; Lock allLock = null; Lock upLock = null; try { allLock = allServerLock.readLock(); allLock.lock(); allServers = allServerList.toArray(new Server[allServerList.size()]); allLock.unlock(); // 所有服務的數量 int numCandidates = allServers.length; // 所有服務ping的結果 results = pingerStrategy.pingServers(ping, allServers); // 狀態可用的服務列表 final List<Server> newUpList = new ArrayList<Server>(); // 狀態改變的服務列表 final List<Server> changedServers = new ArrayList<Server>(); for (int i = 0; i < numCandidates; i++) { // 最新的狀態 boolean isAlive = results[i]; Server svr = allServers[i]; // 老的狀態 boolean oldIsAlive = svr.isAlive(); // 更新狀態 svr.setAlive(isAlive); // 如果狀態改變了,則放到集合中,會進行重新拉取 if (oldIsAlive != isAlive) { changedServers.add(svr); } // 狀態可用的服務 if (isAlive) { newUpList.add(svr); } } upLock = upServerLock.writeLock(); upLock.lock(); upServerList = newUpList; upLock.unlock(); // 變態改變監聽器 notifyServerStatusChangeListener(changedServers); } finally { // ping 完成 pingInProgress.set(false); } } // 檢測服務的狀態 @Override public boolean[] pingServers(IPing ping, Server[] servers) { int numCandidates = servers.length; boolean[] results = new boolean[numCandidates]; for (int i = 0; i < numCandidates; i++) { results[i] = false; if (ping != null) { results[i] = ping.isAlive(servers[i]); } } return results; }
在上面的邏輯中,Ribbon 每10秒向 EurekaClient 發送 ping 來判斷服務的可用性,如果服務的可用性發生了改變或服務的數量和之前的不一致,則會更新或重新拉取服務。有了這些服務之后,會根據負載均衡策略 IRule 來選擇一個可用的服務。
在前文說到 Ribbon 客戶端 RibbonLoadBalancerClient 選擇服務的時候,最終會調用 ILoadBalancer.chooseServer 來選擇服務,接下來就來看下這個方法:
public Server chooseServer(Object key) { ....... //rule= new RoundRobinRule() return rule.choose(key); .... }
關于 Ribbon 的負載均衡策略 IRule, Ribbon 提供了 7 種,后面再來分析,現在只需要知道通過 IRule 來選擇服務就可以了。
在上面的分析中,Ribbon 會每10秒定時的去檢測服務的可用性,如果服務狀態發生了變化則重新獲取,之后,再根據負載均衡策略 IRule 來選擇一個可用的服務;但是,在初始化的時候,是從哪里獲取服務列表呢?下面就來分析這個問題
BaseLoadBalancer 有個子類 DynamicServerListLoadBalancer,它具有使用動態源獲取服務器列表的功能。即服務器列表在運行時可能會更改。此外,還可以通過條件來過濾掉不符合所需條件的服務。
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer { // 是否正在進行服務列表的更新 protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false); // 服務列表 volatile ServerList<T> serverListImpl; // 服務過濾器 volatile ServerListFilter<T> filter; }
在 DynamicServerListLoadBalancer 中,有個 restOfInit 方法,在初始化時進行調用,在該方法中,會從 Eureka 客戶端中拉取所有的服務列表:
void restOfInit(IClientConfig clientConfig) { ............. updateListOfServers(); ........ } public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { // 獲取所有服務列表 servers = serverListImpl.getUpdatedListOfServers(); // 根據條件過濾服務 if (filter != null) { servers = filter.getFilteredListOfServers(servers); } } updateAllServerList(servers); } protected void updateAllServerList(List<T> ls) { if (serverListUpdateInProgress.compareAndSet(false, true)) { try { for (T s : ls) { s.setAlive(true); // 狀態設置為可用 } setServersList(ls); super.forceQuickPing(); // 強制檢測服務狀態 } finally { serverListUpdateInProgress.set(false); } } }
獲取所有服務列表 servers = serverListImpl.getUpdatedListOfServers(); 最終會調用 DiscoveryEnabledNIWSServerList 的方法:
servers = serverListImpl.getUpdatedListOfServers(); public List<DiscoveryEnabledServer> getUpdatedListOfServers(){ return obtainServersViaDiscovery(); } private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); ........ // 通過 eurekaClient 來獲取注冊的服務列表 EurekaClient eurekaClient = eurekaClientProvider.get(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { ..... DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr); serverList.add(des); } } ...... } } return serverList; }
通過上面方法的分析,Ribbon 最終會通過 EurekaClient 來獲取服務列表的,而 EurekaClient 的實現類是 DiscoveryClient,而在 Eureka 中,DiscoveryClient 類具有服務的注冊,發現,續約,獲取服務列表等功能。
此外,該類中還可以通過過濾器來獲取不符合條件的服務。
以上就是 Ribbon 負載均衡器的一個實現原理。最后再來看下流程圖,加深印象:
關于Ribbon中怎么使用 LoadBalancer 實現負載均衡就分享到這里了,希望以上內容可以對大家有一定的幫助,可以學到更多知識。如果覺得文章不錯,可以把它分享出去讓更多的人看到。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。