上一篇说了实时数仓并写了一个简单的例子,这些主要来说离线数仓,数据到达kafka后,走了实时和离线两条路,离线条路线的主要流程是采集kafka的数据HDFS中,然后使用Hive进行数仓的建设,因为我们数据来源可能是第三方API,IOT还有其他一些渠道,还有直接从数据库同步过来,那么数据库的数据我们离线这边可能直接使用DataX这种工具同步到HDFS了,就不经过Kafka了,而其他的数据才经过kafka,然后再使用采集程序将数据采集到HDFS。
数据在kafka中,我们就需要将数据采集到HDFS中,不同的数据形式有不同的采集方案,比如可以采用flume作为采集程序,也可能需要我们在代码里面及进行处理后再将数据保存到HDFS,为了保证原始数据的完整性,在以后的数据流转中具有可溯性,我们直接将数据原封不动的保存到HDFS,一般为json格式,然后通过相应的符号进行数据分割,比如一条数据就为一行,方便后面进行解析。
如下使用Java程序的HDFS API对文件进行操作,因为数据是以天为单位来存,今天的数据明天再进行计算,所以就以日期为单位来进行存储,因为数据是从kafka源源不断地过来,所以HDFS采用append方式往文件中追加内容。
数仓建模分为关系建模和维度建模,关系建模就是按照三范式规范来进行表的设计,我们知道3NF通过实体和关系进行连接,降低了数据的冗余,但是很多时候我们为了效率,不会完全遵守3NF,而维度模型以数据分析作为出发点,不遵循3NF,数据存在冗余。维度模型是直接面向业务的,将业务用事实表和维度表呈现出来。表结构简单,所以查询效率高,查询简单,因为如果完全遵循3NF,就会存在大量的连接查询,比较复杂,效率也不高。
数仓设计是分层设计,分为了ODS层,DWD层,DWS层,ADS层等,不过对于层次的划分,我觉得完全是根据业务需求去划分,不可能去照搬,可能所有层次都需要用到,也可能只用到其中几层,甚至可能数据到了ODS层后,直接使用它,这完全都是需要根据实际业务来出发的,如果为了所谓的“规范”而去把所有东西都搞上,那么其实是不合理的,其实不同层次之间,无非就是写写Hive SQL,将数据从一个层面转换到另外一个层面。
比如ODS层我们保存的是原始数据,那么建立DWD层就是将ODS层的数据作一次清洗和转换,比如去除空值数据,去除脏数据等,后面的层次再按照相应的需求进行建模。
hive表分为内部表和外部表,需要根据数据的性质来选择使用哪一种表,
管理表也称内部表,我们创建的表默认就为管理表,我们创建管理表后,默认会在hive-site.xml配置文件hive.metastore.warehouse.dir
配置的HDFS目录下创建目录,如/hive/warehouse
是配置的目录,当我们创建了一个表user
,那么就表的位置就为/hive/warehouse/user
,不过管理表的生命周期会受到hive的影响,当我们删除表的时候,mysql中的元数据会被删除,表在HDFS中对应的文件目录也会被删除。
外部表与内部表的区别就是外部表被删除后,只会删除元数据,并不会删除HDFS中的文件。
❝create external table user(name string);
因为内部表删除元数据后HDFS中对应的数据也会被删除,所以为了安全起见,我们创建表的时候创建为内部表。
前端埋点,或者物联网的一些数据传输过来,一般会封装为一个json字符串,我们解析这个json,然后对应到相应的表里面。
下面进行简单地数据建模。
原始json数据如下,这就是原始数据,是一个患者的简单信息,包括姓名,症状,医院这些信息,我们直接存储在HDFS。
{"name":"steakliu","id_card":"2897732442","age":97,"gender":"男","address":"成都 ","symptom":"症状","hospital_id":"1","doctor_id":1763}
建立一个hive表来存原始数据,不作任何处理,就一个json字符串,为了保证数据的可溯源特性,以及后续如果数据出现问题需要重新计算数据,我们不对原始数据作任何改动,原封不动地保存,按照每天的日期进行分区存储。
create table hospital_patient_resource_log(message_json_str string)
partitioned by (day string)
row format delimited fields terminated by '\n'
上面的原始json数据比较简单,真实的业务场景json会比较大,一个json字符串包含很多内容,可以对应很多表,我们这里只简单地将这个json字符串中地字段解析出来成为一个表结构进行存储,采用医院ID和每天的日期作为分区条件,数据存储格式为SNAPPY,压缩格式为orc。
create table ods_hospital_patient_info
(
name string,
id_card string,
age string,
gender string,
address string,
symptom string,
hospital_id string,
doctor_id string
) partitioned by (hospital string, dt string)
row format delimited fields terminated by '\t'
stored as orc
tblproperties("orc.compress"="SNAPPY");
使用get_json_object函数取出json字符串中的每一个字段,然后插入主题表中。
insert into table 只是插入数据 insert overwrite table 会覆盖数据
比如现在表里面有1000条数据,如果使用overwrite插入一条数据,那么所有的数据都会被覆盖,此刻表里面就只有一条数据了。
insert into table ods_hospital_patient_info
select get_json_object(json_str, '$.name') as name,
get_json_object(json_str, '$.id_card') as id_card,
get_json_object(json_str, '$.age') as age,
get_json_object(json_str, '$.gender') as gender,
get_json_object(json_str, '$.address') as address,
get_json_object(json_str, '$.symptom') as symptom,
get_json_object(json_str, '$.hospital_id') as hospital_id,
get_json_object(json_str, '$.doctor_id') as doctor_id,
get_json_object(json_str, '$.hospital_id') as hospital,
'2022-11-30' as dt
from ods_hospital_patient_resource_log where dt = '2022-11-30'
上面就将原始json数据转换成ODS层的数据表,这一层没有作任何的数据清洗,只是将数据进行转换,以提供给下一层使用。
❝后面的每一层我们根据需求进行建模,然后写Hive SQL将数据导过去。
整个过程分为很多个步骤,从Mysql同步到HDFS,HDFS同步到Hive,Hive各个数据层之间的转换,这些步骤之间都是相互依赖的,当前节点依赖于先上一个节点的数据,所以必须得等到上一节点完成后,下一节点才能执行,于是使用海豚调度(DS)进行任务编排。
上面的各个任务形成一个DAG,然后选择对应的调度策略进行任务的调度,一般任务都是到了第二天凌晨执行。
❝上述就完成了一个简单的示例,不过在实际的开发中还是比较复杂的,建的表比较多,类型也多,有一些场景可能还需要我们自己编写UDF函数,任务调度也比较复杂,
❝今天的分享就到这里,感谢你的观看,下期见!