前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Flink源码分析之RPC通信

Flink源码分析之RPC通信

原创
作者头像
楞头青
修改2022-07-12 13:23:25
1.5K2
修改2022-07-12 13:23:25
举报
文章被收录于专栏:流计算

问题导读

  1. RPC服务端创建过程
  2. RPC客户端创建过程
  3. RPC调用流程
  4. 在Flink集群中整个RPC通信网络是如何一步步建立起来的,连接容错又如何保证

简介

Flink基于Akka来实现内部各组件(ResourceManager、Dispatcher、JobMaster、TaskExecutor等)间的RPC通信。本篇着重分析Flink的RPC设计,如何封装Actor模型,RPC的创建和调用流程。

阅读说明: 源码版本:Flink release-1.14.4 阅读前提:了解Akka Actor基础知识 1). 先聊Flink的RPC设计,理清RpcGateway、RpcEndpoint、RpcService、RpcServer概念,也就弄明白了Flink如何封装Akka来实现RPC机制。有兴趣可继续往下阅读。 2). 结合代码,看问题1、2、3,进一步熟悉RPC的创建与交互过程。(重点关注AkkaRpcService、AkkaInvocationHandler、AkkaRpcActor类) 3). 第4个问题进一步延伸,主要是理清各组件间谁与谁会建立通信连接,先后顺序是怎样的,由此建立起整个RPC通信网络。在组件切leader、重启或者心跳超时等异常情况时,是否有容错机制重新建连。

接口设计

结合下面RpcGateway、RpcEndpoint、RpcService、RpcServer概念看这两张图

JobManager RpcService
JobManager RpcService
TaskManager RpcService
TaskManager RpcService

RpcGateway

  1. 用于定义RPC协议,是客户端和服务端沟通的桥梁。
  2. 服务端实现了RPC协议,即实现了接口中定义的方法,做具体的业务逻辑处理
  3. 客户端实现了RPC协议,客户端是Proxy生成的代理对象,将对RpcGateway接口方法的调用转为Akka的消息发送。
  4. 关注其5个子接口:DispatcherGateway、ResourceManagerGateway、JobMasterGateway、MetricQueryServiceGateway、TaskExecutorGateway。

RpcEndpoint

  1. RPC服务端的抽象,实现了该接口即为Rpc服务端,是Akka中Actor的封装。
  2. Actor收到ActorRef发送的消息(消息被封装为RpcInvocation对象),会通过RpcInvocation对象中的方法、参数等信息以反射的方式调用RpcGateway接口对应的方法。
  3. 关注其5个实现类:Dispatcher、ResourceManager、JobMaster、MetricQueryService、TaskExecutor。其中Dispatcher、ResourceManager、JobMaster是JobManager进程中的Rpc服务,TaskExecutor是TaskManager进程中的Rpc服务,MetricQueryService在JobManager和TaskManager进程中都有。

RpcService

  1. 是 RpcEndpoint 的运行时环境,是Akka中ActorSystem的封装
  2. 一个ActorSystem系统中有多个Actor,同样在Flink中一个RpcService中有多个RpcEndpoint,即多个Rpc服务。
  3. Flink中RpcService也有多套,JobManager和TaskManager进程中都有两套RpcService。
  4. RpcService 提供了启动Rpc服务(startServer)、停止Rpc服务(stopServer)、连接远端Rpc服务等方法
  5. 实现类是AkkaRpcService,内有属性ActorSystem actorSystem,Map<ActorRef, RpcEndpoint> actors。

RpcServer

是Rpc服务端自身的代理对象,设计上是供服务端调用自身非Rpc方法。

类关系图

注:这里借用网上画的一张图

Flink RPC详解
Flink RPC详解

问题

1. RPC服务端创建过程

RPC服务端是一个代理对象。

入口

代码语言:java
复制
RpcEndpoint.java
  
    protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
        this.rpcService = checkNotNull(rpcService, "rpcService");
        this.endpointId = checkNotNull(endpointId, "endpointId");
				
  			// 生成代理对象
        this.rpcServer = rpcService.startServer(this);

        this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
    }

代理对象生成过程如下:

1. 以Ask方式向SupervisorActor发送StartAkkaRpcActor消息,SupervisorActor收到消息后根据消息里RpcEndpoint的配置信息创建Actor,并以tell方式回复创建成功。

