首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >数据库自动化采集与数据库层别的建立

数据库自动化采集与数据库层别的建立

原创
作者头像
用户11196524
修改2024-11-06 10:59:53
修改2024-11-06 10:59:53
28500
代码可运行
举报
运行总次数:0
代码可运行

目标:从Oracle抽取数据到HIVE中创建ODS层和DWD层数据库,并完成在当天的对应总共100张表的创建与数据载入

HQL语句预设

1:创建ODS层和DWD层

代码语言:sql
复制
create database if not exists 【DBname】;

2.创建表格

代码语言:sql
复制
-- 只需要从下列【关键字】替换后即可获取所需的完整建表语句
create external table if not exists 【Ⅰ. TBLname】
comment 【Ⅱ. TBL_Cmmt】
【Ⅲ. [columns_info_list]】
partitioned by 【Ⅳ. partition_info】
ROW FORMAT 【Ⅴ. delimitered | serde】
STORED AS 【Ⅵ. store_type】
location 【Ⅶ. path】
TBLPROPERTIES 【Ⅷ. optional】
  • 【Ⅰ. TBLname】表名

将需要建立的表格名存入在txt中,读取表格获取

代码语言:sql
复制
-- 如果是整个数据库移植,可以通过查询目标数据库的表名获取
show tables;
  • 【Ⅱ. TBL_Cmmt】/ 【Ⅷ. optional】表备注
  • 【Ⅲ. [columns_info_list]】 / 【Ⅷ. optional】列的元数据描述部分:列名,数据类型,列备注

表或者列的描述信息可以通过读取Oracle中元数据表格获取,或者TBLPROPERTIES【Ⅷ. optional】

通过读取schema文件补充元数据信息。

  • 【Ⅳ. partition_info】

由于是创建当天的表格,可以选用日期作为分桶依据

  • 【Ⅴ. delimitered | serde】/ 【Ⅵ. store_type】

首先需要使用sqoop从Oracle中采集数据到HIVE,完成ODS层的采集,再根据这层数据导入到DWD层。因此,为了避免换行符与分隔符与表格内容的冲突,ODS层可以使用sqoop,Oracle以及HIVE都兼容的AVRO格式存储原始数据

而DWD层需要多次查询的情况下使用ORC格式

  • 【Ⅶ. path】

表格存储在HDFS上的路径,最后一个文件夹的命名可以不单用表格名,可以加前缀以表示其他信息

3.分区声明

代码语言:sql
复制
alter table 表名 add if not exists partition 【partition_set】
location 【partition_path_on_HDFS】

数据采集部分

代码语言:shell
复制
#!/usr/bin/env bash
# /bin/bash

# partition名设置
biz_date=20210101
# 条件判断用的日期
biz_fmt_date=2021-01-01
# 目标文件夹
dw_parent_dir=/target_path/……
# 工作目录
workhome=/workhome_path/……
# 需要采集的oracle表名list
oracle_tables=${workhome}/oracle_tables.txt
# 连接Oracle配置:主机IP 端口 SID 用户 用户密码
orcl_srv=hostname
orcl_port=1521
orcl_sid=sid
orcl_user=orcl_user_name
orcl_pwd=****

# 创建日志文件夹
mkdir ${workhome}/log

# sqoop条件命令拼接变量设置:
# 作业的用户类路径优先避免类冲突
sqoop_condition_params="--where \"'${biz_fmt_date}'=to_char(CREATE_TIME,'yyyy-mm-dd')\""
sqoop_import_params="sqoop import -Dmapreduce.job.user.classpath.first=true --outdir ${workhome}/java_code --as-avrodatafile"
# sqoop连接Oracle的命令
sqoop_jdbc_params="--connect jdbc:oracle:thin:@${orcl_srv}:${orcl_port}:${orcl_sid} --username ${orcl_user} --password ${orcl_pwd}"

# 加载hadoop/sqoop环境配置
source /etc/profile

while read p; do
    # 删除旧文件,覆盖写入
    hdfs fs -rm -r ${dw_parent_dir}/${p}/${biz_date}
    # 拼接sqoop语句
    ${sqoop_import_params} ${sqoop_jdbc_params} --target-dir ${dw_parent_dir}/${p}/${biz_date} --table ${p^^} ${sqoop_condition_params} -m 1 &
    # 语句执行时间预留
    sleep 30
    
