前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >修复 Spring Cloud Gateway 项目中无法通过 Skywalking 追踪 WebClient 调用的问题

修复 Spring Cloud Gateway 项目中无法通过 Skywalking 追踪 WebClient 调用的问题

原创
作者头像
Zimmem
发布于 2022-03-05 08:54:58
发布于 2022-03-05 08:54:58
7.8K0
举报
文章被收录于专栏:SkywalkingSkywalking

解决 Spring Cloud Gateway 项目中无法追踪 WebClient 调用的问题

问题描述

Skywalking 通过 java agent 的方式为 java 应用带来无侵入的分布式链路采集。

微服务架构中, Spring Cloud Gateway 做为业务网关, 一般需要自定义 Filter ,调用其它服务接口验证用户身份或判断权限。 Gateway 进程配置了 Skywalking Agent(8.8.0) , 但在 Filter 中使用 WebClient 调用远程服务, 可能导致生成多个调用链路, 无法正确跟踪。

Skywalking Jave Agent 采集链路信息原理

排查问题之前, 先了解下 Skywalking Jave Agent 是如何采集链路信息的。

Plugin Development Guide

单进程内同步调用 trace 状态维护

Skywalking Jave Agent 通过 org.apache.skywalking.apm.agent.core.context.ContextManager 来管理 Trace 上下文。

通过 ContextManager#createEntrySpanContextManager#createLocalSpanContextManager#createExistSpan 等方法来创建一个 Span。

  • EntrySpan 表示一次远程被调跨度
  • LocalSpan 表示一次进程内本地跨度
  • ExistSpan 表示一次远程主调跨度

当创建 Span 时, 如果链路上下文 TraceContext 还没有创建, 会先创建 Trace , 并把 TracerContext 存到 ContextManager 管理的 ThreadLocal ContextManager.CONTEXT 中。 新创建的 Span 会使用 TracerContext 的上下文信息。

因为 TracerContext 存在 ThreadLocal 中, 所以在同一个线程中创建的多个 Span 会使用到同一个 TracerContext 串起来。

单进程内异步调用 trace 状态维护

当使用 Spring WebFlux 或 Vert.x 等异步框架时, 一次调用事务的逻辑可能调度在不同的线程中。

因为 ContextManager 使用 ThreadLocal 来维持 TracerContext, 那么在一次调用事务链中每次创建 Span , 可能对应不同的 TracerContext. 最终在 Skyawalking 控制台中出现多个链路。

比如 Spring Mvc 接收到 Http 请求时, 创建了一个 EntrySpan, 在接下来的业务逻辑中需要调用一个远程服务, 那么需要创建一个 ExitSpan , 但在创建 ExitSpan 时由于多次异步调用, 已经切到别的线程上, ContextManager 获取不到原来的 TracerContext, 便新建了一个, 此时便出现 EntrySpan 与 ExitSpan 不属于同一个 Trace 的情况。

针对异步调用, Skywalking Agent 提供了 ContextSnapshot 用于在线程间共享 TracerContext.

在实现异步框架的插件时, 当创建第一个 Span 后, 需要使用 ContextManager.capture() 获取到 ContextSnapshot, 并放置到异步框架本身的上下文来传递。

而后, 再创建后续的 Span 时, 需要从框架的上下文中获取 ContextSnapshot , 再使用 ContextManger.continued 方法把 ContextSnapshot 恢复到当前 Span 中。

跨进程调用 trace 状态传递

Skywalking 根据不同的网络协议或框架(比如 Http Header 或 Kafka Message Header), 来传递链路上下文。 实现步聚如下:

  1. 主调端创建一个 ExitSpan, 通过 ContextManger.inject(ContextCarrier carrier) 把上下文信息注入到 carrier 中, 通过 carrier 可以获取到需要传递的 Hearder 信息, 再把 Header 信息注入到对应调用框架中(比如 HttpRequest)。
  2. 被调方从框架中取得 Header 信息封装成 ContextCarrier, 再使用 carrier 调用 ContextManager#createEntrySpan 来创建 EntrySpan 便能把主调跟被调的 Trace 上下文串起来。

Spring WebFlux Webclient 插件实现逻辑及问题重现。

既然是 WebClient 调用会导致生成多个 Trace , 那么直接查看 spring-webflux-5.x-webclient-plugin 插件的代码(8.8.0 版本)。

