前往小程序,Get更优阅读体验!
立即前往
发布
社区首页 >专栏 >Java学习笔记-微服务(4)-服务熔断和降级

Java学习笔记-微服务(4)-服务熔断和降级

原创
作者头像
咸鱼程序员
发布2025-03-05 23:31:17
发布2025-03-05 23:31:17
740
举报

CircuitBreaker

服务熔断

在分布式系统中,许多以来不可避免的会调用失败,Hystrix 能够保证在一个依赖出问题的情况下,不会导致整体服务失败,以提高分布式的弹性。

为什么不用 Hystrix?因为该组件不再更新。

分布式系统存在一个非常关键的问题,如何防止服务雪崩。服务雪崩是当链路中有服务宕机或无法请求时,链路前的所有系统都会由于大量的请求无法获取信息从而导致接连宕机。

为了解决这个问题,我们需要将有问题的节点快速熔断(返回失败处理或返回默认兜底数据做服务降级)。

断路器本身是一种开关装置,当某个服务发生故障后,通过断路器的故障监控,返回一个符合预期的、可以处理的备用响应而不是返回无法处理的异常。这样就保证了这笔无法成功的线程不会被长时间的不必要的占用,从而避免了故障在系统中的蔓延,从而避免雪崩。

对于一些概念做一些解释

服务熔断:当出现故障时直接拒绝访问。

服务降级:当出现故障时返回一个默认的提示信息。

服务限流:并发请求数过多,对请求做限流。

服务限时:指定时间内可以访问。

服务预热:请求从少到多逐渐允许请求。

CircuitBreaker 断路器,目的是保护分布式系统免受故障和异常,提高系统的可用性和健壮性。

当一个服务出现故障时,CircuitBreaker 会切换模式到 OPEN 状态,阻止请求发送到该服务从而避免更多的请求发送到该服务,减少该服务的负载,防止进一步崩溃。同时 CircuitBreaker 还可以在分布式系统的各个组件之间自动切换,从而避免单点故障。

关键的一点是:CircuitBreaker 只是一套规范和接口,具体的实现者是 Resilence4J,Resilence4J 也是 Spring 用于替换 Hystrix 的新组件。

Resilence4J 提供了几个核心模块,按重要程度排序如下:

CircuitBreaker 断路

ratelimiter 速率限制

bulkhead 舱壁

retry 自动重试

timelimiter 超时处理

cache 结果缓存

CircuitBreaker 状态转换原理

断路器有三个普通状态:关闭 CLOSED、开启 OPEN、半开 HALF_OPEN。

两个特殊状态:禁用 DISABLED、强制开启 FORCED_OPEN。

熔断器工作原理:

当熔断器关闭时,所有的请求都会通过熔断器。如果失败率超过设定阈值,熔断器就会从关闭状态转换为打开状态,此时所有请求都会被拒绝。经过一段时间后,熔断器会从打开状态转换为半开状态,此时仅有一定数量的请求被放入系统,并重新计算失败率。此时失败率仍然超过阈值,则继续转换为打开状态。此时失败率小于阈值,则转换为关闭状态。

熔断器使用滑动窗口来存储和统计调用的结果。可以选择基于调用数量的滑动窗口或基于时间的滑动窗口。

基于访问数量的滑动窗口统计了最近 N 次的调用结果。

基于时间的滑动窗口统计了最近 N 秒的调用结果。

除此之外,熔断器的两种特殊状态不会生成熔断器事件(转换状态除外),并且不会记录事件的成功或失败。退出特殊状态的唯一方法就是触发熔断器转换或重置熔断器。

CircuitBreaker 常用配置参考

failure-rate-threshold:以百分比配置失败率峰值。

sliding-window-type:断路器的滑动窗口类型,基于次数 COUNT_BASED、基于时间 TIME_BASED,默认基于次数。

sliding-window-size:若基于次数,则 50% 失败打开熔断器。若基于时间,则有两个额外的配置 sliding-window-size(N 秒)内 slow-call-rate-threshold(百分比)的请求超过 slow-call-duration-threshold (超过 N 秒)打开熔断器。

slow-call-duration-threshold:配置时间调用的峰值,高于该事件视为慢调用。

permitted-number-of-calls-in-half-open-state:断路器在半开状态下进行 N 次调用,若故障高于阈值则再次进入打开状态。

minimum-number-of-calls:每个滑动窗口的样本数。配置熔断器错误率或慢调用率的最小调用数。例如设置为 5 表示计算故障率之前需要至少调用 5 次