done < ${oracle_tables}

运行结果

target-dir参数下的,每个……/tablename/20210101下得到对应的.avro表数据文件

outdir参数下的的文件为:

  • 每个表格生成一个.java文件记录导入和导出数据操作的Java代码
  • 一个记录表格schema的.avsc文件

上传schema文件

代码语言:shell
复制
#!/usr/bin/env bash
# /bin/bash

workhome=/workhome_path/……
hdfs_schema_dir=/store_path/……/avsc
biz_date=20210101
biz_fmt_date=2021-01-01
local_schema_backup_filename=schema_${biz_date}.tar.gz
hdfs_schema_backup_filename=${hdfs_schema_dir}/avro_schema_${biz_date}.tar.gz
log_file=${workhome}/log/upload_avro_schema_${biz_fmt_date}.log

source /etc/profile
cd ${workhome}

hdfs fs -test -e ${hdfs_schema_dir} > /dev/null

if [ $? != 0 ]; then
    hdfs fs -mkdir -p ${hdfs_schema_dir}
fi

hdfs fs -test -e ${hdfs_schema_dir}/CISS4_CISS_BASE_AREAS.avsc.avsc > /dev/null
if [ $? != 0 ]; then
    hdfs fs -put ${workhome}/java_code/*.avsc ${hdfs_schema_dir}
fi

if [ ! -e ${local_schema_backup_filename} ]; then
    tar -czf ${local_schema_backup_filename} ./java_code/*.avsc
fi

hdfs fs -test -e ${hdfs_schema_backup_filename} > /dev/null
if [ $? != 0 ]; then
    hdfs fs -put ${local_schema_backup_filename} ${hdfs_schema_backup_filename}

代码实现部分

建立连接

代码语言:javascript
代码运行次数:0
运行
复制
import configparser
import cx_Oracle
from pyhive import hive
​
config = configparser.ConfigParser()
config.read(r'config_path\……\config.txt')
config.read(r'D:\Py_code\……)
ORACLE_HOST=config.get('OracleConn','oracleHost')
# …… 略 
HIVE_HOST=config.get('OracleConn','hiveHost')
# …… 略
 
dsn = cx_Oracle.makedsn(ORACLE_HOST, ORACLE_PORT, service_name=ORACLE_SID)
# 这里建立了连接Oracle的对象
oracleConn = cx_Oracle.connect(ORACLE_USER, ORACLE_PASSWORD, dsn)
​
 
HiveConn = hive.Connection(host=HIVE_HOST, 
                                port=HIVE_PORT, 
                                username=HIVE_UNAME, 
                                auth='CUSTOM',
                                password=SPARK_HIVE_PASSWORD)

获取元数据信息

代码语言:sql
复制
with t1 as (
    select  -- 获取列表数据类型
           column_name columnName, 
           data_type dataType,
           DATA_SCALE dataScale,
           DATA_PRECISION dataPercision,
           TABLE_NAME
    from all_tab_cols where '【TBLname】' = TABLE_NAME
),
t2 as(  -- 获取表格备注
        select
               comments tableComment,TABLE_NAME
        from all_tab_comments WHERE '【TBLname】' = TABLE_NAME
    ),
t3 as(  -- 获取列表备注
        select comments columnComment, COLUMN_NAME
        from all_col_comments WHERE '【TBLname】'= TABLE_NAME
    )
select
columnName, dataType, dataScale, dataPercision, columnComment, tableComment
from t1
    left join t2 on t1.TABLE_NAME = t2.TABLE_NAME
    left join t3 on t1.columnName = t3.COLUMN_NAME

以其中一张表为例,在Oracle中查询结果的前5行如下

每一行对应其中一列的元数据信息,分别为【列名】【数据类型】【数据长度】【数据精度】【列备注】【表备注】

读取到python的表格中

代码语言:sql
复制
# 读取sql语句
GetOracleMetaData=open(r"\……\GetOracleMetaData.txt")
oracle_cmd=GetOracleMetaData.read()
GetOracleMetaData.close()
# 读取目标表名            
fr = open(r"\……\tablenames.txt")
tableNameList = []
for line in fr.readlines():
    curLine = line.rstrip('\n')
    tableNameList.append(curLine)
    
# 将所有表的元数据信息存放在列表中
MetaDataLit=dict()
for tbl in tableNameList:
    oracle_cmd.replace('【TBLname】',tbl.upper())
    cursor.execute(oracleSql)
    resultSet = cursor.fetchall()
    MetaDataLit[tbl]=resultSet

ODS层表格创建

因为使用sqoop将Oracle表导入到HIVE时生成了*.avsc ,因此列的元数据信息可以通过tblproperties配置schema文件指定

HQL

代码语言:sql
复制
# ods层数据库的建表语句create_table.txt
create external table if not exists [DB_TBLname]
comment '[COMMT]'
partitioned by[partition_info]
ROW FORMAT SERDE 
	'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS 
	INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
	OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
location '[TBL_HDFS_path]'
tblproperties('avro.schema.url'='[AVSC_folder_path][AVSC_filename].avsc';

实现

代码语言:python
代码运行次数:0
运行
复制
command_file=open(r"D:\Py_code\tmp\create_table.txt")
tble_create_cmd=command_file.read()
command_file.close()
cmd=tble_create_cmd
replace_dict=dict()

for tbl in tableNameList:
    replace_dict['[DB_TBLname]']='ods.'tbl
    replace_dict['[COMMT]']=MetaDataLit[tbl][0][5]
    replace_dict['[partition_info]']='(dt string)'
    replace_dict['[TBL_HDFS_path]']='hdfs:///workhome/ods/'+tbl
    replace_dict['[AVSC_folder_path]']='hdfs:///workhome/dw/ods/avsc/'
    replace_dict['[AVSC_filename]']=tbl.upper()
    for k,v in replace_dict.items():
        cmd=cmd.replace(k,v)
    HIVE_cursor.execute(cmd)
    cmd=tble_create_cmd
    replace_dict=dict()

修改分区信息

HQL

代码语言:sql
复制
# ods层表分区语句add_partition.txt
alter table [DB_TBLname] add if not exists partition ([partition_set])
location '[TBL_HDFS_path][partition_path]'

实现

代码语言:python
代码运行次数:0
运行
复制
command_file=open(r"D:\Py_code\tmp\add_partition.txt")
tble_create_cmd=command_file.read()
command_file.close()
cmd=tble_create_cmd
partition_dict=dict()
for tbl in tableNameList:
    partition_dict['[partition_set]']='dt="20210101"'
    partition_dict['[partition_path]']='/20210101'
    partition_dict['[DB_TBLname]']=tbl
    partition_dict['[TBL_HDFS_path]']='hdfs:///workhome/ods/'+tbl
    for k,v in partition_dict.items():
        cmd=cmd.replace(k,v)
    HIVE_cursor.execute(cmd)
    cmd=add_partition_cmd
    partition_dict=dict()

DWD层表格创建

HQL语句

代码语言:sql
复制
# dwd层表格创建语句create_dwd_tbl.txt
create table if not exists [TBLname](
[columns_list]
) 
comment '[TBLComment]'
partitioned by(dt string)
stored as orc
location '[TBLpath][TBLname]'

实现

[columns_list]中所需的 【列名】 【列的数据格式】【类的备注】

由于【列的数据格式】从Oracle中抽取,需要更改为与HIVE共有或兼容的格式,需要做以下的数据类型转换:

timestamp => long , number => bigint | dicimal ,other => String

将以上内容将通过oracal2Hive函数处理后以字典的格式返回

oracal2Hive

代码语言:python
代码运行次数:0
运行
复制
# columnName 列名
# dataType   列的数据类型
# dataScale  类列的长度
# dataScope  小数类列的精度
# columnComment 列的备注
    
def oracal2Hive(columnName,dataType,dataScale,dataScope,columnComment):
    col_dict=dict()
    col_dict['columnName']=columnName
    col_dict['columnComment']=columnComment
    
    if dataType.lower()=='timestamp':
        col_dict['columnType']='long'
    # 如果Oracle中为数值类型
    elif dataType.lower()=='number':
        # 如果长度为None或者长度小于1
        if dataScale is None or dataScale < 1:
            # 整数类型,返回bigint
            col_dict['columnType']= 'bigint'
        # 为数值,但是有小数点
        else:
            # 返回dicimal类型
            col_dict['columnType']= f'decimal({dataScope}, {dataScale})'
    # 其他类型全部返回String类型
    else:
        col_dict['columnType'] = 'string'

    return col_dict

具体转换

代码语言:python
代码运行次数:0
运行
复制
create_dwd_tbl=open(r"\……\create_dwd_tbl.txt")
dwd_tbl_create_cmd=create_dwd_tbl.read()
create_dwd_tbl.close()
col_info_list=[]
replace_dict=dict()
for tbl in tableNameList:
    TBLComment=MetaDataLit[tbl][0][5]
    if TBLComment is None:
        TBLComment=''
    for col_info in MetaDataLit[tbl]:
        columnName= col_info[0]        # 获取表元素据-列的名称
        dataType = col_info[1]          # 获取表元素据-列的类型
        dataScale = col_info[2]         # 获取表元素据-列值长度
        dataScope = col_info[3]         # 获取表元素据-列值精度
        columnComment = col_info[4]     # 获获取表元素据-列的注释
        if dataScale is None:
            dataScale=0
        if dataScope is None:
            dataScope=0
        # Oracle->hive类型转换
        col_info_list.append(oracal2Hive(columnName,dataType,dataScale,dataScope,columnComment))
    cols_cmd=[]
    # 每一列按照建表语法格式存储在列表中
    for i in col_info_list:
        cols_cmd.append(f"\t`{i.get('columnName')}` {i.get('columnType')} comment'{i.get('columnComment')}',")
    # 拼接列表,并去除最后的逗号,
    columns_list='\n'.join(cols_cmd)[:-1] 
    
    cmd=dwd_tbl_create_cmd
    replace_dict['[TBLname]']='dwd.'+tbl
    replace_dict['[columns_list]']=columns_list
    replace_dict['[partition_info]']='(dt string)'
    replace_dict['[TBLComment]']=TBLComment
    replace_dict['[TBLpath]']='/workhome/dwd/'+tbl'
    for k,v in replace_dict.items():
        cmd=cmd.replace(k,v)
        
    HIVE_cursor.execute(cmd)
    cmd=dwd_tbl_create_cmd
    replace_dict=dict()
    col_info_list=[]

DWD层数据导入

HQL语句

代码语言:sql
复制
# dwd数据插入语句insert_dwd_tbl.txt
insert overwrite table [Target_TBL] partition([partition_set])
select
[select_cols]
from [TBLname]
where [where_set]

实现

代码语言:python
代码运行次数:0
运行
复制
insert_dwd_tbl=open(r"\……\insert_dwd_tbl.txt")
dwd_tbl_insert_cmd=insert_dwd_tbl.read()
insert_dwd_tbl.close()
select_list=[]
insert_dict=dict()
for tbl in tableNameList:
    for col_info in MetaDataLit[tbl]:
        select_list.append(f'\t`{col_info[0]},`')
        
    select_list='\n'.join(select_list)[:-1]
    cmd=dwd_tbl_insert_cmd
    
    insert_dict['[Target_TBL]']='dwd.'+tbl
    insert_dict['[TBLname]']='ods.'+tbl
    insert_dict['[partition_set]']='dt="20210101"'
    insert_dict['[select_cols]']=select_list
    insert_dict['[where_set]']='dt="20210101"'
    
    for k,v in insert_dict.items():
        cmd=cmd.replace(k,v)
        
    HIVE_cursor.execute(cmd)
    select_list=[]
    cmd=dwd_tbl_insert_cmd

.

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • HQL语句预设
  • 数据采集部分
  • 代码实现部分
    • 建立连接
    • 获取元数据信息
    • ODS层表格创建
    • 修改分区信息
    • DWD层表格创建
    • DWD层数据导入
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档