上一篇我们用命令行提交任务的方式演示了如何快速同步 MySQL 库表结构至 Doris,并利用自带的 JDBC-Catalog 能力将 OLTP 库数据同步至 Doris 中。
但对于一些经常做常规开发的同学而言,一个 POC 测试的方案还不足以满足业务日常开发的完整诉求。
这类完整诉求包含了功能完整度、运维难度、开发难度、管理难度等一系列考虑点在内。
故此本篇我们一起来探讨学习如何使用 Dinky 这款开源Flink任务管理平台来完成 OLTP 库至 Apache Doris 的任务构建的。
话不多说,开搞!
Dinky (https://www.dinky.org.cn/) 是一个以 Apache Flink 为基础、开箱即用的一站式实时计算平台,连接数据湖仓等众多框架,致力于流批一体和湖仓一体的建设与实践。
同时具备以下几方面的核心特性:
有兴趣的同学可至官网翻看更详细的文档内容,这里不做更多赘述。
Dinky 部署版本:1.16-1.2.0【前者是 Flink 版本号,后者是 Dinky 版本号】
Dinky 需要具备内置的 Flink 环境,将相关依赖放置在指定目录下 本篇的演示 DEMO 依旧以 MySQL -> Doris 为例
# 下载 Dinky 二进制包
https://github.com/DataLinkDC/dinky/releases/download/v1.2.0/dinky-release-1.16-1.2.0.tar.gz
# 解压缩安装包
tar -zxvf dinky-release-1.16-1.2.0.tar.gz
# 重命名 Dinky 目录
mv dinky-release-1.16-1.2. dinky-1.2.
# 进入根目录
cd dinky-1.20本篇为缩减篇幅不显臃肿,采用默认 h2 数据库作为元数据管理组件 若要生产使用,请使用 MySQL 或 PostgreSQL 作为元数据管理数据库 使用 h2 数据库无法提交 Flink yarn application 任务
【重要】h2 数据库当前默认不提供持久化,所以一旦重启,所有元数据将丢失!(每次重启,都是崭新的人生!)
若有生产配置需求,请访问 Dinky 官网依据部署教程修改和配置相应参数,本篇不做赘述。
Dinky 的 WebUI 端口默认为 8888,可在 Conf 文件进行修改。
Dinky 平台有多种整库同步功能,无论是 Pipeline 模式还是 Doris-Connnector 实现的整库同步方案,均可完成本篇的任务目的,故此我们将同步采用这两种模式做以演示,供各位看官老爷自行选择。
使用 Doris-Connector 做整库同步,可不依赖 Flink 进程,只需要三个 Jar 包即可完成 Standalone 模式的整库同步任务构建。
同时使用 Doris-Connector 构建时,若只需同步库表结构,不需要构建CDC任务时,则可以不考虑非主键模式下的库表同步问题,是整库同步库表结构的一大利器!
依赖添加的方式有两种:
需要放置的 Jar 包有如下几个:
# MySQL
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.2.1/flink-sql-connector-mysql-cdc-3.2.1.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
# Doris
wget https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.16/24.1.0/flink-doris-connector-1.16-24.1.0.jar任务构建内容如下:
# 程序路径
rs:/flink-doris-connector-1.16-24.1.0.jar
# 程序运行类
org.apache.doris.flink.tools.cdc.CdcTools
# 程序运行参数
mysql-sync-database
--database doris_database_name
--mysql-conf hostname=127.0.0.1
--mysql-conf port=
--mysql-conf username=admin
--mysql-conf password=
--mysql-conf database-name=ssb
--sink-conf fenodes=127.0.0.1:
--sink-conf username=root
--sink-conf password=
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030
--sink-conf sink.label-prefix=label
--table-conf replication_num=
--create-table-only=true请注意,最后一个配置项 --create-table-only=true 意为只同步表结构,不做CDC数据同步。
# checkpoint设置,不写会出现数据无法写入情况
SET 'execution.checkpointing.interval' = '10s';# 同步的库表中含有非主键表时
# 必须设置 scan.incremental.snapshot.chunk.key-column 参数
# 且只能选择非空类型的一个字段。
# 不同的库表列之间用,隔开,例如:
scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...
# 完整在程序运行参数中的格式为
--mysql-conf scan.incremental.snapshot.chunk.key-column=database.table:column我们需要放置的 Jar 依赖包有如下四个:
# 进入依赖目录
cd extends/flink1.
# Flink CDC
wget https://archive.apache.org/dist/flink/flink-cdc-3.2.1/flink-cdc-3.2.1-bin.tar.gz
# MySQL
wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.2.1/flink-cdc-pipeline-connector-mysql-3.2.1.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
# Doris
wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.2.1/flink-cdc-pipeline-connector-doris-3.2.1.jar如果直接在 Dinky 使用 flink-cdc-dist-3.2.1.jar 会有 java.lang.NoSuchMethodError: org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList; 错误,需预处理一遍
# 解压 flink-cdc-3.2.1-bin.tar.gz
tar -zxvf flink-cdc-3.2.-bin.tar.gz
cd flink-cdc-3.2./lib/
# 解压jar文件·
jar -xvf flink-cdc-dist-3.2.1.jar
# 删除冲突包
rm -rf org/apache/calcite
# 删除原 Jar 包
rm -rf flink-cdc-dist-3.2.1.jar
# 重新打包
jar -cvf flink-cdc-dist-3.2.1.jar *
# 移动 Jar 包
mv flink-cdc-dist-3.2.1.jar ../../# 进入依赖目录
cd extends/flink1./
# Flink
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz# 解压缩
tar -zxvf flink-1.16.-bin-scala_2.12.tgz
# 移动 lib
mv flink-1.16./lib/* ./
# 移动 table-planner 依赖
mv flink-1.16.3/opt/flink-table-planner_2.12-1.16.3.jar ./
# 删除解压缩目录
rm -rf flink-1.16.3
# 删除冲突 table-planner 依赖
rm -rf flink-table-planner-loader-1.16.3.jarrm -rf flink-cdc-3.2./ flink-cdc-3.2.-bin.tar.gz flink-1.16.-bin-scala_2.12.tgz bash bin/auto.sh start请注意,如果需要除本地机器外访问 8081 端口,请在 conf/flink-conf.yaml 文件中调整 rest.address、rest.bind-address 为 0.0.0.0。
# 进入指定目录
cd /opt/software/
# 下载 Flink 二进制包
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
# 解压缩
tar -zxvf flink-1.16.-bin-scala_2.12.tgz
cd flink-1.16./
# 拷贝 Dinky-app Jar 包至 flink/lib 目录下
cp ../dinky-1.2./jar/dinky-app-1.16-1.2.-jar-with-dependencies.jar ./lib/
# 拷贝在 Dinky 调整好的 dist、table-planner 依赖替换 flink/lib 目录下依赖
cp ../dinky-1.2./extends/flink1./flink-dist-1.16.3.jar./lib/
cp ../dinky-1.2./extends/flink1./flink-table-planner_2.-1.16.3.jar./lib/
rm -rf ./lib/flink-table-planner-loader-1.16.3.jar
# 拷贝 MySQL-JDBC-Driver
cp ../dinky-1.2./extends/flink1./mysql-connector-java-8.0.28.jar./lib/
# 启动 Flink
./bin/start_cluster.sh我们以同步一张非主键的 MySQL 表 customer 为例,选择默认 Catalog 和 Flink-Standalone 资源:
SET 'execution.checkpointing.interval' = '10s';
EXECUTE PIPELINE WITHYAML (
source:
type: mysql
scan.incremental.snapshot.chunk.key-column: ssb_test.customer:c_name
hostname: localhost
port:
username: admin
password: 'Syj123456'
tables: ssb_test.customer
server-id: -
sink:
type: doris
fenodes: 192.168..88:
username: root
password: 'Doris2024'
table.create.properties.light_schema_change: true
table.create.properties.replication_num:
pipeline:
name: SyncMySQLDatabasetoDoris
parallelism:
)运维中心观察任务状态
MySQL 端:
Doris 端:
最后,给准备深度使用的同学给一个建议,如果要添加诸如 MySQL、PG、Oracle 等 CDC Pipeline 等依赖包时,建议使用 Resource 资源管理器和 ADD JAR 或 ADD CUSTOMJAR 语法来按任务添加相关依赖,一方面可以做到依赖冲突的隔离,另一方面可以按需加载,无需每次重启 Dinky 服务。
这一套折腾的确累人,本来预想两天搞完,没想到搞了小一周,一方面时间有限,只能下班后加班搞搞,另一方面对 Dinky 的确不熟悉,依赖冲突处理花费了很多时间。
同时为了满足每一个希望照着可以完全复现的同学都可以成功部署,这个方案的环境是铲了部署,部署后再铲,前前后后从零部署环境到实验,大概做了五次,直到后续三次依照文档可完全满足照搬即可的既定目标后,我才放心定稿。
在这里特别鸣谢 Dinky 社区的老朱、凯哥、高哥、洲哥四位巨佬的鼎力相助,还有老朱的小号也得鸣谢一下😄,四位大佬在这篇文章助我很多,也希望 Doris 和 Dinky 两社区继续长长久久~
本文分享自 Apache Doris 补习班 微信公众号,前往查看
如有侵权,请联系 cloudcommunity@tencent.com 删除。
本文参与 腾讯云自媒体同步曝光计划 ,欢迎热爱写作的你一起参与!