代码语言:java
复制
AkkaRpcService.java
    startServer(C rpcEndpoint)
        registerAkkaRpcActor(rpcEndpoint)
   		{
     			 	// 创建Actor
             SupervisorActor.startAkkaRpcActor(
               supervisor.getActor(),
               actorTerminationFuture ->
               Props.create(
                 akkaRpcActorType,
                 rpcEndpoint,
                 actorTerminationFuture,
                 getVersion(),
                 configuration.getMaximumFramesize(),
                 flinkClassLoader),
               rpcEndpoint.getEndpointId());  
   
             // 在RpcService中保存ActorRef与RpcEndpoint引用关系
             actors.put(actorRegistration.getActorRef(), rpcEndpoint)    
   		}
代码语言:java
复制
SupervisorActor.java
     
   		// 1) 发送消息
       public static StartAkkaRpcActorResponse startAkkaRpcActor(
               ActorRef supervisor, StartAkkaRpcActor.PropsFactory propsFactory, String endpointId) {
     
       			// 以Ask方式发送消息并等待结果
     				// Ask在实现上实际上是会创建一个Actor等待响应结果,成功或者超时时,销毁Actor
           return Patterns.ask(
                           supervisor,
                           createStartAkkaRpcActorMessage(propsFactory, endpointId),
                           RpcUtils.INF_DURATION)
                   .toCompletableFuture()
                   .thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast)
                   .join();
       } 
   
   		// 2) 处理消息
       private void createStartAkkaRpcActorMessage(StartAkkaRpcActor startAkkaRpcActor) {
           final String endpointId = startAkkaRpcActor.getEndpointId();
           final Props akkaRpcActorProps = ...
           ...
   
           try {
             	// 创建Actor
               final ActorRef actorRef = getContext().actorOf(akkaRpcActorProps, endpointId);
   
               registeredAkkaRpcActors.put(actorRef, akkaRpcActorRegistration);
   
             	// 回复消息
               getSender().tell(
                               	StartAkkaRpcActorResponse.success(...),
                               	getSelf()
               								);
           } catch (AkkaException akkaException) {
               getSender().tell(StartAkkaRpcActorResponse.failure(akkaException), getSelf());
           }
       }

2. 准备代理对象要实现的接口

代码语言:java
复制
AkkaRpcService.java
    startServer(C rpcEndpoint)
   	{
   		...
        // 服务端对象实现了RpcGateway接口
        Set<Class<?>> implementedRpcGateways =
        new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
            
            // 服务端对象是一个RpcServer
        implementedRpcGateways.add(RpcServer.class);
            // 服务端对象是Akka endpoint,可以获取到ActorRef引用
        implementedRpcGateways.add(AkkaBasedEndpoint.class); 
    
        if (rpcEndpoint instanceof FencedRpcEndpoint){
            implementedRpcGateways.add(FencedMainThreadExecutable.class);
        }
   	}

3. 生成代理对象

代码语言:java
复制
AkkaRpcService.java
   	startServer(C rpcEndpoint)
   	{
     	...
        RpcServer server =
                   (RpcServer)
                           Proxy.newProxyInstance(
                                   classLoader,
                                   implementedRpcGateways.toArray(
                                           new Class<?>[implementedRpcGateways.size()]),
                                   akkaInvocationHandler);
   
         return server;        
   	}

2. RPC客户端创建过程

RPC客户端是一个代理对象。

入口:RpcService的connect(String address, Class<C> clazz)方法。

代码语言:java
复制
AkkaRpcService.java
  connect(String address, Class<C> clazz)
		{
  			...
        // 1) 使用ActorSystem.actorSelection(address).resolveOne的方式来获取Actor的引用ActorRef(ActorRef可以用来向服务端Actor发送消息)
        final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);
  
  			// 2) ActorRef创建完成后,使用ask的方式向服务端发送一条握手消息(用来验证Client和Server彼此版本一致)
        final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =
                actorRefFuture.thenCompose(
                        (ActorRef actorRef) ->
                                AkkaFutureUtils.toJava(
                                        Patterns.ask(
                                                        actorRef,
                                                        new RemoteHandshakeMessage(
                                                                clazz, getVersion()),
                                                        configuration.getTimeout().toMilliseconds())
                                                .<HandshakeSuccessMessage>mapTo(
                                                        ClassTag$.MODULE$
                                                                .<HandshakeSuccessMessage>apply(
                                                                        HandshakeSuccessMessage
                                                                                .class))));  
  
  			// 3) 以上2个事都做完后,异步创建代理对象并返回
        final CompletableFuture<C> gatewayFuture =
                actorRefFuture.thenCombineAsync(
                        handshakeFuture,
                        (ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
                          	// invocationHandlerFactory.apply(actorRef) = new AkkaInvocationHandler 或 new 或 FencedAkkaInvocationHandler
                            InvocationHandler invocationHandler =
                                    invocationHandlerFactory.apply(actorRef);

                            ClassLoader classLoader = getClass().getClassLoader();

                            C proxy =
                                    (C)
                                            Proxy.newProxyInstance(
                                                    classLoader,
                                                    new Class<?>[] {clazz},
                                                    invocationHandler);

                            return proxy;
                        },
                        actorSystem.dispatcher());

        return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);  	
		}

