前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >flink源码分析之TaskManager启动篇

flink源码分析之TaskManager启动篇

作者头像
山行AI
发布于 2020-10-27 08:41:05
发布于 2020-10-27 08:41:05
3K00
代码可运行
举报
文章被收录于专栏:山行AI山行AI
运行总次数:0
代码可运行

在flink社区上关于flink集群的剖析中讲到:flink运行时由两种类型的进程组成,一个jobManager和一个或多个TaskManager。

Flink Cluster组成

一张经典的flink 集群组成图如下:

客户端不是运行时和程序执行的一部分,但用于准备数据流并将其发送到JobManager。之后,客户端可以断开连接(分离模式 detached mode),或者保持连接以接收进度报告(附加模式 attached mode)。客户机可以作为触发执行的Java/Scala程序的一部分运行,也可以在命令行使用./bin/flink run...开启进程中运行。

JobManager和taskmanager可以以各种方式启动:作为独立集群直接在机器上启动,或者在容器中启动,或者由YARN或Mesos等资源框架管理。TaskManagers连接到JobManagers,宣布它们是可用的,并分配工作。

由于篇幅有限,这里我们主要关注下TaskManager的相关内容,关于JobManager的后面再具体来分析。

TaskManager

TaskManager就是执行数据流中任务以及缓冲和交换数据流的worker。必须始终至少有一个TaskManager。任务管理器中资源调度的最小单元是任务槽。任务管理器中的任务槽数表示并发处理任务的数量。注意,多个操作算子可能在一个任务槽中执行。

每个worker (TaskManager)都是一个JVM进程,可以在单独的线程中执行一个子任务。为了控制TaskManager接受多少任务,它有所谓的Task slot(至少一个)。

每个Task slot表示TaskManager的一个固定资源子集。例如,一个有三个插槽的TaskManager,会将其托管内存的1/3分配给每个插槽。对资源进行分槽意味着子任务不会与其他作业的子任务争夺托管内存,而是拥有一定数量的保留托管内存。注意,这里没有发生CPU隔离;当前插槽只分隔任务的托管内存。

通过调整任务槽的数量,用户可以定义子任务如何相互隔离。每个TaskManager有一个槽意味着每个任务组在单独的JVM中运行(例如,JVM可以在单独的容器中启动)。拥有多个槽意味着更多子任务共享同一个JVM。相同JVM中的任务共享TCP连接(通过多路复用)和心跳消息。它们还可以共享数据集和数据结构,从而减少每个任务的开销。

默认情况下,Flink允许子任务共享插槽,即使它们是不同任务的子任务,只要它们来自相同的作业。结果是,一个插槽可以容纳作业的整个管道。允许这种插槽共享有两个主要好处:

1.Flink集群需要的任务槽数与作业中使用的最高并行度相同。不需要计算一个程序总共包含多少任务(具有不同的并行性)2.更容易获得更好的资源利用。如果没有槽共享,非密集的source/map()子任务将阻塞和资源密集的窗口子任务一样多的资源。使用插槽共享,将示例中的基本并行性从2个增加到6个,可以充分利用有插槽的资源,同时确保繁重的子任务在TaskManager中得到公平分配。

关于上面对于flink taskManager的更多介绍,可以自行查阅flink官方文档[1]。下面将进入对TaskManager启动流程的源码分析部分。

TaskManager启动流程分析

这里的源码分析,我们以本地MiniCluster中各组件的启动流程为例。在本地提交一个job时,会启动一个MiniCluster,在这个集群内部分进行各组件的初始化操作,其中也包括TaskManager的启动。

直接来看org.apache.flink.runtime.minicluster.MiniCluster#start方法的部分代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    if (useSingleRpcService) {
                    // we always need the 'commonRpcService' for auxiliary calls
                    // 这里会创建local rpc service
                    commonRpcService = createLocalRpcService(configuration);
                    final CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService);
                    taskManagerRpcServiceFactory = commonRpcServiceFactory;
                    dispatcherResourceManagreComponentRpcServiceFactory = commonRpcServiceFactory;
                    metricQueryServiceRpcService = MetricUtils.startLocalMetricsRpcService(configuration);
    } else {
        ------
            dispatcherResourceManagreComponentRpcServiceFactory =
            new DedicatedRpcServiceFactory(
            configuration,
            // jobManager地址
            jobManagerExternalAddress,
            jobManagerExternalPortRange,
            jobManagerBindAddress);
        taskManagerRpcServiceFactory =
            new DedicatedRpcServiceFactory(
            configuration,
            // taskManager地址
            taskManagerExternalAddress,
            taskManagerExternalPortRange,
            taskManagerBindAddress);
    }   
 --------------省略部分代码----------------       