wait-duration-open-state:从 OPEN 到 HALF_OPEN 需要等待的时间。

CircuitBreaker 演示

基于前文的基础,我们新增一个入口用于测试服务熔断及降级(非上文的 OpenFeign 调用模块)

代码语言:java
复制
@RestController
public class PayCircuitController {

    @PostMapping("/pay/circuit")
    public ResultVO circuit(@RequestBody PayDTO payDTO) throws Exception {
        // id给-4时输出异常
        if (payDTO.getId() == -4) {
            throw new Exception("id 不能为负");
        }
        // id给9999时停止5s
        if (payDTO.getId() == 9999) {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        // 其余情况直接放行
        return new ResultVO<>(200, "成功", payDTO);
    }
}

在 api 模块中新增一个访问(如有疑问请看前文)

代码语言:java
复制
    /**
     * 验证 circuit
     * @param payDTO
     * @return
     */
    @PostMapping("/pay/circuit")
    ResultVO circuit(@RequestBody PayDTO payDTO) throws Exception;

之后在前文的 OpenFeign 测试模块完成所有需要的配置

  • 修改请求配置,添加 pom 依赖;
  • 修改 application.yml 超时配置 8s 更改为 20s 便于演示 circuit;
  • 在spring-cloud-openfeign 配置下新增 circuitbreaker 配置;
  • 在根配置下新增 resilience4j 配置;
  • 新建请求 Controller 并配置熔断相关;

示例代码如下

代码语言:xml
复制
        <!--circuitBreaker-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
        </dependency>
        <!--aop-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>
代码语言:yml
复制
client:
        config:
          default:
            # 连接超时时间
            connect-timeout: 20000
            # 读取超时时间,8s的原因是有调查发现,当响应时间超过8s时,绝大部分用户都会关闭请求
            read-timeout: 20000
代码语言:yml
复制
      circuitbreaker:
        # 开启 circuitbreaker
        enabled: true
        # 分组开启时,分组策略默认为精确优先,如果配置了多个分组,则优先使用分组策略
        # 精确优先就是每个服务调用都将根据其所属的熔断器分组进行处理
        group:
          enabled: true
代码语言:yml
复制
resilience4j:
  circuitbreaker:
    configs:
      default:
        # 设置百分之50的失败率,超过打开熔断器
        failure-rate-threshold: 50
        # 设置滑动窗口为次数
        sliding-window-type: COUNT_BASED
        # 滑动窗口大小 6个请求
        sliding-window-size: 6
        # 开始计算失败率之前的最小样本数
        minimum-number-of-calls: 6
        # 是否开启熔断器自动过渡到半开状态标志
        automatic-transition-from-open-to-half-open-enabled: true
        # 过渡所需的时间
        wait-duration-in-open-state: 5s
        # 半开状态的最大请求数 默认为10
        permitted-number-of-calls-in-half-open-state: 2
        # 熔断器开启后,记录异常的类
        record-exceptions:
          - java.lang.Exception
    instances:
      # 指定服务名
      cloud-payment-service:
        # 使用default配置
        base-config:  default
代码语言:java
复制
@RestController
public class OrderCircuitController {

    @Resource
    private OpenFeignApi openFeignApi;

    /**
     * 服务熔断降级demo
     * 新增熔断降级注解,name为服务名,fallbackMethod为降级方法
     * @param payDTO
     * @return
     */
    @PostMapping("/feign/pay/circuit")
    @CircuitBreaker(name = "cloud-payment-service", fallbackMethod = "circuitFallback")
    public ResultVO circuit(@RequestBody PayDTO payDTO) throws Exception {
        return openFeignApi.circuit(payDTO);
    }