3. RPC调用流程

1. 通过客户端代理对象调用RpcGateway的方法会交由invoke方法执行。

2. invoke将方法、参数信息封装为RpcInvocation对象,并通过ActorRef将消息发送给服务端Actor。

如果执行的方法有返回值就使用Akka ask方式,否则以tell方式发送消息。 通过连接的服务端的地址可以判断出服务端在远程还是本地。 如果在远程,消息类型为RemoteRpcInvocation,实现了序列化接口,对象可序列化传输。(会判断methodName + parameterTypes + args序列化后的字节数是否超时指定的值,见参数akka.remote.netty.tcp.maximum-frame-size) 如果在本地,消息类型为LocalRpcInvocation。

3. 服务端Actor收到RpcInvocation消息,会从中获取到方法名、方法参数等相关信息,在主线程中通过反射的方式调用代理对象对应方法执行业务逻辑,如果方法有返回值,还会以tell方法告知客户端结果。

客户端相关代码如下:

代码语言:java
复制
class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer{
  
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        Class<?> declaringClass = method.getDeclaringClass();

        Object result;

      	// 非Rpc方法,直接本地执行。这个是服务端通过自己的代理对象RpcServer调用自己非Rpc方法时走的逻辑
        if (declaringClass.equals(AkkaBasedEndpoint.class)
                || declaringClass.equals(Object.class)
                || declaringClass.equals(RpcGateway.class)
                || declaringClass.equals(StartStoppable.class)
                || declaringClass.equals(MainThreadExecutable.class)
                || declaringClass.equals(RpcServer.class)) {
            result = method.invoke(this, args);
        } else if (declaringClass.equals(FencedRpcGateway.class)) { // 支持HA的见FencedAkkaInvocationHandler
            throw new UnsupportedOperationException(...);
        } else {
          	// RPC方法,指RpcGateway子接口中定义的方法
          	// 接口:ResourceManagerGateway、DispatcherGateway、JobMasterGateway、MetricQueryServiceGateway、TaskExecutorGateway
            result = invokeRpc(method, args);
        }

        return result;
    }
  
    private Object invokeRpc(Method method, Object[] args) throws Exception {
				...

        // 1) 封装消息
        final RpcInvocation rpcInvocation =
                createRpcInvocationMessage(methodName, parameterTypes, args);

        // 2) 借助akka发送消息,进行RPC调用
        Class<?> returnType = method.getReturnType();

        final Object result;

        if (Objects.equals(returnType, Void.TYPE)) {
          	// 无返回值,用akka tell模式
            tell(rpcInvocation);

            result = null;
        } else {
						...
            // 有返回值,用akka ask模式
            final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
						...
        }

        return result;
    }
  
}

服务端相关代码如下:

代码语言:java
复制
AkkaRpcActor.java
  
    private void handleRpcInvocation(RpcInvocation rpcInvocation) {
        Method rpcMethod = null;

        try {
            String methodName = rpcInvocation.getMethodName();
            Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();

            rpcMethod = lookupRpcMethod(methodName, parameterTypes);
        } catch (Exception e) {
          	...
            getSender().tell(new Status.Failure(rpcException), getSelf());
        }

        if (rpcMethod != null) {
            try {
                rpcMethod.setAccessible(true);
                final Method capturedRpcMethod = rpcMethod;
              
              	// 1) 无返回值
                if (rpcMethod.getReturnType().equals(Void.TYPE)) {
                    runWithContextClassLoader(
                            () -> capturedRpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()),
                            flinkClassLoader);
                } else {
                  	// 2) 有返回值
                    final Object result;
                    try {
                        result =
                                runWithContextClassLoader(
                                        () ->
                                                capturedRpcMethod.invoke(
                                                        rpcEndpoint, rpcInvocation.getArgs()),
                                        flinkClassLoader);
                    } catch (InvocationTargetException e) {
												...
                        getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
                        return;
                    }

                    final String methodName = rpcMethod.getName();

                    if (result instanceof CompletableFuture) {
                        final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;
                        sendAsyncResponse(responseFuture, methodName);
                    } else {
                        sendSyncResponse(result, methodName);
                    }
                }
            } catch (Throwable e) {
              	...
                getSender().tell(new Status.Failure(e), getSelf());
            }
        }
    }  