通过 org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.define.WebFluxWebClientInstrumentation 可以看到插件通过 org.apache.skywalking.apm.plugin.spring.webflux.v5.webclient.WebFluxWebClientInterceptor 拦截了 org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction#exchange 方法。

那么继续查看 WebFluxWebClientInterceptor 的代码:

代码语言:java
AI代码解释
复制
@Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInvocationContext context) throws Throwable {
        //..

        
        ClientRequest request = (ClientRequest) allArguments[0];
        final ContextCarrier contextCarrier = new ContextCarrier();

        URI uri = request.url();
        final String requestURIString = getRequestURIString(uri);
        final String operationName = requestURIString;
        final String remotePeer = getIPAndPort(uri);
        
        // 直接创建 ExitSpan , 没用使用 ContextManager.continued 来恢复上下文
        AbstractSpan span = ContextManager.createExitSpan(operationName, contextCarrier, remotePeer);

        //...

        //user async interface
        span.prepareForAsync();
        ContextManager.stopSpan();
        context.setContext(span);
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) throws Throwable {
        // fix the problem that allArgument[0] may be null
        if (allArguments[0] == null) {
            return ret;
        }
        Mono<ClientResponse> ret1 = (Mono<ClientResponse>) ret;
        AbstractSpan span = (AbstractSpan) context.getContext();
        return ret1.doOnSuccess(clientResponse -> {
            //...
        }).doOnError(error -> {
            span.log(error);
        }).doFinally(s -> {
            span.asyncFinish();
        });
    }

可以看到 WebFluxWebClientInterceptor#beforeMethod 中直接创建 ExitSpan , 并没有使用 ContextManger.continued 来恢复上下文。 因为在Spring WebFlux 基于 Reactor 异步框架 , 那么创建当前 Span 与前置创建 EntrySpan 不在同个线程中, 两个 Span 属于两个不同的 Trace.

Bug 复现, 创建一个 Spring WebFlux 项目, 编写测试 Controller 如下

代码语言:java
AI代码解释
复制
@SpringBootApplication
@RestController
public class SpringWebfluxProjectApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringWebfluxProjectApplication.class, args);
    }

    @GetMapping("test")
    public Mono<String> hello() {
        return WebClient.create("http://localhost:8080/foo")
                .get()
                .retrieve()
                .bodyToMono(String.class)
                .flatMap(s -> {
                    return WebClient.create("http://localhost:8080/bar")
                            .get()
                            .retrieve()
                            .bodyToMono(String.class);
                });

    }

    @GetMapping("foo")
    public Mono<String> baidu(ServerWebExchange exchange) {
        return Mono.just("foo");

    }

    @GetMapping("bar")
    public Mono<String> qq(ServerWebExchange exchange) throws InterruptedException {
        return Mono.just("bar").delayElement(Duration.ofMillis(100));

    }

}

配置好skywalking agent 相关 JVM 参数, 运行项目, 请求 http://localhost:8080/test , 查看 skywalking 面板, 确实生成了多个 Span , 但每个 Span 的 TraceId 都不一样。

skywalking-webclient-bug.png
skywalking-webclient-bug.png

解决方案

基于上节分析, 根本问题在于在创建 ExitSpan 时没有恢复上下文, 那么需有找到一个方法获取到上游的 ContextSnapshot 并恢复即可。

Spring Webflux 基于 Reactor 框架 , 可以通过 Reactor Context 来传递 ContextSnapshot.

Skywalking 默认插件中包含 mvc-annotation-5.x-plugin , 查看对应代码, 发现该插件通过拦截 Spring Mvc 相关注解方法, 在注解方法前创建 EntrySpan , 使用同步的方式,且拦截方法返回值不一定是 Mono 或 Flux , 难于在这个地方把 ContextSnapshot 放入 Reactor Context 中。 在 optional-plugin 还有 spring-webflux-5.x-plugin 插件, 该插件通过拦截 org.springframework.web.reactive.DispatcherHandler#handle 来创建 EntrySpan, DispatcherHandler#handle 返回 Mono , 可以在这里插入 ContextSnapshot.

具体实现如下:

