一、认识 Hystrix
Hystrix 是 Netflix 开源的一款容错框架,包含常用的容错方法:线程池隔离、信号量隔离、熔断、降级回退。在高并发访问下,系统所依赖的服务的稳定性对系统的影响非常大,依赖有很多不可控的因素,比如网络连接变慢,资源突然繁忙,暂时不可用,服务脱机等。我们要构建稳定、可靠的分布式系统,就必须要有这样一套容错方法。
复杂分布式体系结构中的应用程序有几十个依赖项,每个依赖项都不可避免地会在某个时刻失败。如果主机应用程序没有与这些外部故障隔离开来,那么它就有被这些故障摧毁的风险。 例如,对于一个依赖 30 个服务的应用程序,其中每个服务都有 99.99% 的正常运行时间,您可以期待以下内容:
99.9930 = 99.7% 正常运行时间 10 亿次请求中的 0.3% = 3000000 次失败 2 小时以上的停机时间/月,即使所有依赖项都具有良好的正常运行时间。
现实情况通常更糟。 即使所有依赖关系都表现良好,如果不对整个系统进行弹性设计,数十项服务中每项服务的 0.01% 停机时间的总影响也相当于每月可能停机数小时。 当一切正常时,请求流可能如下所示:
当许多后端系统中的一个变得潜在时,它可以阻止整个用户请求:
在高流量的情况下,一个潜在的后端依赖可能会导致所有服务器上的所有资源在几秒钟内饱和。
应用程序中通过网络或进入客户端库可能导致网络请求的每一点都是潜在故障的根源。
比故障更糟糕的是,这些应用程序还可能导致服务之间的延迟增加,从而备份队列、线程和其他系统资源,从而导致系统中更多的级联故障。
当通过第三方客户端执行网络访问时,这些问题会加剧。第三方客户就是一个“黑匣子”,其中实施细节被隐藏,并且可以随时更改,网络或资源配置对于每个客户端库都是不同的,通常难以监视和 更改。
通过的故障包括:
网络连接失败或降级。 服务和服务器失败或变慢。 新的库或服务部署会改变行为或性能特征。 客户端库有错误。
所有这些都代表需要隔离和管理的故障和延迟,以便单个故障依赖关系不能导致整个应用程序或系统的故障。
当您使用 Hystrix 包装每个底层依赖项时,上图所示的体系结构如下图所示。 每个依赖关系彼此隔离,在延迟发生时可以饱和的资源受到限制,迅速执行 fallback 的逻辑,该逻辑决定了在依赖关系中发生任何类型的故障时会做出什么响应:
1.1线程隔离
依赖隔离是 Hystrix 的核心目的。依赖隔离其实就是资源隔离,把对依赖使用的资源隔离起来,统一控制和调度。那为什么需要把资源隔离起来呢?
主要有以下几点:
Hystrix 提供了两种依赖隔离方式:线程池隔离 和 信号量隔离。
如下图,线程池隔离,Hystrix 可以为每一个依赖建立一个线程池,使之和其他依赖的使用资源隔离,同时限制他们的并发访问和阻塞扩张。
每个依赖可以根据权重分配资源(这里主要是线程),每一部分的依赖出现了问题,也不会影响其他依赖的使用资源。
1.2线程池隔离
如果简单的使用异步线程来实现依赖调用会有如下问题:
1.2.1 线程的创建和销毁;
1.2.2 线程上下文空间的切换,用户态和内核态的切换带来的性能损耗。
使用线程池的方式可以解决第一种问题,但是第二个问题计算开销是不能避免的。
Netflix在使用过程中详细评估了使用异步线程和同步线程带来的性能差异,结果表明在 99% 的情况下,异步线程带来的几毫秒延迟的完全可以接受的。
1.3线程池隔离的优缺点
优点:
缺点:
整体流程
2.1 POM依赖
<dependency>
<groupId>de.ahus1.prometheus.hystrix</groupId>
<artifactId>prometheus-hystrix</artifactId>
<version>4.1.0</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-core</artifactId>
<version>1.5.18</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-metrics-event-stream</artifactId>
<version>1.5.18</version>
</dependency>
<dependency>
<groupId>com.netflix.hystrix</groupId>
<artifactId>hystrix-javanica</artifactId>
<version>1.5.18</version>
</dependency>
2.2 Hystrix 生效
2.2.1 HystrixCommonRequestAspect
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PatchMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
@Slf4j
@Aspect
@Order(1)
@Component
public class HystrixCommonRequestAspect {
@Around(value = "(within(@org.springframework.stereotype.Controller *) || within(@org.springframework.web.bind.annotation.RestController *)) && @annotation(requestMapping)")
public Object requestMappingAround(ProceedingJoinPoint joinPoint, RequestMapping requestMapping) throws Throwable {
return handleRequest(joinPoint, requestMapping);
}
@Around(value = "(within(@org.springframework.stereotype.Controller *) || within(@org.springframework.web.bind.annotation.RestController *)) && @annotation(getMapping)")
public Object getMappingAround(ProceedingJoinPoint joinPoint, GetMapping getMapping) throws Throwable {
return handleRequest(joinPoint, getMapping);
}
@Around(value = "(within(@org.springframework.stereotype.Controller *) || within(@org.springframework.web.bind.annotation.RestController *)) && @annotation(postMapping)")
public Object postMappingAround(ProceedingJoinPoint joinPoint, PostMapping postMapping) throws Throwable {
return handleRequest(joinPoint, postMapping);
}
@Around(value = "(within(@org.springframework.stereotype.Controller *) || within(@org.springframework.web.bind.annotation.RestController *)) && @annotation(putMapping)")
public Object putMappingAround(ProceedingJoinPoint joinPoint, PutMapping putMapping) throws Throwable {
return handleRequest(joinPoint, putMapping);
}
@Around(value = "(within(@org.springframework.stereotype.Controller *) || within(@org.springframework.web.bind.annotation.RestController *)) && @annotation(deleteMapping)")
public Object putMappingAround(ProceedingJoinPoint joinPoint, DeleteMapping deleteMapping) throws Throwable {
return handleRequest(joinPoint, deleteMapping);
}
@Around(value = "(within(@org.springframework.stereotype.Controller *) || within(@org.springframework.web.bind.annotation.RestController *)) && @annotation(patchMapping)")
public Object putMappingAround(ProceedingJoinPoint joinPoint, PatchMapping patchMapping) throws Throwable {
return handleRequest(joinPoint, patchMapping);
}
private Object handleRequest(ProceedingJoinPoint joinPoint, Annotation mapping) throws Throwable {
if(hasHystrixCommand(joinPoint)){
if(log.isDebugEnabled()){
log.debug("当前请求有自定义的command,使用自定义的command");
}
return joinPoint.proceed();
}else{
if(log.isDebugEnabled()){
log.debug("当前请求没有自定义的command,使用默认的command");
}
HttpProceedCommand proceedCommand = new HttpProceedCommand();
proceedCommand.setJoinPoint(joinPoint);
return proceedCommand.execute();
}
}
public static class HttpProceedCommand extends com.netflix.hystrix.HystrixCommand{
private ProceedingJoinPoint joinPoint;
public ProceedingJoinPoint getJoinPoint() {
return joinPoint;
}
public HttpProceedCommand(){
super(HystrixCommandGroupKey.Factory.asKey("HttpProceedCommand"),HystrixThreadPoolKey.Factory.asKey("HttpProceedCommandThreadPool"));
}
public void setJoinPoint(ProceedingJoinPoint joinPoint) {
this.joinPoint = joinPoint;
}
@Override
protected Object run() throws Exception {
try {
return joinPoint.proceed();
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
}
private boolean hasHystrixCommand(ProceedingJoinPoint joinPoint){
MethodSignature methodSignature = (MethodSignature)joinPoint.getSignature();
Method method = methodSignature.getMethod();
HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
return hystrixCommand != null;
}
}
2.2.2 HystrixCommand 注解
@Configuration
public class HystrixConfig {
@Resource
private CollectorRegistry registry;
/**
* 用来拦截处理 HystrixCommand 注解
* @return
*/
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
HystrixPlugins.getInstance().registerCommandExecutionHook(new MyHystrixHook());
HystrixPrometheusMetricsPublisher.builder().withRegistry(registry).buildAndRegister();
return new HystrixCommandAspect();
}
/**
* 用来向监控中心 Dashboard 发送 stream 信息
* @return
*/
@Bean
public ServletRegistrationBean hystrixMetricsStreamServlet() {
ServletRegistrationBean registration = new ServletRegistrationBean(new HystrixMetricsStreamServlet());
registration.addUrlMappings("/hystrix.stream");
return registration;
}
}
2.2.3 HystrixCommand 注解
参数:
commandKey : 代表了一类 command,一般来说,代表了底层的依赖服务的一个接口
threadPoolKey : 代表使用的线程池 KEY,相同的 threadPoolKey 会使用同一个线程池
ignoreExceptions : 调用服务时,除了 HystrixBadRequestException 之外,其他 @HystrixCommand 修饰的函数抛出的异常均会被Hystrix 认为命令执行失败而触发服务降级的处理逻辑 (调用 fallbackMethod 指定的回调函数),所以当需要在命令执行中抛出不触发降级的异常时来使用它,通过这个参数指定,哪些异常抛出时不触发降级(不去调用 fallbackMethod ),而是将异常向上抛出。
**fallbackMethod **: 降级使用的方法,需要在同一个类中
@PostMapping("/test")
@HystrixCommand(commandKey = "testCommandKey",
threadPoolKey = "testThreadPool",
ignoreExceptions = {RuntimeException.class},
fallbackMethod = "testHystrixFail")
public String test() {
System.out.println("test");
return "测试";
}
降级方法
public String testHystrixFail() {
return "进入降级方法";
}
对重要数据进行缓存
/**
* 首页的查询缓存,缓存 48 个小时;做降级策略使用
* 时间小于 24h 时,进行更新缓存
*/
public static <T> void setUpHystrixCache(T t, String key) {
Long ttl = jedisClientUtil.ttl(key);
Long day = 169200L;
if (ttl <= day) {
jedisClientUtil.set(key, 60 * 60 * 24 * 2, JSONObject.toJSONString(t));
}
}
线程池配置如下
根据实际情况而定
查询活跃线程数方法
top -H -i -b -d 1 -n2 -p 进程号 | awk -v RS= 'END{print $0}' | awk '$1 ~ /[0-9]+/{print $12}' | sed -E 's/[0-9]+/n/g' | sort | uniq -c
arthas
thread --state RUNNABLE
4.1 因为使用的是线程池的模式,请求会在一个新的线程池中拿到线程执行代码,而不是使用 tomcat 的线程所有会存在一个 ThreadLocal 变量获取不到的情况,例如 TraceId
解决方案
/**
* Hystrix 使用线程池的模式会得不到父线程的线程上下文 列如 TraceId 该类就是为了解决可以得到父线程的 ThreadLocal 的变量
*/
public class MyHystrixHook extends HystrixCommandExecutionHook {
private HystrixRequestVariableDefault<String> traceIdVariable = new HystrixRequestVariableDefault<>();
@Override
public <T> void onStart(HystrixInvokable<T> commandInstance) {
HystrixRequestContext.initializeContext();
traceIdVariable.set(TraceIdUtil.getCurrentTraceId());
}
@Override
public <T> Exception onError(HystrixInvokable<T> commandInstance, HystrixRuntimeException.FailureType failureType, Exception e) {
HystrixRequestContext.getContextForCurrentThread().shutdown();
return super.onError(commandInstance, failureType, e);
}
@Override
public <T> void onSuccess(HystrixInvokable<T> commandInstance) {
HystrixRequestContext.getContextForCurrentThread().shutdown();
super.onSuccess(commandInstance);
}
@Override
public <T> void onExecutionStart(HystrixInvokable<T> commandInstance) {
TraceIdUtil.initTraceId(traceIdVariable.get());
}
@Override
public <T> void onFallbackStart(HystrixInvokable<T> commandInstance) {
TraceIdUtil.initTraceId(traceIdVariable.get());
}
}
然后在合适的位置注册:
HystrixPlugins.getInstance().registerCommandExecutionHook(new MyHystrixHook());
4.2 通过注解的方式可以设置降级方法的 **ignoreExceptions 参数,**拦截器的方式无法设置
解决方案
public static class HttpProceedCommand extends com.netflix.hystrix.HystrixCommand{
private ProceedingJoinPoint joinPoint;
public ProceedingJoinPoint getJoinPoint() {
return joinPoint;
}
public HttpProceedCommand(){
super(HystrixCommandGroupKey.Factory.asKey("HttpProceedCommand"),HystrixThreadPoolKey.Factory.asKey("HttpProceedCommandThreadPool"));
}
public void setJoinPoint(ProceedingJoinPoint joinPoint) {
this.joinPoint = joinPoint;
}
@Override
protected Object run() throws Exception {
try {
return joinPoint.proceed();
} catch (Throwable e) {
if (e instanceof RuntimeException) {
throw (Exception) e;
} else {
throw new HystrixBadRequestException(e.getMessage(), e);
}
}
}
}
在合适的地方抛出 HystrixBadRequestException
HystrixBadRequestException 用提供的参数或状态表示错误而不是执行失败的异常。与 HystrixCommand
抛出的所有其他异常不同,这不会触发回退,不会计算故障指标,因此不会触发断路器。
4.3 HystrixRuntimeException: Command fallback execution rejected
执行错误了,本应该去执行 fallback 方法,可是却被 reject 了,为什么呢?
这种情况下,一般来说是 command 已经熔断了,所有请求都进入 fallback 导致的,因为 fallback 默认是有个并发最大处理的限制,fallback.isolation.semaphore.maxConcurrentRequests,默认是10,这个方法及时很简单,处理很快,可是QPS如果很高,还是很容易达到10这个阈值,导致后面的被拒绝。
解决方法也很简单:
4.4 时间较长的接口是否应该进行中断
根据实际场景来判断 可通过
hystrix.command.[command].execution.isolation.thread.interruptOnTimeout = false
来配置关闭
下载请求
上传请求
command 和 pool 和 collapser 的配置参数
HystrixCommandProperties 命令执行相关配置:
hystrix.command.[commandkey].execution.isolation.strategy 隔离策略THREAD或SEMAPHORE 默认HystrixCommands使用THREAD方式 HystrixObservableCommands使用SEMAPHORE
hystrix.command.[commandkey].execution.timeout.enabled 是否开启超时设置,默认true。
hystrix.command.[commandkey].execution.isolation.thread.timeoutInMilliseconds 默认超时时间 默认1000ms
hystrix.command.[commandkey].execution.isolation.thread.interruptOnTimeout是否打开超时线程中断 默认值true
hystrix.command.[commandkey].execution.isolation.thread.interruptOnFutureCancel 当隔离策略为THREAD时,当执行线程执行超时时,是否进行中断处理,即Future#cancel(true)处理,默认为false。
hystrix.command.[commandkey].execution.isolation.semaphore.maxConcurrentRequests 信号量最大并发度 默认值10该参数当使用ExecutionIsolationStrategy.SEMAPHORE策略时才有效。如果达到最大并发请求数,请求会被拒绝。理论上选择semaphore size的原则和选择thread size一致,但选用semaphore时每次执行的单元要比较小且执行速度快(ms级别),否则的话应该用thread。
hystrix.command.[commandkey].fallback.isolation.semaphore.maxConcurrentRequests fallback方法的信号量配置,配置getFallback方法并发请求的信号量,如果请求超过了并发信号量限制,则不再尝试调用getFallback方法,而是快速失败,默认信号量为10。
hystrix.command.[commandkey].fallback.enabled 是否启用降级处理,如果启用了,则在超时或异常时调用getFallback进行降级处理,默认开启。
hystrix.command.[commandkey].circuitBreaker.enabled 是否开启熔断机制,默认为true。
hystrix.command.[commandkey].circuitBreaker.forceOpen 强制开启熔断,默认为false。
hystrix.command.[commandkey].circuitBreaker.forceClosed 强制关闭熔断,默认为false。
hystrix.command.[commandkey].circuitBreaker.sleepWindowInMilliseconds 熔断窗口时间,默认为5s。
hystrix.command.[commandkey].circuitBreaker.requestVolumeThreshold 当在配置时间窗口内达到此数量后的失败,进行短路。默认20个
hystrix.command.[commandkey].circuitBreaker.errorThresholdPercentage 出错百分比阈值,当达到此阈值后,开始短路。默认50%
hystrix.command.[commandkey].metrics.rollingStats.timeInMilliseconds 设置统计滚动窗口的长度,以毫秒为单位。用于监控和熔断器 默认10s
hystrix.command.[commandkey].metrics.rollingStats.numBuckets 设置统计窗口的桶数量 默认10
hystrix.command.[commandkey].metrics.rollingPercentile.enabled 设置执行时间是否被跟踪,并且计算各个百分比,50%,90%等的时间 默认true
hystrix.command.[commandkey].metrics.rollingPercentile.timeInMilliseconds 设置执行时间在滚动窗口中保留时间,用来计算百分比 默认60000ms
hystrix.command.[commandkey].metrics.rollingPercentile.numBuckets 设置rollingPercentile窗口的桶数量 默认6。
hystrix.command.[commandkey].metrics.rollingPercentile.bucketSize 此属性设置每个桶保存的执行时间的最大值 默认100。如果bucket size=100,window=10s,若这10s里有500次执行,只有最后100次执行会被统计到bucket里去。增加该值会增加内存开销以及排序的开销。
hystrix.command.[commandkey].metrics.healthSnapshot.intervalInMilliseconds 记录health 快照(用来统计成功和错误绿)的间隔,默认500ms
hystrix.command.[commandkey].requestCache.enabled 设置是否缓存请求,request-scope内缓存 默认值true
hystrix.command.[commandkey].requestLog.enabled 设置HystrixCommand执行和事件是否打印到HystrixRequestLog中 默认值true
hystrix.command.[commandkey].threadPoolKeyOverride 命令的线程池key,决定该命令使用哪个线程池。
HystrixThreadPoolProperties线程池相关配置:
hystrix.threadpool.[threadkey].coreSize 线程池核心线程数 默认值10;
hystrix.threadpool.[threadkey].maximumSize 线程池最大线程数 默认值10;
hystrix.threadpool.[threadkey].allowMaximumSizeToDivergeFromCoreSize 当线程数大于核心线程数时,是否需要回收。与keepAliveTimeMinutes配合使用。
hystrix.threadpool.[threadkey].keepAliveTimeMinutes 当实际线程数超过核心线程数时,线程存活时间 默认值1min
hystrix.threadpool.[threadkey].maxQueueSize 最大等待队列数 默认不开启使用SynchronousQueue 不可动态调整
hystrix.threadpool.[threadkey].queueSizeRejectionThreshold 允许在队列中的等待的任务数量 默认值5
hystrix.threadpool.[threadkey].metrics.rollingStats.timeInMilliseconds 设置统计滚动窗口的长度,以毫秒为单位 默认值10000。
hystrix.threadpool.[threadkey].metrics.rollingStats.numBuckets 设置统计窗口的桶数量 默认10
HystrixCollapserProperties批处理相关配置:
hystrix.collapser.[collapserKey].maxRequestsInBatch 单次批处理的最大请求数,达到该数量触发批处理,默认Integer.MAX_VALUE
hystrix.collapser.[collapserKey].timerDelayInMilliseconds 触发批处理的延迟,也可以为创建批处理的时间+该值,默认值10
hystrix.collapser.[collapserKey].requestCache.enabled 默认值true
hystrix.collapser.[collapserKey].metrics.rollingStats.timeInMilliseconds 默认值10000
hystrix.collapser.[collapserKey].metrics.rollingStats.numBuckets 默认值10
hystrix.collapser.[collapserKey].metrics.rollingPercentile.enabled 默认值true
hystrix.collapser.[collapserKey].metrics.rollingPercentile.timeInMilliseconds 默认值60000
hystrix.collapser.[collapserKey].metrics.rollingPercentile.numBuckets 默认值6
hystrix.collapser.[collapserKey].metrics.rollingPercentile.bucketSize 默认值100
以上文档是借鉴互联网经验和项目接入经验总结而来,相关配置仅供参考,具体配置请以实际情况而定。
参考 : https://github.com/Netflix/Hystrix/wiki#what