4. 在Flink集群中整个RPC通信网络是如何一步步建立起来的,连接容错又如何保证

总述

哪些组件之间会建立RPC连接,什么时候会建立,连接又是如何建立起来的?

1). Rpc服务端作为服务端提供了Rpc服务,其内部也有其他Rpc服务的客户端

JobMaster主动连接ResourceManager,ResourceManager回连JobMaster

TaskManager主动连接ResourceManager,ResourceManager回连TaskManager

TaskManager主动连接JobMaster,JobMaster回连TaskManager

2). Dispatcher会连接ResourceManager的Rpc服务

是通过GatewayRetriever来发现ResourceManager的Rpc地址信息

3). WebMonitorEndpoint会连接Dispatcher和ResourceManager的Rpc服务

是通过GatewayRetriever来发现Dispatcher和ResourceManager的Rpc地址信息

WebMonitorEndpoint是一个基于netty实现的rest服务,非Rpc服务端

4). WebMonitorEndpoint会连接JobManager进程中的MetricQueryService Rpc服务和TaskManager进程中的MetricQueryService Rpc服务

步骤:

  1. 其内部有MetricFetcher,MetricFetcher通过GatewayRetriever发现并连接Dispatcher
  2. 调用Dispatcher的requestMultipleJobDetails方法获取Job统计信息
  3. 调用Dispatcher的requestMetricQueryServiceAddresses方法获取JobManager的Rpc服务地址,通过MetricQueryServiceRetriever的retrieveService方法连接到JobManager上的MetricQueryService Rpc服务,查询并获取JobManager的metric数据
  4. 调用Dispatcher的requestTaskManagerMetricQueryServiceAddresses方法获取所有TaskManager的Rpc服务地址,通过MetricQueryServiceRetriever的retrieveService方法连接到TaskManager上的MetricQueryService Rpc服务,查询并获取TaskManager的metric数据

5). JobClient会连接Dispatcher的Rpc服务

下面重点说明JobMaster、ResourceManager、TaskManager之间的连接建立过程,为了描述方便,JM指JobMaster,RM指ResourceManager,TM指TaskManager

JM连接RM,RM回连JM

1). 连接建立过程(JM主动连接RM,RM回连JM)

入口是JobMaster的reconnectToResourceManager方法

调用链使用伪码表示如下:

代码语言:java
复制
JobMaster.java
     reconnectToResourceManager
     	tryConnectToResourceManager
     		connectToResourceManager
   			{
     			// 这里用resourceManagerAddress地址重新构建一个连接
     			// RM地址发生切换时,resourceManagerAddress值也会得到更新
   				resourceManagerConnection = new ResourceManagerConnection(..., resourceManagerAddress, ...)  
                resourceManagerConnection.start	
   			}
     
RegisteredRpcConnection.java
    start
    	newRegistration.startRegistration
     
RetryingRegistration.java
    startRegistration
     	// 1). 建立与RM的连接
     	rpcService.connect	
     	// 2). 向RM注册自己
     	register
     		JobMaster.ResourceManagerConnection.invokeRegistration
     			gateway.registerJobManager	
   				{
     				// 3). RM方法内部回连JM
     				getRpcService().connect(jobManagerAddress, jobMasterId, JobMasterGateway.class)
   				}

2). JM如何获取RM地址?

通过LeaderRetrievalService获取

分两种情况

  • 没有启用HA

RM地址、Dispatcher地址、WebMonitor地址都保存在StandaloneHaServices对象中,也就是内存中,地址不会发生变化。

地址为Akka地址,格式:protocolPrefix://flink@hostname:port/user/rpc/endpointName,hostname和port为JobManager的rpc host和port,三者仅在最后的endpointName上有区别。

  • 启用了HA

可以是ZK或者K8S

ZK是通过NodeCache监听了一个节点的数据变化,这个节点上保存了leader信息

K8S是Watch了一个ConfigMap

代码语言:java
复制
JobMaster.java
   
  // LeaderRetrievalService用于发现leader,启用HA时start方法会创建一个具体的LeaderRetrievalDriver
  // driver上的leader切换事件最后会通知到LeaderRetrievalListener的notifyLeaderAddress上
  // ResourceManagerLeaderListener实现了LeaderRetrievalListener接口
  LeaderRetrievalService resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever()
  resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener())
  ResourceManagerLeaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID)  

3). 什么时候与RM建立连接?