代码语言:java
AI代码解释
复制
//org.apache.skywalking.apm.plugin.spring.webflux.v5.DispatcherHandlerHandleMethodInterceptor#afterMethod
 @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {

        ServerWebExchange exchange = (ServerWebExchange) allArguments[0];

        AbstractSpan span = (AbstractSpan) exchange.getAttributes().get("SKYWALKING_SPAN");
        Mono<Void> monoReturn = (Mono<Void>) ret;

        
        // add skywalking context snapshot to reactor context.
        EnhancedInstance instance = getInstance(allArguments[0]);
        if (instance != null && instance.getSkyWalkingDynamicField() != null) {
            monoReturn = monoReturn.subscriberContext(
                    c -> c.put("SKYWALKING_CONTEXT_SNAPSHOT", instance.getSkyWalkingDynamicField()));
        }

        return monoReturn.doFinally(s -> {

            if (span != null) {
                maybeSetPattern(span, exchange);
                try {

                    HttpStatus httpStatus = exchange.getResponse().getStatusCode();
                    // fix webflux-2.0.0-2.1.0 version have bug. httpStatus is null. not support
                    if (httpStatus != null) {
                        Tags.HTTP_RESPONSE_STATUS_CODE.set(span, httpStatus.value());
                        if (httpStatus.isError()) {
                            span.errorOccurred();
                        }
                    }
                } finally {
                    span.asyncFinish();
                }
            }
        });
    }

个性 WebFluxWebClientInterceptor 从 Reactor Context 中获取 ContextSnapshot :

代码语言:java
AI代码解释
复制
public class WebFluxWebClientInterceptor implements InstanceMethodsAroundInterceptorV2 {

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInvocationContext context) throws Throwable {
        // before method 中无法获取 Reactor 上下文 , 原逻辑直接删除掉
        // ExchangeFunctions$DefaultExchangeFunction 中只是构建 Reactor 链条, 并末真正执行, 所以原来逻辑可以推迟到 subscriberContext 中获取上下文后再执行。
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, Object ret, MethodInvocationContext context) throws Throwable {
        // fix the problem that allArgument[0] may be null
        if (allArguments[0] == null) {
            return ret;
        }
        Mono<ClientResponse> ret1 = (Mono<ClientResponse>) ret;
        // 从 Reactor Context 中获取 ContextSnapshot 
        return Mono.subscriberContext().flatMap(ctx -> {

            ClientRequest request = (ClientRequest) allArguments[0];
            URI uri = request.url();
            final String operationName = getRequestURIString(uri);
            final String remotePeer = getIPAndPort(uri);
            AbstractSpan span = ContextManager.createExitSpan(operationName, remotePeer);

            // get ContextSnapshot from reactor context,  the snapshot is set to reactor context by any other plugin
            // such as DispatcherHandlerHandleMethodInterceptor in spring-webflux-5.x-plugin
            final Optional<Object> optional = ctx.getOrEmpty("SKYWALKING_CONTEXT_SNAPSHOT");
            optional.ifPresent(snapshot -> ContextManager.continued((ContextSnapshot) snapshot));

            //set components name
            span.setComponent(ComponentsDefine.SPRING_WEBCLIENT);
            Tags.URL.set(span, uri.toString());
            Tags.HTTP.METHOD.set(span, request.method().toString());
            SpanLayer.asHttp(span);

            final ContextCarrier contextCarrier = new ContextCarrier();
            ContextManager.inject(contextCarrier);
            if (request instanceof EnhancedInstance) {
                ((EnhancedInstance) request).setSkyWalkingDynamicField(contextCarrier);
            }

            //user async interface
            span.prepareForAsync();
            ContextManager.stopSpan();
            return ret1.doOnSuccess(clientResponse -> {
                HttpStatus httpStatus = clientResponse.statusCode();
                if (httpStatus != null) {
                    Tags.HTTP_RESPONSE_STATUS_CODE.set(span, httpStatus.value());
                    if (httpStatus.isError()) {
                        span.errorOccurred();
                    }
                }
            }).doOnError(span::log).doFinally(s -> {
                span.asyncFinish();
            });
        });
    }
    
}

重新编译插件后把 spring-webflux-5.x-plugin 及 spring-webflux-5.x-webclient-plugin 两个插件拷到 Skywalking Agent plugin 目录下, 重新运行测试代码, 可以发现问题解决, 所有调用都串起来。

webclient_trace_fixed.png
webclient_trace_fixed.png

