导语 | Flink已经成为未来流计算趋势,目前在很多大厂已经有了大规模的使用。最近在学习Flink源码,就想把自己学习的过程分享出来,希望能帮助到志同道合的朋友。开始阅读源码,说明读者已经对flink的基本概念有一些了解,这里就不再重复介绍Flink了。本文作为学习过程的第一章,首先对Flink的工程目录做一个解读,了解了工程下各个模块的作用,才能在遇到问题时准确定位到代码,进一步学习。
用IDEA打开Flink的工程,可以看到图一所示的这些子工程目录,这一小节会简要介绍下各个模块的作用,其中像example和test的目录就跳过了,对于Flink比较重要的两个模块flink-runtime和flink-table会分别在第二节和第三节分别介绍。
flink-runtime模块是Flink最核心的模块之一,实现了Flink的运行时框架。图三1和图四2分别是Flink运行时框架和作业调度框架,图中标识的几乎所有模块,在flink-runtime下都有对应的实现,如JobManager、TaskManager、ResourceManager、Scheduler、Checkpoint Coordinator等。有些部分看了目录的名字就很好理解,但有些目录与图中的模块无法对应上,官网文档也没有这些名词的介绍,本节就重点介绍下这些目录。
Flink文档中并没有描述TaskExecutor这个模块,只是说有TaskManager作为任务执行和运行资源管理的服务。实际上图四中的TaskManager是task-executor和task-manager这两个目录下的代码实现的,而且主要的功能还是在task-executor中,因此,我觉得这个TaskManager还不如直接叫做TaskExecutor。
task-executor中org.apache.flink.runtime.taskexecutor.TaskManagerRunner就是TaskManager进程的入口。
package org.apache.flink.runtime.taskexecutor;
public class TaskManagerRunner implements FatalErrorHandler, AutoCloseableAsync {
public static void main(String[] args) throws Exception {
// startup checks and logging
EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
SignalHandler.register(LOG);
JvmShutdownSafeguard.installAsShutdownHook(LOG);
long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
if (maxOpenFileHandles != -1L) {
LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
} else {
LOG.info("Cannot determine the maximum number of open file descriptors");
}
final Configuration configuration = loadConfiguration(args);
FileSystem.initialize(configuration, PluginUtils.createPluginManagerFromRootFolder(configuration));
SecurityUtils.install(new SecurityConfiguration(configuration));
try {
SecurityUtils.getInstalledContext().runSecured(new Callable<Void>() {
@Override
public Void call() throws Exception {
runTaskManager(configuration, ResourceID.generate());
return null;
}
});
} catch (Throwable t) {
final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("TaskManager initialization failed.", strippedThrowable);
System.exit(STARTUP_FAILURE_RETURN_CODE);
}
}
}
从main函数看进去,进程启动首先加载了配置(flink-conf.yaml),然后就调用了runTaskManager(),这个函数里最终调用了rpcServer.start(),就相当于启动了一个后台服务进程,等待JobManager给它分配Task了。
TaskManagerRunner最重要的成员就是 taskManager,而它实际上是一个TaskExecutor类型的对象。再看下TaskExecutor的成员,可以看到有TaskSlotTable,就是图四中每个TaskManager中维护的多个TaskSlot;有TaskManagerServices,其中包括MemoryManager、IOManager;有各种Connection,用来维护与其他模块的连接。因此TaskManager实际上对应的是TaskExecutor这个类。
package org.apache.flink.runtime.taskexecutor;
* TaskExecutor implementation. The task executor is responsible for the execution of multiple
* {@link Task}.
*/
public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
public static final String TASK_MANAGER_NAME = "taskmanager";
/** The access to the leader election and retrieval services. */
private final HighAvailabilityServices haServices;
private final TaskManagerServices taskExecutorServices;
/** The task manager configuration. */
private final TaskManagerConfiguration taskManagerConfiguration;
private final HeartbeatServices heartbeatServices;
/** The fatal error handler to use in case of a fatal error. */
...
// --------- TaskManager services --------
/** The connection information of this task manager. */
private final TaskManagerLocation taskManagerLocation;
private final TaskManagerMetricGroup taskManagerMetricGroup;
/** The state manager for this task, providing state managers per slot. */
private final TaskExecutorLocalStateStoresManager localStateStoresManager;
/** The network component in the task manager. */
private final ShuffleEnvironment<?, ?> shuffleEnvironment;
/** The kvState registration service in the task manager. */
private final KvStateService kvStateService;
// --------- job manager connections -----------
private final Map<ResourceID, JobManagerConnection> jobManagerConnections;
// --------- task slot allocation table -----------
private final TaskSlotTable taskSlotTable;
private final JobManagerTable jobManagerTable;
private final JobLeaderService jobLeaderService;
...
// --------- resource manager --------
@Nullable
private ResourceManagerAddress resourceManagerAddress;
@Nullable
private EstablishedResourceManagerConnection establishedResourceManagerConnection;
@Nullable
private TaskExecutorToResourceManagerConnection resourceManagerConnection;
...
}
图四中可以看出,Flink中Client、TaskManager和JobManager都是独立的进程,本文前面已经分别说明了Client和TaskManager的入口在哪里,那JobManager的入口在哪里呢?
图三中的AM其实是一个单独的进程,入口在entrypoint目录下的org.apache.flink.runtime.entrypoint.ClusterEntrypoint。该类中有一个DispatcherResourceManagerComponent对象,就是对应AM中的Dispatcher和ResourceManager两个模块。Dispatcher中有一个JobManagerRunnerFactory的对象,当用户通过客户端向AM提交任务时,就由Dispatcher创建一个JobManagerRunner对象,其中包括JobGraph和JobMasterService,用来创建运行时的JobMaster。而图四中JobManager中的各个小模块,实际上是在JobMaster这个类中:包括JobGraph、Scheduler、CheckpointCoordinator(来自JobMasterGateway接口)。
package org.apache.flink.runtime.jobmaster;
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
/** Default names for Flink's distributed components. */
public static final String JOB_MANAGER_NAME = "jobmanager";
// ------------------------------------------------------------------------
private final JobMasterConfiguration jobMasterConfiguration;
private final ResourceID resourceId;
private final JobGraph jobGraph;
private final Time rpcTimeout;
private final HighAvailabilityServices highAvailabilityServices;
private final BlobWriter blobWriter;
private final HeartbeatServices heartbeatServices;
private final JobManagerJobMetricGroupFactory jobMetricGroupFactory;
private final ScheduledExecutorService scheduledExecutorService;
private final OnCompletionActions jobCompletionActions;
private final FatalErrorHandler fatalErrorHandler;
private final ClassLoader userCodeLoader;
private final SlotPool slotPool;
private final Scheduler scheduler;
private final SchedulerNGFactory schedulerNGFactory;
// --------- BackPressure --------
private final BackPressureStatsTracker backPressureStatsTracker;
// --------- ResourceManager --------
private final LeaderRetrievalService resourceManagerLeaderRetriever;
// --------- TaskManagers --------
private final Map<ResourceID, Tuple2<TaskManagerLocation, TaskExecutorGateway>> registeredTaskManagers;
private final ShuffleMaster<?> shuffleMaster;
...
}
flink-table模块属于Flink的上层API,包括java和scala版本的table-api,以及SQL的解析和SQL的执行。
图六3展示了flink-table中子模块的架构。在Flink 1.9之前,Flink只有一个table-planner(flink-table-planner模块)用来将SQL转化成流计算的执行任务,而且流和批的Table API也不是统一的,因此有StreamTableEnvironment和BatchTableEnvironment两个运行环境,分别用来处理流和批的任务执行。
随着Flink SQL越来越受重视,flink-table从flink-libraries中移了出来,成为了独立的一级目录。Flink 1.9中,阿里把blink-planner开源了出来,这样整个flink-table中就有了2个planner。从长期来看,流批的统一是一个趋势,因此blink-planner只使用了StreamTableEnvironment中相关的API,而没有使用BatchTableEnvironment,将批当做一个有限的流来处理,希望通过这种方式实现流和批的统一。由于blink-table-planner更好的支持流批统一,且性能更好,在未来的版本中,很有可能完全替代flink-table-planner的功能,而flink-table-planner可能将会被移除。
flink-table中还实现了一个命令行的SQL Client,方便开发者学习和调试。其中org.apache.flink.table.client.SqlClient就是这个客户端的入口。
本文大致介绍了一下Flink工程下各个模块的功能,并对flink-runtime和flink-table模块做了进一步的介绍,希望能帮助读者更快的了解Flink代码。最后推荐一个github的仓库,里面有很多关于Flink的视频讲座,https://github.com/flink-china/flink-training-course ,也希望能帮助到大家。
1 高赟, Flink Runtime 核心机制剖析, https://files.alicdn.com/tpsservice/7bb8f513c765b97ab65401a1b78c8cb8.pdf.
2 Flink docs, https://ci.apache.org/projects/flink/flink-docs-release-1.9/concepts/runtime.html.
3 贺小令, 深度探索 Flink SQL, https://files.alicdn.com/tpsservice/6f069ca9f2930400a577cc90323de326.pdf.
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。