感谢阅读「美图数据技术团队」的第 19 篇原创文章,关注我们持续获取美图最新数据技术动态。
大数据部门经常会有一些任务需要离线定时触发,例如需要每天执行的定时统计任务、定时更新算法模型等。当场景比较简单的时候可以通过一些 crontab、quartz 等方式来实现定时调度,但是当场景变复杂了呢?
#多个任务之间可能有依赖,并且各自有自己的执行时间
#任务需要重跑
#任务需要在多个节点执行
。。。。。。。
此时就需要一个成熟的调度系统来解决这些问题。根据我们的调研结果显示,现有的调度系统如下所示:
调度分类 | 特点 | 现有的实现 |
---|---|---|
DAG(有向无环图)工作流调度系统 | 关注于任务之间的依赖处理 | oozie(针对 Hadoop) 、zeus(阿里开源 ,资料少)、azkaban(不支持跨周期调度) |
定时分片类任务调度系统 | 关注于把任务分散到多个节点执行 | elastic-job、SchedulerX |
正如上表所示,调度系统目前市面上没有比较统一、全面的解决方案,而以上两种方案也没有办法满足美图特殊的业务需求,因此美图大数据团队决定量身定制一个适合美图业务的调度系统。
如上图所示是任务调度系统的架构设计,Agent 端是负责执行任务的客户端,Dispatcher 端是用来进行实例创建和调度相关的模块,它设计成多节点,各个节点可以自动负责某些实例的调度,并且节点下线之后也能够正常切换。Client 端是封装的 SDK,给第三方使用。对于Client 的请求会转发到 Manager 端进行处理响应,它也是设计成多节点的。
通常一个完整的调度系统需要实现任务周期性调度、任务依赖关系、任务调度、任务重跑等服务。
周期性调度一般支持分钟、小时、天、周、月、年调度级别,目前不支持自定义 CRON 表达式,具体表达如下图所示。
任务的组成及其依赖关系
WORKFLOW 是一系列相关的 TASK 的集合,其中 TASK 的 DAG 结构如下图所示:
而总体上而言,任务之间存在 5 大类依赖关系:
对于这些任务之间的依赖关系我们需要解决 2 个问题:「依赖关系怎么去维护」以及「依赖关系是否需要限制」。
1.依赖关系怎么去维护?
当我们创建实例之后,只要能够维护各个实例之间的依赖关系,那么就能够保证任务的正确运行,那么怎么维护呢?如上图所示可以通过 CRON 表达式去找出对应时间的上游实例信息。
2.依赖关系是否需要限制?
如下图所示,每小时 20 分运行产生的实例到底要依赖上一个小时 30 分的实例还是说之后的 30 分的实例呢?
为了避免这种歧义,我们决定给依赖关系加约束。目前可以分成 3 类通过添加约束的方式来确定依赖关系:
1.同周期,保证上游执行的时间不晚于下游的时间
2.大周期依赖小周期,根据大周期的上一个实例运行时间和当前这个实例的运行时间作为基准,确定出这个时间范围内的小周期,设置依赖关系,当且仅当小周期都成功下游才会执行 。
3.小周期依赖大周期,约束比较大,目前使用的场景偏少。
要维护实例之间的关系,则首先要有实例,那么实例要怎么提前生成呢?我们需要有专门的定时任务去生成第二天要产生的实例信息,并且维护它们之间的依赖关系;对于在大于定时任务执行时间点之后上线的任务,需要手动生成实例并维护依赖关系。
当然我们还需要保证有效地生成实例。多个 WORKFLOW 之间的依赖关系可以组成一个 DAG 图,这一个图可以用一个「DAG_WORKFLOW_GROUP ID」来表示,通过这个 ID 以及加锁的形式可以方便我们在多节点进行 WORKFLOW 实例生成、上下线等操作。
任务调度
任务调度有两个前置条件:「时间约束满足」以及「上游实例依赖都完成」。
我们将调度模块设计成多节点的形式,同时我们希望每个调度节点能够尽量去调度某些固定的实例,而不是通过竞争资源的形式来进行调度。因此调度节点会有一个心跳线程去更新自己的状态以及获取当前各个调度节点的状态,如果有节点失效,那么就会把它剔除掉。调度节点根据节点数和自己所属的下标这 2 个信息通过加锁的形式进行实例调度,这样可以减少锁的资源竞争,同时每个节点又可以固定调度一些实例。
跟对应执行任务节点进行通信主要有两种方式:1.AGENT EXEC 作为一个单独的服务存在,支持在对应的执行节点执行 SHELL 脚本进行任务调度,通过 SHELLE 的退出码来做判断任务的执行情况,规定退出码为 0 是正常退出,其余都是异常退出;2.AGENT SDK 作为一个 jar 包可以内嵌到业务方的代码中,业务方必须实现我们预先定义的几个接口。每个客户端通过 IP+名字的形式作为唯一标识。
整个任务调度的执行逻辑如下:在预先创建完实例之后会创建一个可以运行的调度记录,调度线程扫描处于这种状态的记录。当运行对应的记录的时候,会创建相应的 task 实例信息并执行对应的 task 实例,根据之前的配置信息,发送命令到对应的客户端中。接着客户端上报任务执行的结果,如果任务运行完成会相应唤醒下游的任务,当所有的任务都运行完毕之后 workflow 就运行完毕,如果说在这个期间有任务运行失败,那么最终这个 workflow 也是失败的。
任务重跑
既然创建了实例就应该能够支持实例的重跑,重跑传递的方式有两种:上游重跑支持,配置依赖关系的时候可以选择上游的重跑是否能够导致下游的重跑;重跑时选择可以触发对应的下游节点。
我们定义当实例处于运行或待运行状态时不允许重跑,那么什么情况下能重跑呢?根据不同的重跑类型判断标准也有所不同:
在判断状态的时候,我们需要进行加锁处理, 如果只是加单个全局锁其范围太大,因此希望通过多锁的形式来做。任务重跑的依赖关系是由历史关系所确定的,因此重跑的调度也会判断对应的上游是否满足调度条件之后才会执行。调度线程处理逻辑也是找出可执行的记录,此时不创建新的 task 实例信息,而是利用历史的实例信息进行调度。
接着介绍几个比较常见的任务操作。任务上线会判断当前实例在当天是否创建过实例。如果没有创建,那么就根据调度频率创建今天这个时间之后可以执行的实例;如果创建了,那么看调度频率有没有发生变化,如果没有重新设置那些实例的状态为可以正常调度的类型,否则就删除那些实例,然后重新创建实例,设置依赖关系。任务下线时不能对它进行调度,只要把下线点之后的实例设置成下线状态即可。值得一提的是,在当前系统中我们是不允许在未下线的状态编辑任务信息。
未来规划
之后的系统规划主要分为两个方向:
1. 支持更多的业务场景使用
目前基本能够覆盖大部分的业务场景,之后会在继续扩展业务支撑;
2. 调度加入对资源相关的考虑
目前调度逻辑并没有考虑到对应资源的考虑,而且接受到任务信息的节点也不一定是任务执行节点,有可能会提交到集群中去跑任务。因此,我们可以通过主动采集节点信息或者是业务方定时上报集群信息的方式来对资源进行一定的调度限制。