// 启动QueryService,在MetricRegistryImpl中,会启动一个rpc server
metricRegistry.startQueryService(metricQueryServiceRpcService, null);
// 执行io操作的线程池
ioExecutor = Executors.newFixedThreadPool(
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("mini-cluster-io"));
// 高可用服务
haServices = createHighAvailabilityServices(configuration, ioExecutor);
// blobServer用于管理一些jar包等
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
// 心跳服务
heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
// blob缓存服务,用于BroadCast的缓存管理等
blobCacheService = new BlobCacheService(
configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort())
);
// 启动taskManager,会启动taskExecutor,也会启动一个rpcServer
startTaskManagers();

关于rpcService加载的部分,会根据是否使用单例的rpcService来决定是创建CommonRpcServiceFactory还是DedicatedRpcServiceFactory。CommonRpcServiceFactory产生的是一个单例的rpcService,而DedicatedRpcServiceFactory每次产生的是不同的rpcService。

CommonRpcServiceFactory

如果使用CommonRpcServiceFactory,它rpcService产生的方法为:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
protected RpcService createLocalRpcService(Configuration configuration) throws Exception {
        return AkkaRpcServiceUtils.localServiceBuilder(configuration)
            .withCustomConfig(AkkaUtils.testDispatcherConfig())
            .createAndStart();
    }

这里会创建一个CommonRpcServiceFactory中使用的单例的akkaRpcService,对于JobManager的一些组件来说使用的也是这个akkaRpcService。

DedicatedRpcServiceFactory

而如果使用DedicatedRpcServiceFactory,它会调用org.apache.flink.runtime.minicluster.MiniCluster.DedicatedRpcServiceFactory#createRpcService方法创建RemoteRpcService:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
        @Override
        public RpcService createRpcService() throws Exception {
            final RpcService rpcService = MiniCluster.this.createRemoteRpcService(
                configuration, externalAddress, externalPortRange, bindAddress);
            synchronized (lock) {
                rpcServices.add(rpcService);
            }
            return rpcService;
        }
        protected RpcService createRemoteRpcService(
            Configuration configuration,
            String externalAddress,
            String externalPortRange,
            String bindAddress) throws Exception {
            return AkkaRpcServiceUtils.remoteServiceBuilder(configuration, externalAddress, externalPortRange)
                // 需要传入绑定地址
                .withBindAddress(bindAddress)
                .withCustomConfig(AkkaUtils.testDispatcherConfig())
                .createAndStart();
    }

参考上文DedicatedRpcServiceFactory的使用部分可以看出:对于jobManager和TaskManager来说会有不同的DedicatedRpcServiceFactory来产生相应的RpcService,它们会绑定不同的外部地址。

RpcService

这里我们主要来分析下AkkaRpcService的相关内容。

创建过程

它的创建过程发生在AkkaRpcServiceUtils#createAndStart方法中:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public AkkaRpcService createAndStart() throws Exception {
        ------------------省略部分---------------
            final ActorSystem actorSystem;
            if (externalAddress == null) {
                // create local actor system
                // 创建本地的actor system,不需要外部地址,所有服务共用同一套actorSystem
                actorSystem = BootstrapTools.startLocalActorSystem(
                    configuration,
                    actorSystemName,
                    logger,
                    actorSystemExecutorConfiguration,
                    customConfig);
            } else {
                // create remote actor system
                // 因为是与外部地址绑定的,所以对于jobManager和TaskManager来说各自有一套actkorSystem
                actorSystem = BootstrapTools.startRemoteActorSystem(
                    configuration,
                    actorSystemName,
                    externalAddress,
                    externalPortRange,
                    bindAddress,
                    Optional.ofNullable(bindPort),
                    logger,
                    actorSystemExecutorConfiguration,
                    customConfig);
            }
            return new AkkaRpcService(actorSystem, AkkaRpcServiceConfiguration.fromConfiguration(configuration));
        }

这里对于actorSystem需要关注下面两点:

•创建本地的actor system,不需要外部地址,所有服务共用同一套actorSystem•因为是与外部地址绑定的,所以对于jobManager和TaskManager来说各自有一套actkorSystem

构造方法
代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@VisibleForTesting
    public AkkaRpcService(final ActorSystem actorSystem, final AkkaRpcServiceConfiguration configuration) {
        this.actorSystem = checkNotNull(actorSystem, "actor system");
        this.configuration = checkNotNull(configuration, "akka rpc service configuration");
        // 需要传入一个actorSystem
        Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
        if (actorSystemAddress.host().isDefined()) {
            address = actorSystemAddress.host().get();
        } else {
            address = "";
        }
        if (actorSystemAddress.port().isDefined()) {
            port = (Integer) actorSystemAddress.port().get();
        } else {
            port = -1;
        }
        captureAskCallstacks = configuration.captureAskCallStack();
        // 内部调度执行器
        internalScheduledExecutor = new ActorSystemScheduledExecutorAdapter(actorSystem);
        terminationFuture = new CompletableFuture<>();
        stopped = false;
        // actor体系中的监督者,主要用于服务重启,对于flink的重启机制发挥作用的地方,它有自己的executorService
        supervisor = startSupervisorActor();
    }

