一旦数据仓库开始使用,就需要不断从源系统给数据仓库提供新数据。为了确保数据流的稳定,需要使用所在平台上可用的任务调度器来调度ETL定期执行。调度模块是ETL系统必不可少的组成部分,它不但是数据仓库的基本需求,也对项目的成功起着举足轻重的作用。
操作系统一般都为用户提供调度作业的功能,如Windows的“计划任务”和UNIX/Linux的cron系统服务。绝大多数Hadoop系统都运行在Linux之上,因此本片详细讨论两种Linux上定时自动执行ETL作业的方案。一种是经典的crontab,这是操作系统自带的功能,二是Hadoop生态圈中的Oozie组件。Kettle的Start作业项也提供了定时调度作业执行的功能。为了演示Kettle对数据仓库的支持能力,我们的示例将使用Start作业项实现ETL执行自动化。
上一篇我们已经创建好用于定期装载的Kettle作业,将其保存为regular_etc.kjb文件。这里建立一个内容如下的shell脚本文件regular_etl.sh,调用Kettle的命令行工具kitchen.sh执行此作业,并将控制台的输出或错误重定向到一个文件名中带有当前日期的日志文件中:
#!/bin/bash
cd /root/pdi-ce-8.3.0.0-371
# 需要清理Kettle缓存,否则会报 No suitable driver found for jdbc:hive2 错误
rm -rf ./system/karaf/caches/*
# 执行作业
./kitchen.sh -file ~/kettle_hadoop/6/regular_etc.kjb 1>~/kettle_hadoop/6/regular_etc_`date +%Y%m%d`.log 2>&1
可以很容易地用crontab命令创建一个任务,定期运行此脚本。
crontab -e
# 添加如下一行,指定每天2点执行定期装载作业,然后保存退出。
0 2 * * * /root/regular_etl.sh
这就可以了,需要用户做的就是如此简单,其它的事情交给cron系统服务去完成。提供cron服务的进程名为crond,这是Linux下一个用来周期性执行某种任务或处理某些事件的守护进程。当安装完操作系统后,会自动启动crond进程,它每分钟会定期检查是否有要执行的任务,如果有则自动执行该任务。
Linux下的任务调度分为两类,系统任务调度和用户任务调度。
Linux系统使用一对allow/deny文件组合判断用户是否具有执行crontab的权限。如果用户名出现在/etc/cron.allow文件中,则该用户允许执行crontab命令。如果此文件不存在,那么如果用户名没有出现在/etc/cron.deny文件中,则该用户允许执行crontab命令。如果只存在cron.deny文件,并且该文件是空的,则所有用户都可以使用crontab命令。如果这两个文件都不存在,那么只有root用户可以执行crontab命令。allow/deny文件由每行一个用户名构成。
通过crontab 命令,我们可以在固定间隔的时间点执行指定的系统指令或shell脚本。时间间隔的单位可以是分钟、小时、日、月、周及以上的任意组合。crontab 命令格式如下:
crontab [-u user] file
crontab [-u user] [ -e | -l | -r | -i ]
参数说明:
注意,如果不经意地输入了不带任何参数的crontab命令,不要使用Control-d退出,因为这会删除用户所对应的crontab文件中的所有条目。代替的方法是用Control-c退出。
用户所建立的crontab文件中,每一行都代表一项任务,每行的每个字段代表一项设置。它的格式共分为六个字段,前五段是时间设定段,第六段是要执行的命令段,格式如下: .---------------- 分钟(0 - 59) | .------------- 小时(0 - 23) | | .---------- 日期(1 - 31) | | | .------- 月份(1 - 12) | | | | .---- 星期(0 - 6,代表周日到周一) | | | | | * * * * * 要执行的命令,可以是系统命令,也可以是自己编写的脚本文件。
在以上各个时间字段中,还可以使用如下特殊字符:
注意,“日期”和“星期”字段都可以指定哪天执行,如果两个字段都设置了,则执行的日期是两个字段的并集。
# 每1分钟执行一次command
* * * * * command
# 每小时的第3和第15分钟执行
3,15 * * * * command
# 在上午8点到11点的第3和第15分钟执行
3,15 8-11 * * * command
# 每隔两天的上午8点到11点的第3和第15分钟执行
3,15 8-11 */2 * * command
# 每个星期一的上午8点到11点的第3和第15分钟执行
3,15 8-11 * * 1 command
# 每晚的21:30执行
30 21 * * * command
# 每月1、10、22日的4:45执行
45 4 1,10,22 * * command
# 每周六、周日的1:10执行
10 1 * * 6,0 command
# 每天18:00至23:00之间每隔30分钟执行
0,30 18-23 * * * command
# 每星期六的晚上11:00执行
0 23 * * 6 command
# 每一小时执行一次
* */1 * * * command
# 晚上11点到早上7点之间,每隔一小时执行一次
* 23-7/1 * * * command
# 每月的4号与每周一到周三的11点执行
0 11 4 * 1-3 command
# 一月一号的4点执行
0 4 1 1 * command
# 每小时执行/etc/cron.hourly目录内的脚本。run-parts会遍历目标文件夹,执行第一层目录下具有可执行权限的文件。
01 * * * * root run-parts /etc/cron.hourly
有时我们创建了一个crontab任务,但是这个任务却无法自动执行,而手动执行脚本却没有问题,这种情况一般是由于在crontab文件中没有配置环境变量引起的。cron从用户所在的主目录,使用shell调用需要执行的命令。cron为每个shell提供了一个缺省的环境,Linux下的定义如下:
SHELL=/bin/bash
PATH=/sbin:/bin:/usr/sbin:/usr/bin
MAILTO=用户名
HOME=用户主目录
在crontab文件中定义多个调度任务时,需要特别注意的一个问题就是环境变量的设置,因为我们手动执行某个脚本时,是在当前shell环境下进行的,程序能找到环境变量,而系统自动执行任务调度时,除了缺省的环境,是不会加载任何其它环境变量的。因此就需要在crontab文件中指定任务运行所需的所有环境变量。
不要假定cron知道所需要的特殊环境,它其实并不知道。所以用户要保证在shell脚本中提供所有必要的路径和环境变量,除了一些自动设置的全局变量。以下三点需要注意:
缺省时,每条任务调度执行完毕,系统都会将任务输出信息通过电子邮件的形式发送给当前系统用户。这样日积月累,日志信息会非常大,可能会影响系统的正常运行。因此,将每条任务进行重定向处理非常重要。可以在crontab文件中设置如下形式,忽略日志输出:
0 */3 * * * /usr/local/myscript.sh >/dev/null 2>&1
“>/dev/null 2>&1”表示先将标准输出重定向到/dev/null,然后将标准错误重定向到标准输出。由于标准输出已经重定向到了/dev/null,因此标准错误也会重定向到/dev/null,这样日志输出问题就解决了。
可以将crontab执行任务的输出信息重定向到一个自定义的日志文件中,例如:
30 8 * * * rm /home/someuser/tmp/* > /home/someuser/cronlogs/clean_tmp_dir.log
除了利用操作系统提供的功能以外,Hadoop生态圈的工具也可以完成同样的调度任务,而且更灵活,这个组件就是Oozie。Oozie是一个管理Hadoop作业、可伸缩、可扩展、可靠的工作流调度系统,它内部定义了三种作业:工作流作业、协调器作业和Bundle作业。工作流作业是由一系列动作构成的有向无环图(DAGs),协调器作业是按时间频率周期性触发Oozie工作流的作业,Bundle管理协调器作业。Oozie支持的用户作业类型有Java map-reduce、Streaming map-reduce、Pig、 Hive、Sqoop和Distcp,及其Java程序和shell脚本或命令等特定的系统作业。
Oozie项目经历了三个主要阶段。第一版Oozie是一个基于工作流引擎的服务器,通过执行Hadoop MapReduce和Pig作业的动作运行工作流作业。第二版Oozie是一个基于协调器引擎的服务器,按时间和数据触发工作流执行。它可以基于时间(如每小时执行一次)或数据可用性(如等待输入数据完成后再执行)连续运行工作流。第三版Oozie是一个基于Bundle引擎的服务器。它提供更高级别的抽象,批量处理一系列协调器应用。用户可以在bundle级别启动、停止、挂起、继续、重做协调器作业,这样可以更好地简化操作控制。
使用Oozie主要基于以下两点原因:
Oozie的体系结构如图7-1所示。
图7-1 Oozie体系结构
Oozie是一种Java Web应用程序,它运行在Java Servlet容器、即Tomcat中,并使用数据库来存储以下内容:
Oozie工作流是放置在DAG(有向无环图 Direct Acyclic Graph)中的一组动作,例如,Hadoop的Map/Reduce作业、Pig作业等。DAG控制动作的依赖关系,指定了动作执行的顺序。Oozie使用hPDL这种XML流程定义语言来描述这个图。
hPDL是一种很简洁的语言,它只会使用少数流程控制节点和动作节点。控制节点会定义执行的流程,并包含工作流的起点和终点(start、end和fail节点)以及控制工作流执行路径的机制(decision、fork和join节点)。动作节点是实际执行操作的部分,通过它们工作流会触发执行计算或者处理任务。Oozie为以下类型的动作提供支持:Hadoop MapReduce、Hadoop HDFS、Pig、Java和Oozie的子工作流。而SSH动作已经从Oozie schema 0.2之后的版本中移除了。
所有由动作节点触发的计算和处理任务都不在Oozie中运行。它们是由Hadoop的MapReduce框架执行的。这种低耦合的设计方法让Oozie可以有效利用Hadoop的负载平衡、灾难恢复等机制。这些任务主要是串行执行的,只有文件系统动作例外,它是并行处理的。这意味着对于大多数工作流动作触发的计算或处理任务类型来说,在工作流操作转换到工作流的下一个节点之前都需要等待,直到前面节点的计算或处理任务结束了之后才能够继续。Oozie可以通过两种不同的方式来检测计算或处理任务是否完成,这就是回调和轮询。当Oozie启动了计算或处理任务时,它会为任务提供唯一的回调URL,然后任务会在完成的时候发送通知给这个特定的URL。在任务无法触发回调URL的情况下(可能是因为任何原因,比方说网络闪断),或者当任务的类型无法在完成时触发回调URL的时候,Oozie有一种机制,可以对计算或处理任务进行轮询,从而能够判断任务是否完成。
Oozie工作流可以参数化,例如在工作流定义中使用像${inputDir}之类的变量等。在提交工作流操作的时候,我们必须提供参数值。如果经过合适地参数化,比如使用不同的输出目录,那么多个同样的工作流操作可以并发执行。
一些工作流是根据需要触发的,但是大多数情况下,我们有必要基于一定的时间段、数据可用性或外部事件来运行它们。Oozie协调系统(Coordinator system)让用户可以基于这些参数来定义工作流执行计划。Oozie协调程序让我们可以用谓词的方式对工作流执行触发器进行建模,谓词可以是时间条件、数据条件、内部事件或外部事件。工作流作业会在谓词得到满足的时候启动。不难看出,这里的谓词,其作用和SQL语句的WHERE子句中的谓词类似,本质上都是在满足某些条件时触发某种事件。
有时,我们还需要连接定时运行、但时间间隔不同的工作流操作。多个以不同频率运行的工作流的输出会成为下一个工作流的输入。把这些工作流连接在一起,会让系统把它作为数据应用的管道来引用。Oozie协调程序支持创建这样的数据应用管道。
CDH 6.3.1中,Oozie的版本是5.1.0。在安装CDH时,我们配置使用MySQL数据库存储Oozie元数据。关于示例环境CDH的安装参见“基于Hadoop生态圈的数据仓库实践 —— 环境搭建(二)”。关于CDH 6.3.1中Oozie的属性,参考以下链接: https://docs.cloudera.com/documentation/enterprise/6/properties/6.3/topics/cm_props_cdh630_oozie.html。
对于刚接触Oozie的用户来说,前面介绍的概念过于抽象,不易理解,那么就让我们一步步创建销售订单示例ETL的工作流,在实例中学习Oozie的特性和用法。
Oozie运行需要使用较高的内存资源,因此要将以下两个YARN参数的值调大:
如果分配的内存不足,在执行工作流作业时会报类似下面的错误:
org.apache.oozie.action.ActionExecutorException: JA009: org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException: Invalid resource request, requested memory < 0, or requested memory > max configured, requestedMemory=1536, maxMemory=1500
我们的实验环境中,每个Hadoop节点所在虚拟机的总物理内存为8G,所以我们把这两个参数都设置为2G。在Cloudera Manager中修改,yarn.nodemanager.resource.memory-mb参数在YARN服务的NodeManager范围里,yarn.scheduler.maximum-allocation-mb参数在YARN服务的ResourceManager范围里,修改后需要保存更改并重启Hadoop集群。
定期装载工作流需要用Oozie调用Sqoop执行,这需要开启Sqoop的元数据共享存储,命令如下:
sqoop metastore > /tmp/sqoop_metastore.log 2>&1 &
metastore工具配置Sqoop作业的共享元数据信息存储,它会在当前主机启动一个内置的HSQLDB共享数据库实例。客户端可以连接这个metastore,这样允许多个用户定义并执行metastore中存储的Sqoop作业。metastore库文件的存储位置由sqoop-site.xml中的sqoop.metastore.server.location属性配置,它指向一个本地文件。如果不设置这个属性,Sqoop元数据默认存储在~/.sqoop目录下。
如果碰到用Oozie工作流执行Sqoop命令是成功的,但执行Sqoop作业却失败的情况,可以参考“Oozie系列(3)之解决Sqoop Job无法运行的问题”这篇文章。该文中对这个问题对有很详细的分析,并提供了解决方案,其访问地址是:http://www.lamborryan.com/oozie-sqoop-fail/。
建立一个增量抽取sales_order表数据的Sqoop作业,并将其元数据存储在shared metastore里。
sqoop job \
--meta-connect jdbc:hsqldb:hsql://node2:16000/sqoop \
--create myjob_incremental_import \
-- \
import \
--connect "jdbc:mysql://node3:3306/source?useSSL=false&user=root&password=123456" \
--table sales_order \
--columns "order_number, customer_number, product_code, order_date, entry_date, order_amount" \
--hive-import \
--hive-table rds.sales_order \
--fields-terminated-by , \
--incremental append \
--check-column order_number \
--last-value 0
通过--meta-connect jdbc:hsqldb:hsql://node2:16000/sqoop选项将作业元数据存储到HSQLDB数据库文件中。metastore的缺省端口是16000,可以用sqoop.metastore.server.port属性设置为其它端口号。创建作业前,可以使用--delete参数先删除已经存在的同名作业。
sqoop job --meta-connect jdbc:hsqldb:hsql://node2:16000/sqoop --delete myjob_incremental_import
sqoop作业还包含以下常用命令:
# 查看sqoop作业列表
sqoop job --meta-connect jdbc:hsqldb:hsql://node2:16000/sqoop --list
# 查看一个sqoop作业的属性
sqoop job --meta-connect jdbc:hsqldb:hsql://node2:16000/sqoop --show myjob_incremental_import
# 执行一个sqoop作业
sqoop job --meta-connect jdbc:hsqldb:hsql://node2:16000/sqoop --exec myjob_incremental_import
建立内容如下的workflow.xml文件:
<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.1" name="regular_etl">
<start to="fork-node"/>
<fork name="fork-node">
<path start="sqoop-customer" />
<path start="sqoop-product" />
<path start="sqoop-sales_order" />
</fork>
<action name="sqoop-customer">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>import</arg>
<arg>--connect</arg>
<arg>jdbc:mysql://node3:3306/source?useSSL=false</arg>
<arg>--username</arg>
<arg>root</arg>
<arg>--password</arg>
<arg>123456</arg>
<arg>--table</arg>
<arg>customer</arg>
<arg>--delete-target-dir</arg>
<arg>--target-dir</arg>
<arg>/user/hive/warehouse/rds.db/customer</arg>
<file>/tmp/hive-site.xml</file>
<archive>/tmp/mysql-connector-java-5.1.38-bin.jar</archive>
</sqoop>
<ok to="joining"/>
<error to="fail"/>
</action>
<action name="sqoop-product">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>import</arg>
<arg>--connect</arg>
<arg>jdbc:mysql://node3:3306/source?useSSL=false</arg>
<arg>--username</arg>
<arg>root</arg>
<arg>--password</arg>
<arg>123456</arg>
<arg>--table</arg>
<arg>product</arg>
<arg>--delete-target-dir</arg>
<arg>--target-dir</arg>
<arg>/user/hive/warehouse/rds.db/product</arg>
<file>/tmp/hive-site.xml</file>
<archive>/tmp/mysql-connector-java-5.1.38-bin.jar</archive>
</sqoop>
<ok to="joining"/>
<error to="fail"/>
</action>
<action name="sqoop-sales_order">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<command>job --exec myjob_incremental_import --meta-connect jdbc:hsqldb:hsql://node2:16000/sqoop</command>
<file>/tmp/hive-site.xml</file>
<archive>/tmp/mysql-connector-java-5.1.38-bin.jar</archive>
</sqoop>
<ok to="joining"/>
<error to="fail"/>
</action>
<join name="joining" to="hive-node"/>
<action name="hive-node">
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<job-xml>/tmp/hive-site.xml</job-xml>
<script>/tmp/regular_etl.sql</script>
</hive>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
这个工作流的DAG如图7-2所示。
图7-2 定期装载DAG
上面的XML文件使用hPDL的语法定义了一个名为regular_etl的工作流。该工作流包括9个节点,其中有5个控制节点,4个动作节点:工作流的起点start、终点end、失败处理节点fail,两个执行路径控制节点fork-node和joining,三个并行处理的Sqoop动作节点sqoop-customer、sqoop-product、sqoop-sales_order用作数据抽取,一个Hive动作节点hive-node用作数据转换与装载。
Oozie的工作流节点分为控制节点和动作节点两类。控制节点控制着工作流的开始、结束和作业的执行路径。动作节点触发计算或处理任务的执行。节点的名字必须符合[a-zA-Z][\-_a-zA-Z0-0]*这种正则表达式模式,并且不能超过20个字符。
控制节点 控制节点又可分成两种,一种定义工作流的开始和结束,这种节点使用start、end和kill三个标签。另一种用来控制工作流的执行路径,使用decision、fork和join标签。
start节点是一个工作流作业的入口,是工作流作业的第一个节点。当工作流开始时,它会自动转到start标签所标识的节点。每一个工作流定义必须包含一个start节点。end节点是工作流作业的结束,它表示工作流作业成功完成。当工作流到达这个节点时就结束了。如果在到达end节点时,还有一个或多个动作正在执行,这些动作将被kill,这种场景也被认为是执行成功。每个工作流定义必须包含一个end节点。kill节点允许一个工作流作业将自己kill掉。当工作流作业到达kill节点时,表示作业以失败结束。如果在到达kill节点时,还有一个或多个动作正在执行,这些动作将被kill。一个工作流定义中可以没有kill节点,也可以包含一个或多个kill节点。
decision节点能够让工作流选择执行路径,其行为类似于一个switch-case语句,即按不同情况走不同分支。我们刚定义的工作流中没有decision节点。fork节点将一个执行路径分裂成多个并发的执行路径。直到所有这些并发执行的路径都到达join节点后,工作流才会继续往后执行。fork与join节点必须成对出现。实际上join节点将多条并发执行路径视作同一个fork节点的子节点。
动作节点 动作节点是实际执行操作的部分。Oozie支持很多种动作节点,包括Hive脚本、Hive Server2脚本、Pig脚本、Spark程序、Java程序、Sqoop1命令、MapReduce作业、shell脚本、HDFS命令等等。我们的ETL工作流中使用了Sqoop和Hive两种。ok和error是动作节点预定义的两个XML元素,它们通常被用来指定动作节点执行成功或失败时的下一步跳转节点。这些元素在Oozie中被称为转向元素。arg元素包含动作节点的实际参数。sqoop-customer和sqoop-product动作节点中使用arg元素指定Sqoop命令行参数。command元素表示要执行一个shell命令。在sqoop-sales_order动作节点中使用command元素指定执行Sqoop作业的命令。file和archive元素用于为执行MapReduce作业提供有效的文件和包。为了避免不必要的混淆,最好使用HDFS的绝对路径。我们的三个Sqoop动作节点使用这两个属性为Sqoop指定Hive的配置文件和MySQL JDBC驱动包的位置。必须包含这两个属性Sqoop动作节点才能正常执行。script元素包含要执行的脚本文件,这个元素的值可以被参数化。我们在hive-node动作节点中使用script元素指定需要执行的定期装载SQL脚本文件。regular_etl.sql文件内容如下:
use dw;
-- 设置scd的生效时间和过期时间
set hivevar:cur_date = current_date();
set hivevar:pre_date = date_add(${hivevar:cur_date},-1);
set hivevar:max_date = cast('2200-01-01' as date);
-- 设置cdc的上限时间
update rds.cdc_time set current_load = ${hivevar:cur_date};
-- 装载customer维度
-- 设置已删除记录和customer_street_addresses列上scd2的过期
update customer_dim
set expiry_date = ${hivevar:pre_date}
where customer_dim.customer_sk in
(select a.customer_sk
from (select customer_sk,customer_number,customer_street_address
from customer_dim where expiry_date = ${hivevar:max_date}) a left join
rds.customer b on a.customer_number = b.customer_number
where b.customer_number is null or a.customer_street_address <> b.customer_street_address);
-- 处理customer_street_addresses列上scd2的新增行
insert into customer_dim
select
row_number() over (order by t1.customer_number) + t2.sk_max,
t1.customer_number,
t1.customer_name,
t1.customer_street_address,
t1.customer_zip_code,
t1.customer_city,
t1.customer_state,
t1.version,
t1.effective_date,
t1.expiry_date
from
(
select
t2.customer_number customer_number,
t2.customer_name customer_name,
t2.customer_street_address customer_street_address,
t2.customer_zip_code,
t2.customer_city,
t2.customer_state,
t1.version + 1 version,
${hivevar:pre_date} effective_date,
${hivevar:max_date} expiry_date
from customer_dim t1
inner join rds.customer t2
on t1.customer_number = t2.customer_number
and t1.expiry_date = ${hivevar:pre_date}
left join customer_dim t3
on t1.customer_number = t3.customer_number
and t3.expiry_date = ${hivevar:max_date}
where t1.customer_street_address <> t2.customer_street_address and t3.customer_sk is null) t1
cross join
(select coalesce(max(customer_sk),0) sk_max from customer_dim) t2;
-- 处理customer_name列上的scd1
-- 因为scd1本身就不保存历史数据,所以这里更新维度表里的
-- 所有customer_name改变的记录,而不是仅仅更新当前版本的记录
drop table if exists tmp;
create table tmp as
select
a.customer_sk,
a.customer_number,
b.customer_name,
a.customer_street_address,
a.customer_zip_code,
a.customer_city,
a.customer_state,
a.version,
a.effective_date,
a.expiry_date
from customer_dim a, rds.customer b
where a.customer_number = b.customer_number and (a.customer_name <> b.customer_name);
delete from customer_dim where customer_dim.customer_sk in (select customer_sk from tmp);
insert into customer_dim select * from tmp;
-- 处理新增的customer记录
insert into customer_dim
select
row_number() over (order by t1.customer_number) + t2.sk_max,
t1.customer_number,
t1.customer_name,
t1.customer_street_address,
t1.customer_zip_code,
t1.customer_city,
t1.customer_state,
1,
${hivevar:pre_date},
${hivevar:max_date}
from
(
select t1.* from rds.customer t1 left join customer_dim t2 on t1.customer_number = t2.customer_number
where t2.customer_sk is null) t1
cross join
(select coalesce(max(customer_sk),0) sk_max from customer_dim) t2;
-- 装载product维度
-- 设置已删除记录和product_name、product_category列上scd2的过期
update product_dim
set expiry_date = ${hivevar:pre_date}
where product_dim.product_sk in
(select a.product_sk
from (select product_sk,product_code,product_name,product_category
from product_dim where expiry_date = ${hivevar:max_date}) a left join
rds.product b on a.product_code = b.product_code
where b.product_code is null or (a.product_name <> b.product_name or a.product_category <> b.product_category));
-- 处理product_name、product_category列上scd2的新增行
insert into product_dim
select
row_number() over (order by t1.product_code) + t2.sk_max,
t1.product_code,
t1.product_name,
t1.product_category,
t1.version,
t1.effective_date,
t1.expiry_date
from
(
select
t2.product_code product_code,
t2.product_name product_name,
t2.product_category product_category,
t1.version + 1 version,
${hivevar:pre_date} effective_date,
${hivevar:max_date} expiry_date
from product_dim t1
inner join rds.product t2
on t1.product_code = t2.product_code
and t1.expiry_date = ${hivevar:pre_date}
left join product_dim t3
on t1.product_code = t3.product_code
and t3.expiry_date = ${hivevar:max_date}
where (t1.product_name <> t2.product_name or t1.product_category <> t2.product_category) and t3.product_sk is null) t1
cross join
(select coalesce(max(product_sk),0) sk_max from product_dim) t2;
-- 处理新增的product记录
insert into product_dim
select
row_number() over (order by t1.product_code) + t2.sk_max,
t1.product_code,
t1.product_name,
t1.product_category,
1,
${hivevar:pre_date},
${hivevar:max_date}
from
(
select t1.* from rds.product t1 left join product_dim t2 on t1.product_code = t2.product_code
where t2.product_sk is null) t1
cross join
(select coalesce(max(product_sk),0) sk_max from product_dim) t2;
-- 装载order维度
insert into order_dim
select
row_number() over (order by t1.order_number) + t2.sk_max,
t1.order_number,
t1.version,
t1.effective_date,
t1.expiry_date
from
(
select
order_number order_number,
1 version,
order_date effective_date,
'2200-01-01' expiry_date
from rds.sales_order, rds.cdc_time
where entry_date >= last_load and entry_date < current_load ) t1
cross join
(select coalesce(max(order_sk),0) sk_max from order_dim) t2;
-- 装载销售订单事实表
insert into sales_order_fact
select
order_sk,
customer_sk,
product_sk,
date_sk,
order_amount
from
rds.sales_order a,
order_dim b,
customer_dim c,
product_dim d,
date_dim e,
rds.cdc_time f
where
a.order_number = b.order_number
and a.customer_number = c.customer_number
and a.order_date >= c.effective_date
and a.order_date < c.expiry_date
and a.product_code = d.product_code
and a.order_date >= d.effective_date
and a.order_date < d.expiry_date
and to_date(a.order_date) = e.dt
and a.entry_date >= f.last_load and a.entry_date < f.current_load ;
-- 更新时间戳表的last_load字段
update rds.cdc_time set last_load=current_load;
工作流参数化 工作流定义中可以使用形式参数。当工作流被Oozie执行时,所有形参都必须提供具体的值。参数定义使用JSP 2.0的语法,参数不仅可以是单个变量,还支持函数和复合表达式。参数可以用于指定动作节点和decision节点的配置值、XML属性值和XML元素值,但是不能在节点名称、XML属性名称、XML元素名称和节点的转向元素中使用参数。我们的工作流中使用了{jobTracker}和{nameNode}两个参数,分别指定YARN资源管理器的主机/端口和HDFS NameNode的主机/端口。
表达式语言函数 Oozie的工作流作业本身还提供了丰富的内建函数,Oozie将它们统称为表达式语言函数(Expression Language Functions,简称EL函数)。通过这些函数可以对动作节点和decision节点的谓词进行更复杂的参数化。我们的工作流中使用了wf:errorMessage和wf:lastErrorNode两个内建函数。wf:errorMessage函数返回特定节点的错误消息,如果没有错误则返回空字符串。错误消息常被用于排错和通知的目的。wf:lastErrorNode函数返回最后出错的节点名称,如果没有错误则返回空字符串。
这里所说的部署就是把相关文件上传到HDFS的对应目录中。需要上传工作流定义文件,还要上传file、archive、script元素中指定的文件。可以使用hdfs dfs -put命令将本地文件上传到HDFS,-f参数的作用是,如果目标位置已经存在同名的文件,则用上传的文件覆盖已存在的文件。
hdfs dfs -put -f workflow.xml /user/root/
hdfs dfs -put -f /etc/hive/conf.cloudera.hive/hive-site.xml /tmp/
hdfs dfs -put -f /root/mysql-connector-java-5.1.38-bin.jar /tmp/
hdfs dfs -put -f /root/regular_etl.sql /tmp/
到现在为止我们已经定义了工作流,也将运行工作流所需的所有文件上传到了HDFS的指定位置。但是,仍然无法运行工作流,因为还缺少关键的一步:必须定义作业的某些属性,并将这些属性值提交给Oozie。在本地目录中,需要创建一个作业属性文件,这里命名为名为job.properties,其中的内容如下:
nameNode=hdfs://nameservice1
jobTracker=manager:8032
queueName=default
oozie.use.system.libpath=true
oozie.wf.application.path=${nameNode}/user/${user.name}
注意,此文件不需要上传到HDFS。这里稍微解释一下每一行的含义。nameNode和jobTracker是工作流定义里面的两个形参,分别指示NameNode服务地址和YARN资源管理器的主机名/端口号。工作流定义里使用的形参,必须在作业属性文件中赋值。queueName是MapReduce作业的队列名称,用于给一个特定队列命名。缺省时,所有的MR作业都进入“default”队列。queueName主要用于给不同目的作业队列赋予不同的属性集来保证优先级。为了让工作流能够使用Oozie的共享库,要在作业属性文件中设置oozie.use.system.libpath=true。oozie.wf.application.path属性设置应用工作流定义文件的路径,在它的赋值中,{nameNode}是引用第一行的变量,{user.name}系统变量引用的是Java环境的user.name属性,通过该属性可以获得当前登录的操作系统用户名。
经过一连串的配置,现在已经万事俱备,可以运行定期装载工作流了。下面的命令用于运行工作流作业。oozie是Oozie的客户端命令,job表示指定作业属性,-oozie参数指示Oozie服务器实例的URL,-config参数指示作业属性配置文件,-run告诉Oozie运行作业。
oozie job -oozie http://node3:11000/oozie -config /root/job.properties -run
此时从Oozie Web控制台可以看到正在运行的作业,如图7-3所示。
图7-3 运行的作业
点击“Active Jobs”标签,会看到表格中只有一行,就是我们刚运行的工作流作业。Job Id是系统生成的作业号,它唯一标识一个作业。Name是我们在workflow.xml文件中定义的工作流名称,Status为RUNNING,表示正在运行。页面中还会显示执行作业的用户名、作业创建时间、开始时间、最后修改时间、结束时间等作业属性。
点击作业所在行,可以打开作业的详细信息窗口,如图7-4所示。
图7-4 作业详细信息
这个页面有上下两部分组成。上面是以纵向方式显示作业属性,内容和图7-3所示的一行相同。下面是动作的信息。在这个表格中会列出我们定义的工作流节点。从图中可以看到节点的名称和类型,分别对应workflow.xml文件中节点定义的属性和元素,Transition表示转向的节点,对应工作流定义文件中“to”属性的值。从Status列可以看到节点执行的状态,图中表示正在运行sqoop-customer动作节点,前面的start、fork-node、sqoop-sales_order、sqoop-product都以已执行成功,后面的joining、hive-node、end节点还没有执行到,所以图中没有显示。这个表格中只会显示已经执行或正在执行的节点。表格中还有StartTime和EndTime两列,分别表示节点的开始和结束时间,fork节点中的三个Sqoop动作是并行执行的,因此起止时间上有所交叉。
点击动作所在行,可以打开动作的详细信息窗口,如图7-5所示。
图7-5 动作详细信息
这个窗口中显示一个节点的13个相关属性。从上图中可以看到正在运行的hive-node节点的属性。从YARN服务的HistoryServer Web UI界面中,可以看到真正执行动作的MapReduce作业的跟踪页面,如图7-6所示。Oozie中定义的动作,实际上是作为MapReduce之上的应用来执行的。从这个页面可以看到相关MapReduce作业的属性,包括作业ID、总的Map/Reduce数、已完成的Map/Reduce数、Map和Reduce的处理进度等信息。
图7-6 执行动作的MapReduce作业
当Oozie作业执行完,可以在图7-3所示页面的“All Jobs”标签页看到,Status列已经从RUNNING变为SUCCEEDED,如下图7-7所示。
图7-7 完成的工作流
可以看到,整个工作流执行了将近31分钟。细心的读者可能发现了,显示的结束时间点是07:26:28。这个时间比较奇怪,它和我们手工执行工作流的时间相差了八个小时。造成这个问题的原因稍后再做解释。
工作流作业通常都是以一定的时间间隔定期执行的,例如定期装载ETL作业需要在每天2点执行一次。Oozie的协调器作业能够在满足谓词条件时触发工作流作业的执行。现在的谓词条件可以定义为数据可用、时间或外部事件,将来还可能扩展为支持其它类型的事件。协调器作业还有一种使用场景,就是需要关联多个周期性运行工作流作业。它们运行的时间间隔不同,前面所有工作流的输出一起成为下一个工作流的输入。例如,有五个工作流,前四个顺序执行,每隔15分钟运行一个,第五个工作流每隔60分钟运行一次,前面四个工作流的输出共同构成第五个工作流的输入。这种工作流链有时被称为数据应用管道。Oozie协调器系统允许用户定义周期性执行的工作流作业,还可以定义工作流之间的依赖关系。和工作流作业类似,定义协调器作业也要创建配置文件和属性文件。
建立内容如下的coordinator.xml文件:
<coordinator-app name="regular_etl-coord" frequency="${coord:days(1)}" start="${start}" end="${end}" timezone="${timezone}" xmlns="uri:oozie:coordinator:0.1">
<action>
<workflow>
<app-path>${workflowAppUri}</app-path>
<configuration>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>queueName</name>
<value>${queueName}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
在上面的XML文件中,我们定义了一个名为regular_etl-coord的协调器作业。coordinator-app元素的frequency属性指定工作流运行的频率。我们用Oozie提供的${coord:days(int n)} EL函数给它赋值,该函数返回‘n’天的分钟数,示例中的n为1,也就是每隔1440分钟运行一次工作流。start属性指定起始时间,end属性指定终止时间,timezone属性指定时区。这三个属性都赋予形参,在属性文件中定义参数值。xmlns属性值是常量字符串“uri:oozie:coordinator:0.1”。${workflowAppUri}形参指定应用的路径,就是工作流定义文件所在的路径。${jobTracker}、${nameNode}和${queueName}形参与前面workflow.xml工作流文件中的含义相同。
建立内容如下的job-coord.properties文件:
nameNode=hdfs://nameservice1
jobTracker=manager:8032
queueName=default
oozie.use.system.libpath=true
oozie.coord.application.path=${nameNode}/user/${user.name}
timezone=UTC
start=2020-10-16T07:40Z
end=2020-12-31T07:15Z
workflowAppUri=${nameNode}/user/${user.name}
这个文件定义协调器作业的属性,并给协调器作业定义文件中的形参赋值。该文件的内容与工作流作业属性文件的内容类似。oozie.coord.application.path参数指定协调器作业定义文件所在的HDFS路径。需要注意的是,start、end变量的赋值与时区有关。Oozie默认的时区是UTC,而且即便在属性文件中设置了timezone=GMT+0800也不起作用。我们给出的起始时间点是2020-10-16T07:40Z,实际要加上8个小时,才是我们所在时区真正的运行时间,即15:40(为了便于及时验证运行效果,设置这个时间点)。因此在定义时间点时一定要注意时间的计算问题,这也就是在前面的工作流演示中,控制台页面里看到的时间是7点的原因,真实时间是上午15点。
执行下面的命令将coordinator.xml文件上传到oozie.coord.application.path参数指定的HDFS目录中。
hdfs dfs -put -f coordinator.xml /user/root/
执行下面的命令运行协调器作业:
oozie job -oozie http://node3:11000/oozie -config /root/job-coord.properties -run
此时从Oozie Web 控制台可以看到准备运行的协调器作业,作业的状态为RUNNING,如图7-8所示。
图7-8 提交协调器作业
点击作业所在行,可以打开协调器作业的详细信息窗口,如图7-9所示。Status为WAITING,表示正在等待执行工作流。当时间到达15:40,该状态值变为RUNNING,表示已经开始执行。
图7-9 协调器作业详细信息
点击动作所在行,可以打开调用的工作流作业的详细信息窗口,如图7-10所示。这个页面和图7-4所示的是同一个页面,但这时在“Parent Coord”字段显示了协调器作业的Job Id。
图7-10 工作流作业详细信息
Kettle提供的“Oozie job executor”作业项用于执行Oozie作业。如图7-11所示的作业中,CDH631是已经建好的Hadoop集群连接。“Enable Blocking”选项将阻止转换的其余部分执行,直到选中Oozie作业完成为止。“Polling Interval(ms)”设置间检查Oozie工作流的时间间隔。“Workflow Properties”设置工作流属性文件。此路径是必需的,并且必须是有效的作业属性文件。
图7-11 “Oozie job executor”作业项
执行该Kettle作业,日志中出现以下错误:
Oozie job executor - ERROR (version 8.3.0.0-371, build 8.3.0.0-371 from 2019-06-11 11.09.08 by buildguy) : 2020-10-16 11:03:00,386 INFO org.apache.oozie.command.coord.CoordSubmitXCommand: SERVER[node3] USER[root] GROUP[-] TOKEN[] APP[regular_etl-coord] JOB[0000006-201015145511008-oozie-oozi-C] ACTION[-] ENDED Coordinator Submit jobId=0000006-201015145511008-oozie-oozi-C
对于协调器作业,可以忽略此报错,因为此时已经将协调器作业提交至Oozie,剩下的工作交由Oozie完成。如果执行的是一个工作流作业,如这里的“Workflow Properties”设置为“file:///root/kettle_hadoop/7/job.properties”,则会正常执行Oozie工作流作业。关于“Oozie Job Executor”作业项的说明,参见https://wiki.pentaho.com/pages/viewpage.action?pageId=25045116。
Oozie本身并不真正运行工作流中的动作,它在执行工作流中的动作节点时,会先启动一个发射器(Launcher)。发射器类似于一个YARN作业,由一个AppMaster和一个Mapper组成,只负责运行一些基本的命令,如为执行Hive CLI胖客户端的“hive”、Hive Beeline瘦客户端的“hive2”、Pig CLI、Sqoop、Spark Driver、Bash shell等等。然后,由这些命令产生一系列真正执行工作流动作的YARN作业。值得注意的是,YARN并不知道发射器和它所产生的作业之间的依赖关系,这在“hive2”动作中表现得尤为明显。“hive2”动作的发射器连接到HiveServer2,然后HiveServer2产生动作相关的作业。
知道了Oozie的运行机制,就可以有针对性地优化Oozie工作流的执行了。下面以Hive动作为例进行说明。
(1)减少给发射器作业分配的资源 发射器作业只需要一个很小的调度(记住只有一个Mapper),因此它的AppMaster所需资源参数值应该设置得很低,以避免因消耗过多内存阻碍了后面工作流队列的执行。可以通过配置以下动作属性值修改发射器使用的资源。
(2)减少给“hive2”发射器作业分配的资源 类似地,配置以下动作属性值:
(3)利用YARN队列名 如果能够获得更高级别的YARN队列名称,可以为发射器配置oozie.launcher.mapreduce.job.queuename属性。对于实际的Hive查询,可以如下配置:
(4)设置Hive查询的AppMaster资源 如果缺省的AppMaster资源对于实际的Hive查询来说太大了,可以修改它们的大小:
注意,对于上面的(1)、(2)、(4)条,不能配置低于yarn.scheduler.minimum-allocation-mb的值。
(5)合并HiveQL脚本 可以将某些步骤合并到同一个HiveQL脚本中,这会降低Oozie轮询YARN的开销。Oozie会向YARN询问一个查询是否结束,如果是就启动另一个发射器,然后该发射器启动另一个Hive会话。当然,对于出现查询出错的情况,这种合并做法的控制粒度较粗,可能在重新启动动作前需要做一些手工清理的工作。
(6)并行执行多个步骤 在拥有足够YARN资源的前提下,尽量将可以并行执行的步骤放置到Oozie Fork/Join的不同分支中。
(7)使用Tez计算框架 在很多场景下,Tez计算框架比MapReduce效率更高。例如,Tez会为Map和Reduce步骤重用同一个YARN容器,这对于连续的查询将降低YARN的开销,同时减少中间处理的磁盘I/O。
Kettle的start作业项具有定时调度作业执行功能。如图7-12所示的属性定义作业每天2点执行一次。
图7-12 start作业项的定时调度
下面验证一下start的调度功能。执行下面的语句在MySQL源表中新增两条2020年10月15日销售订单数据。
use source;
drop table if exists temp_sales_order_data;
create table temp_sales_order_data as select * from sales_order where 1=0;
set @start_date := unix_timestamp('2020-10-15');
set @end_date := unix_timestamp('2020-10-16');
set @customer_number := floor(1 + rand() * 8);
set @product_code := floor(1 + rand() * 4);
set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (1,@customer_number,@product_code,@order_date,@order_date,@amount);
set @customer_number := floor(1 + rand() * 8);
set @product_code := floor(1 + rand() * 4);
set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (1,@customer_number,@product_code,@order_date,@order_date,@amount);
insert into sales_order
select @a:=@a+1, customer_number, product_code, order_date, entry_date, order_amount
from temp_sales_order_data t1,(select @a:=102) t2 order by order_date;
commit ;
然后执行定期装载Kettle作业,到了start作业项中定义的时间,作业就会自动执行,事实表中将会增加两条记录。这种方式的调度设置简单明了,缺点是在作业执行后可以关闭job标签页,但不能关闭Spoon窗口,否则无法执行。