前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >湖仓一体电商项目(二十二):实时任务执行流程

湖仓一体电商项目(二十二):实时任务执行流程

原创
作者头像
Lansonli
发布2022-10-25 00:13:29
3510
发布2022-10-25 00:13:29
举报
文章被收录于专栏:Lansonli技术博客

​实时任务执行流程

目前暂时将项目在本地执行,执行顺序如下:

一、准备环境

这里默认HDFS、Hive、HBase、Kafka环境已经准备,启动maxwell组件监控mysql业务库数据:

代码语言:javascript
复制
#在Kafka中创建好对应的kafka topic(已创建的topic,可忽略,避免重复创建)
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-USER-LOG-DATA --partitions 3 --replication-factor 3

./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DB-BUSSINESS-DATA --partitions 3 --replication-factor 3


./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-ODS-TOPIC --partitions 3 --replication-factor 3

./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DIM-TOPIC --partitions 3 --replication-factor 3

./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWD-BROWSE-LOG-TOPIC --partitions 3 --replication-factor 3

./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWS-BROWSE-LOG-WIDE-TOPIC --partitions 3 --replication-factor 3

#启动maxwell
[root@node3 ~]# cd /software/maxwell-1.28.2/bin
[root@node3 bin]#  maxwell --config ../config.properties

#在Hive中创建好需要的Iceberg各层的表
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

代码语言:javascript
复制
CREATE TABLE ODS_PRODUCT_CATEGORY (
id string,
p_id string,
name string,
pic_url string,
gmt_create string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_PRODUCT_CATEGORY/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

CREATE TABLE ODS_PRODUCT_INFO (
product_id string,
category_id string,
product_name string,
gmt_create string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_PRODUCT_INFO/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

CREATE TABLE ODS_BROWSELOG  (
 log_time string,
 user_id string,
 user_ip string,
 front_product_url string,
 browse_product_url string,
 browse_product_tpcode string,
 browse_product_code string,
 obtain_points string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_BROWSELOG/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

CREATE TABLE DWD_BROWSELOG  (
 log_time string,
 user_id string,
 user_ip string,
 front_product_url string,
 browse_product_url string,
 browse_product_tpcode string,
 browse_product_code string,
 obtain_points string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/DWD_BROWSELOG/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

CREATE TABLE DWS_BROWSE_INFO (
log_time string,
user_id string,
user_ip string,
product_name string,
front_product_url string,
browse_product_url string,
first_category_name string,
second_category_name string,
obtain_points string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/DWS_BROWSE_INFO/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

代码语言:javascript
复制
#启动Clickhouse
[root@node1 ~]# service clickhouse-server start

代码语言:javascript
复制
#在Clickhouse中创建好对应表
create table dm_product_visit_info(
 current_dt String,
 window_start String,
 window_end String,
 first_cat String,
 second_cat String,
 product String,
 product_cnt UInt32
) engine = MergeTree() order by current_dt;

二、启动Flink代码

依次启动如下Flink代码:”ProduceKafkaDBDataToODS.scala”、“ProduceKafkaLogDataToODS.scala”、“DimDataToHBase.scala”、“ProduceKafkaODSDataToDWD.scala”、“ProduceBrowseLogToDWS.scala”、“ProcessBrowseLogInfoToDM.scala”代码。各个代码中Kafka Connector属性“scan.startup.mode”设置为“latest-offset”,从最新位置消费数据。

注意:代码执行时可以设置使用内存参数:-Xmx500m -Xms500m

三、启动数据采集接口代码

启动项目“LakeHouseDataPublish”发布数据。

四、启动模拟数据代码

启动项目“LakeHouseMockData”中模拟向数据库中生产数据代码“RTMockDBData.java”,此代码中只需要向MySQL生产用户登录数据即可。

启动项目“LakeHouseMockData”中向日志采集接口生产日志的代码“RTMockUserLogData.java”。

这里如果想和第一个业务一起运行还需要将第一个业务“ProduceUserLogInToDWS.scala”、“ProcessUserLoginInfoToDM.scala”两个代码。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ​实时任务执行流程
    • 一、准备环境
      • 二、启动Flink代码
        • 三、启动数据采集接口代码
          • 四、启动模拟数据代码
          相关产品与服务
          数据湖计算 DLC
          数据湖计算DLC(Data Lake Compute,DLC)提供了敏捷高效的数据湖分析与计算服务。服务采用无服务器架构(Serverless),开箱即用。使用标准SQL语法即可完成数据处理、多源数据联合计算等数据工作,有效降低用户数据分析服务搭建成本及使用成本,提高企业数据敏捷度。
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档