前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Dinky 实践系列之 Flink Catalog 元数据管理

Dinky 实践系列之 Flink Catalog 元数据管理

作者头像
文末丶
发布2023-02-26 14:18:34
2.3K0
发布2023-02-26 14:18:34
举报
文章被收录于专栏:DataLink数据中台

摘要:本文介绍了韩非老师带来的 Dinky 实践系列之 Flink Catalog 元数据管理的分享。内容包括:

  1. 前言
  2. 环境要求
  3. 所需依赖
  4. 脚本准备
  5. 功能实践
  6. 总结

Tips:历史传送门~

Dinky实践系列之FlinkCDC整库实时入仓入湖

Dinky FlinkCDC 整库入仓 StarRocks

打造 Flink + StarRocks+ Dinky 的极速统一分析平台

Dinky 构建 Flink CDC 整库入仓入湖

GitHub 地址

https://github.com/DataLinkDC/dlink

https://gitee.com/DataLinkDC/Dinky

欢迎大家关注 Dinky 的发展~

一、前言

Flink Catalog 持久化是 Dinky 实践系列的第二篇,通过阅读本文,您将会熟悉 Dinky MySQL Catalog 持久化的用法。这个系列中,我们以 MySQL 做为 Source 端,StarRocks 做为 Sink 端做为演示。

在 Dinky 0.6.5 之前,在编写 FlinkSQL 作业时,FlinkSQL 的 DDL 语句可以采用 FlinkSQLEnv 环境引入。但这种方式对大量表结构进行初始化管理时存在局限性,为提供统一的 Flink 元数据管理能力,Dinky 在 0.6.6 实现了 Flink MySQL Catalog 功能,此功能与 Hive Catalog 相似,相比之前大大降低了表结构的维护成本。

二、环境要求

软件

版本

CDH

6.2.0

Hadoop

3.0.0-cdh6.2.0

Flink

1.13.6

Flink CDC

2.2.1

StarRocks

2.2.0

Dinky

0.6.6

MYSQL

5.7

三、所需依赖

依赖

Mysql Catalog 持久化需要在 Flink下加载周围组件所需要的 Flink connector 即可。依赖如下:

代码语言:javascript
复制
# hadoop依赖
flink-shaded-hadoop-2-uber-3.0.0-cdh6.3.0-7.0.jar
# Flink Starrrocks依赖
flink-connector-starrocks-1.2.2_flink-1.13_2.12.jar
# Dinky hadoop依赖
flink-shaded-hadoop-3-uber-3.1.1.7.2.8.0-224-9.0.jar
# Dinky mysql catalog依赖
dlink-catalog-mysql-1.13-0.6.6.jar
# flink cdc依赖包
flink-sql-connector-mysql-cdc-2.2.1.jar
# mysql 驱动依赖
mysql-connector-java-8.0.21.jar

说明

1.hadoop 依赖包放置 $FLINK_HOME/lib下

2.Flink StarRocks 依赖包放置 FLINK_HOME/lib 和 DINKY_HOME/plugins 下

3.Dinky hadoop 依赖包放置 $DINKY_HOME/plugins 下(网盘或者群公告下载)

4.Dinky MySQL Catalog 依赖放置 $FLINK_HOME/lib 下

5.Flink cdc 依赖包放置 FLINK_HOME/lib 和 DINKY_HOME/plugins 下

6.MySQL 驱动依赖放置 FLINK_HOME/lib 和 DINKY_HOME/plugins 下

四、脚本准备

MySQL 建表

如下 sql 脚本采用 Flink CDC 官网。

代码语言:javascript
复制
# mysql建表语句(同步到Starocks)
CREATE TABLE bigdata.products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE bigdata.products AUTO_INCREMENT = 101;
INSERT INTO bigdata.products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");
       
CREATE TABLE bigdata.orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;

INSERT INTO bigdata.orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

表解析

MySQL Catalog 持久化目前默认的 catalog 为 my_catalog,默认的 FlinkSQLEnv 为 DefaultCatalog。目前存储 MySQL Catalog 元数据的表结构如下:

元数据表

表含义

metadata_database

元数据 schema 信息

metadata_table

元数据table信息

metadata_database_property

schema 属性信息

metadata_table_property

table 属性信息

metadata_column

数据列信息

metadata_function

UDF 信息

五、功能实践

作业脚本准备

代码语言:javascript
复制
DROP  TABLE IF EXISTS ods_orders_src;
CREATE  TABLE IF NOT EXISTS ods_orders_src (
    `order_id` int COMMENT ''
    , `order_date` timestamp(3) COMMENT ''
    , `customer_name` string COMMENT ''
    , `price` decimal(12,2) COMMENT ''
    , `product_id` int COMMENT ''
    , `order_status` tinyint COMMENT ''
    ,PRIMARY KEY(order_id) NOT ENFORCED
) COMMENT ''
WITH (
     'connector' = 'mysql-cdc'
    ,'hostname' = '192.168.0.40'
    ,'port' = '4406'
    ,'username' = 'root'
    ,'password' = 'Percona@020*'
    ,'server-time-zone' = 'Asia/Shanghai'
    ,'scan.incremental.snapshot.enabled' = 'true'
    ,'scan.startup.mode'='initial' 
    ,'scan.incremental.snapshot.chunk.size' = '20000'   
    ,'heartbeat.interval' = '120s' 
    ,'database-name' = 'bigdata'
    ,'table-name' = 'orders'
    );
