摘要:本文介绍了韩非老师带来的 Dinky 实践系列之 Flink Catalog 元数据管理的分享。内容包括:
Tips:历史传送门~
《Dinky FlinkCDC 整库入仓 StarRocks》
《打造 Flink + StarRocks+ Dinky 的极速统一分析平台》
GitHub 地址
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/Dinky
欢迎大家关注 Dinky 的发展~
一、前言
Flink Catalog 持久化是 Dinky 实践系列的第二篇,通过阅读本文,您将会熟悉 Dinky MySQL Catalog 持久化的用法。这个系列中,我们以 MySQL 做为 Source 端,StarRocks 做为 Sink 端做为演示。
在 Dinky 0.6.5 之前,在编写 FlinkSQL 作业时,FlinkSQL 的 DDL 语句可以采用 FlinkSQLEnv 环境引入。但这种方式对大量表结构进行初始化管理时存在局限性,为提供统一的 Flink 元数据管理能力,Dinky 在 0.6.6 实现了 Flink MySQL Catalog 功能,此功能与 Hive Catalog 相似,相比之前大大降低了表结构的维护成本。
二、环境要求
软件 | 版本 |
---|---|
CDH | 6.2.0 |
Hadoop | 3.0.0-cdh6.2.0 |
Flink | 1.13.6 |
Flink CDC | 2.2.1 |
StarRocks | 2.2.0 |
Dinky | 0.6.6 |
MYSQL | 5.7 |
三、所需依赖
Mysql Catalog 持久化需要在 Flink下加载周围组件所需要的 Flink connector 即可。依赖如下:
# hadoop依赖
flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar
# Flink Starrrocks依赖
flink-connector-starrocks-1.2.2_flink-1.13_2.12.jar
# Dinky hadoop依赖
flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar
# Dinky mysql catalog依赖
dlink-catalog-mysql-1.13-0.6.6.jar
# flink cdc依赖包
flink-sql-connector-mysql-cdc-2.2.1.jar
# mysql 驱动依赖
mysql-connector-java-8.0.21.jar
说明
1.hadoop 依赖包放置 $FLINK_HOME/lib下
2.Flink StarRocks 依赖包放置 FLINK_HOME/lib 和 DINKY_HOME/plugins 下
3.Dinky hadoop 依赖包放置 $DINKY_HOME/plugins 下(网盘或者群公告下载)
4.Dinky MySQL Catalog 依赖放置 $FLINK_HOME/lib 下
5.Flink cdc 依赖包放置 FLINK_HOME/lib 和 DINKY_HOME/plugins 下
6.MySQL 驱动依赖放置 FLINK_HOME/lib 和 DINKY_HOME/plugins 下
四、脚本准备
如下 sql 脚本采用 Flink CDC 官网。
# mysql建表语句(同步到Starocks)
CREATE TABLE bigdata.products (
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512)
);
ALTER TABLE bigdata.products AUTO_INCREMENT = 101;
INSERT INTO bigdata.products
VALUES (default,"scooter","Small 2-wheel scooter"),
(default,"car battery","12V car battery"),
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
(default,"hammer","12oz carpenter's hammer"),
(default,"hammer","14oz carpenter's hammer"),
(default,"hammer","16oz carpenter's hammer"),
(default,"rocks","box of assorted rocks"),
(default,"jacket","water resistent black wind breaker"),
(default,"spare tire","24 inch spare tire");
CREATE TABLE bigdata.orders (
order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
order_date DATETIME NOT NULL,
customer_name VARCHAR(255) NOT NULL,
price DECIMAL(10, 5) NOT NULL,
product_id INTEGER NOT NULL,
order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;
INSERT INTO bigdata.orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);
MySQL Catalog 持久化目前默认的 catalog 为 my_catalog,默认的 FlinkSQLEnv 为 DefaultCatalog。目前存储 MySQL Catalog 元数据的表结构如下:
元数据表 | 表含义 |
---|---|
metadata_database | 元数据 schema 信息 |
metadata_table | 元数据table信息 |
metadata_database_property | schema 属性信息 |
metadata_table_property | table 属性信息 |
metadata_column | 数据列信息 |
metadata_function | UDF 信息 |
五、功能实践
DROP TABLE IF EXISTS ods_orders_src;
CREATE TABLE IF NOT EXISTS ods_orders_src (
`order_id` int COMMENT ''
, `order_date` timestamp(3) COMMENT ''
, `customer_name` string COMMENT ''
, `price` decimal(12,2) COMMENT ''
, `product_id` int COMMENT ''
, `order_status` tinyint COMMENT ''
,PRIMARY KEY(order_id) NOT ENFORCED
) COMMENT ''
WITH (
'connector' = 'mysql-cdc'
,'hostname' = '192.168.0.40'
,'port' = '4406'
,'username' = 'root'
,'password' = 'Percona@020*'
,'server-time-zone' = 'Asia/Shanghai'
,'scan.incremental.snapshot.enabled' = 'true'
,'scan.startup.mode'='initial'
,'scan.incremental.snapshot.chunk.size' = '20000'
,'heartbeat.interval' = '120s'
,'database-name' = 'bigdata'
,'table-name' = 'orders'
);
DROP TABLE IF EXISTS ods_orders_sink;
CREATE TABLE IF NOT EXISTS ods_orders_sink (
`order_id` int COMMENT ''
, `order_date` timestamp(3) COMMENT ''
, `customer_name` string COMMENT ''
, `price` decimal(12,2) COMMENT ''
, `product_id` int COMMENT ''
, `order_status` tinyint COMMENT ''
,PRIMARY KEY(order_id) NOT ENFORCED
) COMMENT ''
WITH (
'jdbc-url' = 'jdbc:mysql://192.168.0.5:19035',
'connector' = 'starrocks',
'database-name' = 'qhc_ods',
'table-name' = 'ods_orders',
'password' = '123456',
'load-url' = '192.168.0.5:18035',
'username' = 'devuser',
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true',
'sink.max-retries' = '10',
'sink.buffer-flush.interval-ms' = '15000',
'sink.parallelism' = '1'
);
DROP TABLE IF EXISTS ods_products_src;
CREATE TABLE IF NOT EXISTS ods_products_src (
`id` int COMMENT ''
, `name` string COMMENT ''
, `description` string COMMENT ''
,PRIMARY KEY(id ) NOT ENFORCED
) COMMENT ''
WITH (
'connector' = 'mysql-cdc'
,'hostname' = '192.168.0.4'
,'port' = '3306'
,'username' = 'root'
,'password' = '123456'
,'server-time-zone' = 'Asia/Shanghai'
,'scan.incremental.snapshot.enabled' = 'true'
,'scan.startup.mode'='initial'
,'scan.incremental.snapshot.chunk.size' = '20000'
,'heartbeat.interval' = '120s'
,'database-name' = 'bigdata'
,'table-name' = 'products'
);
DROP TABLE IF EXISTS ods_products_sink;
CREATE TABLE IF NOT EXISTS ods_products_sink (
`id` int COMMENT ''
, `name` string COMMENT ''
, `description` string COMMENT ''
,PRIMARY KEY(id ) NOT ENFORCED
) COMMENT ''
WITH (
'jdbc-url' = 'jdbc:mysql://192.168.0.5:19035',
'connector' = 'starrocks',
'database-name' = 'qhc_ods',
'table-name' = 'ods_products',
'password' = '123456',
'load-url' = '192.168.0.5:18035',
'username' = 'devuser',
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true',
'sink.max-retries' = '10',
'sink.buffer-flush.interval-ms' = '15000',
'sink.parallelism' = '1'
);
创建一个 ddl_init 作业,通过 yarn session模式提交,FlinkSQLEnv采用DefaultCatalog,作业如下:
执行作业后,在 dinky 元数据库查询是否表已经存在。
每执行一次初始化DDL,将会覆盖之前的元数据。
说明: 对于所创建的表元信息不能清空,否则会报错如下:
新建一个作业 ods_order_src。
select * from ods_orders_src;
由此可以看到,对于所创建的表其实已经存在与 DefaultCatalog,即保存与 Mysql 元数据中。此时,可以通过创建任意作业去使用DefaultCatalog 中的表。
首先清空 StarRocks 数据。
还是在 ods_order_src 作业中,使用 insert 语句。将数据写入 StarRocks 中。
INSERT INTO ods_orders_sink
select * from ods_orders_src;
对于 MySQL Catalog 除上面用默认的 DefaultCatalog,那么也可以通过 create 创建 catalog,然后在对应数据库下执行 dlinkmysqlcatalog.sql。语法如下:
create catalog my_catalog with(
'type' = 'dlink_mysql',
'username' = 'dlink',
'password' = 'dlink',
'url' = 'jdbc:mysql://192.168.0.4:3306/dlink2?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true'
);
use catalog my_catalog;
六、总结
随着 Dinky 的不断扩大以及在业界的影响力。为方便大家的学习和使用,此次系列文章作为 Dinky 系列文章的第二篇,后期系列文章尽请期待。