主要执行一些属性的初始化操作,并且会创建并启动对应actorSystem的监督者。

org.apache.flink.runtime.rpc.akka.AkkaRpcService#startServer方法

AkkaRpcService#startServer方法的调用位置在TaskExecutor的父类RpcEndpoint中,在执行TaskExecutor的构造方法时会调用RpcEndpoint的构造方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
        this.rpcService = checkNotNull(rpcService, "rpcService");
        this.endpointId = checkNotNull(endpointId, "endpointId");
        this.rpcServer = rpcService.startServer(this);
        this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
    }

我们聚焦到 rpcService.startServer(this),我们先关注下这个this,在下面会有比较重要的作用。

我们先来看下startServer方法代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @Override
    public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
        checkNotNull(rpcEndpoint, "rpc endpoint");
        // 注册akkaRpcActor,TaskExecutor不是FencedRpcEndpoint类型的,JobMaster是FencedRpcEndpoint类型的
        final SupervisorActor.ActorRegistration actorRegistration = registerAkkaRpcActor(rpcEndpoint);
        // 获取注册后得到的actor对象
        final ActorRef actorRef = actorRegistration.getActorRef();
        // TerminationFuture,可以在里面执行一些中止后的操作
        final CompletableFuture<Void> actorTerminationFuture = actorRegistration.getTerminationFuture();
        LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());
        // 获得对应actor对象在对应的actorSystem中的地址
        final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
        final String hostname;
        Option<String> host = actorRef.path().address().host();
        if (host.isEmpty()) {
            hostname = "localhost";
        } else {
            hostname = host.get();
        }
        Set<Class<?>> implementedRpcGateways = new HashSet<>(RpcUtils.extractImplementedRpcGateways(rpcEndpoint.getClass()));
        // 为代理接口中添加RpcServer接口
        implementedRpcGateways.add(RpcServer.class);
        // 添加AkkaBasedEndpoint接口
        implementedRpcGateways.add(AkkaBasedEndpoint.class);
        // 用于触发代理类方法的invocationHandler
        final InvocationHandler akkaInvocationHandler;
        // 如果是FencedRpcEndpoint,这里对应的akkaInvocationHandler为FencedAkkaInvocationHandler类型,JobMaster返回的是这种类型的
        if (rpcEndpoint instanceof FencedRpcEndpoint) {
            // a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
            akkaInvocationHandler = new FencedAkkaInvocationHandler<>(
                akkaAddress,
                hostname,
                actorRef,
                configuration.getTimeout(),
                configuration.getMaximumFramesize(),
                actorTerminationFuture,
                ((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken,
                captureAskCallstacks);
           implementedRpcGateways.add(FencedMainThreadExecutable.class);
        } else {// 如果不是FencedRpcEndpoint,这里返回的是AkkaInvocationHandler,TaskExecutor返回的是这种类型的
            akkaInvocationHandler = new AkkaInvocationHandler(
                akkaAddress,
                hostname,
                actorRef,
                configuration.getTimeout(),
                configuration.getMaximumFramesize(),
                actorTerminationFuture,
                captureAskCallstacks);
        }
        // Rather than using the System ClassLoader directly, we derive the ClassLoader
        // from this class . That works better in cases where Flink runs embedded and all Flink
        // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
        ClassLoader classLoader = getClass().getClassLoader();
        // jdk的动态代理,对implementedRpcGateways里维护的接口实现进行代理,会在akkaInvocationHandler的invoke方法中反射调用接口中的一些方法
        @SuppressWarnings("unchecked")
        RpcServer server = (RpcServer) Proxy.newProxyInstance(
            classLoader,
            implementedRpcGateways.toArray(new Class<?>[implementedRpcGateways.size()]),
            akkaInvocationHandler);
        return server;
    }

根据流程来看,主要分为以下几步:

•注册akkaRpcActor,TaskExecutor不是FencedRpcEndpoint类型的,注册的是AkkaRpcActor类型的actor,JobMaster是FencedRpcEndpoint类型的,注册的是FencedAkkaRpcActor类型的actor。这点有疑惑的可以参考下下面这个类图:

•FencedRpcEndpoint类型的会创建FencedAkkaInvocationHandler类型的invocationHandler,否则会创建AkkaInvocationHandler类型的invocationHandler。为RpcServer和AkkaBasedEndpoint接口创建动态代理,这是一个jdk的动态代理,对implementedRpcGateways里维护的接口实现进行代理,会在akkaInvocationHandler的invoke方法中反射调用接口中的一些方法。

可以看出在这里返回的RpcServer是一个动态代理。也就是说在TaskExecutor的父类RpcEndpoint中维护的属性RpcServer rpcServer是一个动态代理,这时我们再回过头来看taskExecutor.start方法的调用链:

代码链截图如下:

其中org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler#start方法代码如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
    public void start() {
        rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
    }

这里会通过rpcEndpoint向actorSystem发送一条ControlMessages.START类型的消息,关于这条消息有什么用,怎么形成的闭环,请继续看下面的分析内容。

继续来看启动流程

当然,org.apache.flink.runtime.minicluster.MiniCluster#start方法会加载一系列的服务和JobManager的几个关键组件ResourceManager、Dispatcher、JobMaster等,这里我们主要关注TaskManager的启动流程。

进入到org.apache.flink.runtime.minicluster.MiniCluster#startTaskManagers方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@GuardedBy("lock")
private void startTaskManagers() throws Exception {
    final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();
    LOG.info("Starting {} TaskManger(s)", numTaskManagers);
    for (int i = 0; i < numTaskManagers; i++) {
        startTaskExecutor();
    }
}

在这里会去获取配置中需要启动的taskManager的个数,然后启动指定个数的TaskManager。

继续来看org.apache.flink.runtime.minicluster.MiniCluster#startTaskExecutor方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@VisibleForTesting
    void startTaskExecutor() throws Exception {
        synchronized (lock) {
            final Configuration configuration = miniClusterConfiguration.getConfiguration();
            // 创建TaskExecutor
            final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(
                configuration,
                // 这个用于在ResourceManager中标识TaskExecutor的使用资源
                new ResourceID(UUID.randomUUID().toString()),
                taskManagerRpcServiceFactory.createRpcService(),
                haServices,
                heartbeatServices,
                metricRegistry,
                blobCacheService,
                useLocalCommunication(),
                ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,
                taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));
            // 启动TaskExecutor
            taskExecutor.start();
            // 将taskExecutor加入到taskMangers列表中
            taskManagers.add(taskExecutor);
        }
    }

