通过上一篇 分析,我们已经对Spring Cloud如何使用Ribbon有了基本的了解。虽然Spring Cloud中定义了LoadBalancerClient作为负载均衡的通用接口,并且针对Ribbon实现了RibbonLoadBalancerClient,但是他在具体实现客户端负载均衡时,是通过Ribbon的ILoadBalancer接口实现的。下面我们根据ILoadBalancer接口的实现做个看看它是如何实现客户端负载均衡的。
AbstractLoadBalancer
AbstractLoadBalancer 是ILoadBalancer接口的抽象实现,在该抽象类中定义一个关于微服务实例的枚举类ServerGroup,它包含三种不同类型。
1、ALL:所有服务实例。
2、STATUS_UP:正常服务的实例
3、STATUS_NOT_UP:停止服务的实例
另外还实现了chooseServer()方法,该方法通过调用接口中的chooseServer(Object key) 实现,其中参数为null,表示在选择具体服务实例时忽略key的条件判断。
最后还定义了两个抽象函数,getServerList(ServerGroup serverGroup)定义了根据分组类型来获取不同的服务实例列表,getLoadBalancerStats()定义了获取LoadBalancerStats对象的方法,LoadBalancerStats对象被用来存储负载均衡器中各个服务实例当前的属性和统计信息,这些信息非常有用,我们可以利用这些信息来观察负载均衡器的运行情况,同时这些信息也是用来制定负载均衡策略的重要依据。
<code>package com.netflix.loadbalancer;
import java.util.List;
public abstract class AbstractLoadBalancer implements ILoadBalancer {
public AbstractLoadBalancer() {
}
public Server chooseServer() {
return this.chooseServer((Object)null);
}
\t\t// 定义了根据分组类型来获取不同的服务实例列表
public abstract List<server> getServerList(AbstractLoadBalancer.ServerGroup var1);
//getLoadBalancerStats()定义了获取LoadBalancerStats对象的方法,
//LoadBalancerStats对象被用来存储负载均衡器中各个服务实例当前的属性和统计信息,这些信息非常有用,
//我们可以利用这些信息来观察负载均衡器的运行情况,同时这些信息也是用来制定负载均衡策略的重要依据。
public abstract LoadBalancerStats getLoadBalancerStats();
public static enum ServerGroup {
ALL,
STATUS_UP,
STATUS_NOT_UP;
private ServerGroup() {
}
}
}/<server>/<code>
BaseLoadBalancer
BaseLoadBalancer类是Ribbon负载均衡器的基础实现类,在该类中定义很多关于均衡负载器相关的基础内容:
- 定义并维护了两个存储服务实例Server对象的列表。一个用于存储所有服务实例的清单,一个用于存储正常服务的实例清单。
<code>@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<server> allServerList = Collections
.synchronizedList(new ArrayList<server>()); // 所有服务实例
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<server> upServerList = Collections
.synchronizedList(new ArrayList<server>()); // 正常服务实例/<server>/<server>/<server>/<server>/<code>
<code> private static class SerialPingStrategy implements IPingStrategy {
@Override public boolean[] pingServers(IPing ping, Server[] servers) {
int numCandidates = servers.length;
boolean[] results = new boolean[numCandidates];
logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates);
for (int i = 0; i < numCandidates; i++) {
results[i] = false;
try {
if (ping != null) {
\t\tresults[i] = ping.isAlive(servers[i]);
\t}
}
} catch (Exception e) {
logger.error("Exception while pinging Server: '{}'", servers[i], e);
}
}
return results;
}
}
}/<code>
- 定义了负载均衡的处理规则IRule对象,从BaseLoadBalancer中chooseServer(Object key)的实现源码,我们可以知道负载均衡器实际进行服务实例选择任务是委托给了IRule实例中的choose函数来实现。而在这里,默认初始化了RoundRobinRule为IRule的实现对象。RoundRobinRule实现了最基本且常用的线性负载均衡规则。
<code>public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key);
} catch (Throwable t) {
return null;
}
}
}/<code>
- addServers(List newServers):向负载均衡器中增加新的服务实例列表,该实现将原本已经维护着的所有服务实例清单allServerList和新传入的服务实例清单newServers都加入到newList中,然后通过调用setServersList函数对newList进行处理,在BaseLoadBalancer中实现的时候会使用新的列表覆盖旧的列表。而之后介绍的几个扩展实现类对于服务实例清单更新的优化都是对setServersList函数的重写来实现的。
<code> public void addServer(Server newServer) {
if (newServer != null) {
try {
ArrayList<server> newList = new ArrayList();
newList.addAll(this.allServerList);
newList.add(newServer);
this.setServersList(newList);
} catch (Exception var3) {
logger.error("LoadBalancer [{}]: Error adding newServer {}", new Object[]{this.name, newServer.getHost(), var3});
}
}
}/<server>/<code>
- chooseServer(Object key):挑选一个具体的服务实例,在上面介绍IRule的时候,已经做了说明,这里不再赘述。
<code>public Server chooseServer(Object key) {
if (this.counter == null) {
this.counter = this.createCounter();
}
this.counter.increment();
if (this.rule == null) {
return null;
} else {
try {
return this.rule.choose(key);
} catch (Exception var3) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", new Object[]{this.name, key, var3});
return null;
}
}
}/<code>
- markServerDown(Server server):标记某个服务实例暂停服务。
<code> public void markServerDown(Server server) {
if (server != null && server.isAlive()) {
logger.error("LoadBalancer [{}]: markServerDown called on [{}]", this.name, server.getId());
server.setAlive(false);
this.notifyServerStatusChangeListener(Collections.singleton(server));
}
}/<code>
- getReachableServers():获取可用的服务实例列表。由于BaseLoadBalancer中单独维护了一个正常服务的实例清单,所以直接返回即可。
<code> public List<server> getReachableServers() {
return Collections.unmodifiableList(this.upServerList);
}
/<server>/<code>
- getAllServers():获取所有的服务实例列表。由于BaseLoadBalancer中单独维护了一个所有服务的实例清单,所以也直接返回它即可。
<code>public List<server> getAllServers() {
return Collections.unmodifiableList(this.allServerList);
}/<server>/<code>
DynamicServerListLoadBalancer
DynamicServerListLoadBalancer类继承于BaseLoadBalancer类,它是对基础负载均衡器的扩展。在该负载均衡器中,实现了服务实例清单的在运行期的动态更新能力;同时,它还具备了对服务实例清单的过滤功能,也就是说我们可以通过过滤器来选择性的获取一批服务实例清单。下面我们具体来看看在该类中增加了一些什么内容:
ServerList:
从DynamicServerListLoadBalancer的成员定义中,我们马上可以发现新增了一个关于服务列表的操作对象:ServerList
<code>public interface ServerList{ /<code>
\t\t// 获取初始化的服务清单
public ListgetInitialListOfServers();
/**
* Return updated list of servers. This is called say every 30 secs
* (configurable) by the Loadbalancer's Ping cycle
* 获取更新的服务实例清单
*/
public ListgetUpdatedListOfServers();
}
它定义了两个抽象方法:getInitialListOfServers用于获取初始化的服务实例清单,而getUpdatedListOfServers用于获取更新的服务实例清单。那么该接口的实现有哪些呢?通过搜索源码,我们可以整出如下图的结构:
从图中我们可以看到有很多个ServerList的实现类,那么在DynamicServerListLoadBalancer中的ServerList默认配置到底使用了哪个具体实现呢?既然在该负载均衡器中需要实现服务实例的动态更新,那么势必需要ribbon具备访问eureka来获取服务实例的能力,所以我们从Spring Cloud整合ribbon与eureka的包org.springframework.cloud.netflix.ribbon.eureka下探索,可以找到配置类EurekaRibbonClientConfiguration,在该类中可以找到看到下面创建ServerList实例的内容:
<code>@Bean
@ConditionalOnMissingBean
public ServerList> ribbonServerList(IClientConfig config) {
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}/<code>
可以看到,这里创建的是一个DomainExtractingServerList实例,从下面它的源码中我们可以看到在它内部还定义了一个ServerList list。同时,DomainExtractingServerList类中对getInitialListOfServers和getUpdatedListOfServers的具体实现,其实委托给了内部定义的ServerList list对象,而该对象是通过创建DomainExtractingServerList时候,由构造函数传入的DiscoveryEnabledNIWSServerList实现的。
<code>public class DomainExtractingServerList implements ServerList<discoveryenabledserver> {
\tprivate ServerList<discoveryenabledserver> list;
\tprivate final RibbonProperties ribbon;
\tprivate boolean approximateZoneFromHostname;
\tpublic DomainExtractingServerList(ServerList<discoveryenabledserver> list,
\t\t\tIClientConfig clientConfig, boolean approximateZoneFromHostname) {
\t\tthis.list = list;
\t\tthis.ribbon = RibbonProperties.from(clientConfig);
\t\tthis.approximateZoneFromHostname = approximateZoneFromHostname;
\t}
\t@Override
\tpublic List<discoveryenabledserver> getInitialListOfServers() {
\t\tList<discoveryenabledserver> servers = setZones(this.list
\t\t\t\t.getInitialListOfServers());
\t\treturn servers;
\t}
\t@Override
\tpublic List<discoveryenabledserver> getUpdatedListOfServers() {
\t\tList<discoveryenabledserver> servers = setZones(this.list
\t\t\t\t.getUpdatedListOfServers());
\t\treturn servers;
\t}
\tprivate List<discoveryenabledserver> setZones(List<discoveryenabledserver> servers) {
\t\tList<discoveryenabledserver> result = new ArrayList<>();
\t\tboolean isSecure = this.ribbon.isSecure(true);
\t\tboolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
\t\tfor (DiscoveryEnabledServer server : servers) {
\t\t\tresult.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
\t\t\t\t\tthis.approximateZoneFromHostname));
\t\t}
\t\treturn result;
\t}
}/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<code>
那么DiscoveryEnabledNIWSServerList是如何实现这两个服务实例的获取的呢?我们从源码中可以看到这两个方法都是通过该类中的一个私有函数obtainServersViaDiscovery来通过服务发现机制来实现服务实例的获取。
<code>
@Override
public List<discoveryenabledserver> getInitialListOfServers(){
return obtainServersViaDiscovery();
}
@Override
public List<discoveryenabledserver> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}/<discoveryenabledserver>/<discoveryenabledserver>/<code>
而obtainServersViaDiscovery的实现逻辑如下,主要依靠EurekaClient从服务注册中心中获取到具体的服务实例InstanceInfo列表(EurekaClient的具体实现,我们在分析Eureka的源码时已经做了详细的介绍,这里传入的vipAddress可以理解为逻辑上的服务名,比如“USER-SERVICE”)。接着,对这些服务实例进行遍历,将状态为“UP”(正常服务)的实例转换成DiscoveryEnabledServer对象,最后将这些实例组织成列表返回。
<code>private List<discoveryenabledserver> obtainServersViaDiscovery() {
List<discoveryenabledserver> serverList = new ArrayList<discoveryenabledserver>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<discoveryenabledserver>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
List<instanceinfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(
vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
// 省略了一些实例信息的加工逻辑
DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
des.setZone(DiscoveryClient.getZone(ii));
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break;
}
}
}
return serverList;
}/<instanceinfo>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<code>
在DiscoveryEnabledNIWSServerList中通过EurekaClient从服务注册中心获取到最新的服务实例清单后,返回的List到了DomainExtractingServerList类中,将继续通过setZones函数进行处理,而这里的处理具体内容如下,主要完成将DiscoveryEnabledNIWSServerList返回的List列表中的元素,转换成内部定义的DiscoveryEnabledServer的子类对象DomainExtractingServer,在该对象的构造函数中将为服务实例对象设置一些必要的属性,比如id、zone、isAliveFlag、readyToServe等信息。
<code>private List<discoveryenabledserver> setZones(List<discoveryenabledserver> servers) {
List<discoveryenabledserver> result = new ArrayList<>();
boolean isSecure = this.clientConfig.getPropertyAsBoolean(
CommonClientConfigKey.IsSecure, Boolean.TRUE);
boolean shouldUseIpAddr = this.clientConfig.getPropertyAsBoolean(
CommonClientConfigKey.UseIPAddrForServer, Boolean.FALSE);
for (DiscoveryEnabledServer server : servers) {
result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
this.approximateZoneFromHostname));
}
return result;
}/<discoveryenabledserver>/<discoveryenabledserver>/<discoveryenabledserver>/<code>
ServerListUpdater
通过上面的分析我们已经知道了Ribbon与Eureka整合后,如何实现从Eureka Server中获取服务实例清单。那么它又是如何触发向Eureka Server去获取服务实例清单以及如何在获取到服务实例清单后更新本地的服务实例清单的呢?继续来看DynamicServerListLoadBalancer中的实现内容,我们可以很容易的找到下面定义的关于ServerListUpdater的内容:
<code>protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
updateListOfServers();
}
};
protected volatile ServerListUpdater serverListUpdater;/<code>
根据该接口的命名,我们基本就能猜到,这个对象实现的是对ServerList的更新,所以可以称它为“服务更新器”,从下面的源码中可以看到,在ServerListUpdater内部还定义了一个UpdateAction接口,上面定义的updateAction对象就是以匿名内部类的方式创建了一个它的具体实现,其中doUpdate实现的内容就是对ServerList的具体更新操作。除此之外,“服务更新器”中还定义了一系列控制它和获取它一些信息的操作。
<code>public interface ServerListUpdater {
public interface UpdateAction {
void doUpdate();
}
// 启动服务更新器,传入的UpdateAction对象为更新操作的具体实现。
void start(UpdateAction updateAction);
// 停止服务更新器
void stop();
// 获取最近的更新时间戳
String getLastUpdate();
// 获取上一次更新到现在的时间间隔,单位为毫秒
long getDurationSinceLastUpdateMs();
// 获取错过的更新周期数
int getNumberMissedCycles();
// 获取核心线程数
int getCoreThreads();
}/<code>
而ServerListUpdater的实现类不多,具体下图所示。
根据两个类的注释,我们可以很容易的知道它们的作用分别是:
- PollingServerListUpdater:动态服务列表更新的默认策略,也就是说DynamicServerListLoadBalancer负载均衡器中的默认实现就是它,它通过定时任务的方式进行服务列表的更新。
- EurekaNotificationServerListUpdater:该更新器也可服务于DynamicServerListLoadBalancer负载均衡器,但是它的触发机制与PollingServerListUpdater不同,它需要利用Eureka的事件监听器来驱动服务列表的更新操作。
下面我们来详细看看它默认实现的PollingServerListUpdater。先从用于启动“服务更新器”的start函数源码看起,具体如下。我们可以看到start函数的实现内容验证了之前提到的:以定时任务的方式进行服务列表的更新。它先创建了一个Runnable的线程实现,在该实现中调用了上面提到的具体更新服务实例列表的方法updateAction.doUpdate(),最后再为这个Runnable的线程实现启动了一个定时任务来执行。
<code>@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}/<code>
继续看PollingServerListUpdater的其他内容,我们可以找到用于启动定时任务的2个重要参数initialDelayMs和refreshIntervalMs的默认定义分别为1000和30*1000,单位为毫秒。也就是说更新服务实例在初始化之后延迟1秒后开始执行,并以30秒为周期重复执行。除了这些内容之外,我们还能看到它还会记录最后更新时间、是否存活等信息,同时也实现了ServerListUpdater中定义的一些其他操作内容,这些操作相对比较简单,这里不再具体说明,有兴趣的读者可以自己查看源码了解其实现原理。
ServerListFilter
在了解了更新服务实例的定时任务是如何启动的之后,我们继续回到updateAction.doUpdate()调用的具体实现位置,在DynamicServerListLoadBalancer中,它的实际实现委托给了updateListOfServers函数,具体实现如下:
<code>public void updateListOfServers() {
Listservers = new ArrayList /<code>();
if (serverListImpl != null) {
servers = serverListImpl.getUpdatedListOfServers();
LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
getIdentifier(), servers);
}
}
updateAllServerList(servers);
}
可以看到,这里终于用到了我们之前提到的ServerList的getUpdatedListOfServers,通过之前的介绍我们已经可以知道这一步实现了从Eureka Server中获取服务可用实例的列表。在获得了服务实例列表之后,这里又将引入一个新的对象filter,追朔该对象的定义,我们可以找到它是ServerListFilter定义的。
ServerListFilter接口非常简单,该接口中之定义了一个方法List getFilteredListOfServers(List servers),主要用于实现对服务实例列表的过滤,通过传入的服务实例清单,根据一些规则返回过滤后的服务实例清单。该接口的实现如下图所示:
其中,除了ZonePreferenceServerListFilter的实现是Spring Cloud Netflix中对Ribbon的扩展实现外,其他均是Netflix Ribbon中的实现类。我们可以分别看看这些过滤器实现都有什么特点:
- AbstractServerListFilter:这是一个抽象过滤器,在这里定义了过滤时需要的一个重要依据对象LoadBalancerStats,我们在之前介绍过的,该对象存储了关于负载均衡器的一些属性和统计信息等。
<code>public abstract class AbstractServerListFilterimplements ServerListFilter /<code>{
private volatile LoadBalancerStats stats;
public void setLoadBalancerStats(LoadBalancerStats stats) {
this.stats = stats;
}
public LoadBalancerStats getLoadBalancerStats() {
return stats;
}
}
ZoneAffinityServerListFilter:该过滤器基于“区域感知(Zone Affinity)”的方式实现服务实例的过滤,也就是说它会根据提供服务的实例所处区域(Zone)与消费者自身的所处区域(Zone)进行比较,过滤掉那些不是同处一个区域的实例。
<code>public ListgetFilteredListOfServers(List /<code>servers) {
if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){
ListfilteredServers = Lists.newArrayList(Iterables.filter(
servers, this.zoneAffinityPredicate.getServerOnlyPredicate()));
if (shouldEnableZoneAffinity(filteredServers)) {
return filteredServers;
} else if (zoneAffinity) {
overrideCounter.increment();
}
}
return servers;
}
从上面的源码中我们可以看到,对于服务实例列表的过滤是通过Iterables.filter(servers, this.zoneAffinityPredicate.getServerOnlyPredicate())来实现的,其中判断依据由ZoneAffinityPredicate实现服务实例与消费者的Zone比较。而在过滤之后,这里并不会马上返回过滤的结果,而是通过shouldEnableZoneAffinity函数来判断是否要启用“区域感知”的功能,从下面shouldEnableZoneAffinity的实现中,我们可以看到,它使用了LoadBalancerStats的getZoneSnapshot方法来获取这些过滤后的同区域实例的基础指标(包含了:实例数量、断路器断开数、活动请求数、实例平均负载等),根据一系列的算法求出下面的几个评价值并与设置的阈值对比(下面的为默认值),若有一个条件符合,就不启用“区域感知”过滤的服务实例清单。这一算法实现对于集群出现区域故障时,依然可以依靠其他区域的实例进行正常服务提供了完善的高可用保障。同时,通过这里的介绍,我们也可以关联着来理解之前介绍Eureka的时候提到的对于区域分配设计来保证跨区域故障的高可用问题。
- blackOutServerPercentage:故障实例百分比(断路器断开数 / 实例数量) >= 0.8
- activeReqeustsPerServer:实例平均负载 >= 0.6
- availableServers:可用实例数(实例数量 - 断路器断开数) < 2
<code>private boolean shouldEnableZoneAffinity(Listfiltered) { /<code>
if (!zoneAffinity && !zoneExclusive) {
return false;
}
if (zoneExclusive) {
return true;
}
LoadBalancerStats stats = getLoadBalancerStats();
if (stats == null) {
return zoneAffinity;
} else {
logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered);
ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered);
double loadPerServer = snapshot.getLoadPerServer();
int instanceCount = snapshot.getInstanceCount();
int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount();
if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get()
|| loadPerServer >= activeReqeustsPerServerThreshold.get()
|| (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) {
logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}",
new Object[] {(double) circuitBreakerTrippedCount / instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount});
return false;
} else {
return true;
}
}
}
a. 服务实例的并发连接数超过客户端配置的值,默认为0,配置参数为:<clientname>.<namespace>.ServerListSubsetFilter.eliminationConnectionThresold
b. 服务实例的失败数超过客户端配置的值,默认为0,配置参数为:<clientname>.<namespace>.ServerListSubsetFilter.eliminationFailureThresold
c. 如果按符合上面任一规则的服务实例剔除后,剔除比例小于客户端默认配置的百分比,默认为0.1(10%),配置参数为:<clientname>.<namespace>.ServerListSubsetFilter.forceEliminatePercent。那么就先对剩下的实例列表进行健康排序,再开始从最不健康实例进行剔除,直到达到配置的剔除百分比。 在完成剔除后,清单已经少了至少10%(默认值)的服务实例,最后通过随机的方式从候选清单中选出一批实例加入到清单中,以保持服务实例子集与原来的数量一致,而默认的实例子集数量为20,其配置参数为:<clientname>.<namespace>.ServerListSubsetFilter.size。/<namespace>/<clientname>/<namespace>/<clientname>/<namespace>/<clientname>/<namespace>/<clientname>
<code>@Override
public List<server> getFilteredListOfServers(List<server> servers) {
List<server> output = super.getFilteredListOfServers(servers);
if (this.zone != null && output.size() == servers.size()) {
List<server> local = new ArrayList<server>();
for (Server server : output) {
if (this.zone.equalsIgnoreCase(server.getZone())) {
local.add(server);
}
}
if (!local.isEmpty()) {
return local;
}
}
return output;
}/<server>/<server>/<server>/<server>/<server>/<code>
ZoneAwareLoadBalancer
ZoneAwareLoadBalancer负载均衡器是对DynamicServerListLoadBalancer的扩展。在DynamicServerListLoadBalancer中,我们可以看到它并没有重写选择具体服务实例的chooseServer函数,所以它依然会采用在BaseLoadBalancer中实现的算法,使用RoundRobinRule规则,以线性轮询的方式来选择调用的服务实例,该算法实现简单并没有区域(Zone)的概念,所以它会把所有实例视为一个Zone下的节点来看待,这样就会周期性的产生跨区域(Zone)访问的情况,由于跨区域会产生更高的延迟,这些实例主要以防止区域性故障实现高可用为目的而不能作为常规访问的实例,所以在多区域部署的情况下会有一定的性能问题,而该负载均衡器则可以避免这样的问题。那么它是如何实现的呢?
首先,在ZoneAwareLoadBalancer中,我们可以发现,它并没有重写setServersList,说明实现服务实例清单的更新主逻辑没有修改。但是我们可以发现它重写了这个函数:setServerListForZones(Map<string>> zoneServersMap)。看到这里可能会有一些陌生,因为它并不是接口中定义的必备函数,所以我们不妨去父类DynamicServerListLoadBalancer中寻找一下该函数,我们可以找到下面的定义了:/<string>
<code>public void setServersList(List lsrv) {
super.setServersList(lsrv);
ListserverList = (List /<code>) lsrv;
Map<string>> serversInZones = new HashMap<string>>();
...
setServerListForZones(serversInZones);
}
protected void setServerListForZones(Map<string>> zoneServersMap) {
LOGGER.debug("Setting server list for zones: {}", zoneServersMap);
getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);
}/<string>/<string>/<string>
setServerListForZones函数的调用位于更新服务实例清单函数setServersList的最后,同时从其实现内容来看,它在父类DynamicServerListLoadBalancer中的作用是根据按区域Zone分组的实例列表,为负载均衡器中的LoadBalancerStats对象创建ZoneStats并放入Map zoneStatsMap集合中,每一个区域Zone会对应一个ZoneStats,它用于存储每个Zone的一些状态和统计信息。
在ZoneAwareLoadBalancer中对setServerListForZones的重写如下:
<code>protected void setServerListForZones(Map<string>> zoneServersMap) {
super.setServerListForZones(zoneServersMap);
if (balancers == null) {
balancers = new ConcurrentHashMap<string>();
}
for (Map.Entry<string>> entry: zoneServersMap.entrySet()) {
String zone = entry.getKey().toLowerCase();
getLoadBalancer(zone).setServersList(entry.getValue());
}
for (Map.Entry<string> existingLBEntry: balancers.entrySet()) {
if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) {
existingLBEntry.getValue().setServersList(Collections.emptyList());
}
}
}/<string>/<string>/<string>/<string>/<code>
可以看到,在该实现中创建了一个ConcurrentHashMap()类型的balancers对象,它将用来存储每个Zone区域对应的负载均衡器,而具体的负载均衡器的创建则是通过下面的第一个循环中调用getLoadBalancer函数来完成,同时在创建负载均衡器的时候会创建它的规则(如果当前实现中没有IRULE的实例,就创建一个AvailabilityFilteringRule规则;如果已经有具体实例,就clone一个),在创建完负载均衡器后又马上调用setServersList函数为其设置对应Zone区域的实例清单。而第二个循环则是对Zone区域中实例清单的检查,看看是否有Zone区域下已经没有实例了,是的话就将balancers中对应Zone区域的实例列表清空,该操作的作用是为了后续选择节点时,防止过时的Zone区域统计信息干扰具体实例的选择算法。
在了解了该负载均衡器是如何扩展服务实例清单的实现后,我们来具体看看它是如何挑选服务实例,来实现对区域的识别的:
<code>public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
Server server = null;
try {
LoadBalancerStats lbStats = getLoadBalancerStats();
Map<string> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);
logger.debug("Zone snapshots: {}", zoneSnapshot);
...
Set<string> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());
logger.debug("Available zones: {}", availableZones);
if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) {
String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones);
logger.debug("Zone chosen: {}", zone);
if (zone != null) {
BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone);
server = zoneLoadBalancer.chooseServer(key);
}
}
} catch (Throwable e) {
logger.error("Unexpected exception when choosing server using zone aware logic", e);
}
if (server != null) {
return server;
} else {
logger.debug("Zone avoidance logic is not invoked.");
return super.chooseServer(key);
}
}/<string>/<string>/<code>
从源码中我们可以看到,只有当负载均衡器中维护的实例所属Zone区域个数大于1的时候才会执行这里的选择策略,否则还是将使用父类的实现。当Zone区域个数大于1个的时候,它的实现步骤主要如下:
- 调用ZoneAvoidanceRule中的静态方法createSnapshot(lbStats),为当前负载均衡器中所有的Zone区域分别创建快照,保存在Map zoneSnapshot中,这些快照中的数据将用于后续的算法。
- 调用ZoneAvoidanceRule中的静态方法getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()),来获取可用的Zone区域集合,在该函数中会通过Zone区域快照中的统计数据来实现可用区的挑选。 首先它会剔除符合这些规则的Zone区域:所属实例数为零的Zone区域;Zone区域内实例平均负载小于零,或者实例故障率(断路器断开次数/实例数)大于等于阈值(默认为0.99999)。 然后根据Zone区域的实例平均负载来计算出最差的Zone区域,这里的最差指的是实例平均负载最高的Zone区域。 如果在上面的过程中没有符合剔除要求的区域,同时实例最大平均负载小于阈值(默认为20%),就直接返回所有Zone区域为可用区域。否则,从最坏Zone区域集合中随机的选择一个,将它从可用Zone区域集合中剔除。
- 当获得的可用Zone区域集合不为空,并且个数小于Zone区域总数,就随机的选择一个Zone区域。
- 在确定了某个Zone区域后,则获取对应Zone区域的服务均衡器,并调用chooseServer来选择具体的服务实例,而在chooseServer中将使用IRule接口的choose函数来选择具体的服务实例。在这里IRule接口的实现会使用ZoneAvoidanceRule来挑选出具体的服务实例。
閱讀更多 JAVA破局之路 的文章