目标:从Oracle抽取数据到HIVE中创建ODS层和DWD层数据库,并完成在当天的对应总共100张表的创建与数据载入
1:创建ODS层和DWD层
create database if not exists 【DBname】;
2.创建表格
-- 只需要从下列【关键字】替换后即可获取所需的完整建表语句
create external table if not exists 【Ⅰ. TBLname】
comment 【Ⅱ. TBL_Cmmt】
【Ⅲ. [columns_info_list]】
partitioned by 【Ⅳ. partition_info】
ROW FORMAT 【Ⅴ. delimitered | serde】
STORED AS 【Ⅵ. store_type】
location 【Ⅶ. path】
TBLPROPERTIES 【Ⅷ. optional】
将需要建立的表格名存入在txt中,读取表格获取
-- 如果是整个数据库移植,可以通过查询目标数据库的表名获取
show tables;
表或者列的描述信息可以通过读取Oracle中元数据表格获取,或者TBLPROPERTIES【Ⅷ. optional】
通过读取schema文件补充元数据信息。
由于是创建当天的表格,可以选用日期作为分桶依据
首先需要使用sqoop从Oracle中采集数据到HIVE,完成ODS层的采集,再根据这层数据导入到DWD层。因此,为了避免换行符与分隔符与表格内容的冲突,ODS层可以使用sqoop,Oracle以及HIVE都兼容的AVRO格式存储原始数据
而DWD层需要多次查询的情况下使用ORC格式
表格存储在HDFS上的路径,最后一个文件夹的命名可以不单用表格名,可以加前缀以表示其他信息
3.分区声明
alter table 表名 add if not exists partition 【partition_set】
location 【partition_path_on_HDFS】
#!/usr/bin/env bash
# /bin/bash
# partition名设置
biz_date=20210101
# 条件判断用的日期
biz_fmt_date=2021-01-01
# 目标文件夹
dw_parent_dir=/target_path/……
# 工作目录
workhome=/workhome_path/……
# 需要采集的oracle表名list
oracle_tables=${workhome}/oracle_tables.txt
# 连接Oracle配置:主机IP 端口 SID 用户 用户密码
orcl_srv=hostname
orcl_port=1521
orcl_sid=sid
orcl_user=orcl_user_name
orcl_pwd=****
# 创建日志文件夹
mkdir ${workhome}/log
# sqoop条件命令拼接变量设置:
# 作业的用户类路径优先避免类冲突
sqoop_condition_params="--where \"'${biz_fmt_date}'=to_char(CREATE_TIME,'yyyy-mm-dd')\""
sqoop_import_params="sqoop import -Dmapreduce.job.user.classpath.first=true --outdir ${workhome}/java_code --as-avrodatafile"
# sqoop连接Oracle的命令
sqoop_jdbc_params="--connect jdbc:oracle:thin:@${orcl_srv}:${orcl_port}:${orcl_sid} --username ${orcl_user} --password ${orcl_pwd}"
# 加载hadoop/sqoop环境配置
source /etc/profile
while read p; do
# 删除旧文件,覆盖写入
hdfs fs -rm -r ${dw_parent_dir}/${p}/${biz_date}
# 拼接sqoop语句
${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} ${sqoop_condition_params} -m 1 &
# 语句执行时间预留
sleep 30
done < ${oracle_tables}
运行结果
target-dir参数下的,每个……/tablename/20210101下得到对应的.avro表数据文件
outdir参数下的的文件为:
上传schema文件
#!/usr/bin/env bash
# /bin/bash
workhome=/workhome_path/……
hdfs_schema_dir=/store_path/……/avsc
biz_date=20210101
biz_fmt_date=2021-01-01
local_schema_backup_filename=schema_${biz_date}.tar.gz
hdfs_schema_backup_filename=${hdfs_schema_dir}/avro_schema_${biz_date}.tar.gz
log_file=${workhome}/log/upload_avro_schema_${biz_fmt_date}.log
source /etc/profile
cd ${workhome}
hdfs fs -test -e ${hdfs_schema_dir} > /dev/null
if [ $? != 0 ]; then
hdfs fs -mkdir -p ${hdfs_schema_dir}
fi
hdfs fs -test -e ${hdfs_schema_dir}/CISS4_CISS_BASE_AREAS.avsc.avsc > /dev/null
if [ $? != 0 ]; then
hdfs fs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}
fi
if [ ! -e ${local_schema_backup_filename} ]; then
tar -czf ${local_schema_backup_filename} ./java_code/*.avsc
fi
hdfs fs -test -e ${hdfs_schema_backup_filename} > /dev/null
if [ $? != 0 ]; then
hdfs fs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}
import configparser
import cx_Oracle
from pyhive import hive
config = configparser.ConfigParser()
config.read(r'config_path\……\config.txt')
config.read(r'D:\Py_code\……)
ORACLE_HOST=config.get('OracleConn','oracleHost')
# …… 略
HIVE_HOST=config.get('OracleConn','hiveHost')
# …… 略
dsn = cx_Oracle.makedsn(ORACLE_HOST, ORACLE_PORT, service_name=ORACLE_SID)
# 这里建立了连接Oracle的对象
oracleConn = cx_Oracle.connect(ORACLE_USER, ORACLE_PASSWORD, dsn)
HiveConn = hive.Connection(host=HIVE_HOST,
port=HIVE_PORT,
username=HIVE_UNAME,
auth='CUSTOM',
password=SPARK_HIVE_PASSWORD)
with t1 as (
select -- 获取列表数据类型
column_name columnName,
data_type dataType,
DATA_SCALE dataScale,
DATA_PRECISION dataPercision,
TABLE_NAME
from all_tab_cols where '【TBLname】' = TABLE_NAME
),
t2 as( -- 获取表格备注
select
comments tableComment,TABLE_NAME
from all_tab_comments WHERE '【TBLname】' = TABLE_NAME
),
t3 as( -- 获取列表备注
select comments columnComment, COLUMN_NAME
from all_col_comments WHERE '【TBLname】'= TABLE_NAME
)
select
columnName, dataType, dataScale, dataPercision, columnComment, tableComment
from t1
left join t2 on t1.TABLE_NAME = t2.TABLE_NAME
left join t3 on t1.columnName = t3.COLUMN_NAME
以其中一张表为例,在Oracle中查询结果的前5行如下
每一行对应其中一列的元数据信息,分别为【列名】【数据类型】【数据长度】【数据精度】【列备注】【表备注】
读取到python的表格中
# 读取sql语句
GetOracleMetaData=open(r"\……\GetOracleMetaData.txt")
oracle_cmd=GetOracleMetaData.read()
GetOracleMetaData.close()
# 读取目标表名
fr = open(r"\……\tablenames.txt")
tableNameList = []
for line in fr.readlines():
curLine = line.rstrip('\n')
tableNameList.append(curLine)
# 将所有表的元数据信息存放在列表中
MetaDataLit=dict()
for tbl in tableNameList:
oracle_cmd.replace('【TBLname】',tbl.upper())
cursor.execute(oracleSql)
resultSet = cursor.fetchall()
MetaDataLit[tbl]=resultSet
因为使用sqoop将Oracle表导入到HIVE时生成了*.avsc ,因此列的元数据信息可以通过tblproperties配置schema文件指定
HQL
# ods层数据库的建表语句create_table.txt
create external table if not exists [DB_TBLname]
comment '[COMMT]'
partitioned by[partition_info]
ROW FORMAT SERDE
'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
location '[TBL_HDFS_path]'
tblproperties('avro.schema.url'='[AVSC_folder_path][AVSC_filename].avsc';
实现
command_file=open(r"D:\Py_code\tmp\create_table.txt")
tble_create_cmd=command_file.read()
command_file.close()
cmd=tble_create_cmd
replace_dict=dict()
for tbl in tableNameList:
replace_dict['[DB_TBLname]']='ods.'tbl
replace_dict['[COMMT]']=MetaDataLit[tbl][0][5]
replace_dict['[partition_info]']='(dt string)'
replace_dict['[TBL_HDFS_path]']='hdfs:///workhome/ods/'+tbl
replace_dict['[AVSC_folder_path]']='hdfs:///workhome/dw/ods/avsc/'
replace_dict['[AVSC_filename]']=tbl.upper()
for k,v in replace_dict.items():
cmd=cmd.replace(k,v)
HIVE_cursor.execute(cmd)
cmd=tble_create_cmd
replace_dict=dict()
HQL
# ods层表分区语句add_partition.txt
alter table [DB_TBLname] add if not exists partition ([partition_set])
location '[TBL_HDFS_path][partition_path]'
实现
command_file=open(r"D:\Py_code\tmp\add_partition.txt")
tble_create_cmd=command_file.read()
command_file.close()
cmd=tble_create_cmd
partition_dict=dict()
for tbl in tableNameList:
partition_dict['[partition_set]']='dt="20210101"'
partition_dict['[partition_path]']='/20210101'
partition_dict['[DB_TBLname]']=tbl
partition_dict['[TBL_HDFS_path]']='hdfs:///workhome/ods/'+tbl
for k,v in partition_dict.items():
cmd=cmd.replace(k,v)
HIVE_cursor.execute(cmd)
cmd=add_partition_cmd
partition_dict=dict()
HQL语句
# dwd层表格创建语句create_dwd_tbl.txt
create table if not exists [TBLname](
[columns_list]
)
comment '[TBLComment]'
partitioned by(dt string)
stored as orc
location '[TBLpath][TBLname]'
实现
[columns_list]中所需的 【列名】 【列的数据格式】【类的备注】
由于【列的数据格式】从Oracle中抽取,需要更改为与HIVE共有或兼容的格式,需要做以下的数据类型转换:
timestamp => long , number => bigint | dicimal ,other => String
将以上内容将通过oracal2Hive函数处理后以字典的格式返回
oracal2Hive
# columnName 列名
# dataType 列的数据类型
# dataScale 类列的长度
# dataScope 小数类列的精度
# columnComment 列的备注
def oracal2Hive(columnName,dataType,dataScale,dataScope,columnComment):
col_dict=dict()
col_dict['columnName']=columnName
col_dict['columnComment']=columnComment
if dataType.lower()=='timestamp':
col_dict['columnType']='long'
# 如果Oracle中为数值类型
elif dataType.lower()=='number':
# 如果长度为None或者长度小于1
if dataScale is None or dataScale < 1:
# 整数类型,返回bigint
col_dict['columnType']= 'bigint'
# 为数值,但是有小数点
else:
# 返回dicimal类型
col_dict['columnType']= f'decimal({dataScope}, {dataScale})'
# 其他类型全部返回String类型
else:
col_dict['columnType'] = 'string'
return col_dict
具体转换
create_dwd_tbl=open(r"\……\create_dwd_tbl.txt")
dwd_tbl_create_cmd=create_dwd_tbl.read()
create_dwd_tbl.close()
col_info_list=[]
replace_dict=dict()
for tbl in tableNameList:
TBLComment=MetaDataLit[tbl][0][5]
if TBLComment is None:
TBLComment=''
for col_info in MetaDataLit[tbl]:
columnName= col_info[0] # 获取表元素据-列的名称
dataType = col_info[1] # 获取表元素据-列的类型
dataScale = col_info[2] # 获取表元素据-列值长度
dataScope = col_info[3] # 获取表元素据-列值精度
columnComment = col_info[4] # 获获取表元素据-列的注释
if dataScale is None:
dataScale=0
if dataScope is None:
dataScope=0
# Oracle->hive类型转换
col_info_list.append(oracal2Hive(columnName,dataType,dataScale,dataScope,columnComment))
cols_cmd=[]
# 每一列按照建表语法格式存储在列表中
for i in col_info_list:
cols_cmd.append(f"\t`{i.get('columnName')}` {i.get('columnType')} comment'{i.get('columnComment')}',")
# 拼接列表,并去除最后的逗号,
columns_list='\n'.join(cols_cmd)[:-1]
cmd=dwd_tbl_create_cmd
replace_dict['[TBLname]']='dwd.'+tbl
replace_dict['[columns_list]']=columns_list
replace_dict['[partition_info]']='(dt string)'
replace_dict['[TBLComment]']=TBLComment
replace_dict['[TBLpath]']='/workhome/dwd/'+tbl'
for k,v in replace_dict.items():
cmd=cmd.replace(k,v)
HIVE_cursor.execute(cmd)
cmd=dwd_tbl_create_cmd
replace_dict=dict()
col_info_list=[]
HQL语句
# dwd数据插入语句insert_dwd_tbl.txt
insert overwrite table [Target_TBL] partition([partition_set])
select
[select_cols]
from [TBLname]
where [where_set]
实现
insert_dwd_tbl=open(r"\……\insert_dwd_tbl.txt")
dwd_tbl_insert_cmd=insert_dwd_tbl.read()
insert_dwd_tbl.close()
select_list=[]
insert_dict=dict()
for tbl in tableNameList:
for col_info in MetaDataLit[tbl]:
select_list.append(f'\t`{col_info[0]},`')
select_list='\n'.join(select_list)[:-1]
cmd=dwd_tbl_insert_cmd
insert_dict['[Target_TBL]']='dwd.'+tbl
insert_dict['[TBLname]']='ods.'+tbl
insert_dict['[partition_set]']='dt="20210101"'
insert_dict['[select_cols]']=select_list
insert_dict['[where_set]']='dt="20210101"'
for k,v in insert_dict.items():
cmd=cmd.replace(k,v)
HIVE_cursor.execute(cmd)
select_list=[]
cmd=dwd_tbl_insert_cmd
.
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。