如1)所述,入口是reconnectToResourceManager,该方法在三处地方被调用

JM发现自己与RM心跳超时,JM会重连RM

代码语言:java
复制
JobMaster.java
    ResourceManagerHeartbeatListener.notifyHeartbeatTimeout
        reconnectToResourceManager

JM发现RM切leader,JM会重连新的RM

代码语言:java
复制
JobMaster.java
    ResourceManagerLeaderListener.notifyLeaderAddress
        notifyOfNewResourceManagerLeader
        	createResourceManagerAddress
        	reconnectToResourceManager

RM发现自己与JM心跳超时,RM会通知JM去重连RM

代码语言:java
复制
ResourceManager.java
    JobManagerHeartbeatListener.notifyHeartbeatTimeout
        closeJobManagerConnection
        	jobMasterGateway.disconnectResourceManager(getFencingToken(), cause)
      		{
        		// JobMaster disconnectResourceManager方法内部
        		if (isConnectingToResourceManager(resourceManagerId)) {
                  	reconnectToResourceManager(cause);
              	}
      		}
      
      
    private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
        return resourceManagerAddress != null
            && resourceManagerAddress.getResourceManagerId().equals(resourceManagerId);
    }

TM连接RM,RM回连TM

1). 连接建立过程(TM主动连接RM,RM回连TM)

入口是TaskExecutor的reconnectToResourceManager方法

调用过程同上述1中JM连接RM类似,不再多述

2). TM如何获取RM地址?

通过LeaderRetrievalService获取,同上述1中JM获取RM地址一致

3). 什么时候与RM建立连接?

  1. TM发现自己与RM心跳超时,TM会重连RM
  2. TM发现RM切leader时,TM会重连RM
  3. RM发现自己与TM心跳超时,RM会通知TM去重连RM

TM连接JM,JM回连TM

1). 连接建立过程(TM主动连接JM,JM回连TM)

入口是TaskExecutor的disconnectAndTryReconnectToJobManager方法

调用过程同上述1中JM连接RM类似,只是这里将JM地址的切换感知放在了JobLeaderService中,默认实现类是DefaultJobLeaderService,不再多述。

2). TM如何获取JM地址?

通过LeaderRetrievalService获取,一个TM中是可以跑多个Job的Task的,也就会连多个JM

见DefaultJobLeaderService属性Map<JobID, Tuple2<LeaderRetrievalService, DefaultJobLeaderService.JobManagerLeaderListener>> jobLeaderServices。

3). 什么时候与JM建立连接?

  1. TM发现自己与JM心跳超时,TM会重连JM
  2. TM发现JM切leader,TM会重连新的JM
  3. JM发现自己与TM心跳超时,JM会通知TM去重连JM

JM主动连RM,TM主动连RM和JM,心跳超时或者切leader时会发生重连,那第一次建立连接在哪?

在notifyLeaderAddress方法中。

为什么是JM主动连接RM、TM主动连接RM和JM?

  1. Flink集群中先有ResourceManager和Dispatcher,有任务需要运行时Dispatcher才创建JobMaster(Dispatcher可创建多个JobMaster,一个JobGraph对应一个JobMaster)。ResourceManager地址是已知的, JobMaster连接上ResourceManager后调用ResourceManager的registerJobManager注册自己,ResourceManager再回连JobMaster。
  2. ResourceManager和Dispatcher在JobManager进程中,Flink集群是先启动JobManager进程后启动TaskManager进程,TaskManager进程可以有多个。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 简介
  • 接口设计
    • RpcGateway
      • RpcEndpoint
        • RpcService
          • RpcServer
          • 类关系图
          • 问题
            • 1. RPC服务端创建过程
              • 2. RPC客户端创建过程
                • 3. RPC调用流程
                  • 4. 在Flink集群中整个RPC通信网络是如何一步步建立起来的,连接容错又如何保证
                    • 总述
                    • JM连接RM,RM回连JM
                    • TM连接RM,RM回连TM
                    • TM连接JM,JM回连TM
                    • JM主动连RM,TM主动连RM和JM,心跳超时或者切leader时会发生重连,那第一次建立连接在哪?
                    • 为什么是JM主动连接RM、TM主动连接RM和JM?
                相关产品与服务
                流计算 Oceanus
                流计算 Oceanus 是大数据产品生态体系的实时化分析利器,是基于 Apache Flink 构建的企业级实时大数据分析平台,具备一站开发、无缝连接、亚秒延时、低廉成本、安全稳定等特点。流计算 Oceanus 以实现企业数据价值最大化为目标,加速企业实时化数字化的建设进程。
                领券
                问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档