本篇重点是针对销售订单示例创建并测试数据装载的Kettle作业和转换。在此之前,先简要介绍数据清洗的概念,并说明如何使用Kettle完成常见的数据清洗工作。由于本示例中Kettle在Hadoop上的ETL实现依赖于Hive,所以之后对Hive做一个概括的介绍,包括它的体系结构、工作流程和优化。最后用完整的的Kettle作业演示如何实现销售订单数据仓库的数据转换与装载。
对大多数用户来说,ETL的核心价值在“T”所代表的转换部分。这个阶段要做很多工作,数据清洗就是其中一项重点任务。数据清洗是对数据进行重新审查和校验的过程,目的在于删除重复信息、纠正存在的错误,并提供数据一致性。
数据仓库中的数据是面向某一主题数据的集合,这些数据从多个业务系统中抽取而来,并且包含历史变化,因此就不可避免地出现某些数据是错误的,或者数据相互之间存在冲突的情况。这些错误的或有冲突的数据显然不是我们想要的,被称为“脏数据”。要按照一定的规则处理脏数据,这个过程就是数据清洗。数据清洗的任务是过滤那些不符合要求的数据,将过滤的结果交给业务部门,确认是直接删除掉,还是修正之后再进行抽取。不符合要求的数据主要包括不完整的数据、错误的数据、重复的数据、不一致的数据四大类。
来自操作型数据源的数据如果含有不洁成分或不规范的格式,将对数据仓库的建立和维护,特别是对联机分析处理的使用,造成很多问题和麻烦。这时必须在ETL过程中加以处理,不同类型的数据,处理的方式也不尽相同。对于残缺数据,ETL将这类数据过滤出来,按缺失的内容向业务数据的所有者提交,要求在规定的时间内补全,之后才写入数据仓库。对于错误数据,一般的处理方式是通过数据库查询的方式找出来,并将脏数据反馈给业务系统用户,由业务用户确定是抛弃这些数据,还是修改后再次进行抽取,修改的工作可以是业务系统相关人员配合ETL开发者来完成。对于重复数据的处理,ETL系统本身应该具有自动查重去重的功能。而差异数据,则需要协调ETL开发者与来自多个不同业务系统的人员共同确认参照标准,然后在ETL系统中建立一系列必要的方法和手段实现数据一致性和标准化。
保障数据清洗处理顺利进行的原则是优先对数据清洗处理流程进行分析和系统化的设计,针对数据的主要问题和特征,设计一系列数据对照表和数据清洗程序库的有效组合,以便面对不断变化的、形形色色的数据清洗问题。数据清洗流程通常包括如下内容:
身份证号码格式校验是很多系统在数据集成时的一个常见需求,这里以18位身份证为例,使用一个Kettle转换实现身份证号码的合法性验证。该转换执行的结果是将所有合规与不合规的身份证号码写入相应的输出文件。按以下身份证号码的定义规则建立转换。
身份证18位分别代表的含义,从左到右方分别表示: 1-2 省级行政区代码。 3-4 地级行政区划分代码。 5-6 县区行政区分代码。 7-10 11-12 13-14 出生年、月、日。 15-17 顺序码,同一地区同年、同月、同日出生人的编号,奇数是男性,偶数是女性。 18 校验码,如果是0-9则用0-9表示,如果是10则用X(罗马数字10)表示。
身份证校验码的计算方法:
总的Kettle转换如图6-1所示。
图6-1 校验身份证号码的Kettle转换
这是本专题到目前为止步骤最多的一个转换。虽然有些复杂,但条理还比较清楚。下面具体说明每个步骤的定义和作用。
“自定义常量数据”定义8条身份证号码模拟数据作为输入,其中包括各种不合规的情况及一条完全合规的号码:
cardid
110102197203270816
1101021972032708161
11010219720327081
a00102197203270816
000102197203270816
110102197302290816
110102197202290816
110102197202300816
第一层的四个步骤校验号码位数,18位的数据流向第二层,其它数据输出到一个错误文件。“字符串操作”步骤作用是去除字符串两边空格(Trim type选择both),并将字符串转为大写(Lower/Upper选择upper)。“计算器”步骤返回一个表示字符串长度的新字段length(“计算”选择“Return the length of a string A”,“字段A”选择“cardid”,“值类型”选择“String”)。“过滤记录”步骤中的“条件”为“length = 18”,为真时流向下面的步骤,为假时输出错误文件。
第二层的四个步骤校验号码的前17位均为数字,合规的数据流向第三层,其它数据输出到一个错误文件。“剪切字符串”步骤将18位字符串分隔成21个字段,字符串下标从0开始,如图6-2所示。province取前两位,用于后面验证省份代码。year取第7到第10位,用于后面计算是否闰年。p17取前17位,用于判断是否纯数字。s1-s18表示每一位,用于后面计算检验位。
图6-2 剪切字符串
“JavaScript代码”步骤中只有一行脚本,用isNaN函数判断前17位的字符串是否纯数字,并返回Boolean类型的字段“notnumber”:
notnumber=isNaN(p17)
“过滤记录 2”步骤中的“条件”为“notnumber = N”,为真时流向下面的步骤,为假时输出错误文件。
第三层的两个步骤校验两位省份代码,合规的数据流向第四层,其它数据输出到一个错误文件。“过滤记录 3”步骤中的“条件”为:
province IN LIST 11;12;13;14;15;21;22;23;31;32;33;34;35;36;37;41;42;43;44;45;46;50;51;52;53;54;61;62;63;64;65;71;81;82;91
“计算器 2”步骤定义如图6-3所示,先定义三个常数400、100、4,然后计算year除以这三个常数的余数,用于后面判断是否闰年。
图6-3 计算闰年
“JavaScript代码 2”步骤中的脚本如下,按闰年定义进行判断,返回Boolean类型的字段isleapyear表示year是否为闰年。
var isleapyear;
if( y1==0 || y2>0 && y3==0 )
{
isleapyear = true;
}
else
{
isleapyear = false;
}
“Switch / case”定义如图6-4所示,根据isleapyear的值,闰年与平年走不同数据校验分支。
图6-4 闰年与平年走不同分支
“数据校验”步骤验证闰年的日期规则,“要检验的字段名”选择cardid,在“合法数据的正则表达式”中填写:
^[1-9][0-9]{5}19[0-9]{2}((01|03|05|07|08|10|12)(0[1-9]|[1-2][0-9]|3[0-1])|(04|06|09|11)(0[1-9]|[1-2][0-9]|30)|02(0[1-9]|[1-2][0-9]))[0-9]{3}[0-9X]$
“数据校验2”步骤验证平年的日期规则,“要检验的字段名”选择cardid,在“合法数据的正则表达式”中填写:
^[1-9][0-9]{5}19[0-9]{2}((01|03|05|07|08|10|12)(0[1-9]|[1-2][0-9]|3[0-1])|(04|06|09|11)(0[1-9]|[1-2][0-9]|30)|02(0[1-9]|1[0-9]|2[0-8]))[0-9]{3}[0-9X]$
没有通过两个校验步骤的数据分别输出到一个错误文件,通过的数据流向“公式”步骤。该步骤输出一个新字段s,公式如下,即将前17位数字和系数相乘的结果相加。
([s1]*7+[s2]*9+[s3]*10+[s4]*5+[s5]*8+[s6]*4+[s7]*2+[s8]*1+[s9]*6+[s10]*3+[s11]*7+[s12]*9+[s13]*10+[s14]*5+[s15]*8+[s16]*4+[s17]*2)
“计算器 3”步骤定义如图6-5所示,计算s除以11的余数。
图6-5 计算余数
“JavaScript代码 3”步骤中的脚本如下,计算校验位,返回Boolean类型的字段valid表示校验位是否正确。
var v_str='10X98765432'.substring(v2,v2+1);
if(s18==v_str)
{valid=true}
else
{valid=false}
“过滤记录 4”步骤中的“条件”为“valid = Y”,为真时将通过验证的数据输出到文件,为假时输出错误文件。
执行转换后,各错误文件和正确输出文件内容如下:
[root@localhost 6]# cat err1.txt
cardid;length
1101021972032708161;19
11010219720327081;17
[root@localhost 6]# cat err2.txt
cardid;p17
A00102197203270816;A0010219720327081
[root@localhost 6]# cat err3.txt
cardid;province
000102197203270816;00
[root@localhost 6]# cat err4.txt
cardid
110102197202300816
[root@localhost 6]# cat err5.txt
cardid
110102197302290816
[root@localhost 6]# cat err6.txt
cardid
110102197202290816
[root@localhost 6]# cat valid.txt
cardid
110102197203270816
[root@localhost 6]#
有两种意义上的重复记录,一是完全重复的记录,即所有字段均都重复,二是部分字段重复的记录。发生第一种重复的原因主要是表设计不周,通过给表增加主键或唯一索引列即可避免。对于第二类重复问题,通常要求查询出重复记录中的任一条记录。Kettle转换中有“去除重复记录”和“唯一行(哈希值)”两个步骤用于实现去重操作。“去除重复记录”步骤前,应该按照去除重列进行排序,否则可能返回错误的结果。“唯一行(哈希值)”步骤则不需要事先对数据进行排序。图6-6所示为一个Kettle去重的例子。
图6-6 Kettle去除重复数据
“自定义常量数据”步骤定义5条记录:
id name
1 a
2 b
1 b
3 a
3 b
“去除重复记录”步骤中“用于比较的字段”选择id,即按id字段去重。因为没有排序,该步骤输出为4条记录,id=1仍然有两条记录:
id name
1 a
2 b
1 b
3 a
“去除重复记录 2”步骤的定义与“去除重复记录”步骤相同,但前置了一个“排序记录”步骤,在其中定义按id和name字段排序,因此去重输出为:
id name
1 a
2 b
3 a
“唯一行(哈希值)”步骤的输出同上,该步骤不需先排序即可按预期去重。
这是一个真实数据仓库项目中的案例。某公司要建立一个员工数据仓库,需要从多个业务系统集成员工相关的信息。由于历史的原因,该公司现存的四个业务系统中都包含员工数据,这四个业务系统是HR、OA、考勤和绩效考核系统。这些系统是彼此独立的,有些是采购的商业软件,有些是公司自己开发的。每个系统中都有员工和组织机构表,存储员工编号、姓名、所在部门等属性。各个系统的员工数据并不一致。例如,员工入职或离职时,HR系统会更新员工数据,但OA系统的更新可能会滞后很长时间。项目的目标是建立一个全公司唯一的、一致的人员信息库。 我们的思路是利用一系列经过仔细定义的参照表或转换表取代那些所谓硬编码的转换程序。其优点是很明显的:转换功能动态化,并能适应多变的环境。对于建立在许多不同数据源之上的数据仓库来说,这是一项非常重要的基础工作。具体方案如下:
下面的问题是确定标准值的来源。从业务的角度看,HR系统的数据相对来说是最准确的,因为员工或组织机构的变化,最先反应到该系统的数据更新中。以HR系统中的员工表数据为标准是比较合适的选择。有了标准值后,还要建立一个映像表,把其它系统的员工数据和标准值对应起来。比方说有一个员工的编号在HR系统中为101,在其它三个系统中的编号分别是102、103、104,我们建立的映像表应该与表6-1类似。
DW条目名称 | DW标准值 | 业务系统 | 数据来源 | 源值 |
---|---|---|---|---|
员工编号 | 101 | HR | HR库.表名.列名 | 101 |
员工编号 | 101 | OA | OA库.表名.列名 | 102 |
员工编号 | 101 | 考勤 | 考勤库.表名.列名 | 103 |
员工编号 | 101 | 绩效 | 绩效库.表名.列名 | 104 |
表6-1 标准值映像表
这张表建立在数据仓库模式中,人员数据从各个系统抽取来以后,与标准值映像表关联,从而形成统一的标准数据。映像表被其它源数据引用,是数据一致性的关键,其维护应该与HR系统同步。因此在ETL过程中应该首先处理HR表和映像表。
数据清洗在实际ETL开发中是不可缺少的重要一步。即使为了降低复杂度,在我们的销售订单示例中没有涉及数据清洗,读者还是应该了解相关内容,这会对实际工作有所启发。
让我们回到实践中来。在“Kettle构建Hadoop ETL实践(四):建立ETL示例模型”中,我们建立了Hive库表以存储销售订单示例的过渡区和数据仓库数据,并介绍了Hive支持的文件格式、表类型以及如何支持事务处理。Kettle处理Hadoop ETL依赖于Hive,因此有必要系统了解一下Hive的基本概念及其体系结构。
Hive是Hadoop生态圈的数据仓库软件,使用类似于SQL的语言读、写、管理分布式存储上的大数据集。它建立在Hadoop之上,具有以下功能和特点:
Hive提供标准的SQL功能,包括2003以后的标准和2011标准中的分析特性。Hive中的SQL还可以通过用户定义的函数(UDFs)、用户定义的聚合函数(UDAFs)、用户定义的表函数(UDTFs)进行扩展。Hive内建连接器支持CSV文本文件、Parquet、ORC等多种数据格式,用户也可以扩展支持其它格式的连接器。Hive被设计成一个可扩展的、高性能的、容错的、与输入数据格式松耦合的系统,适合于数据仓库中的汇总、分析、批处理查询等任务,而不适合联机事务处理的工作场景。Hive包括HCatalog和WebHCat两个组件。HCatalog是Hadoop的表和存储管理层,允许使用Pig和MapReduce等数据处理工具的用户更容易读写集群中的数据。WebHCat提供了一个服务,可以使用HTTP接口执行MapReduce(或YARN)、Pig、Hive作业或元数据操作。
Hive的体系结构如图6-7所示。
图6-7 Hive体系结构
Hive建立在Hadoop的分布式文件系统(HDFS)和MapReduce之上。上图中显示了Hadoop 1和Hadoop 2中的两种MapReduce组件。在Hadoop 1中,Hive查询被转化成MapReduce代码,并且使用第一版的MapReduce框架执行,如JobTracker和TaskTracker。在Hadoop 2中,YARN将资源管理和调度从MapReduce框架中解耦。Hive查询仍然被转化为MapReduce代码并执行,但使用的是YARN框架和第二版的MapReduce。
为了更好地理解Hive如何与Hadoop的基本组件一起协同工作,可以把Hadoop看做一个操作系统,HDFS和MapReduce是这个操作系统的组成部分,而象Hive、HBase这些组件,则是操作系统的上层应用或功能。Hadoop生态圈的通用底层架构是,HDFS提供分布式存储,MapReduce为上层功能提供并行处理能力。
在HDFS和MapReduce之上,图中显示了Hive驱动程序和元数据存储。Hive驱动程序及其编译器负责编译、优化和执行HiveQL。依赖于具体情况,Hive驱动程序可能选择在本地执行Hive语句或命令,也可能是产生一个MapReduce作业。Hive驱动程序把元数据存储在数据库中。
缺省配置下,Hive在内建的Derby关系数据库系统中存储元数据,这种方式被称为嵌入模式。在这种模式下,Hive驱动程序、元数据存储和Derby全部运行在同一个Java虚拟机中(JVM)。这种配置适合于学习目的,它只支持单一Hive会话,所以不能用于多用户的生产环境。Hive还允许将元数据存储于本地或远程的外部数据库中,这种设置可以更好地支持Hive的多会话生产环境。并且,可以配置任何与JDBC API兼容的关系数据库系统存储元数据,如MySQL、Oracle等。
对应用支持的关键组件是Hive Thrift服务,它允许一个富客户端集访问Hive,开源的SQuirreL SQL客户端被作为示例包含其中。任何与JDBC兼容的应用,都可以通过绑定的JDBC驱动访问Hive。与ODBC兼容的客户端,如Linux下典型的unixODBC和isql应用程序,可以从远程Linux客户端访问Hive。如果在客户端安装了相应的ODBC驱动,甚至可以从微软的Excel访问Hive。通过Thrift还可以用Java以外的程序语言,如PHP或Python访问Hive。就像JDBC、ODBC一样,Thrift客户端通过Thrift服务器访问Hive。
架构图的最上面包括一个命令行接口(CLI),可以在Linux终端窗口向Hive驱动程序直接发出查询或管理命令。还有一个简单的Web界面,通过它可以从浏览器访问Hive管理表及其数据。
从接收到发自命令行或是应用程序的查询命令,到把结果返回给用户,期间Hive的工作流程(第一版的MapReduce)如图6-8所示。
图6-8 Hive工作流程
表6-2说明Hive如何与Hadoop的基本组件进行交互。从中不难看出,Hive的执行过程与关系数据库的非常相似,只不过是使用分布式计算框架来实现。
步骤 | 操作 |
---|---|
1 | 执行查询 从Hive的CLI或Web UI发查询命令给驱动程序(任何JDBC、ODBC数据库驱动)执行。 |
2 | 获得计划 驱动程序请求查询编译器解析查询、检查语法、生成查询计划或者查询所需要的资源。 |
3 | 获取元数据 编译器向元数据存储数据库发送元数据请求。 |
4 | 发送元数据 作为响应,元数据存储发向编译器发送元数据。 |
5 | 发送计划 编译器检查需要的资源,并把查询计划发送给驱动程序。至此,查询解析完成。 |
6 | 执行计划 驱动程序向执行引擎发送执行计划。 |
7 | 执行作业 执行计划的处理是一个MapReduce作业。执行引擎向Name node上的JobTracker进程发送作业,JobTracker把作业分配给Data node上的TaskTracker进程。此时,查询执行MapReduce作业。 |
7.1 | 操作元数据 执行作业的同时,执行引擎可能会执行元数据操作,如DDL语句等。 |
8 | 取回结果 执行引擎从Data node接收结果。 |
9 | 发送结果 执行引擎向驱动程序发送合成的结果值。 |
10 | 发送结果 驱动程序向Hive接口(CLI或Web UI)发送结果。 |
表6-2 Hive执行流程
我们在https://wxy0327.blog.csdn.net/article/details/108341342#2.%20%E8%BF%9E%E6%8E%A5Hive中已经提到过Hive有HiveServer和HiveServer2两版服务器,并指出了两个版本的主要区别,这里再对HiveServer2做一些深入的补充说明。
HiveServer2(后面简称HS2)是从Hive 0.11版本开始引入的,它提供了一个服务器接口,允许客户端在Hive中执行查询并取回查询结果。当前的实现是一个HiveServer的改进版本,它基于Thrift RPC,支持多客户端身份认证和并发操作,其设计对JDBC、ODBC这样的开放API客户端提供了更好的支持。
HS2使用单一进程提供两种服务,分别是基于Thrift的Hive服务和一个Jetty Web服务器。基于Thrift的Hive服务是HS2的核心,它对Hive查询,例如从Beeline里发出的查询语句做出响应。Hive通过Thrift提供Hive元数据存储的服务。通常来说,用户不能够调用元数据存储方法来直接对元数据进行修改,而应该通过HiveQL语言让Hive来执行这样的操作。用户应该只能通过只读方式来获取表的元数据信息。
不同版本的HS2,配置属性可能会有所不同。最基本的配置是在hive-site.xml文件中设置如下属性:
除了在hive-site.xml配置文件中设置属性,还可以使用环境变量设置相关信息。环境变量的优先级别要高于配置文件,相同的属性如果在环境变量和配置文件中都有设置,则会使用环境变量的设置,就是说环境变量或覆盖掉配置文件里的设置。可以配置如下环境变量:
HS2支持通过HTTP协议传输Thrift RPC消息(Hive 0.13以后的版本),这种方式特别用于支持客户端和服务器之间存在代理层的情况。当前HS2可以运行在TCP模式或HTTP模式下,但是不能同时使用两种模式。使用下面的属性设置启用HTTP模式:
可以配置hive.server2.global.init.file.location属性指定一个全局初始化文件的位置(Hive 0.14以后版本),它或者是初始化文件本身的路径,或者是一个名为“.hiverc”的文件所在的目录。在这个初始化文件中可以包含的一系列命令,这些命令会在HS2实例中运行,例如注册标准的JAR包或函数等。
如下参数配置HS2的操作日志:
缺省情况下,HS2以连接服务器的用户的身份处理查询,但是如果将下面的属性设置为false,那么查询将以运行HS2进程的用户身份执行。当遇到无法创建临时表一类的错误时,可以尝试设置此属性。
为了避免不安全的内存溢出,可以通过将以下参数设置为true,禁用文件系统缓存。
HS2允许配置临时目录,这些目录被Hive用于存储中间临时输出。临时目录相关的配置属性如下。
HS2的Web界面提供配置、日志、度量和活跃会话等信息,其使用的缺省端口是10002。可以设置hive-site.xml文件中的hive.server2.webui.host、hive.server2.webui.port、hive.server2.webui.max.threads等属性配置Web接口。Web界面如图6-9所示。
图6-9 HiveServer2的Web界面
可以使用两种方法查看Hive版本。
Hive的执行依赖于底层的MapReduce作业,因此对Hadoop作业的优化或者对MapReduce作业的调整是提高Hive性能的基础。大多数情况下,用户不需要了解Hive内部是如何工作的。但是当对Hive具有越来越多的经验后,学习一些Hive的底层实现细节和优化知识,会让用户更加高效地使用Hive。如果没有适当的调整,那么即使查询Hive中的一个小表,有时也会耗时数分钟才得到结果。也正是因为这个原因,Hive对于OLAP类型的应用有很大的局限性,它不适合需要立即返回查询结果的场景。然而,通过实施下面一系列的调优方法,Hive查询的性能会有大幅提高。
(1)启用压缩
压缩可以使磁盘上存储的数据量变小,例如,文本文件格式能够压缩40%甚至更高比例,这样可以通过降低I/O来提高查询速度。除非产生的数据用于外部系统,或者存在格式兼容性问题,建议总是启用压缩。压缩与解压缩会消耗CPU资源,但Hive产生的MadReduce作业往往是I/O密集型的,因此CPU开销通常不是问题。 为了启用压缩,需要查出所使用的Hive版本支持的压缩编码方式,下面的set命令列出可用的编解码器(CDH 5.7.0中的Hive)。
hive> set io.compression.codecs;
io.compression.codecs=org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DeflateCodec,org.apache.hadoop.io.compress.SnappyCodec,org.apache.hadoop.io.compress.Lz4Codec
hive>
一个复杂的Hive查询在提交后,通常被转换为一系列中间阶段的MapReduce作业,Hive引擎将这些作业串联起来完成整个查询。可以将这些中间数据进行压缩。这里所说的中间数据指的是上一个MapReduce作业的输出,这些输出将被下一个MapReduce作业作为输入数据使用。我们可以在hive-site.xml文件中设置hive.exec.compress.intermediate属性以启用中间数据压缩。
<property>
<name>hive.exec.compress.intermediate</name>
<value>true</value>
<description/>
</property>
<property>
<name>hive.intermediate.compression.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
<description/>
</property>
<property>
<name>hive.intermediate.compression.type</name>
<value>BLOCK</value>
<description/>
</property>
也可以在Hive客户端中使用set命令设置这些属性。
hive> set hive.exec.compress.intermediate=true;
hive> set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
hive> set hive.intermediate.compression.type=BLOCK;
当Hive将输出写入到表中时,输出内容同样可以进行压缩。我们可以设置hive.exec.compress.output属性启用最终输出压缩。
<property>
<name>hive.exec.compress.output</name>
<value>true</value>
<description/>
</property>
或者
hive> set hive.exec.compress.output=true;
hive> set mapreduce.output.fileoutputformat.compress=true;
hive> set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
hive> set mapreduce.output.fileoutputformat.compress.type=BLOCK;
(2)优化连接 可以通过配置Map连接和倾斜连接的相关属性提升连接查询的性能。
当连接一个大表和一个小表时,自动Map连接是一个非常有用的特性。如果启用了该特性,小表将保存在每个节点的本地缓存中,并在Map阶段与大表进行连接。开启自动Map连接提供了两个好处。首先,将小表装进缓存将节省每个数据节点上读取时间。其次,它避免了Hive查询中的倾斜连接,因为每个数据块的连接操作已经在Map阶段完成了。设置下面的属性启用自动Map连接属性。
<property>
<name>hive.auto.convert.join</name>
<value>true</value>
</property>
<property>
<name>hive.auto.convert.join.noconditionaltask</name>
<value>true</value>
</property>
<property>
<name>hive.auto.convert.join.noconditionaltask.size</name>
<value>10000000</value>
</property>
<property>
<name>hive.auto.convert.join.use.nonstaged</name>
<value>true</value>
</property>
属性说明: hive.auto.convert.join:是否启用基于输入文件的大小,将普通连接转化为Map连接的优化机制。 hive.auto.convert.join.noconditionaltask:是否启用基于输入文件的大小,将普通连接转化为Map连接的优化机制。假设参与连接的表(或分区)有N个,如果打开这个参数,并且有N-1个表(或分区)的大小总和小于hive.auto.convert.join.noconditionaltask.size参数指定的值,那么会直接将连接转为Map连接。 hive.auto.convert.join.noconditionaltask.size:如果hive.auto.convert.join.noconditionaltask是关闭的,则本参数不起作用。否则,如果参与连接的N个表(或分区)中的N-1个的总大小小于这个参数的值,则直接将连接转为Map连接。缺省值为10MB。 hive.auto.convert.join.use.nonstaged:对于条件连接,如果从一个小的输入流可以直接应用于join操作而不需要过滤或者投影,那么不需要通过MapReduce的本地任务在分布式缓存中预存。当前该参数在vectorization或tez执行引擎中不工作。
两个大表连接时,会先基于连接键分别对两个表进行排序,然后连接它们。Mapper将特定键值的所有行发送给同一个Reducer。例如,表A的id列有1、2、3、4四个值,表B的id列有1、2、3三个值。查询语句如下:
select A.id from A join B on A.id = B.id
一系列Mapper读取表中的数据并基于键值发送给Reducer。如id=1行进入Reducer R1,id=2的行进入Reducer R2等等。这些Reducer产生A、B的交集并输出。Reducer R4只从A获取行,不会产生查询结果。 现在假设id=1的数据行是高度倾斜的,则R2和R3会很快完成,而R1需要很长时间,将成为整个查询的瓶颈。配置倾斜连接的相关属性可以有效优化倾斜连接。
<property>
<name>hive.optimize.skewjoin</name>
<value>true</value>
</property>
<property>
<name>hive.skewjoin.key</name>
<value>100000</value>
</property>
<property>
<name>hive.skewjoin.mapjoin.map.tasks</name>
<value>10000</value>
</property>
<property>
<name>hive.skewjoin.mapjoin.min.split</name>
<value>33554432</value>
</property>
属性说明: hive.optimize.skewjoin:是否为连接表中的倾斜键创建单独的执行计划。它基于存储在元数据中的倾斜键。在编译时,Hive为倾斜键和其它键值生成各自的查询计划。 hive.skewjoin.key:决定如何确定连接中的倾斜键。在连接操作中,如果同一键值所对应的数据行数超过该参数值,则认为该键是一个倾斜连接键。 hive.skewjoin.mapjoin.map.tasks:指定倾斜连接中,用于Map连接作业的任务数。该参数应该与hive.skewjoin.mapjoin.min.split一起使用,执行细粒度的控制。 hive.skewjoin.mapjoin.min.split:通过指定最小split的大小,确定Map连接作业的任务数。该参数应该与hive.skewjoin.mapjoin.map.tasks一起使用,执行细粒度的控制。
如果连接中使用的表是按特定列分桶的,可以开启桶Map连接提升性能。
<property>
<name>hive.optimize.bucketmapjoin</name>
<value>true</value>
</property>
<property>
<name>hive.optimize.bucketmapjoin.sortedmerge</name>
<value>true</value>
</property>
属性说明: hive.optimize.bucketmapjoin:是否尝试桶Map连接。 hive.optimize.bucketmapjoin.sortedmerge:是否尝试在Map连接中使用归并排序。
(3)避免使用order by全局排序 Hive中使用order by子句实现全局排序。order by只用一个Reducer产生结果,对于大数据集,这种做法效率很低。如果不需要全局有序,则可以使用sort by子句,该子句为每个reducer生成一个排好序的文件。如果需要控制一个特定数据行流向哪个reducer,可以使用distribute by子句,例如:
select id, name, salary, dept from employee
distribute by dept sort by id asc, name desc;
属于一个dept的数据会分配到同一个reducer进行处理,同一个dept的所有记录按照id、name列排序。最终的结果集是全局有序的。
(4)启用Tez执行引擎 使用Tez执行引擎代替传统的MapReduce引擎会大幅提升Hive查询的性能。在安装好Tez后,配置hive.execution.engine属性指定执行引擎。
<property>
<name>hive.execution.engine</name>
<value>tez</value>
<description/>
</property>
(5)优化limit操作 缺省时limit操作仍然会执行整个查询,然后返回限定的行数。在有些情况下这种处理方式很浪费,因此可以通过设置下面的属性避免此行为。
<property>
<name>hive.limit.optimize.enable</name>
<value>true</value>
</property>
<property>
<name>hive.limit.row.max.size</name>
<value>100000</value>
</property>
<property>
<name>hive.limit.optimize.limit.file</name>
<value>10</value>
</property>
<property>
<name>hive.limit.optimize.fetch.max</name>
<value>50000</value>
</property>
属性说明: . hive.limit.optimize.enable:是否启用limit优化。当使用limit语句时,对源数据进行抽样。 . hive.limit.row.max.size:在使用limit做数据的子集查询时保证的最小行数据量。 . hive.limit.optimize.limit.file:在使用limit做数据子集查询时,采样的最大文件数。 . hive.limit.optimize.fetch.max:使用简单limit数据抽样时,允许的最大行数。
(6)启用并行执行 每条HiveQL语句都被转化成一个或多个执行阶段,可能是一个MapReduce阶段、采样阶段、归并阶段、限制阶段等。缺省时,Hive在任意时刻只能执行其中一个阶段。如果组成一个特定作业的多个执行阶段是彼此独立的,那么它们可以并行执行,从而整个作业得以更快完成。通过设置下面的属性启用并行执行。
<property>
<name>hive.exec.parallel</name>
<value>true</value>
</property>
<property>
<name>hive.exec.parallel.thread.number</name>
<value>8</value>
</property>
属性说明: . hive.exec.parallel:是否并行执行作业。 . hive.exec.parallel.thread.number:最多可以并行执行的作业数。
(7)启用MapReduce严格模式 Hive提供了一个严格模式,可以防止用户执行那些可能产生负面影响的查询。通过设置下面的属性启用MapReduce严格模式。
<property>
<name>hive.mapred.mode</name>
<value>strict</value>
</property>
严格模式禁止3种类型的查询:
(8)使用单一Reduce执行多个Group By 通过为group by操作开启单一reduce任务属性,可以将一个查询中的多个group by操作联合在一起发送给单一MapReduce作业。
<property>
<name>hive.multigroupby.singlereducer</name>
<value>true</value>
<description/>
</property>
(9)控制并行Reduce任务 Hive通过将查询划分成一个或多个MapReduce任务达到并行的目的。确定最佳的mapper个数和reducer个数取决于多个变量,例如输入的数据量以及对这些数据执行的操作类型等。如果有太多的mapper或reducer任务,会导致启动、调度和运行作业过程中产生过多的开销,而如果设置的数量太少,那么就可能没有充分利用好集群内在的并行性。对于一个Hive查询,可以设置下面的属性来控制并行reduce任务的个数。
<property>
<name>hive.exec.reducers.bytes.per.reducer</name>
<value>256000000</value>
</property>
<property>
<name>hive.exec.reducers.max</name>
<value>1009</value>
</property>
属性说明: hive.exec.reducers.bytes.per.reducer:每个reducer的字节数,缺省值为256M。Hive是按照输入的数据量大小来确定reducer个数的。例如,如果输入的数据是1G,将使用4个reducer。 hive.exec.reducers.max:将会使用的最大reducer个数。
(10)启用向量化 向量化特性在Hive 0.13.1版本中被首次引入。通过查询执行向量化,使Hive从单行处理数据改为批量处理方式,具体来说是一次处理1024行而不是原来的每次只处理一行,这大大提升了指令流水线和缓存的利用率,从而提高了表扫描、聚合、过滤和连接等操作的性能。可以设置下面的属性启用查询执行向量化。
<property>
<name>hive.vectorized.execution.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.vectorized.execution.reduce.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.vectorized.execution.reduce.groupby.enabled</name>
<value>true</value>
</property>
属性说明: hive.vectorized.execution.enabled:如果该标志设置为true,则开启查询执行的向量模式,缺省值为false。 hive.vectorized.execution.reduce.enabled:如果该标志设置为true,则开启查询执行reduce端的向量模式,缺省值为true。 hive.vectorized.execution.reduce.groupby.enabled:如果该标志设置为true,则开启查询执行reduce端group by操作的向量模式,缺省值为true。
(11)启用基于成本的优化器 Hive 0.14版本开始提供基于成本优化器(CBO)特性。使用过Oracle数据库的读者对CBO一定不会陌生。与Oracle类似,Hive的CBO也可以根据查询成本制定执行计划,例如,确定表连接的顺序,以何种方式执行连接,使用的并行度等等。设置下面的属性启用基于成本优化器。
<property>
<name>hive.cbo.enable</name>
<value>true</value>
</property>
<property>
<name>hive.compute.query.using.stats</name>
<value>true</value>
</property>
<property>
<name>hive.stats.fetch.partition.stats</name>
<value>true</value>
</property>
<property>
<name>hive.stats.fetch.column.stats</name>
<value>true</value>
</property>
属性说明: hive.cbo.enable:控制是否启用基于成本的优化器,缺省值是true。Hive的CBO使用Apache Calcite框架实现。 hive.compute.query.using.stats:该属性的缺省值为false。如果设置为true,Hive在执行某些查询时,例如select count(1),只利用元数据存储中保存的状态信息返回结果。为了收集基本状态信息,需要将hive.stats.autogather属性配置为true。为了收集更多的状态信息,需要运行analyze table查询命令。 hive.stats.fetch.partition.stats:该属性的缺省值为true。操作树中所标识的统计信息,需要分区级别的基本统计,如每个分区的行数、数据量大小和文件大小等。分区统计信息从元数据存储中获取。如果存在很多分区,要为每个分区收集统计信息可能会消耗大量的资源。这个标志可被用于禁止从元数据存储中获取分区统计。当该标志设置为false时,Hive从文件系统获取文件大小,并根据表结构估算行数。 hive.stats.fetch.column.stats:该属性的缺省值为false。操作树中所标识的统计信息,需要列统计。列统计信息从元数据存储中获取。如果存在很多列,要为每个列收集统计信息可能会消耗大量的资源。这个标志可被用于禁止从元数据存储中获取列统计。
可以使用HiveQL的analyze table语句收集一个表中所有列相关的基本统计信息,例如下面的语句收集sales_order_fact表的统计信息。 analyze table sales_order_fact compute statistics for columns; analyze table sales_order_fact compute statistics for columns order_number, customer_sk;
(12)使用ORC文件格式 ORC文件格式可以有效提升Hive查询的性能。图6-10由Hortonworks公司提供,显示了Hive不同文件格式的大小对比。
图6-10 Hive文件格式与大小对比
对Hive的服务器结构一定了解后,我们开始使用Kettle创建销售订单示例数据仓库数据装载的作业和转换。在数据仓库可以使用前,需要装载历史数据。这些历史数据是导入进数据仓库的第一个数据集合。首次装载被称为初始装载,一般是一次性工作。由最终用户来决定有多少历史数据进入数据仓库。例如,数据仓库使用的开始时间是2020年3月1日,而用户希望装载两年的历史数据,那么应该初始装载2018年3月1日到2020年2月29日之间的源数据。在2020年3月2日装载2020年3月1日的数据(假设执行频率是每天一次),之后周期性地每天装载前一天的数据。在装载事实表前,必须先装载所有的维度表。因为事实表需要引用维度的代理键。这不仅针对初始装载,也针对定期装载。本节说明执行初始装载的步骤,包括标识源数据、维度历史的处理、创建相关Kettle作业和转换,以及验证初始装载过程。
设计开发初始装载步骤前需要识别数据仓库的每个事实表和每个维度表用到的并且是可用的源数据,还要了解数据源的特性,例如文件类型、记录结构和可访问性等。表8-3显示的是销售订单示例数据仓库需要的源数据的关键信息,包括源数据表、对应的数据仓库目标表等属性。这类表格通常称作数据源对应图,因为它反应了每个从源数据到目标数据的对应关系。在本示例中,客户和产品的源数据直接与其数据仓库里的目标表,customer_dim和product_dim表相对应,而销售订单事务表是多个数据仓库表的数据源。
源数据 | 源数据类型 | 文件名/表名 | 数据仓库中的目标表 |
---|---|---|---|
客户 | MySQL表 | customer | customer_dim |
产品 | MySQL表 | product | product_dim |
销售订单 | MySQL表 | sales_order | order_dim、sales_order_fact |
表8-3 销售订单数据源映射
标识出了数据源,现在要考虑维度历史的处理。大多数维度值是随着时间改变的,如客户改变了姓名,产品的名称或分类变化等。当一个维度改变,比如当一个产品有了新的分类时,有必要记录维度的历史变化信息。在这种情况下,product_dim表里必须既存储产品老的分类,也存储产品当前的分类。并且,老的销售订单里的产品引用老的分类。渐变维(SCD)即是一种在多维数据仓库中实现维度历史的技术。有三种不同的SCD技术:SCD 类型1(SCD1),SCD类型2(SCD2),SCD类型3(SCD3):
同一个维度表中的不同字段可以有不同的变化处理方式。在本示例中,客户维度历史的客户名称使用SCD1,客户地址使用SCD2,产品维度的两个属性,产品名称和产品类型都使用SCD2保存历史变化数据。
多维数据仓库中的维度表和事实表一般都需要有一个代理键,作为这些表的主键,代理键一般由单列的自增数字序列构成。Hive没有关系数据库中的自增列,但它也有一些对自增序列的支持,通常有两种方法生成代理键:使用row_number()窗口函数或者使用一个名为UDFRowSequence的用户自定义函数(UDF)。 假设有维度表tbl_dim和过渡表tbl_stg,现在要将tbl_stg的数据装载到tbl_dim,装载的同时生成维度表的代理键。
insert into tbl_dim
select row_number() over (order by tbl_stg.id) + t2.sk_max, tbl_stg.*
from tbl_stg
cross join (select coalesce(max(sk),0) sk_max from tbl_dim) t2;
上面语句中,先查询维度表中已有记录最大的代理键值,如果维度表中还没有记录,利用coalesce函数返回0。然后使用cross join连接生成过渡表和最大代理键值的笛卡尔集,最后使用row_number()函数生成行号,并将行号与最大代理键值相加的值,作为新装载记录的代理键。
add jar hdfs:///user/hive-contrib-2.0.0.jar;
create temporary function row_sequence as 'org.apache.hadoop.hive.contrib.udf.udfrowsequence';
insert into tbl_dim
select row_sequence() + t2.sk_max, tbl_stg.*
from tbl_stg
cross join (select coalesce(max(sk),0) sk_max from tbl_dim) t2;
hive-contrib-2.0.0.jar中包含一个生成记录序号的自定义函数udfrowsequence。上面的语句先加载JAR包,然后创建一个名为row_sequence()的临时函数作为调用UDF的接口,这样可以为查询的结果集生成一个自增伪列。之后就和row_number()写法类似了,只不过将窗口函数row_number()替换为row_sequence()函数。
因为窗口函数的方法比较通用,而且无需引入额外的JAR包,所以我们在示例中使用row_number()函数生成代理键。初始装载Kettle作业如图6-11所示。
图6-11 初始装载作业
初始装载作业流程描述如下:
系统初始化部分包括“SQL_init_cdc_time”和“设置系统日期”两个作业项。“SQL_init_cdc_time”作业项中执行的SQL语句如下,用于数据初始化,以便测试或排错后重复执行,实现幂等操作。
truncate table dw.customer_dim;
truncate table dw.product_dim;
update rds.cdc_time set last_load='1970-01-01', current_load='1970-01-01';
“设置系统日期”作业项调用一个如图6-12所示的转换,用于获取当前系统日期,更新时间戳表rds.cdc_time,并设置相关变量。
图6-12 设置系统日期转换
“自定义常量步骤”设置一个Date类型的常量max_date,格式为yyyy-MM-dd,数据为2200-01-01。该值用于设置渐变维的初始过期日期。“获取系统信息”步骤中用两个字段cur_date和pre_date表示当前日期和前一天的日期。当前日期用于获得需要处理的数据,前一天日期用于设置变量,在后续步骤中构成文件名。该步骤定义如下,两个字段将被以复制方式发送到“字段选择”和“插入/更新”步骤。
名称 类型
cur_date 今天 00:00:00
pre_date 昨天 00:00:00
“字段选择”步骤用于将pre_date字段格式化为“yyyy-MM-dd”形式。在该步骤的“元数据”标签中进行如下定义:
字段名称 类型 格式
pre_date Date yyyy-MM-dd
“设置变量”步骤设置两个变量PRE_DATE、MAX_DATE,变量值从pre_date和max_date数据流字段获得。“变量活动类型”选择“Valid in the root job”,使得作业中涉及的所有子作业或转换都可以使用这两个变量。
“插入/更新”步骤定义如图6-13所示。该步骤的功能类似于SQL中replace into或merge into。当rds.cdc_time表字段current_load为NULL时执行插入操作,否则更新该字段的值,插入或更新的值为数据流字段cur_date的值。
图6-13 更新rds.cdc_time表字段current_load的值
“装载过渡区”作业项调用的是一个子作业,如图6-14所示。
图6-14 装载过渡区作业
该作业包括“Sqoop import customer”、“Sqoop import product”、“load_sales_order”三个作业项。前两个Sqoop作业的命令行定义如下,其含义与功能在前一篇中已经详细讲解,这里不再赘述。
--connect jdbc:mysql://node3:3306/source --delete-target-dir --password 123456 --table customer --target-dir /user/hive/warehouse/rds.db/customer --username root
--connect jdbc:mysql://node3:3306/source --delete-target-dir --password 123456 --table product --target-dir /user/hive/warehouse/rds.db/product --username root
“load_sales_order”作业项调用的是一个装载事实表的转换,如图6-15所示。
图6-15 初始装载rds.sales_order表
“表输入”步骤执行下面的SQL,查询出当前日期与最后装载日期,本例中分别为“2020-10-07”和“1971-01-01”。
select id, last_load, current_load from rds.cdc_time;
“数据库连接”步骤的定义如图6-16所示。该步骤将前一步骤输出的last_load和current_load字段作为参数,查询出源数据中sales_order表的全部数据。
图6-16 查询source.sales_order表的全部数据
最后的“Hadoop file output”步骤将sales_order源数据以文本文件的形式,存储到rds.sales_order表对应的HDFS目录下。在该步骤的“文件”标签页中,“Folder/File”属性输入“/user/hive/warehouse/rds.db/sales_order/sales_order”,“扩展名”属性输入“txt”。“内容”标签页中,“分隔符”为“,”,“编码”选择“UTF-8”。字段标签页的定义表6-4所示。注意由于性能原因,对于Hive表不能使用普通的“表输出”步骤为其装载数据。
名称 | 类型 | 格式 | 长度 | 精度 |
---|---|---|---|---|
order_number | Integer | 9 | 0 | |
customer_number | Integer | 9 | 0 | |
product_code | Integer | 9 | 0 | |
order_date | Date | yyyy-MM-dd HH:mm:ss | 0 | |
entry_date | Date | yyyy-MM-dd HH:mm:ss | 0 | |
order_amount | Number | 00000000.00 | 10 | 2 |
表6-4 sales_order.txt文件字段定义
“装载维度表”作业项调用一个如图6-17所示的转换。
图6-17 初始装载维度表的转换
“装载客户维度”执行下面的SQL语句:
use dw;
insert into customer_dim
select row_number() over (order by t1.customer_number) + t2.sk_max,
t1.customer_number, t1.customer_name, t1.customer_street_address,
t1.customer_zip_code, t1.customer_city, t1.customer_state, 1,
'2020-03-01', '2200-01-01'
from rds.customer t1
cross join (select coalesce(max(customer_sk),0) sk_max from customer_dim) t2;
“装载产品维度”执行下面的SQL语句:
use dw;
insert into product_dim
select row_number() over (order by t1.product_code) + t2.sk_max,
t1.product_code, t1.product_name, t1.product_category, 1,
'2020-03-01', '2200-01-01'
from rds.product t1
cross join (select coalesce(max(product_sk),0) sk_max from product_dim) t2;
说明:
订单维度表的装载当然也可以使用类似的“执行SQL语句”步骤,但订单维度与客户维度或产品维度不同。在前一篇中曾提到,它的数据是单向递增的,不涉及数据更新,因此这里使用“表输入”、“增加序列”、“ORC output”三个步骤装载订单维度数据。
“表输入”步骤中执行以下查询:
select order_number,
1 version,
date_format(order_date,'yyyy-mm-dd') effective_date,
'2200-01-01' expiry_date
from rds.sales_order order by order_number;
因为不会更新,订单维度的版本号恒为1,而其生效日期显然就是订单生成的日期(order_date字段)。为了使所有维度表具有相同的粒度,使用date_format函数将订单维度的生效日期字段只保留到日期,忽略时间部分。“增加序列”步骤生成代理键,将“值的名称”定义为order_sk。“ORC output”步骤的定义如图6-18所示。与装载过渡区的rds.sales_order表类似,这里也是将数据以文件形式上传到Hive表所对应的HDFS目录。dw库中的维度表是ORC格式,因此将“Hadoop file output”步骤替换为“ORC output”步骤。
图6-18 用“ORC output”步骤装载dw.order_dim表
“装载事实表”作业项调用一个如图6-19所示的转换。
图6-19 初始装载事实表的转换
该转换比较简单,只有“表输入”和“ORC output”两个步骤。“表输入”步骤执行下面的查询,销售订单事实表的外键列引用维度表的代理键。date_dim维度表的数据已经预生成,日期从2018年1月1日到2022年12月31日,参见“https://wxy0327.blog.csdn.net/article/details/108408821#%E5%9B%9B%E3%80%81%E8%A3%85%E8%BD%BD%E6%97%A5%E6%9C%9F%E7%BB%B4%E5%BA%A6%E6%95%B0%E6%8D%AE”。这里说的外键只是逻辑上的外键,Hive并不支持创建表的物理主键或外键。
select order_sk, customer_sk, product_sk, date_sk, order_amount
from rds.sales_order a, dw.order_dim b, dw.customer_dim c, dw.product_dim d, dw.date_dim e
where a.order_number = b.order_number
and a.customer_number = c.customer_number
and a.product_code = d.product_code
and to_date(a.order_date) = e.dt;
“ORC output”与上一步装载dw.order_dim表的步骤相同,只是将“Folder/File name”属性值改为:
hdfs://nameservice1/user/hive/warehouse/dw.db/sales_order_fact/sales_order_fact
初始装载的最后一个作业项是“SQL”,执行下面的语句,将最后装载日期更新为当前装载日期。对于时间戳表的详细使用说明参见“https://wxy0327.blog.csdn.net/article/details/108866976#1.%20%E5%9F%BA%E4%BA%8E%E6%BA%90%E6%95%B0%E6%8D%AE%E7%9A%84CDC”。
update rds.cdc_time set last_load=current_load;
成功执行初始装载作业后,可以在Hive中执行下面的查询验证数据正确性。
use dw;
select order_number,customer_name,product_name,dt,order_amount amount
from sales_order_fact a, customer_dim b, product_dim c, order_dim d, date_dim e
where a.customer_sk = b.customer_sk
and a.product_sk = c.product_sk
and a.order_sk = d.order_sk
and a.order_date_sk = e.date_sk
order by order_number;
初始装载只在开始数据仓库使用前执行一次,然而,必须要按时调度定期执行装载源数据的过程。与初始装载不同,定期装载一般都是增量的,并且需要捕获和记录数据的变化历史。本节说明执行定期装载的步骤,包括识别源数据与装载类型、创建Kettle作业和转换实现定期增量装载过程并执行验证。
定期装载首先要识别数据仓库的每个事实表和每个维度表用到的并且是可用的源数据。然后要决定适合装载的抽取模式和维度历史装载类型。表6-5汇总了本示例的这些信息。
数据源 | 源数据存储 | 数据仓库 | 抽取模式 | 维度历史装载类型 |
---|---|---|---|---|
customer | customer | customer_dim | 整体、拉取 | address列上SCD2,name列上SCD1 |
product | product | product_dim | 整体、拉取 | 所有属性均为SCD2 |
sales_order | sales_order | order_dim | CDC(每天)、拉取 | 唯一订单号 |
sales_order_fact | CDC(每天)、拉取 | N/A | ||
N/A | N/A | date_dim | N/A | 预装载 |
表6-5 销售订单定期装载
order_dim维度表和sales_order_fact事实表使用基于时间戳的CDC装载模式。时间戳表rds.cdc_time用于关联查询增量数据。定期装载Kettle作业如图6-20所示。
图6-20 定期装载作业
定期装载作业流程描述如下:
“设置系统日期”作业项调用一个如图6-12所示的转换,用于获取当前系统日期,更新时间戳表rds.cdc_time,并设置相关变量。每个步骤的定义已经在前面“初始转载”部分说明。该作业项的输出中,last_load为最后装载日期,current_load为当前日期。用select * from source.sales_order where entry_date >= last_load and entry_date < current_load即可查询出增量数据。
“装载过渡区”作业项调用的子作业与图6-14所示的初始装载过渡区只有一点不同:“load_sales_order”作业项调用的转换中,“Hadoop file output”步骤生成的文件,其文件名中带有装载日期,这通过在“Folder/File”属性输入/user/hive/warehouse/rds.db/sales_order/sales_order_{PRE_DATE}实现。{PRE_DATE}引用的是前一作业项“设置系统日期”中所设置的变量,值为当前日期前一天。过渡区的rds.sales_order表存储全部销售订单数据,因此需要向表所对应的HDFS目录中新增文件,而不能覆盖已有文件。
“装载维度表”作业项调用一个如图6-21所示的转换。
图6-21 定期装载维度表的转换
这个转换貌似很简单,只有三个执行SQL脚本的步骤。正如你所想到的,实现渐变维使用的就是Hive提供的行级更新功能。与单纯用shell执行SQL相比,Kettle转换一个明显的好处是这三个步骤可以并行以提高性能。 “装载客户维度表”步骤中的SQL脚本如下:
use dw;
update customer_dim
set expiry_date = '${PRE_DATE}'
where customer_dim.customer_sk in
(select a.customer_sk
from (select customer_sk,customer_number,customer_street_address
from customer_dim where expiry_date = '${MAX_DATE}') a left join
rds.customer b on a.customer_number = b.customer_number
where b.customer_number is null or a.customer_street_address <> b.customer_street_address);
insert into customer_dim
select
row_number() over (order by t1.customer_number) + t2.sk_max,
t1.customer_number,
t1.customer_name,
t1.customer_street_address,
t1.customer_zip_code,
t1.customer_city,
t1.customer_state,
t1.version,
t1.effective_date,
t1.expiry_date
from
(
select
t2.customer_number customer_number,
t2.customer_name customer_name,
t2.customer_street_address customer_street_address,
t2.customer_zip_code,
t2.customer_city,
t2.customer_state,
t1.version + 1 version,
'${PRE_DATE}' effective_date,
'${MAX_DATE}' expiry_date
from customer_dim t1
inner join rds.customer t2
on t1.customer_number = t2.customer_number
and t1.expiry_date = '${PRE_DATE}'
left join customer_dim t3
on t1.customer_number = t3.customer_number
and t3.expiry_date = '${MAX_DATE}'
where t1.customer_street_address <> t2.customer_street_address and t3.customer_sk is null) t1
cross join
(select coalesce(max(customer_sk),0) sk_max from customer_dim) t2;
drop table if exists tmp;
create table tmp as
select
a.customer_sk,
a.customer_number,
b.customer_name,
a.customer_street_address,
a.customer_zip_code,
a.customer_city,
a.customer_state,
a.version,
a.effective_date,
a.expiry_date
from customer_dim a, rds.customer b
where a.customer_number = b.customer_number and (a.customer_name <> b.customer_name);
delete from customer_dim where customer_dim.customer_sk in (select customer_sk from tmp);
insert into customer_dim select * from tmp;
insert into customer_dim
select
row_number() over (order by t1.customer_number) + t2.sk_max,
t1.customer_number,
t1.customer_name,
t1.customer_street_address,
t1.customer_zip_code,
t1.customer_city,
t1.customer_state,
1,
'${PRE_DATE}',
'${MAX_DATE}'
from
(
select t1.* from rds.customer t1 left join customer_dim t2 on t1.customer_number = t2.customer_number
where t2.customer_sk is null) t1
cross join
(select coalesce(max(customer_sk),0) sk_max from customer_dim) t2;
客户维度表的customer_street_addresses字段值变化时采用SCD2,需要新增版本,customer_name字段值变化时采用SCD1,直接覆盖更新。如果一个表的不同字段有的采用SCD2,有的采用SCD1,就像客户维度表这样,那么是先处理SCD2,还是先处理SCD1呢?为了回答这个问题,我们看一个简单的例子。假设有一个维度表包含c1,c2、c3、c4四个字段,c1是代理键,c2是业务主键,c3使用SCD1,c4使用SCD2。源数据从1、2、3变为1、3、4。如果先处理SCD1,后处理SCD2,则维度表的数据变化过程是先从1、1、2、3变为1、1、3、3,再新增一条记录2、1、3、4。此时表中的两条记录是1、1、3、3和2、1、3、4。如果先处理SCD2,后处理SCD1,则数据的变化过程是先新增一条记录2、1、2、4,再把1、1、2、3和2、1、2、4两条记录变为1、1、3、3和2、1、3、4。可以看出,无论谁先谁后,最终的结果是一样的,而且结果中都会出现一条实际上从未存在过的记录:1、1、3、3。因为SCD1本来就不保存历史变化,所以单从c2字段的角度看,任何版本的记录值都是正确的,没有差别。而对于c3字段,每个版本的值是不同的,需要跟踪所有版本的记录。我们从这个简单的例子可以得出以下结论:SCD1和SCD2的处理顺序不同,但最终结果是相同的,并且都会产生实际不存在的临时记录。因此从功能上说,SCD1和SCD2的处理顺序并不关键,只需要记住对SCD1的字段,任意版本的值都正确,而SCD2的字段需要跟踪所有版本。但在性能上看,先处理SCD1应该更好些,因为更新的数据行更少。本示例我们先处理SCD2。
第一句的update语句设置已删除记录和customer_street_addresses列上scd2的过期。该语句将老本的过期时间列从‘2200-01-01’更新为执行装载的前一天。内层的查询获取所有当前版本的数据。外层查询使用一个左外连接查询出地址列发生变化的记录的代理键,然后在update语句的where子句中用IN操作符,更新这些记录的过期时间列。left join的逻辑查询处理顺序是:
第二句的insert语句处理customer_street_addresses列上scd2的新增行。这条语句插入SCD2的新增版本行。子查询中用inner join获取当期版本号和源数据信息。left join连接是必要的,否则如果多次执行该语句,会生成多条重复的记录。最后用row_number()方法生成新纪录的代理键。新记录的版本号加1,开始日期为执行时的前一天,过期日期为‘2200-01-01’。
后面的四条SQL语句处理customer_name列上的scd1,因为scd1本身就不保存历史数据,所以这里更新维度表里的所有customer_name改变的记录,而不是仅仅更新当前版本的记录。在关系数据库中,SCD1非常好处理,如在MySQL中使用类似如下的语句即可:
update customer_dim a, customer_stg b set a.customer_name = b.customer_name
where a.customer_number = b.customer_number and a.customer_name <> b.customer_name ;
但是hive里不能在update后跟多个表,也不支持在set子句中使用子查询,它只支持SET column = value的形式,其中value只能是一个具体的值或者是一个标量表达式。所以这里使用了一个临时表存储需要更新的记录,然后将维度表和这个临时表关联,用先delete再insert代替update。为简单起见也不考虑并发问题(典型数据仓库应用的并发操作基本都是只读的,很少并发写,而且ETL通常是一个单独在后台运行的程序,如果用SQL实现,并不存在并发执行的情况,所以并发导致的问题并不像OLTP那样严重)。
最后的insert语句处理新增的customer记录。内层子查询使用rds.customer和dw.customer_dim的左外链接获取新增的数据。新数据的版本号为1,开始日期为执行时的前一天,过期日期为‘2200-01-01’。同样使用row_number()方法生成代理键。到这里,客户维度表的装载处理代码已完成。 “装载产品维度表”步骤中的SQL脚本如下:
use dw;
update product_dim
set expiry_date = '${PRE_DATE}'
where product_dim.product_sk in
(select a.product_sk
from (select product_sk,product_code,product_name,product_category
from product_dim where expiry_date = '${MAX_DATE}') a left join
rds.product b on a.product_code = b.product_code
where b.product_code is null or (a.product_name <> b.product_name or a.product_category <> b.product_category));
insert into product_dim
select
row_number() over (order by t1.product_code) + t2.sk_max,
t1.product_code,
t1.product_name,
t1.product_category,
t1.version,
t1.effective_date,
t1.expiry_date
from
(
select
t2.product_code product_code,
t2.product_name product_name,
t2.product_category product_category,
t1.version + 1 version,
'${PRE_DATE}' effective_date,
'${MAX_DATE}' expiry_date
from product_dim t1
inner join rds.product t2
on t1.product_code = t2.product_code
and t1.expiry_date = '${PRE_DATE}'
left join product_dim t3
on t1.product_code = t3.product_code
and t3.expiry_date = '${MAX_DATE}'
where (t1.product_name <> t2.product_name or t1.product_category <> t2.product_category) and t3.product_sk is null) t1
cross join
(select coalesce(max(product_sk),0) sk_max from product_dim) t2;
insert into product_dim
select
row_number() over (order by t1.product_code) + t2.sk_max,
t1.product_code,
t1.product_name,
t1.product_category,
1,
'${PRE_DATE}',
'${MAX_DATE}'
from
(
select t1.* from rds.product t1 left join product_dim t2 on t1.product_code = t2.product_code
where t2.product_sk is null) t1
cross join
(select coalesce(max(product_sk),0) sk_max from product_dim) t2;
产品维度表的所有属性都使用SCD2,处理方法和客户表类似。 “装载订单维度表”步骤中的SQL脚本如下:
use dw;
insert into order_dim
select
row_number() over (order by t1.order_number) + t2.sk_max,
t1.order_number,
t1.version,
t1.effective_date,
t1.expiry_date
from
(
select
order_number order_number,
1 version,
order_date effective_date,
'2200-01-01' expiry_date
from rds.sales_order, rds.cdc_time
where entry_date >= last_load and entry_date < current_load ) t1
cross join
(select coalesce(max(order_sk),0) sk_max from order_dim) t2;
订单维度表的装载比较简单,因为不涉及维度历史变化,只要将新增的订单号插入rds.order_dim表就可以了。上面语句的子查询中,将过渡区库的订单表和时间戳表关联,用时间戳表中的两个字段值作为时间窗口区间的两个端点,用entry_date >= last_load AND entry_date < current_load条件过滤出上次执行定期装载的日期到当前日期之间的所有销售订单,装载到order_dim维度表。
“装载事实表”作业项调用一个如图6-22所示的转换。
图6-22 定期装载事实表的转换
为了装载dw.sales_order_fact事实表,需要关联rds.sales_order与dw库中的四个维度表,获取维度表的代理键和源数据的度量值。这里只有销售金额字段order_amount一个度量。和订单维度一样,也要关联时间戳表,获取时间窗口作为确定新增数据的过滤条件。
“表输入”步骤中的SQL查询语句如下,输出时间区间的两端日期:
select last_load, current_load from rds.cdc_time;
“销售订单事务数据”是一个数据库连接步骤,定义如图6-23所示,输出过渡区中销售订单表的增量数据。
图6-23 查询增量数据的数据库连接步骤
“获取日期代理键”使用的数据库查询步骤定义如图6-24所示。该步骤关联表字段date_dim.dt与流字段order_date查询出日期代理键date_dim.date_sk。
图6-24 使用数据库查询步骤获取日期代理键
“获取客户代理键”、“获取产品代理键”、“获取订单代理键”使用的都是“维度查询/更新”步骤,它们的定义除表名和字段名外完全相同。例如“获取客户代理键”中的“目标模式”选择dw,“目标表”选择customer_dim,其定义如图6-25所示。
图6-25 使用维度查询/更新步骤获取代理键
该步骤通过关联维度表和数据流中的业务主键字段customer_number,查询出订单日期order_date在生效日期effective_date与过期日期expiry_date区间内的客户维度代理键customer_sk,功能等价于下面的SQL查询:
select customer_sk
from rds.sales_order a, customer_dim c
where a.customer_number = c.customer_number
and a.order_date >= c.effective_date
and a.order_date < c.expiry_date;
“获取产品代理键”、“获取订单代理键”步骤类似地获得产品代理键product_sk和订单代理键order_sk。
最后的“ORC output”步骤定义如图6-26所示,将事实表数据以文件形式存储到相应的HDFS目录中,文件名中带有日期。
图6-26 使用ORC output步骤增量装载事实表数据
与初始装载一样,最后一个“SQL”作业项执行下面的语句,将最后装载日期更新为当前装载日期。
update rds.cdc_time set last_load=current_load;
下面进行一些测试,验证数据装载的正确性。 测试步骤: 1. 在MySQL的source源数据库中准备客户、产品和销售订单测试数据。
use source;
/*** 客户数据的改变如下:
客户6的街道号改为7777 ritter rd。(原来是7070 ritter rd)
客户7的姓名改为distinguished agencies。(原来是distinguished partners)
新增第八个客户。
***/
update customer set customer_street_address = '7777 ritter rd.' where customer_number = 6 ;
update customer set customer_name = 'distinguished agencies' where customer_number = 7 ;
insert into customer (customer_name, customer_street_address, customer_zip_code, customer_city, customer_state)
values ('subsidiaries', '10000 wetline blvd.', 17055, 'pittsburgh', 'pa') ;
/*** 产品数据的改变如下:
产品3的名称改为flat panel。(原来是lcd panel)
新增第四个产品。
***/
update product set product_name = 'flat panel' where product_code = 3 ;
insert into product (product_name, product_category)
values ('keyboard', 'peripheral') ;
/*** 新增订单日期为2020年10月7日的16条订单。 ***/
drop table if exists temp_sales_order_data;
create table temp_sales_order_data as select * from sales_order where 1=0;
set @start_date := unix_timestamp('2020-10-07');
set @end_date := unix_timestamp('2020-10-08');
set @customer_number := floor(1 + rand() * 8);
set @product_code := floor(1 + rand() * 4);
set @order_date := from_unixtime(@start_date + rand() * (@end_date - @start_date));
set @amount := floor(1000 + rand() * 9000);
insert into temp_sales_order_data values (1,@customer_number,@product_code,@order_date,@order_date,@amount);
... 插入16条数据 ...
insert into sales_order
select @a:=@a+1, customer_number, product_code, order_date, entry_date, order_amount
from temp_sales_order_data t1,(select @a:=102) t2 order by order_date;
commit ;
2. 执行定期装载Kettle作业。
3. 验证结果。
use dw;
select * from customer_dim;
查询的部分结果如下:
...
6 6 loyal clients 7070 ritter rd. 17055 pittsburgh pa 1 2020-03-01 2020-10-07
8 6 loyal clients 7777 ritter rd. 17055 pittsburgh pa 2 2020-10-07 2200-01-01
7 7 distinguished agencies 9999 scott st. 17050 mechanicsburg pa 1 2020-03-01 2200-01-01
9 8 subsidiaries 10000 wetline blvd. 17055 pittsburgh pa 1 2020-10-07 2200-01-01
可以看到,客户6因为地址变更新增了一个版本,而客户7的姓名变更直接覆盖了原来的值,新增了客户8。注意客户6第一个版本的到期日期和第二个版本的生效日期同为‘2020-10-07’,这是因为任何一个SCD的有效期是一个“左闭右开”的区间,以客户6为例,其第一个版本的有效期大于等于‘2020-03-01’,小于‘2020-10-07’,即为‘2020-03-01’到‘2020-10-06’。
select * from product_dim;
查询的部分结果如下:
...
3 3 lcd panel monitor 1 2020-03-01 2020-10-07
4 3 flat panel monitor 2 2020-10-07 2200-01-01
5 4 keyboard peripheral 1 2020-10-07 2200-01-01
可以看到,产品3的名称变更使用SCD2增加了一个版本,新增了产品4的记录。
select * from order_dim;
查询的部分结果如下:
...
111 111 1 2020-10-07 2200-01-01
112 112 1 2020-10-07 2200-01-01
113 113 1 2020-10-07 2200-01-01
114 114 1 2020-10-07 2200-01-01
115 115 1 2020-10-07 2200-01-01
116 116 1 2020-10-07 2200-01-01
117 117 1 2020-10-07 2200-01-01
118 118 1 2020-10-07 2200-01-01
Time taken: 0.146 seconds, Fetched: 118 row(s)
现在有118个订单,102个是“初始导入”装载的,16个是本次定期装载的。
select * from sales_order_fact;
查询的部分结果如下:
...
110 8 5 1011 7791.00
111 3 1 1011 6711.00
112 7 1 1011 5570.00
113 1 2 1011 4722.00
114 1 5 1011 7330.00
115 3 1 1011 7214.00
116 9 4 1011 9160.00
117 9 5 1011 8382.00
118 3 1 1011 4956.00
Time taken: 0.135 seconds, Fetched: 118 row(s)
可以看到,2020年10月7日的16个销售订单被添加,产品3的代理键是4而不是3,客户6的代理键是8而不是6。
select * from rds.cdc_time;
查询结果如下:
1 2020-10-08 2020-10-08
Time taken: 0.117 seconds, Fetched: 1 row(s)
可以看到,两个字段值都已更新为当前日期。
查看销售订单过渡区表和事实表所对应的的HDFS文件如下,不带日期的文件是初始装载作业所生成,带日期的文件为定期装载作业所生成。
[hdfs@manager~]$hdfs dfs -ls /user/hive/warehouse/rds.db/sales_order/*
-rw-r--r-- 3 root hive 6012 2020-10-08 20:28 /user/hive/warehouse/rds.db/sales_order/sales_order.txt
-rw-r--r-- 3 root hive 960 2020-10-08 20:39 /user/hive/warehouse/rds.db/sales_order/sales_order_2020-10-07.txt
[hdfs@manager~]$hdfs dfs -ls /user/hive/warehouse/dw.db/sales_order_fact/*
-rw-r--r-- 3 root hive 1625 2020-10-08 20:31 /user/hive/warehouse/dw.db/sales_order_fact/sales_order_fact
-rw-r--r-- 3 root hive 770 2020-10-08 21:06 /user/hive/warehouse/dw.db/sales_order_fact/sales_order_fact_2020-10-07
[hdfs@manager~]$
以上示例说明了如何用Kettle实现Hadoop数据仓库的初始装载和定期装载。需要指出的一点是,就本示例的环境和数据量而言装载执行速度很慢,一次定期装载就需要二十多分钟,比关系数据库慢多了。但考虑到Hadoop本身就只适合大数据量的批处理任务,再加上Hive的性能问题一直就被诟病,也就不必再吐槽了。至此,ETL过程已经实现,下一篇将介绍如何定期自动执行这个过程。
数据清洗是转换过程的一个重要步骤,是对数据进行重新审查和校验的过程,目的在于删除重复信息、纠正存在的错误,并提供数据一致性。Hive是Hadoop生态圈的数据仓库软件,使用类似于SQL的语言读、写、管理分布式存储上的大数据集。使用row_number()窗口函数或者使用一个名为UDFRowSequence的用户自定义函数可以生成代理键。Kettle作业和转换能够实现Hadoop数据仓库的初始装载和定期装载。