前言
这是个有趣的Flink资源管理的使用案例。随着业务需求的遍地开花和数据量的成倍增长,集团内部一个兄弟部门(姑且称为客户吧)现有的技术构架有点陈旧,已经无法支撑日常工作。经过调研论证,准备搭建大数据平台,计算引擎定为Flink。
我们都知道,通常情况下,线上生产环境的集群资源肯定是统一管理的。对于Flink来说,可以选择运行在Yarn集群或者k8s集群。如果没有必须使用k8s的需求,通常使用Yarn集群。其中,运行模式可以选择Application模式或者Session模式。
问题描述
客户没有必须使用k8s集群的需求,但是他们也不打算使用已经部署的大数据平台中的Yarn集群。而是要求提供一个或者多个Standalone集群。悲催的是,大数据平台并不提供Standalone类型集群的资源管理方案。
经深入沟通,客户给出不使用Yarn集群的如下理由:
1.Application模式属于作业级别集群,集群生命周期与作业生命周期强绑定。这种模式并不适合客户批作业居多的场景,毕竟集群的启动与销毁存在时间开销,也就是JobManager进程和TaskManager进程的启动与停止的耗时。
2.Session模式虽然JobManager进程常驻、支持多作业共享集群资源。但是TaskManager进程的生命周期还是与作业的生命周期强绑定,相应的启动与停止同样存在时间开销。
3.最关键一点:两种模式集群,作业异常或者正常退出,无法查看TaskManager日志。
所以,他们选择使用Standalone集群,该模式JobManager和TaskManager进程均常驻,没有启动与停止的时间开销,也能够查看TaskManager日志。
问题分析
首先,我们大数据平台是不可能支持Stanalone集群资源管理的。那么,我们只能考虑Session集群部署能否适配下来满足客户需求。其实,客户问题可以转化为另一个问题:Session集群TaskManager进程能否常驻?答案肯定是可以的。既然Session集群能够触发停止TaskManager,那肯定是因为有判断条件来决定是否触发。找到触发条件,不让触发,即可。
通过启动Session集群、提交批任务,任务结束,观察JobManager日志,如下:
[root@felixzh bin]# ./yarn-session.sh –d
[root@felixzh bin]# ./flink run ../examples/batch/WordCount.jar
如图所示,ActiveResourceManager的日志need release 1 workers就是触发停止TaskManager的记录。
阅读源码,找到该日志所属方法为checkResourceDeclarations,梳理出一条调用链路如下:
ActiveResourceManager.checkResourceDeclarations <-
ActiveResourceManager.declareResourceNeeded <-
ActiveResourceManager.declareResourceNeeded <-
FineGrainedSlotManager. declareNeededResources <-
FineGrainedSlotManager.declareNeededResourcesWithDelay <-
FineGrainedSlotManager .releaseIdleTaskExecutor <-
FineGrainedSlotManager .releaseIdleTaskExecutorIfPossible <-
FineGrainedSlotManager .checkTaskManagerTimeouts <-
FineGrainedSlotManager.start
反向梳理出调用链之后,我们正向解读下流程入口,就可以找到触发条件,分析如下:
FineGrainedSlotManager是细粒度Slot管理类,通过ResourceManager类startResourceManagerServices方法调用FineGrainedSlotManager.start方法启动。ResourceManager就是官网Flink Architecture章节所描述的JobManager中的一个内部组件,这里就不深究了。
继续回到FineGrainedSlotManager.start,该方法通过线程池周期性地执行checkTaskManagerTimeouts方法,周期时间为参数resourcemanager.taskmanager-timeout(单位是ms),继而判断是否需要触发停止TaskManager。
至此,我们可以得出结论:将周期参数设置足够大即可。
实践验证
1.启动Seesion集群,设置超时参数1H。
[root@felixzh bin]# ./yarn-session.sh -Dresourcemanager.taskmanager-timeout=3600000 –d
2.提交批任务
[root@felixzh bin]# ./flink run ../examples/batch/WordCount.jar
3.任务结束,过几分钟,观察FlinkUI,TaskManager未退出,达到预期效果
JobManager内部组件ResourceManager提供TaskManager进程管理的相关参数,上述参数resourcemanager.taskmanager-timeout就是其一:
除了上述参数,还有一个有意思的参数如下:
slotmanager.redundant-taskmanager-num表示任务结束之后,集群会保留TaskManager个数。测试如下:
[root@felixzh bin]# ./yarn-session.sh -Dslotmanager.redundant-taskmanager-num=2 –d
[root@felixzh bin]# ./flink run ../examples/batch/WordCount.jar
该任务结束之后,集群剩余两个TaskManager,如下:
至此,本文提供给客户的最终解决方案:yarn-session集群+定制化参数。如果大家有类似有趣的场景需求,可以尝试调试Flink ResourceManager其他定制化参数。