这里会创建并启动taskExecutor,并将启动了的taskExecutor放入到taskManagers列表中。

我们再来分析下org.apache.flink.runtime.taskexecutor.TaskManagerRunner#startTaskManager方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static TaskExecutor startTaskManager(
            Configuration configuration,
            ResourceID resourceID,
            RpcService rpcService,
            HighAvailabilityServices highAvailabilityServices,
            HeartbeatServices heartbeatServices,
            MetricRegistry metricRegistry,
            BlobCacheService blobCacheService,
            boolean localCommunicationOnly,
            ExternalResourceInfoProvider externalResourceInfoProvider,
            FatalErrorHandler fatalErrorHandler) throws Exception {
   -----------------省略部分代码------------------------------------
        String externalAddress = rpcService.getAddress();
        // 声明运行TaskExecutor所需要的资源
        final TaskExecutorResourceSpec taskExecutorResourceSpec = TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
        // TaskManager服务配置
        TaskManagerServicesConfiguration taskManagerServicesConfiguration =
            TaskManagerServicesConfiguration.fromConfiguration(
                configuration,
                resourceID,
                externalAddress,
                localCommunicationOnly,
                taskExecutorResourceSpec);
        // taskManager的Metric组,用于metric管理
        Tuple2<TaskManagerMetricGroup, MetricGroup> taskManagerMetricGroup = MetricUtils.instantiateTaskManagerMetricGroup(
            metricRegistry,
            externalAddress,
            resourceID,
            taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
        // taskExecutor的执行线程池,它的个数是根据cluster.io-pool.size参数配置,如果没有配置会使用cpu核数的四倍
        final ExecutorService ioExecutor = Executors.newFixedThreadPool(
            taskManagerServicesConfiguration.getNumIoThreads(),
            new ExecutorThreadFactory("flink-taskexecutor-io"));
        // TaskManager的一系列服务
        TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
            taskManagerServicesConfiguration,
            blobCacheService.getPermanentBlobService(),
            taskManagerMetricGroup.f1,
            ioExecutor,
            fatalErrorHandler);
             ------------------省略部分代码---------------------
        return new TaskExecutor(
            rpcService,
            taskManagerConfiguration,
            highAvailabilityServices,
            taskManagerServices,
            externalResourceInfoProvider,
            heartbeatServices,
            taskManagerMetricGroup.f0,
            metricQueryServiceAddress,
            blobCacheService,
            fatalErrorHandler,
            new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
            createBackPressureSampleService(configuration, rpcService.getScheduledExecutor()));         
    }

