问题导读
Flink基于Akka来实现内部各组件(ResourceManager、Dispatcher、JobMaster、TaskExecutor等)间的RPC通信。本篇着重分析Flink的RPC设计,如何封装Actor模型,RPC的创建和调用流程。
阅读说明: 源码版本:Flink release-1.14.4 阅读前提:了解Akka Actor基础知识 1). 先聊Flink的RPC设计,理清RpcGateway、RpcEndpoint、RpcService、RpcServer概念,也就弄明白了Flink如何封装Akka来实现RPC机制。有兴趣可继续往下阅读。 2). 结合代码,看问题1、2、3,进一步熟悉RPC的创建与交互过程。(重点关注AkkaRpcService、AkkaInvocationHandler、AkkaRpcActor类) 3). 第4个问题进一步延伸,主要是理清各组件间谁与谁会建立通信连接,先后顺序是怎样的,由此建立起整个RPC通信网络。在组件切leader、重启或者心跳超时等异常情况时,是否有容错机制重新建连。
结合下面RpcGateway、RpcEndpoint、RpcService、RpcServer概念看这两张图
是Rpc服务端自身的代理对象,设计上是供服务端调用自身非Rpc方法。
注:这里借用网上画的一张图
RPC服务端是一个代理对象。
入口
RpcEndpoint.java
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
// 生成代理对象
this.rpcServer = rpcService.startServer(this);
this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}
代理对象生成过程如下:
1. 以Ask方式向SupervisorActor发送StartAkkaRpcActor消息,SupervisorActor收到消息后根据消息里RpcEndpoint的配置信息创建Actor,并以tell方式回复创建成功。
AkkaRpcService.java
startServer(C rpcEndpoint)
registerAkkaRpcActor(rpcEndpoint)
{
// 创建Actor
SupervisorActor.startAkkaRpcActor(
supervisor.getActor(),
actorTerminationFuture ->
Props.create(
akkaRpcActorType,
rpcEndpoint,
actorTerminationFuture,
getVersion(),
configuration.getMaximumFramesize(),
flinkClassLoader),
rpcEndpoint.getEndpointId());
// 在RpcService中保存ActorRef与RpcEndpoint引用关系
actors.put(actorRegistration.getActorRef(), rpcEndpoint)
}
SupervisorActor.java
// 1) 发送消息
public static StartAkkaRpcActorResponse startAkkaRpcActor(
ActorRef supervisor, StartAkkaRpcActor.PropsFactory propsFactory, String endpointId) {
// 以Ask方式发送消息并等待结果
// Ask在实现上实际上是会创建一个Actor等待响应结果,成功或者超时时,销毁Actor
return Patterns.ask(
supervisor,
createStartAkkaRpcActorMessage(propsFactory, endpointId),
RpcUtils.INF_DURATION)
.toCompletableFuture()
.thenApply(SupervisorActor.StartAkkaRpcActorResponse.class::cast)
.join();
}
// 2) 处理消息
private void createStartAkkaRpcActorMessage(StartAkkaRpcActor startAkkaRpcActor) {
final String endpointId = startAkkaRpcActor.getEndpointId();
final Props akkaRpcActorProps = ...
...
try {
// 创建Actor
final ActorRef actorRef = getContext().actorOf(akkaRpcActorProps, endpointId);
registeredAkkaRpcActors.put(actorRef, akkaRpcActorRegistration);
// 回复消息
getSender().tell(
StartAkkaRpcActorResponse.success(...),
getSelf()
);
} catch (AkkaException akkaException) {
getSender().tell(StartAkkaRpcActorResponse.failure(akkaException), getSelf());
}
}
2. 准备代理对象要实现的接口
AkkaRpcService.java
startServer(C rpcEndpoint)
{
...
// 服务端对象实现了RpcGateway接口
Set<Class<?>> implementedRpcGateways =
new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
// 服务端对象是一个RpcServer
implementedRpcGateways.add(RpcServer.class);
// 服务端对象是Akka endpoint,可以获取到ActorRef引用
implementedRpcGateways.add(AkkaBasedEndpoint.class);
if (rpcEndpoint instanceof FencedRpcEndpoint){
implementedRpcGateways.add(FencedMainThreadExecutable.class);
}
}
3. 生成代理对象
AkkaRpcService.java
startServer(C rpcEndpoint)
{
...
RpcServer server =
(RpcServer)
Proxy.newProxyInstance(
classLoader,
implementedRpcGateways.toArray(
new Class<?>[implementedRpcGateways.size()]),
akkaInvocationHandler);
return server;
}
RPC客户端是一个代理对象。
入口:RpcService的connect(String address, Class<C> clazz)方法。
AkkaRpcService.java
connect(String address, Class<C> clazz)
{
...
// 1) 使用ActorSystem.actorSelection(address).resolveOne的方式来获取Actor的引用ActorRef(ActorRef可以用来向服务端Actor发送消息)
final CompletableFuture<ActorRef> actorRefFuture = resolveActorAddress(address);
// 2) ActorRef创建完成后,使用ask的方式向服务端发送一条握手消息(用来验证Client和Server彼此版本一致)
final CompletableFuture<HandshakeSuccessMessage> handshakeFuture =
actorRefFuture.thenCompose(
(ActorRef actorRef) ->
AkkaFutureUtils.toJava(
Patterns.ask(
actorRef,
new RemoteHandshakeMessage(
clazz, getVersion()),
configuration.getTimeout().toMilliseconds())
.<HandshakeSuccessMessage>mapTo(
ClassTag$.MODULE$
.<HandshakeSuccessMessage>apply(
HandshakeSuccessMessage
.class))));
// 3) 以上2个事都做完后,异步创建代理对象并返回
final CompletableFuture<C> gatewayFuture =
actorRefFuture.thenCombineAsync(
handshakeFuture,
(ActorRef actorRef, HandshakeSuccessMessage ignored) -> {
// invocationHandlerFactory.apply(actorRef) = new AkkaInvocationHandler 或 new 或 FencedAkkaInvocationHandler
InvocationHandler invocationHandler =
invocationHandlerFactory.apply(actorRef);
ClassLoader classLoader = getClass().getClassLoader();
C proxy =
(C)
Proxy.newProxyInstance(
classLoader,
new Class<?>[] {clazz},
invocationHandler);
return proxy;
},
actorSystem.dispatcher());
return guardCompletionWithContextClassLoader(gatewayFuture, flinkClassLoader);
}
1. 通过客户端代理对象调用RpcGateway的方法会交由invoke方法执行。
2. invoke将方法、参数信息封装为RpcInvocation对象,并通过ActorRef将消息发送给服务端Actor。
如果执行的方法有返回值就使用Akka ask方式,否则以tell方式发送消息。 通过连接的服务端的地址可以判断出服务端在远程还是本地。 如果在远程,消息类型为RemoteRpcInvocation,实现了序列化接口,对象可序列化传输。(会判断methodName + parameterTypes + args序列化后的字节数是否超时指定的值,见参数akka.remote.netty.tcp.maximum-frame-size) 如果在本地,消息类型为LocalRpcInvocation。
3. 服务端Actor收到RpcInvocation消息,会从中获取到方法名、方法参数等相关信息,在主线程中通过反射的方式调用代理对象对应方法执行业务逻辑,如果方法有返回值,还会以tell方法告知客户端结果。
客户端相关代码如下:
class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, RpcServer{
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class<?> declaringClass = method.getDeclaringClass();
Object result;
// 非Rpc方法,直接本地执行。这个是服务端通过自己的代理对象RpcServer调用自己非Rpc方法时走的逻辑
if (declaringClass.equals(AkkaBasedEndpoint.class)
|| declaringClass.equals(Object.class)
|| declaringClass.equals(RpcGateway.class)
|| declaringClass.equals(StartStoppable.class)
|| declaringClass.equals(MainThreadExecutable.class)
|| declaringClass.equals(RpcServer.class)) {
result = method.invoke(this, args);
} else if (declaringClass.equals(FencedRpcGateway.class)) { // 支持HA的见FencedAkkaInvocationHandler
throw new UnsupportedOperationException(...);
} else {
// RPC方法,指RpcGateway子接口中定义的方法
// 接口:ResourceManagerGateway、DispatcherGateway、JobMasterGateway、MetricQueryServiceGateway、TaskExecutorGateway
result = invokeRpc(method, args);
}
return result;
}
private Object invokeRpc(Method method, Object[] args) throws Exception {
...
// 1) 封装消息
final RpcInvocation rpcInvocation =
createRpcInvocationMessage(methodName, parameterTypes, args);
// 2) 借助akka发送消息,进行RPC调用
Class<?> returnType = method.getReturnType();
final Object result;
if (Objects.equals(returnType, Void.TYPE)) {
// 无返回值,用akka tell模式
tell(rpcInvocation);
result = null;
} else {
...
// 有返回值,用akka ask模式
final CompletableFuture<?> resultFuture = ask(rpcInvocation, futureTimeout);
...
}
return result;
}
}
服务端相关代码如下:
AkkaRpcActor.java
private void handleRpcInvocation(RpcInvocation rpcInvocation) {
Method rpcMethod = null;
try {
String methodName = rpcInvocation.getMethodName();
Class<?>[] parameterTypes = rpcInvocation.getParameterTypes();
rpcMethod = lookupRpcMethod(methodName, parameterTypes);
} catch (Exception e) {
...
getSender().tell(new Status.Failure(rpcException), getSelf());
}
if (rpcMethod != null) {
try {
rpcMethod.setAccessible(true);
final Method capturedRpcMethod = rpcMethod;
// 1) 无返回值
if (rpcMethod.getReturnType().equals(Void.TYPE)) {
runWithContextClassLoader(
() -> capturedRpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()),
flinkClassLoader);
} else {
// 2) 有返回值
final Object result;
try {
result =
runWithContextClassLoader(
() ->
capturedRpcMethod.invoke(
rpcEndpoint, rpcInvocation.getArgs()),
flinkClassLoader);
} catch (InvocationTargetException e) {
...
getSender().tell(new Status.Failure(e.getTargetException()), getSelf());
return;
}
final String methodName = rpcMethod.getName();
if (result instanceof CompletableFuture) {
final CompletableFuture<?> responseFuture = (CompletableFuture<?>) result;
sendAsyncResponse(responseFuture, methodName);
} else {
sendSyncResponse(result, methodName);
}
}
} catch (Throwable e) {
...
getSender().tell(new Status.Failure(e), getSelf());
}
}
}
哪些组件之间会建立RPC连接,什么时候会建立,连接又是如何建立起来的?
1). Rpc服务端作为服务端提供了Rpc服务,其内部也有其他Rpc服务的客户端
JobMaster主动连接ResourceManager,ResourceManager回连JobMaster
TaskManager主动连接ResourceManager,ResourceManager回连TaskManager
TaskManager主动连接JobMaster,JobMaster回连TaskManager
2). Dispatcher会连接ResourceManager的Rpc服务
是通过GatewayRetriever来发现ResourceManager的Rpc地址信息
3). WebMonitorEndpoint会连接Dispatcher和ResourceManager的Rpc服务
是通过GatewayRetriever来发现Dispatcher和ResourceManager的Rpc地址信息
WebMonitorEndpoint是一个基于netty实现的rest服务,非Rpc服务端
4). WebMonitorEndpoint会连接JobManager进程中的MetricQueryService Rpc服务和TaskManager进程中的MetricQueryService Rpc服务
步骤:
5). JobClient会连接Dispatcher的Rpc服务
下面重点说明JobMaster、ResourceManager、TaskManager之间的连接建立过程,为了描述方便,JM指JobMaster,RM指ResourceManager,TM指TaskManager
1). 连接建立过程(JM主动连接RM,RM回连JM)
入口是JobMaster的reconnectToResourceManager方法
调用链使用伪码表示如下:
JobMaster.java
reconnectToResourceManager
tryConnectToResourceManager
connectToResourceManager
{
// 这里用resourceManagerAddress地址重新构建一个连接
// RM地址发生切换时,resourceManagerAddress值也会得到更新
resourceManagerConnection = new ResourceManagerConnection(..., resourceManagerAddress, ...)
resourceManagerConnection.start
}
RegisteredRpcConnection.java
start
newRegistration.startRegistration
RetryingRegistration.java
startRegistration
// 1). 建立与RM的连接
rpcService.connect
// 2). 向RM注册自己
register
JobMaster.ResourceManagerConnection.invokeRegistration
gateway.registerJobManager
{
// 3). RM方法内部回连JM
getRpcService().connect(jobManagerAddress, jobMasterId, JobMasterGateway.class)
}
2). JM如何获取RM地址?
通过LeaderRetrievalService获取
分两种情况
RM地址、Dispatcher地址、WebMonitor地址都保存在StandaloneHaServices对象中,也就是内存中,地址不会发生变化。
地址为Akka地址,格式:protocolPrefix://flink@hostname:port/user/rpc/endpointName,hostname和port为JobManager的rpc host和port,三者仅在最后的endpointName上有区别。
可以是ZK或者K8S
ZK是通过NodeCache监听了一个节点的数据变化,这个节点上保存了leader信息
K8S是Watch了一个ConfigMap
JobMaster.java
// LeaderRetrievalService用于发现leader,启用HA时start方法会创建一个具体的LeaderRetrievalDriver
// driver上的leader切换事件最后会通知到LeaderRetrievalListener的notifyLeaderAddress上
// ResourceManagerLeaderListener实现了LeaderRetrievalListener接口
LeaderRetrievalService resourceManagerLeaderRetriever = highAvailabilityServices.getResourceManagerLeaderRetriever()
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener())
ResourceManagerLeaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID)
3). 什么时候与RM建立连接?
如1)所述,入口是reconnectToResourceManager,该方法在三处地方被调用
JM发现自己与RM心跳超时,JM会重连RM
JobMaster.java
ResourceManagerHeartbeatListener.notifyHeartbeatTimeout
reconnectToResourceManager
JM发现RM切leader,JM会重连新的RM
JobMaster.java
ResourceManagerLeaderListener.notifyLeaderAddress
notifyOfNewResourceManagerLeader
createResourceManagerAddress
reconnectToResourceManager
RM发现自己与JM心跳超时,RM会通知JM去重连RM
ResourceManager.java
JobManagerHeartbeatListener.notifyHeartbeatTimeout
closeJobManagerConnection
jobMasterGateway.disconnectResourceManager(getFencingToken(), cause)
{
// JobMaster disconnectResourceManager方法内部
if (isConnectingToResourceManager(resourceManagerId)) {
reconnectToResourceManager(cause);
}
}
private boolean isConnectingToResourceManager(ResourceManagerId resourceManagerId) {
return resourceManagerAddress != null
&& resourceManagerAddress.getResourceManagerId().equals(resourceManagerId);
}
1). 连接建立过程(TM主动连接RM,RM回连TM)
入口是TaskExecutor的reconnectToResourceManager方法
调用过程同上述1中JM连接RM类似,不再多述
2). TM如何获取RM地址?
通过LeaderRetrievalService获取,同上述1中JM获取RM地址一致
3). 什么时候与RM建立连接?
1). 连接建立过程(TM主动连接JM,JM回连TM)
入口是TaskExecutor的disconnectAndTryReconnectToJobManager方法
调用过程同上述1中JM连接RM类似,只是这里将JM地址的切换感知放在了JobLeaderService中,默认实现类是DefaultJobLeaderService,不再多述。
2). TM如何获取JM地址?
通过LeaderRetrievalService获取,一个TM中是可以跑多个Job的Task的,也就会连多个JM
见DefaultJobLeaderService属性Map<JobID, Tuple2<LeaderRetrievalService, DefaultJobLeaderService.JobManagerLeaderListener>> jobLeaderServices。
3). 什么时候与JM建立连接?
在notifyLeaderAddress方法中。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。