这两天在搞Ribbon负载均衡策略,写了个倍权策略和服务标签策略,给大家分享分享
首先创建一个spring 配置类 ConfigBean
import com.dhc.springcloud.myrule.RobinRule;
import com.netflix.loadbalancer.IRule;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;
@Configuration
public class ConfigBean
{
@Bean
@LoadBalanced
public RestTemplate getRestTemplate()
{
return new RestTemplate();
}
@Bean
public IRule myRule() {
return new RobinRule(); //在写编写的动态切换策略的方法
}
}
在这里用自己的写的方法来注入IRule
自定义的方法要继承AbstractLoadBalancerRule这个父类,
import com.dhc.springcloud.Consts;
import com.netflix.client.config.IClientConfig;
import com.netflix.loadbalancer.AbstractLoadBalancerRule;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @Title
* @ClassName RobinRule
* @Desription
* @Author yangxiaoxiao
* @Date 2019-03-21 10:39
* @Version V1.0
*/
public class RobinRule extends AbstractLoadBalancerRule {
private final MyProbabilityRandomRule myProbabilityRandomRule = new MyProbabilityRandomRule();
private final MyRoundRobinRule myRoundRobinRule = new MyRoundRobinRule();
private final MyTagRandomRule myTagRandomRule = new MyTagRandomRule();
private static Logger log = LoggerFactory.getLogger(com.netflix.loadbalancer.RoundRobinRule.class);
public RobinRule(){
}
public RobinRule(ILoadBalancer lb){
this();
setLoadBalancer(lb);
}
public Server choose(ILoadBalancer lb, Object key) {
switch (Consts.ruleType.get()){
case 1:
log.info("进入随机轮询中");
return myProbabilityRandomRule.choose(lb,key);
case 2:
log.info("进入倍权轮询中");
return myRoundRobinRule.choose(lb,key);
case 3:
log.info("进入tag轮询中");
return myTagRandomRule.choose(lb,key);
default:
log.info("没有找到轮询机制,默认使用随机机制");
return myProbabilityRandomRule.choose(lb,key);
}
}
@Override
public Server choose(Object key) {
return choose(getLoadBalancer(), key);
}
@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
}
}
实现这个类,可以改变Consts.ruleType中的值,来每次动态选择负载均衡策略,其中倍权和tag轮询策略是我更具上述的随机轮询策略编写的,Consts类中包含的中间存储变量所需要的值,后续可以根据实际去改变里面的值
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Title
* @ClassName Consts
* @Desription
* @Author zhangzeux
* @Date 2019-03-18 16:47
* @Version V1.0
*/
public class Consts {
//MICROSERVICECLOUD-DEPT
public static List<String> serviceList = new CopyOnWriteArrayList<>();
public static AtomicInteger ruleType = new AtomicInteger(1);
public static Map<String, CopyOnWriteArrayList<String>> tagList = new ConcurrentHashMap<>();
public static volatile String tag = "prod";
}
分别包含四个变量,分别是serviceList,线程安全的ArrayList集合,存储各个服务调用节点的倍权关系,倍权关系如下:
Consts.serviceList.add("172.20.10.2:8002");
Consts.serviceList.add("172.20.10.2:8001");
Consts.serviceList.add("172.20.10.2:8001");
Consts.serviceList.add("172.20.10.2:8001");
Consts.serviceList.add("172.20.10.2:8001");
Consts.serviceList.add("172.20.10.2:8003");
分别代表是哪个节点的访问关系为1比4比1,是一种概率的访问关系.
ruleType,修改此次可以改变要使用的负载均衡策略。
tagList来保存,tag标签对应要访问的服务,tag表示此服务的标签,可以自己设置接口去动态访问。存储的标签关系分别入下
CopyOnWriteArrayList<String> copy = new CopyOnWriteArrayList<>();
copy.add("172.20.10.2:8002");
copy.add("172.20.10.2:8001");
Consts.tagList.put("prod",copy);
CopyOnWriteArrayList<String> copy1 = new CopyOnWriteArrayList<>();
copy1.add("172.20.10.2:8003");
Consts.tagList.put("dev",copy1);
下来是对应的倍权和半权算法
import com.dhc.springcloud.Consts;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Random;
/**
* @Title
* @ClassName MyRoundRobinRule
* @Desription
* @Author zhangzeux
* @Date 2019-03-18 15:51
* @Version V1.0
*/
public class MyRoundRobinRule {
private static Logger log = LoggerFactory.getLogger(MyRoundRobinRule.class);
public MyRoundRobinRule() { }
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
log.info("reachableServers:{}",reachableServers);
List<Server> allServers = lb.getAllServers();
log.info("allServers:{}",allServers);
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
Random random = new Random();
//防止服务突然下线,集合里面保存的大于实际获取到的,进行去除多余的节点,等下次有节点进来的时候,进行增加
for (String service:Consts.serviceList) {
if(!reachableServers.contains(new Server(service))){
Consts.serviceList.remove(service);
}
}
log.info("Consts.serviceList:{}",Consts.serviceList);
final List<String> weight = Consts.serviceList;
//应该随机输的个数字概率基本上相等,集合的概率在存入是已经确定,可以由此来根据随机数去出节点,来对应的近似表示节点的倍权关系
final int nextServerCyclicCounter = random.nextInt(weight.size());
log.info("weight{}:nextServerCyclicCounter{}",weight.size(),nextServerCyclicCounter);
for (Server se: reachableServers) {
log.info(se.getId());
if(se.getId().equals(weight.get(nextServerCyclicCounter))){
server = se;
break;
}
}
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
log.info("本次调用的服务为{},地址{}",server.getMetaInfo().getAppName(),server.getId());
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
}
标签轮询算法
import com.dhc.springcloud.Consts;
import com.netflix.loadbalancer.ILoadBalancer;
import com.netflix.loadbalancer.Server;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class MyTagRandomRule {
private AtomicInteger nextServerCyclicCounter;
private static Logger log = LoggerFactory.getLogger(MyTagRandomRule.class);
public MyTagRandomRule() {
nextServerCyclicCounter = new AtomicInteger(0);
}
public Server choose(ILoadBalancer lb, Object key) {
if (lb == null) {
log.warn("no load balancer");
return null;
}
Server server = null;
int count = 0;
while (server == null && count++ < 10) {
List<Server> reachableServers = lb.getReachableServers();
List<Server> allServers = lb.getAllServers();
int upCount = reachableServers.size();
int serverCount = allServers.size();
if ((upCount == 0) || (serverCount == 0)) {
log.warn("No up servers available from load balancer: " + lb);
return null;
}
List<String> tags = Consts.tagList.get(Consts.tag);
int nextServerIndex = incrementAndGetModulo(tags.size());
String service = tags.get(nextServerIndex);
for (Server s: reachableServers) {
if(service.equals(s.getId())){
server = s;
break;
}
}
if (server == null) {
/* Transient. */
Thread.yield();
continue;
}
if (server.isAlive() && (server.isReadyToServe())) {
log.info("本次调用的服务为{},地址{}",server.getMetaInfo().getAppName(),server.getId());
return (server);
}
// Next.
server = null;
}
if (count >= 10) {
log.warn("No available alive servers after 10 tries from load balancer: "
+ lb);
}
return server;
}
/**
* Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
*
* @param modulo The modulo to bound the value of the counter.
* @return The next value.
*/
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextServerCyclicCounter.get();
int next = (current + 1) % modulo;
if (nextServerCyclicCounter.compareAndSet(current, next))
return next;
}
}
}
因为在这里都是高并发,所以都要使用线程安全的类,这里是服务调用,算法的时间复杂度很重要,好的负载均衡策略可以大大减少服务调用之间消耗的时间。
欢迎大家有更好的方法可以一起讨论学习,完整的代码可以发邮箱获取 zhangzexu.1995@qq.com