作者:温昂展
前面在过载保护章节中已提及了负载均衡,顾名思义,本节要探究的对象为负载/请求。负载均衡除了起到过载预防的作用,本质上是提高了系统的吞吐量,最小化响应时间,到达资源利用最大化。
目前,负载均衡算法不管是在学术研究还是在工程实现上都已比较成熟,算法大体可分以下几种:
非自适应类算法不要求从服务节点实时获得负载的更新,而自适应算法通常需要负载均衡器能感知节点的负载变化动态调整分发策略,提高负载平衡的效果。也因此,两类算法在实现上架构设计也有较大的差别。
那么节点的负载如何衡量呢? 根据不同类型的系统应用,通常会定义不同的度量方式,如:CPU资源、内存资源、当前进程数、吞吐量、成功率、响应时间等指标都可以作为负载度量的指标,各个指标的重要程度根据实际情况有所不同。
由于这里探讨的是软负载均衡技术,一些基于硬负载均衡的架构设计不做具体涉及。软负载均衡架构大体可以分为:
除了基于客户端方式,其他架构上实际上都是引入一个均衡服务器做控制或代理,或是返回请求的调度结果,亦或是直接对请求做转发。结合TAF的设计理念和架构做思考,负载均衡器其实可以放到主控侧来实现,但是我认为由于负载均衡策略的复杂性,单是对调度决策数据分析已经非常繁重,因此负载均衡通常还是单独引入一个接入层(统一接入网关)来实现,比如目前在使用的 TGW、哈雷接入、LVS(四层)、GSLB/LB(七层)、L5、CMLB 等都是此类接入层系统的实现方案。
负载均衡器周期性计算出每个被调机器的权重,再使用高效的配额算法分配各个主调机器的访问路由,主调机器上的业务进程通过API来取得这些路由,调用结束时通过API来反馈路由的好与坏。
而TAF的实现,我们把重点放在客户端。
根据前面对Client调用的分析可知,客户端会采用选定的负载均衡策略选择一个Invoker,对请求进行路由分发。若采用静态策略,显然在客户端做选择更为直接;若是考虑根据负载调整动态权重,和容错一样,客户端直接统计(成功率、响应时间等)更为精确。
首先根据前面提到的容错策略,初始化服务节点列表如下:
//使用不带权重的hash
List<Invoker<T>> sortedInvokers = sortedInvokersCache;
if (sortedInvokers == null || sortedInvokers.isEmpty()) {
throw new NoInvokerException("no such active connection invoker");
}
List<Invoker<T>> list = new ArrayList<Invoker<T>>();
for (Invoker<T> invoker : sortedInvokers) {
if (!invoker.isAvailable()) {
/**
* 屏敝后尝试重新调用
*/
ServantInvokerAliveStat stat = ServantnvokerAliveChecker.get(invoker.getUrl());
if (stat.isAlive() || (stat.getLastRetryTime() + (config.getTryTimeInterval() * 1000)) < System.currentTimeMillis()) {
list.add(invoker);
}
} else {
list.add(invoker);
}
}
// TODO 如果全死,是否需要随机取一个尝试?
if (list.isEmpty()) {
throw new NoInvokerException(config.getSimpleObjectName() + " try to select active invoker, size=" + sortedInvokers.size() + ", no such active connection invoker");
}
目前支持的负载均衡策略有Round-Robin轮询、带权重轮询、Hash、带权重Hash、一致性Hash; 默认使用的策略是Round-Robin轮询,若客户端调用时在请求上下文中(JceContext对应的map)有设置对应的hash参数则优先使用相应的hash策略,策略选用的优先级为: 一致性hash > hash > round-robin。其中,
相当于简单随机化,适用于各个节点无差别的情况。
// 省略服务节点列表初始化
Invoker<T> invoker = list.get((sequence.getAndIncrement() & Integer.MAX_VALUE) % list.size());
根据请求上下文的hash值对服务节点列表大小取模运行,进行路由分发,在系统正常运行情况下,可以保证相同hash值路由到同一服务节点。
Invoker<T> invoker = list.get((int) (hash % list.size()));
根据一致性hash算法生成一定空间(虚拟节点)大小的调用序列哈希环,整个空间按顺时针方式组织,将对应的服务节点根据哈希映射到环上,在发生移除/添加操作时,它能尽可能小的改变已存在的Key值映射关系。构造算法如下:
public static <T> TreeMap<Long, Invoker<T>> buildConsistentHashCircle(Collection<Invoker<T>> invokers,
ServantProxyConfig config) {
List<Invoker<T>> weightInvokers = new ArrayList<Invoker<T>>();
for (Invoker<T> invoker : invokers) {
if (invoker.getUrl().getParameter(Constants.TAF_CLIENT_WEIGHT_TYPE, 0) != 1) {
weightInvokers.clear();
break;
}
if (invoker.getUrl().getParameter(Constants.TAF_CLIENT_WEIGHT, 0) > 0) weightInvokers.add(invoker);
}
TreeMap<Long, Invoker<T>> result = new TreeMap<Long, Invoker<T>>();
try {
boolean staticWeight = !weightInvokers.isEmpty();
Collection<Invoker<T>> srcInvokers = staticWeight ? weightInvokers : invokers;
for (Invoker<T> invoker : srcInvokers) {
int replicaNumber = staticWeight ? invoker.getUrl().getParameter(Constants.TAF_CLIENT_WEIGHT, 0) : config.getDefaultConHashVirtualNodes();
if (replicaNumber > config.getDefaultConHashVirtualNodes())
replicaNumber = config.getDefaultConHashVirtualNodes();
replicaNumber = replicaNumber / 4 <= 0 ? 1 : replicaNumber / 4;
for (int i = 0; i < replicaNumber; i++) {
byte[] digest = md5(invoker.getUrl().toIdentityString() + i);
for (int h = 0; h < 4; h++) {
long m = hash(digest, h);
result.put(m, invoker);
}
}
}
} catch (Exception e) {
ClientLogger.getLogger().error("build consistent hash circle err. ", e);
return null;
}
return result;
}
上述算法的简单实现都假定每个服务节点都是完全对等的,而实际上每个服务节点的处理能力不尽相同,因此我们可以给每个节点根据处理能力的大小分配对应的权值。权重大的节点处理更多的请求,降低权重小的节点的系统负载。实现上,根据权重序列计算出节点调用序列,权重越大的节点在调用序列中出现的次数越多。生成调用序列算法如下:
public static <T> List<Invoker<T>> buildStaticWeightList(Collection<Invoker<T>> invokers,
ServantProxyConfig config) {
List<Invoker<T>> weightInvokers = new ArrayList<Invoker<T>>();
for (Invoker<T> invoker : invokers) {
if (invoker.getUrl().getParameter(Constants.TAF_CLIENT_WEIGHT_TYPE, 0) != 1) {
return null;
}
if (invoker.getUrl().getParameter(Constants.TAF_CLIENT_WEIGHT, 0) > 0) weightInvokers.add(invoker);
}
if (weightInvokers.isEmpty()) {
return null;
}
if (ClientLogger.getLogger().isDebugEnabled()) {
ClientLogger.getLogger().debug("[buildStaticWeightList]: weightInvokers size: " + weightInvokers.size());
}
int minWeight = Integer.MAX_VALUE;
int maxWeight = Integer.MIN_VALUE;
for (Invoker<T> invoker : weightInvokers) {
int tmpWeight = invoker.getUrl().getParameter(Constants.TAF_CLIENT_WEIGHT, 0);
if (tmpWeight > maxWeight) maxWeight = tmpWeight;
if (tmpWeight < minWeight) minWeight = tmpWeight;
}
int maxRange = maxWeight / minWeight;
if (maxRange < config.getMinStaticWeightLimit()) maxRange = config.getMinStaticWeightLimit();
if (maxRange > config.getMaxStaticWeightLimit()) maxRange = config.getMaxStaticWeightLimit();
Comparator<Pair<Integer, Invoker<T>>> comparator = new WeightToInvokerComparator<T>();
TreeSet<Pair<Integer, Invoker<T>>> weightToInvoker = new TreeSet<Pair<Integer, Invoker<T>>>(comparator);
Map<Invoker<T>, Integer> invokerToWeight = new HashMap<Invoker<T>, Integer>();
int totalWeight = 0;
for (Invoker<T> invoker : weightInvokers) {
int weight = (invoker.getUrl().getParameter(Constants.TAF_CLIENT_WEIGHT, 0) * maxRange) / maxWeight;
totalWeight += weight;
weightToInvoker.add(new Pair<Integer, Invoker<T>>(weight, invoker));
invokerToWeight.put(invoker, weight);
if (ClientLogger.getLogger().isDebugEnabled()) {
ClientLogger.getLogger().debug("[buildStaticWeightList]: invoker: " + invoker.hashCode() + ", weight: " + weight + ", host: " + invoker.getUrl().getHost() + ", port: " + invoker.getUrl().getPort());
}
}
List<Invoker<T>> result = new ArrayList<Invoker<T>>();
for (int i = 0; i < totalWeight; i++) {
boolean first = true;
TreeSet<Pair<Integer, Invoker<T>>> weightToInvokerTmp = new TreeSet<Pair<Integer, Invoker<T>>>(comparator);
Iterator<Pair<Integer, Invoker<T>>> it = weightToInvoker.descendingIterator();
while (it.hasNext()) {
Pair<Integer, Invoker<T>> pair = it.next();
if (first) {
first = false;
result.add(pair.second);
weightToInvokerTmp.add(new Pair<Integer, Invoker<T>>(pair.first - totalWeight + invokerToWeight.get(pair.second), pair.second));
} else {
weightToInvokerTmp.add(new Pair<Integer, Invoker<T>>(pair.first + invokerToWeight.get(pair.second), pair.second));
}
}
weightToInvoker = weightToInvokerTmp;
}
return result;
}
目前TAF还没有提供基于动态权重的方案实现,但是设计上可扩展性较强,之后可以通过继承LoadBalance接口来做更多策略的实现。
感谢阅读,有错误之处还请不吝赐教。
本系列暂时告一段落,以后有时间再继续补充
再次感谢实习以来Leader和导师的指导,感谢TAF学习上terry浩哥、kahn哥、菠菜哥、jeff哥等小伙伴的热情帮助
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。