在这个方法中会依次声明运行TaskExecutor所需要的资源,然后创建taskExecutor的执行线程池,它的个数是根据cluster.io-pool.size参数配置,如果没有配置会使用cpu核数的四倍,接着会创建TaskManager运行需要的一系列服务,最终会依据这些属性创建TaskExecutor实例并返回。

我们进入到org.apache.flink.runtime.taskexecutor.TaskManagerServices#fromConfiguration方法中来看下具体代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public static TaskManagerServices fromConfiguration(
            TaskManagerServicesConfiguration taskManagerServicesConfiguration,
            PermanentBlobService permanentBlobService,
            MetricGroup taskManagerMetricGroup,
            ExecutorService ioExecutor,
            FatalErrorHandler fatalErrorHandler) throws Exception {
        // pre-start checks
        checkTempDirs(taskManagerServicesConfiguration.getTmpDirPaths());
        // 类似于一个even source模型,内部维护着一个分区与TaskEventHandler之间关系的map,其他线程可以
        // 执行subscribe进行事件订阅,在有事件发生时TaskEventDispatcher会执行publish方法进行通知,主要为了达到某个地方发生改变后,关注的位置也相应作出反应。
        final TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        // start the I/O manager, it will create some temp directories.
        // 异步的io管理器,主要用于控制io操作
        final IOManager ioManager = new IOManagerAsync(taskManagerServicesConfiguration.getTmpDirPaths());
         // shuffle环境,在这个方法内部会有一个SPI的拓展点,会从配置的shuffle-service-factory.class中加载用户定义的shuffle-service-factory。内部提供了一个NettyShuffleServiceFactory的实现,会创建NettyShuffleEnvironment对象
        final ShuffleEnvironment<?, ?> shuffleEnvironment = createShuffleEnvironment(
            taskManagerServicesConfiguration,
            taskEventDispatcher,
            taskManagerMetricGroup,
            ioExecutor);
        // 启动shuffleEnvironment,如果使用的是NettyShuffleServiceFactory这里会启动一个netty server和netty client,并和dubbo 协议类似用一个ConnectionId维护一个PartitionRequestClient,关于这个内部的细节后续会详细来分析
        final int listeningDataPort = shuffleEnvironment.start();
        // 创建kvState服务
        final KvStateService kvStateService = KvStateService.fromConfiguration(taskManagerServicesConfiguration);
           // 启动kvState服务
        kvStateService.start();
        // 未能解析的taskManager的位置
        final UnresolvedTaskManagerLocation unresolvedTaskManagerLocation = new UnresolvedTaskManagerLocation(
            taskManagerServicesConfiguration.getResourceID(),
            taskManagerServicesConfiguration.getExternalAddress(),
            // we expose the task manager location with the listening port
            // iff the external data port is not explicitly defined
            taskManagerServicesConfiguration.getExternalDataPort() > 0 ?
                taskManagerServicesConfiguration.getExternalDataPort() :
                listeningDataPort);
        // 广播变量管理器
        final BroadcastVariableManager broadcastVariableManager = new BroadcastVariableManager();
        // 任务槽table
        final TaskSlotTable<Task> taskSlotTable = createTaskSlotTable(
            taskManagerServicesConfiguration.getNumberOfSlots(),
            taskManagerServicesConfiguration.getTaskExecutorResourceSpec(),
            taskManagerServicesConfiguration.getTimerServiceShutdownTimeout(),
            taskManagerServicesConfiguration.getPageSize(),
            ioExecutor);
        // 任务table
        final JobTable jobTable = DefaultJobTable.create();
        // 任务leader服务
        final JobLeaderService jobLeaderService = new DefaultJobLeaderService(unresolvedTaskManagerLocation, taskManagerServicesConfiguration.getRetryingRegistrationConfiguration());
        // state根目录
        final String[] stateRootDirectoryStrings = taskManagerServicesConfiguration.getLocalRecoveryStateRootDirectories();
        final File[] stateRootDirectoryFiles = new File[stateRootDirectoryStrings.length];
        for (int i = 0; i < stateRootDirectoryStrings.length; ++i) {
            stateRootDirectoryFiles[i] = new File(stateRootDirectoryStrings[i], LOCAL_STATE_SUB_DIRECTORY_ROOT);
        }
        // taskState管理器
        final TaskExecutorLocalStateStoresManager taskStateManager = new TaskExecutorLocalStateStoresManager(
            taskManagerServicesConfiguration.isLocalRecoveryEnabled(),
            stateRootDirectoryFiles,
            ioExecutor);
         // 当jvm的元数据空间内存溢出时的错误
        final boolean failOnJvmMetaspaceOomError =
taskManagerServicesConfiguration.getConfiguration().getBoolean(CoreOptions.FAIL_ON_USER_CLASS_LOADING_METASPACE_OOM);
        // 库缓存管理器
        final LibraryCacheManager libraryCacheManager = new BlobLibraryCacheManager(
            permanentBlobService,
            BlobLibraryCacheManager.defaultClassLoaderFactory(
                taskManagerServicesConfiguration.getClassLoaderResolveOrder(),
                taskManagerServicesConfiguration.getAlwaysParentFirstLoaderPatterns(),
                failOnJvmMetaspaceOomError ? fatalErrorHandler : null));
        // 返回TaskManager服务
        return new TaskManagerServices(
            unresolvedTaskManagerLocation,
            taskManagerServicesConfiguration.getManagedMemorySize().getBytes(),
            ioManager,
            shuffleEnvironment,
            kvStateService,
            broadcastVariableManager,
            taskSlotTable,
            jobTable,
            jobLeaderService,
            taskStateManager,
            taskEventDispatcher,
            ioExecutor,
            libraryCacheManager);
    }