修复代码已合并到 skywalking-java 主干(#114), 预计将在 8.10.0 版本中发布。

注意1: 因为 spring-webflux-5.x-plugin 是在 optional-plugins 目录中, 需要手工拷到 plugins 目录。

而 Spring Cloud Gateway 工程需要手工拷 gateway-3.x-plugin。

注意2: Srping MVC 插件 apm-springmvc-annotation-5.x-plugin 默认生效, 当与 spring-webflux-5.x-plugin 同时存在时, 一次调用会生成两个 EntrySpan, 而且 mvc 插件生成 EntrySpan 虽然与 Webclient 生成的 ExitSpan 能用同个 TraceId 串起来了, 但仍然没有 Parent/Child 关系, 介意的话在 Spring Webflux 工程中把 spring-webflux-5.x-plugin 移出 ${agetn/path}/plugin 目录。


参考

  1. Plugin Development Guide
  2. Reactor Context

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
实现分布式锁
在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。那具体什么是分布式锁,分布式锁应用在哪些业务场景、如何来实现分布式锁呢。
HLee
2021/04/16
3980
实现分布式锁
分布式
在单机场景下,可以使用语言的内置锁来实现进程同步。但是在分布式场景下,需要同步的进程可能位于不同的节点上,那么就需要使用分布式锁。
全栈程序员站长
2021/04/07
4220
分布式服务接口请求顺序性保证
服务A调用服务B,先插入再删除。俩请求过去了,落在不同机器上,可能插入请求因为某些原因执行慢一些,导致删除请求先执行了,此时因为没数据所以啥效果也没有;结果这个时候插入请求过来了,好,数据插入进去。
JavaEdge
2021/12/07
4501
万字总结 分布式系统的38个知识点
天天说分布式分布式,那么我们是否知道什么是分布式,分布式会遇到什么问题,有哪些理论支撑,有哪些经典的应对方案,业界是如何设计并保证分布式系统的高可用呢?
一灰灰blog
2022/08/23
6880
分布式服务接口请求的顺序性如何保证?
分布式系统接口的调用顺序一般来说是不用保证顺序的。但是有的时候可能确实是需要严格的顺序保证。
JavaEdge
2022/11/30
3380
基于 Redis 的分布式锁实现
很久之前有讲过并发编程中的锁「并发编程的锁机制:synchronized和lock」。在单进程的系统中,当存在多个线程可以同时改变某个变量时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量。而同步的本质是通过锁来实现的。为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么需要在某个地方做个标记,这个标记必须每个线程都能看到,当标记不存在时可以设置该标记,其余后续线程发现已经有标记了则等待拥有标记的线程结束同步代码块取消标记后再去尝试设置标记。
CG国斌
2020/05/19
5120
分布式锁实现
我们知道在同一个JVM中,可以通过Volatile、Synchronized、ReentrantLock 三个关键字来实现线程的安全。那么在分布式系统中这些是无法保证的,所以要通过分布式锁来实现。
春哥大魔王
2019/10/09
6280
进阶分布式系统架构系列(十三):Zookeeper 分布式锁原理与实现
前面介绍了 Zookeeper 集群 ZAB 协议、配置中心、注册中心、数据与存储、会话与事务管理等相关的知识点,今天我将详细的为大家介绍 zookeeper 分布式锁相关知识,希望大家能够从中收获多多!如有帮助,请点在看、转发支持一波!!!
民工哥
2023/09/09
1.8K0
进阶分布式系统架构系列(十三):Zookeeper 分布式锁原理与实现
基于redis的分布式锁实现
关于分布式锁 很久之前有讲过并发编程中的锁并发编程的锁机制:synchronized和lock。在单进程的系统中,当存在多个线程可以同时改变某个变量时,就需要对变量或代码块做同步,使其在修改这种变量时能够线性执行消除并发修改变量。而同步的本质是通过锁来实现的。为了实现多个线程在一个时刻同一个代码块只能有一个线程可执行,那么需要在某个地方做个标记,这个标记必须每个线程都能看到,当标记不存在时可以设置该标记,其余后续线程发现已经有标记了则等待拥有标记的线程结束同步代码块取消标记后再去尝试设置标记。 分布式环境
aoho求索
2018/04/03
1.1K0
【JavaP6大纲】Dubbo篇:分布式服务接口请求的顺序性如何保证?
服务 A 调用服务 B,先插入再删除。好,结果俩请求过去了,落在不同机器上,可能插入请求因为某些原因执行慢了一些,导致删除请求先执行了,此时因为没数据所以啥效果也没有;结果这个时候插入请求过来了,好,数据插入进去了,那就尴尬了。本来应该是 “先插入 -> 再删除”,这条数据应该没了,结果现在 “先删除 -> 再插入”,数据还存在。
Java廖志伟
2021/04/12
3600
大厂都聊分布式系统,面试不知道分布式锁如何聊下去
公众号[JavaQ]原创,专注分享Java基础原理分析、实战技术、微服务架构、分布式系统构建,诚邀点赞关注!
JavaQ
2020/04/26
7260
分布式锁简单入门以及三种实现方式介绍
很多小伙伴在学习Java的时候,总是感觉Java多线程在实际的业务中很少使用,以至于不会花太多的时间去学习,技术债不断累积!等到了一定程度的时候对于与Java多线程相关的东西就很难理解,今天需要探讨的东西也是一样的和Java多线程相关的!
Java后端技术
2018/08/09
7620
分布式锁简单入门以及三种实现方式介绍
什么是接口幂等性?为什么会产生接口幂等性问题?如何保证接口幂等性?
博主负责的项目报了一个问题,用户操作回退失效。我们的设计里,操作回退是回到操作前的状态。经过查看日志发现,用户之前的操作做了两次,也就是说提交操作的接口被调用了两次,导致之用户上一次的状态和这一次的状态是一样的,所以操作回退是没有问题的,问题出在了操作的接口被调用了两次。
三分恶
2020/12/01
1.5K0
什么是接口幂等性?为什么会产生接口幂等性问题?如何保证接口幂等性?
Java面试——架构设计与分布式
LRU(Least Recently Used:最近最少使用):简单的说,就是保证基本的 Cache容量,如果超过容量则必须丢掉最不常用的缓存数据,再添加最新的缓存。每次读取缓存都会改变缓存的使用时间,将缓存的存在时间重新刷新。其实,就是清理缓冲的一种策略。 我们可以通过双向链表的数据结构实现 LRU Cache,链表头(head)保存最新获取和存储的数据值,链表尾(tail)既为最不常使用的值,当需要清理时,清理链表的 tail 即可,并将前一个元素设置为tail。结构图如下:
Java架构师必看
2021/04/25
7030
Java面试——架构设计与分布式
一文搞懂分布式锁的原理与实现
对于锁,大家应该都不陌生,手机上可以加锁,想用时候解锁,不用的时候上锁,那自行车、房门同样可以加把锁,道理属于类似的情况。
架构精进之路
2021/04/21
7.8K0
一文搞懂分布式锁的原理与实现
分布式系统互斥性与幂等性问题的分析与解决
前言 随着互联网信息技术的飞速发展,数据量不断增大,业务逻辑也日趋复杂,对系统的高并发访问、海量数据处理的场景也越来越多。如何用较低成本实现系统的高可用、易伸缩、可扩展等目标就显得越发重要。为了解决这一系列问题,系统架构也在不断演进。传统的集中式系统已经逐渐无法满足要求,分布式系统被使用在更多的场景中。 分布式系统由独立的服务器通过网络松散耦合组成。在这个系统中每个服务器都是一台独立的主机,服务器之间通过内部网络连接。分布式系统有以下几个特点: 可扩展性:可通过横向水平扩展提高系统的性能和吞吐量。 高可靠性
美团技术团队
2018/03/12
1.5K0
分布式系统互斥性与幂等性问题的分析与解决
【分布式详解】一致性算法、全局唯一ID、分布式锁、分布式事务、 分布式缓存、分布式任务、分布式会话
分布式系统通过副本控制协议,使得从系统外部读取系统内部各个副本的数据在一定的约束条件下相同,称之为副本一致性(consistency)。副本一致性是针对分布式系统而言的,不是针对某一个副本而言。
奥耶可乐冰
2024/01/11
1K0
【分布式详解】一致性算法、全局唯一ID、分布式锁、分布式事务、 分布式缓存、分布式任务、分布式会话
分布式事务与分布式锁的区别
随着互联网的快速发展,分布式系统已经成为了大型应用的标配。在分布式系统中,分布式事务和分布式锁是两个核心概念。本文将重点探讨分布式事务与分布式锁的区别,并提供相关的代码示例。
疯狂的KK
2023/07/23
1.4K1
分布式事务与分布式锁的区别
分布式理论
网络分区(脑裂):网络之间不连通,导致分布式系统出现局部小集群,小集群间网络异常,小集群内部网路正常。
IT大咖说
2021/04/23
4050
分布式理论
接口实现幂等的几种方式
一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同。幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。这些函数不会影响系统状态,也不用担心重复执行会对系统造成改变。
ppxai
2023/11/18
3650
推荐阅读
相关推荐
实现分布式锁
更多 >
LV.1
JAVA工程师
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档