DROP  TABLE IF EXISTS ods_orders_sink;
CREATE  TABLE IF NOT EXISTS ods_orders_sink (
     `order_id` int COMMENT ''
    , `order_date` timestamp(3) COMMENT ''
    , `customer_name` string COMMENT ''
    , `price` decimal(12,2) COMMENT ''
    , `product_id` int COMMENT ''
    , `order_status` tinyint COMMENT ''
    ,PRIMARY KEY(order_id) NOT ENFORCED  
) COMMENT ''
WITH (    
   'jdbc-url' = 'jdbc:mysql://192.168.0.5:19035',
   'connector' = 'starrocks',
   'database-name' = 'qhc_ods',
   'table-name' = 'ods_orders',
   'password' = '123456',
   'load-url' = '192.168.0.5:18035',
   'username' = 'devuser',
   'sink.properties.format' = 'json',
   'sink.properties.strip_outer_array' = 'true',
   'sink.max-retries' = '10',
   'sink.buffer-flush.interval-ms' = '15000',
   'sink.parallelism' = '1'
   );

DROP  TABLE IF EXISTS ods_products_src;
CREATE  TABLE IF NOT EXISTS ods_products_src (
     `id` int COMMENT ''
    , `name` string COMMENT ''
    , `description` string COMMENT ''
    ,PRIMARY KEY(id ) NOT ENFORCED
) COMMENT ''
WITH (
     'connector' = 'mysql-cdc'
    ,'hostname' = '192.168.0.4'
    ,'port' = '3306'
    ,'username' = 'root'
    ,'password' = '123456'
    ,'server-time-zone' = 'Asia/Shanghai'
    ,'scan.incremental.snapshot.enabled' = 'true'
    ,'scan.startup.mode'='initial' 
    ,'scan.incremental.snapshot.chunk.size' = '20000'   
    ,'heartbeat.interval' = '120s' 
    ,'database-name' = 'bigdata'
    ,'table-name' = 'products'
    );
DROP  TABLE IF EXISTS ods_products_sink;
CREATE  TABLE IF NOT EXISTS ods_products_sink (
     `id` int COMMENT ''
    , `name` string COMMENT ''
    , `description` string COMMENT ''
    ,PRIMARY KEY(id ) NOT ENFORCED  
) COMMENT ''
WITH (    
   'jdbc-url' = 'jdbc:mysql://192.168.0.5:19035',
   'connector' = 'starrocks',
   'database-name' = 'qhc_ods',
   'table-name' = 'ods_products',
   'password' = '123456',
   'load-url' = '192.168.0.5:18035',
   'username' = 'devuser',
   'sink.properties.format' = 'json',
   'sink.properties.strip_outer_array' = 'true',
   'sink.max-retries' = '10',
   'sink.buffer-flush.interval-ms' = '15000',
   'sink.parallelism' = '1'
   );

创建初始化脚本作业

创建一个 ddl_init 作业,通过 yarn session模式提交,FlinkSQLEnv采用DefaultCatalog,作业如下:

执行作业后,在 dinky 元数据库查询是否表已经存在。

查看元数据表

每执行一次初始化DDL,将会覆盖之前的元数据。

说明: 对于所创建的表元信息不能清空,否则会报错如下:

查询 Source 的数据

新建一个作业 ods_order_src。

代码语言:javascript
复制
select * from ods_orders_src;

由此可以看到,对于所创建的表其实已经存在与 DefaultCatalog,即保存与 Mysql 元数据中。此时,可以通过创建任意作业去使用DefaultCatalog 中的表。

首先清空 StarRocks 数据。

插入 Sink 表

还是在 ods_order_src 作业中,使用 insert 语句。将数据写入 StarRocks 中。

代码语言:javascript
复制
INSERT INTO ods_orders_sink
select * from ods_orders_src;

提交 Flink 作业

查看 StarRocks 数据

说明

对于 MySQL Catalog 除上面用默认的 DefaultCatalog,那么也可以通过 create 创建 catalog,然后在对应数据库下执行 dlinkmysqlcatalog.sql。语法如下:

代码语言:javascript
复制
create catalog my_catalog with(
   'type' = 'dlink_mysql', 
   'username' = 'dlink', 
   'password' = 'dlink', 
   'url' = 'jdbc:mysql://192.168.0.4:3306/dlink2?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true' 
   );

use catalog my_catalog;

六、总结

随着 Dinky 的不断扩大以及在业界的影响力。为方便大家的学习和使用,此次系列文章作为 Dinky 系列文章的第二篇,后期系列文章尽请期待。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2022-08-02,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Dinky开源 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 依赖
  • MySQL 建表
  • 表解析
  • 作业脚本准备
  • 创建初始化脚本作业
  • 查看元数据表
  • 查询 Source 的数据
  • 插入 Sink 表
  • 提交 Flink 作业
  • 查看 StarRocks 数据
  • 说明
相关产品与服务
云数据库 SQL Server
腾讯云数据库 SQL Server (TencentDB for SQL Server)是业界最常用的商用数据库之一,对基于 Windows 架构的应用程序具有完美的支持。TencentDB for SQL Server 拥有微软正版授权,可持续为用户提供最新的功能,避免未授权使用软件的风险。具有即开即用、稳定可靠、安全运行、弹性扩缩等特点。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档