这个方法的主要功能是创建TaskManager中的一系列服务,按照流程它的操作主要如下:

•创建一个类似于even source的模型,内部维护着一个分区与TaskEventHandler之间关系的map,其他线程可以执行subscribe进行事件订阅,在有事件发生时TaskEventDispatcher会执行publish方法进行通知。•创建异步的io管理器,主要用于控制io操作。•创建ShuffleEnvironment,在这个方法内部会有一个SPI的拓展点,会从配置的shuffle-service-factory.class中加载用户定义的shuffle-service-factory。内部提供了一个NettyShuffleServiceFactory的实现,会创建NettyShuffleEnvironment对象。•启动shuffleEnvironment,如果使用的是NettyShuffleServiceFactory这里会启动一个netty server和netty client,并和dubbo 协议类似用一个ConnectionId维护一个PartitionRequestClient,关于这个内部的细节后续会详细来分析。•创建kvState服务。•启动kvState服务•创建 广播变量管理器•初始化任务槽table,里面维护着任务槽和task manager之间的关系•创建JobTable,jobTable中维护的信息如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
  private final Map<JobID, JobOrConnection> jobs;
      private final Map<ResourceID, JobID> resourceIdJobIdIndex;
      private DefaultJobTable() {
          this.jobs = new HashMap<>();
          this.resourceIdJobIdIndex = new HashMap<>();
     }

维护着任务容器和resourceId与jobId关系的容器。

•创建JobLeaderService•创建TaskExecutor的state管理器•创建库文件缓存管理器。

TashExecutor的启动

启动的代码在org.apache.flink.runtime.minicluster.MiniCluster#startTaskExecutor方法中:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
taskExecutor.start();

它会调用org.apache.flink.runtime.rpc.RpcEndpoint#start方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
public final void start() {
        rpcServer.start();
    }

那么rpcServer来自哪里呢?

我们看下RpcEndpoint的构造方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
        this.rpcService = checkNotNull(rpcService, "rpcService");
        this.endpointId = checkNotNull(endpointId, "endpointId");
        this.rpcServer = rpcService.startServer(this);
        this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
    }

可见rpcServer来自于rpcService,其中RpcEndpoint是TaskExecutor的父类,我们此时回过头来看下TaskExecutor的构造方法有下面这一行代码:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));

那么对于rpcService我们上文有提到过,它是AkkaRpcService类型的。

那么上面RpcEndpoint中rpcService.startServer(this)调用的就是org.apache.flink.runtime.rpc.akka.AkkaRpcService#startServer方法,会返回一个RpcServer代理,包装的是AkkaInvocationHandler对象。

AkkaInvocationHandler中的start方法如下:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
    public void start() {
        rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
    }

也就是说,在调用taskExecutor.start()方法时最后会调用上面这个AkkaInvocationHandler.start()方法,这里注意下它发出的消息是一个ControlMessages.START格式的,这对于接下来非常有用。

我们来反推一下:org.apache.flink.runtime.taskexecutor.TaskExecutor#onStart方法在org.apache.flink.runtime.rpc.RpcEndpoint#internalCallOnStart中会回调,而internalCallOnStart方法会在org.apache.flink.runtime.rpc.akka.AkkaRpcActor.StoppedState#start中调用。StoppedState#start方法会在org.apache.flink.runtime.rpc.akka.AkkaRpcActor#handleControlMessage方法中调用,而handleControlMessage方法会在org.apache.flink.runtime.rpc.akka.AkkaRpcActor#createReceive方法中模式匹配:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
public Receive createReceive() {
    return ReceiveBuilder.create()
        .match(RemoteHandshakeMessage.class, this::handleHandshakeMessage)
        .match(ControlMessages.class, this::handleControlMessage)
        .matchAny(this::handleMessage)
        .build();
}