    /**
     * 熔断降级方法
     * @param payDTO
     * @param t
     * @return
     */
    public ResultVO circuitFallback(PayDTO payDTO, Throwable t){
        return new ResultVO(200, "触发服务降级", null);
    }
}

连通性测试完毕后,将请求的入参 id 更改为 -4,手动触发异常以模拟服务降级。可以从返回结果中观察到,返回结果为熔断降级方法的 ResultVO。

BulkHead 隔离

BulkHead 一般用于限制对于下游服务的最大并发数量的限制

Resilience4j 提供了两种隔离的实现方式,可以限制并发执行的数量:

实现 SemaphoreBulkhead (信号量舱壁)

实现 FixedThreadPoolBulkhead (固定线程池舱壁)

SemaphoreBulkhead 信号量舱壁

当信号量存在空闲时,进入系统的请求会直接获取信号量并开始业务处理。当信号量全被占用时,接下来的请求会进入阻塞状态,SemaphoreBulkhead 提供了一个阻塞计时器。

如果阻塞状态的请求在阻塞计时内无法获取到信号量则系统会拒绝这些请求。若请求在阻塞计时内获取到了信号量,那将直接获取信号量并执行相应的业务处理。

我们在服务方模块中,新增一个方法并在 api 模块中添加访问入口

代码语言:java
复制
@PostMapping("pay/bulkhead")
    public ResultVO bulkhead(@RequestBody PayDTO payDTO) throws Exception {
        if (payDTO.getId() == -4) {
            throw new Exception("id 不能为负");
        }
        if (payDTO.getId() == 9999) {
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return new ResultVO<>(200, "成功", payDTO);
    }
代码语言:java
复制
    /**
     * 验证 bulkhead
     * @param payDTO
     * @return
     */
    @PostMapping("pay/bulkhead")
    ResultVO bulkhead(@RequestBody PayDTO payDTO) throws Exception;

在调用方模块中添加依赖

代码语言:xml
复制
        <!--bulkhead-->
        <dependency>
            <groupId>io.github.resilience4j</groupId>
            <artifactId>resilience4j-bulkhead</artifactId>
        </dependency>

在 applicatin.yml resilience4j 下添加关于 bulkhead 的相关配置

代码语言:yml
复制
  bulkhead:
    configs:
      default:
        # 隔离允许线程并发执行的最大数量
        max-concurrent-calls: 2
        # 当到达并发线程数量时,新的线程的阻塞时间,超时进入fallback
        max-wait-duration: 1000ms
    instances:
      cloud-payment-service:
        base-config: default

  timelimiter:
    configs:
      default:
        timeout-duration: 20000ms

在调用方模块中添加请求

代码语言:java
复制
/**
     * bulkhead舱壁demo
     * 新增熔断降级注解,name为服务名,fallbackMethod为降级方法
     * @param payDTO
     * @return
     */
    @PostMapping("feign/pay/bulkhead")
    @Bulkhead(name = "cloud-payment-service", fallbackMethod = "myBulkheadFallBack", type = Bulkhead.Type.SEMAPHORE)
    public ResultVO bulkhead(@RequestBody PayDTO payDTO) throws Exception {
        return openFeignApi.bulkhead(payDTO);
    }
    public ResultVO myBulkheadFallBack(PayDTO payDTO, Throwable t){
        return new ResultVO(200, "触发服务降级", null);
    }

连通性测试完毕后,多线程发起请求,在同一时间发送超过 max-concurrent-calls 个线程数量的请求,可以观察到超出的请求被转入 fallback 方法。

FixedThreadPoolBulkhead 固定线程池舱壁

FixedThreadPoolBulkhead 和 SemaphoreBulkhead 一样也适用于限制并发执行次数的,但是二者的实现原理存在差别且表现效果也存在不同。FixedThreadPoolBulkhead 使用一个固定线程池和等待队列来实现舱壁。

当线程池中存在空闲时,此时进入系统的请求将直接进入线程池开启新线程或使用空闲线程来处理请求。

当线程池中没有空闲时,接下来的请求会进入等待队列。如果等待队列仍然没有空间则该请求会被直接拒绝。等待队列中的请求会在线程池出现空闲时进入线程池进行业务处理。

此外,ThreadPoolBulkhead 只对 CompletableFuture 方法有效,我们必须创建返回 CompletableFuture 类型的方法。

首先我们添加 application.yml resilience4j 下的相关配置。(pom 和信号量舱壁一致无需重复引入)

代码语言:yml
复制
  # 采用新线程和主线程脱离时,关闭circuitbreaker的group配置
  thread-pool-bulkhead:
    configs:
      default:
        # 线程池核心线程数
        core-thread-pool-size: 1
        # 线程池最大线程数
        max-thread-pool-size: 1
        # 线程池队列大小
        queue-capacity: 1
    instances:
      cloud-payment-service:
        base-config: default
代码语言:yml
复制
    openfeign:
      circuitbreaker:
        # 开启 circuitbreaker
        enabled: true
        # 分组开启时,分组策略默认为精确优先,如果配置了多个分组,则优先使用分组策略
        # 精确优先就是每个服务调用都将根据其所属的熔断器分组进行处理
        group:
          enabled: false # 该项改为false

调用方模块新增内容,注意和前文不同的是,bulkhead 注解的 type 被修改为了 threadpool

代码语言:java
复制
/**
     * FixedThreadPoolBulkhead demo
     * 新增熔断降级注解,name为服务名,fallbackMethod为降级方法
     * @param payDTO
     * @return
     */
    @PostMapping("feign/pay/poolBulkhead")
    @Bulkhead(name = "cloud-payment-service", fallbackMethod = "myBulkheadPoolFallBack", type = Bulkhead.Type.THREADPOOL)
    public CompletableFuture<ResultVO> fixedThreadPoolBulkhead(@RequestBody PayDTO payDTO) throws Exception {
        // 休眠 1s
        Thread.sleep(1000);
        System.out.println(Thread.currentThread().getName() + " start");
        try {
            Thread.sleep(3000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " end");
        return CompletableFuture.supplyAsync(() -> {
            try {
                return openFeignApi.bulkhead(payDTO);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
    public CompletableFuture<ResultVO> myBulkheadPoolFallBack(PayDTO payDTO, Throwable t){
        return CompletableFuture.supplyAsync(() -> new ResultVO(200, "触发服务降级", null));
    }

连通性测试无误后,根据上文提供的配置,最大线程数和等待队列都为 1 的情况下,同时有三个请求进行访问时,前两个会访问成功,第三个请求会访问失败触发服务降级。

RateLimiter 限流

限流就是频率的控制,系统所能提供的最大并发是有限的,同时请求过多就需要限流。

常见的限流算法:

漏斗算法(Leaky Bucket):一个固定容量的桶,按照设定常量固定速率进行限流。若请求超出设定值,多余的被丢弃。由于漏斗算法的速率是固定的,所以漏斗算法对于突发性的流量缺乏效率。

令牌桶算法(Token Bucket):以固定速率添加令牌,当请求进入时,从令牌桶中获取令牌,多余的进入等待队列。若令牌桶内无令牌,则丢弃该请求。

令牌桶算法是 Spring Cloud 的默认算法。

滚动时间窗(Tumbling time window):允许固定数量个请求进入,超过数量的就拒绝或等待下一个时间窗口进入。由于是一个时间间隔内进行限制,如果用户在上个时间间隔结束前请求,同时在当前时间间隔开始时请求,就会出现问题(时间临界点缺陷)。

滑动时间窗(Sliding time window):解决了时间临界点缺陷的算法。

我们在服务方模块中新增代码

代码语言:java
复制
    @PostMapping("pay/ratelimit")
    public ResultVO rateLimit(@RequestBody PayDTO payDTO){
        return new ResultVO<>(200, "rateLimit input", payDTO);
    }

api 模块中新增访问入口

代码语言:java
复制
    /**
     * 验证 rateLimit 限流
     * @param payDTO
     * @return
     */
    @PostMapping("pay/ratelimit")
    public ResultVO rateLimit(@RequestBody PayDTO payDTO);

调用方模块新增依赖

代码语言:xml
复制
        <dependency>
            <groupId>io.github.resilience4j</groupId>
            <artifactId>resilience4j-ratelimiter</artifactId>
        </dependency>

application.yml 的 resilience4j 下新增配置

代码语言:yml
复制
  ratelimiter:
    configs:
      default:
        # 一次刷新周期内的允许执行的最大请求数
        limit-for-period: 2
        # 刷新周期,吸纳六七每隔limitRefreshPeriod刷新一次
        limit-refresh-period: 1s
        # 线程等待权限的默认等待时间
        timeout-duration: 1s
    instances:
      cloud-payment-service:
        base-config: default

Controller 新增如下代码(注意注解为 @RateLimiter):

代码语言:java
复制
   /**
     * bulkhead舱壁demo
     * 新增熔断降级注解,name为服务名,fallbackMethod为降级方法
     * @param payDTO
     * @return
     */
    @PostMapping("feign/pay/bulkhead")
    @RateLimiter(name = "cloud-payment-service", fallbackMethod = "myRatelimiterFallBack")
    public ResultVO rateLimiter(@RequestBody PayDTO payDTO) throws Exception {
        return openFeignApi.bulkhead(payDTO);
    }
    public ResultVO myRatelimiterFallBack(PayDTO payDTO, Throwable t){
        return new ResultVO(200, "触发服务降级", null);
    }

测试过程和前文一致,不断请求该接口,访问速度超出限流规定数值即可。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • CircuitBreaker
    • 服务熔断
    • CircuitBreaker 状态转换原理
    • CircuitBreaker 常用配置参考
    • CircuitBreaker 演示
    • BulkHead 隔离
    • SemaphoreBulkhead 信号量舱壁
    • FixedThreadPoolBulkhead 固定线程池舱壁
    • RateLimiter 限流
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档