在Actor编程中Actor 类需要继承AbstractActor类实现createReceive方法,绑定各类actor收到不同类型消息对应处理不同业务逻辑。createReceive方法内部就是一些模式匹配的逻辑,我们可以看到当match的消息类型为ControlMessages类型时会进入handleControlMessage方法来处理,然后会调用StoppedState#start方法,接下来回调AkkaRpcActor.StoppedState#start方法,接下来回调RpcEndpoint#internalCallOnStart方法,然后回调TaskExecutor#onStart方法,我们来看一下org.apache.flink.runtime.taskexecutor.TaskExecutor#onStart方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    @Override
    public void onStart() throws Exception {
        try {
            // 启动相关服务
            startTaskExecutorServices();
        } catch (Exception e) {
            final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
            onFatalError(exception);
            throw exception;
        }
        // 处理注册超时的问题
        startRegistrationTimeout();
    }

在startTaskExecutorServices方法内部会启动与TaskExecutor相关的一些组件:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
    private void startTaskExecutorServices() throws Exception {
        try {
            // start by connecting to the ResourceManager
            resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
            // tell the task slot table who's responsible for the task slot actions
            taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());

            // start the job leader service
            jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());

            fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
        } catch (Exception e) {
            handleStartTaskExecutorServicesException(e);
        }
    }

这里主要涉及如下几种操作:

•通过连接ResourceManager的方式(注册监听器)启动resourceManagerLeaderRetriever。•taskSlotTable.start方法的目的是设置taskSlotTable的相关属性,告诉taskSlotTable谁负责任务槽操作。•启动job leader service•创建文件缓存

好了,到这里TaskExecutor的启动过程的源码分析就结束了,接下来就可以通过org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway#submitTask方法来调用org.apache.flink.runtime.taskexecutor.TaskExecutor#submitTask方法来提交任务了(org.apache.flink.runtime.executiongraph.Execution#deploy方法中调用RpcTaskManagerGateway#submitTask方法时RpcTaskManagerGateway也使用了代理模式)。由于篇幅有限,本篇就不再继续分析了。

总结

本篇主要分析了TaskExecutor启动的一系列流程,其中包括它的RpcService的构建、dispatcher、ioManager、shuffleEnvironment、kvStateService、广播变量管理器、任务槽table、taskState管理器的创建及启动过程分析。主要分析了RpcService中的actorSystem构建流程及其中的代理模式的使用,另外对shuffleEnvironment内部的nettyShuffleEnvironment及其中的nettyServer、nettyClient的工作模式进行了走马观花式的分析。本文的重点是理一下TaskManger的启动脉络,后面有时间的话会对其中的一些细节进行具体分析,敬请期待。

References

[1] flink官方文档: https://ci.apache.org/projects/flink/flink-docs-release-1.11

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2020-10-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 开发架构二三事 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
暂无评论
推荐阅读
聊一下深/浅克隆的那些要点
Clone,克隆的意思,即通过克隆来获取与源体一样的新个体。还记得初中生物课本上的多利羊吗?亦或是科幻电影中BOSS们害怕自己死亡,而存放在培养皿中的那些克隆体。所以说在Java中我们可以通过Clone方法来创建一个新的对象,这里让我们来回顾一下创建新对象的几种创建方式:
东边的大西瓜
2022/05/05
3860
聊一下深/浅克隆的那些要点
Java对象的深克隆与浅克隆(对象复制)
该语句将stu1的引用赋值给stu2,这样,stu1和stu2指向内存堆中同一对象。
JavaEdge
2020/05/27
3.2K0
Java对象的深克隆与浅克隆(对象复制)
Java对象复制的方法
在实际编程过程中,我们常常要遇到这种情况:有一个对象A,在某一时刻A中已经包含了一些有效值,此时可能 会需要一个和A完全相同新对象B,并且此后对B任何改动都不会影响到A中的值,也就是说,A与B是两个独立的对象,但B的初始值是由A对象确定的。例如下面程序展示的情况:
用户1205080
2019/05/14
1.5K0
javacloneable接口_comparable
一个类实现了Cloneable接口 指向@link java.lang.Object#clone()} 方法是合法的 使得一个 field-for-field copy的类的实例的拷贝
全栈程序员站长
2022/11/04
2500
javacloneable接口_comparable
深入浅出| java中的clone方法
clone,有人称之为克隆,有人称之为复制,其实都是同一个东西 本文称之为"克隆",毕竟人家方法名叫"clone"
KEN DO EVERTHING
2019/01/17
9840
深浅克隆面试题汇总——附详细答案
可以看出,如果使用等号复制时,对于值类型来说,彼此之间的修改操作是相对独立的,而对于引用类型来说,因为复制的是引用对象的内存地址,所以修改其中一个值,另一个值也会跟着变化,原理如下图所示:
磊哥
2019/09/19
4490
深浅克隆面试题汇总——附详细答案
设计模式(四):创建型之原型模式
冬天vs不冷
2025/01/21
470
设计模式(四):创建型之原型模式
原型模式的用法
用一个已经创建的实例作为原型,通过**复制**该原型对象来创建一个和原型对象**相同的新对象**。
忆愿
2024/08/04
920
原型模式的用法
Java的克隆
说到克隆,本质都是使用一个已经实例化完成的对象的副本。 对于基本类型比较简单。比方说我们想复制一个变量,
PhoenixZheng
2018/08/07
5780
克隆一个对象——原型模式深入解析
原型模式也是创建型的设计模式,字面意思其实很简单,就是复制一个对象,这里面有什么学问呢? 用原型实例指定创建对象的种类,并且通过拷贝这些原型,创建新的对象。 按照惯例,先讲故事。 我们都知道苹果有刻字服务,也就是假如你买了一款iPhone手机,你可以花一点钱让厂商给你刻上你想刻的字,这样体现了这款产品的独一无二性,很有意思。 那么现在,有甲乙丙都来买iPhone手机,并且都想刻上自己的名字,我们假设新手机是完全一模一样的。 常规的解决办法: public class Apple { pr
文彬
2018/05/08
7560
Java设计模式---创建型模式
本章Java设计模式的创建型模式的介绍,是通过学习视频记录的笔记,欢迎留言指出错误点
用户11010370
2024/03/12
1420
设计模式-原型模式示例
在这个示例代码中,我们定义了一个原型接口 Prototype 和一个具体的学生类 Student,该类实现了原型接口并重写了 clone() 方法。在客户端代码中,我们创建了一个原型对象 stu1,并通过克隆原型对象来生成一个新的学生对象 stu2,然后修改 stu2 对象的属性并打印结果。由于 stu1 和 stu2 对象是互相独立的,因此修改 stu2 的属性不会影响 stu1 对象的属性。
堕落飞鸟
2023/05/03
2010
【Java编程进阶之路 05】深入探索:Java中的浅克隆与深克隆的原理与实现
在Java编程中,克隆(Cloning)是一个重要的概念,它允许创建并操作对象的副本。克隆可以分为两种类型:浅克隆(Shallow Cloning)和深克隆(Deep Cloning)。这两种克隆方式在处理对象及其引用的成员变量时有所不同。下面,将详细讨论它们之间的区别,并提供实现方法。
夏之以寒
2024/03/05
6600
面试官:你知道对象的克隆原理吗?
我们把前面创建的UserDto对象引用复制给userDto了,然后又对对象中的两个属性进行重新赋值,userDto1也会随着赋值的。这就是所谓的浅克隆(浅复制)。
田维常
2020/12/14
4950
浅析克隆 顶
一、在浅克隆中,如果原型对象的属性是值类型(如int,double,byte,boolean,char等),将复制一份给克隆对象;如果原型对象的属性是引用类型(如类,接口,数组,集合等复杂数据类型),则将引用对象的地址复制一份给克隆对象,也就是说原型对象和克隆对象的属性指向相同的内存地址。简单来说,在浅克隆中,当原型对象被复制时只复制它本身和其中包含的值类型的属性,而引用类型的属性并没有复制。
算法之名
2019/08/20
5130
一篇文章带你了解设计模式——创建者模式
下面我们来介绍第一种设计模式,创建型模式的主要关注点是怎样创建对象,它的主要特点是“将对象的创建与使用分离”。
秋落雨微凉
2023/03/08
7640
一篇文章带你了解设计模式——创建者模式
object中提供的clone默认为浅克隆_接口的定义形式
cloneable其实就是一个标记接口,只有实现这个接口后,然后在类中重写Object中的clone方法,然后通过类调用clone方法才能克隆成功,如果不实现这个接口,则会抛出CloneNotSupportedException(克隆不被支持)异常。Object中clone方法:
全栈程序员站长
2022/11/04
5490
object中提供的clone默认为浅克隆_接口的定义形式
一文读懂深克隆与浅克隆的关系
在Java提供的API中,不需要手动创建抽象原型接口,因为Java已经内置了Cloneable抽象原型接口,自定义的类型只需实现该接口并重写Object.clone()方法即可完成本类的复制。
Tom弹架构
2021/11/02
6460
设计模式——原型模式
设计模式中,单例模式应该是大家最为熟悉的了,那如果我们需要对一个对象进行多次复制的话,大家会用什么呢?这就要用到今天要讲的原型模式了。
健程之道
2019/11/02
4250
java 通过Object的clone复制对象
对象的克隆是指创建一个新的对象,且新的对象的状态与原始对象的状态相同。当对克隆的新对象进行修改时,不会影响原始对象的状态。 常规实现
六月的雨在Tencent
2024/03/28
1370
java 通过Object的clone复制对象
相关推荐
聊一下深/浅克隆的那些要点
更多 >
LV.1
中国地质大学(武汉)研究生
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档