首页
学习
活动
专区
圈层
工具
发布
社区首页 >专栏 >使用 Dinky 可视化构建 Apache Doris 数据开发同步任务

使用 Dinky 可视化构建 Apache Doris 数据开发同步任务

作者头像
苏奕嘉
发布2025-07-14 18:29:56
发布2025-07-14 18:29:56
4970
举报

引言

上一篇我们用命令行提交任务的方式演示了如何快速同步 MySQL 库表结构至 Doris,并利用自带的 JDBC-Catalog 能力将 OLTP 库数据同步至 Doris 中。

但对于一些经常做常规开发的同学而言,一个 POC 测试的方案还不足以满足业务日常开发的完整诉求。

这类完整诉求包含了功能完整度、运维难度、开发难度、管理难度等一系列考虑点在内。

故此本篇我们一起来探讨学习如何使用 Dinky 这款开源Flink任务管理平台来完成 OLTP 库至 Apache Doris 的任务构建的。

话不多说,开搞!

Dinky 简介

Dinky (https://www.dinky.org.cn/) 是一个以 Apache Flink 为基础、开箱即用的一站式实时计算平台,连接数据湖仓等众多框架,致力于流批一体和湖仓一体的建设与实践。

图片
图片

同时具备以下几方面的核心特性:

图片
图片

有兴趣的同学可至官网看更详细的文档内容,这里不做更多赘述。

部署 Dinky 环境

安装 Dinky

Dinky 部署版本:1.16-1.2.0【前者是 Flink 版本号,后者是 Dinky 版本号】

Dinky 需要具备内置的 Flink 环境,将相关依赖放置在指定目录下 本篇的演示 DEMO 依旧以 MySQL -> Doris 为例

下载安装包
代码语言:javascript
复制
# 下载 Dinky 二进制包
https://github.com/DataLinkDC/dinky/releases/download/v1.2.0/dinky-release-1.16-1.2.0.tar.gz
# 解压缩安装包
tar -zxvf dinky-release-1.16-1.2.0.tar.gz
# 重命名 Dinky 目录
mv dinky-release-1.16-1.2. dinky-1.2.
# 进入根目录
cd dinky-1.20
图片
图片
MySQL 配置及 Conf 文件修改【可选】

本篇为缩减篇幅不显臃肿,采用默认 h2 数据库作为元数据管理组件 若要生产使用,请使用 MySQL 或 PostgreSQL 作为元数据管理数据库 使用 h2 数据库无法提交 Flink yarn application 任务

【重要】h2 数据库当前默认不提供持久化,所以一旦重启,所有元数据将丢失!(每次重启,都是崭新的人生!)

若有生产配置需求,请访问 Dinky 官网依据部署教程修改和配置相应参数,本篇不做赘述。

开放端口

Dinky 的 WebUI 端口默认为 8888,可在 Conf 文件进行修改。

整库同步方案

Dinky 平台有多种整库同步功能,无论是 Pipeline 模式还是 Doris-Connnector 实现的整库同步方案,均可完成本篇的任务目的,故此我们将同步采用这两种模式做以演示,供各位看官老爷自行选择。

Doris-Connector 整库同步

使用 Doris-Connector 做整库同步,可不依赖 Flink 进程,只需要三个 Jar 包即可完成 Standalone 模式的整库同步任务构建。

同时使用 Doris-Connector 构建时,若只需同步库表结构,不需要构建CDC任务时,则可以不考虑非主键模式下的库表同步问题,是整库同步库表结构的一大利器!

添加 lib 依赖

依赖添加的方式有两种:

  • • 在Dinky-配置中心,设置Resource的目录地址,将三个依赖包移至该目录下
图片
图片
  •   然后同步目录结构刷新资源
图片
图片
  • • 在Dinky-注册中心-资源栏目,通过客户端上传三个依赖包到服务端
图片
图片

需要放置的 Jar 包有如下几个:

  • • Flink-Doris-Connector
  • • Flink-SQL-Connector-MySQL-CDC
  • • MySQL-JDBC-Driver
代码语言:javascript
复制
# MySQL
wget https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-mysql-cdc/3.2.1/flink-sql-connector-mysql-cdc-3.2.1.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
# Doris
wget https://repo1.maven.org/maven2/org/apache/doris/flink-doris-connector-1.16/24.1.0/flink-doris-connector-1.16-24.1.0.jar
构建 Jar 任务
  • • 创建作业
图片
图片
  • • 选择 Jar 任务
图片
图片
  • • 填写任务信息,按引导框填写内容构建任务,如下图:
图片
图片

任务构建内容如下:

代码语言:javascript
复制
  # 程序路径
  rs:/flink-doris-connector-1.16-24.1.0.jar
  # 程序运行类
  org.apache.doris.flink.tools.cdc.CdcTools
  # 程序运行参数
  mysql-sync-database
--database doris_database_name
--mysql-conf hostname=127.0.0.1
--mysql-conf port=
--mysql-conf username=admin
--mysql-conf password=
--mysql-conf database-name=ssb
--sink-conf fenodes=127.0.0.1:
--sink-conf username=root
--sink-conf password=
--sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030
--sink-conf sink.label-prefix=label
--table-conf replication_num=
--create-table-only=true

请注意,最后一个配置项 --create-table-only=true 意为只同步表结构,不做CDC数据同步。

  • • 若需要构建完表结构后继续做CDC数据同步
  • • 左边窗口填写 Checkpoint 的 SET 语句
代码语言:javascript
复制
# checkpoint设置,不写会出现数据无法写入情况
SET 'execution.checkpointing.interval' = '10s';
图片
图片
  • • 如果 MySQL 表有非主键表,则需按表同步需求设置同步 Key 字段
代码语言:javascript
复制
# 同步的库表中含有非主键表时
# 必须设置 scan.incremental.snapshot.chunk.key-column 参数
# 且只能选择非空类型的一个字段。
# 不同的库表列之间用,隔开,例如:
scan.incremental.snapshot.chunk.key-column=database.table:column,database.table1:column...
# 完整在程序运行参数中的格式为
--mysql-conf scan.incremental.snapshot.chunk.key-column=database.table:column
  • • 运行任务查看执行状态
图片
图片
  • • 可以在运维中心查看任务状态
图片
图片
  • • Doris WebUI 查看库表是否正确同步
图片
图片

Dinky Pipeline 整库同步

添加 Dinky 依赖

我们需要放置的 Jar 依赖包有如下四个:

  • • Flink-CDC-3.2.1-BIN
  • • Flink-CDC-Pipeline-Connector-MySQL-3.2.1
  • • Flink-CDC-Pipeline-Connector-Doris-3.2.1
  • • MySQL-JDBC-Driver-8.0.28
代码语言:javascript
复制
# 进入依赖目录
cd extends/flink1.
# Flink CDC
wget https://archive.apache.org/dist/flink/flink-cdc-3.2.1/flink-cdc-3.2.1-bin.tar.gz
# MySQL
wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-mysql/3.2.1/flink-cdc-pipeline-connector-mysql-3.2.1.jar
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
# Doris
wget https://repo1.maven.org/maven2/org/apache/flink/flink-cdc-pipeline-connector-doris/3.2.1/flink-cdc-pipeline-connector-doris-3.2.1.jar
处理冲突依赖
  • • 处理 Dist 依赖

如果直接在 Dinky 使用 flink-cdc-dist-3.2.1.jar 会有 java.lang.NoSuchMethodError: org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList; 错误,需预处理一遍

代码语言:javascript
复制
# 解压 flink-cdc-3.2.1-bin.tar.gz
tar -zxvf flink-cdc-3.2.-bin.tar.gz       
cd flink-cdc-3.2./lib/
# 解压jar文件·
jar -xvf flink-cdc-dist-3.2.1.jar
# 删除冲突包
rm -rf org/apache/calcite
# 删除原 Jar 包
rm -rf flink-cdc-dist-3.2.1.jar
# 重新打包
jar -cvf flink-cdc-dist-3.2.1.jar * 
# 移动 Jar 包
mv flink-cdc-dist-3.2.1.jar ../../
  • • 处理 table-planner 依赖
代码语言:javascript
复制
# 进入依赖目录
cd extends/flink1./
# Flink
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
  • • 调整依赖
代码语言:javascript
复制
# 解压缩
tar -zxvf flink-1.16.-bin-scala_2.12.tgz
# 移动 lib
mv flink-1.16./lib/* ./
# 移动 table-planner 依赖
mv flink-1.16.3/opt/flink-table-planner_2.12-1.16.3.jar ./
# 删除解压缩目录
rm -rf flink-1.16.3
# 删除冲突 table-planner 依赖
rm -rf flink-table-planner-loader-1.16.3.jar
  • • 清理多余内容
代码语言:javascript
复制
rm -rf flink-cdc-3.2./ flink-cdc-3.2.-bin.tar.gz flink-1.16.-bin-scala_2.12.tgz 
启动 Dinky
代码语言:javascript
复制
bash bin/auto.sh start
图片
图片
图片
图片
Flink Standalone 启动

请注意,如果需要除本地机器外访问 8081 端口,请在 conf/flink-conf.yaml 文件中调整 rest.addressrest.bind-address 为 0.0.0.0

代码语言:javascript
复制
# 进入指定目录
cd /opt/software/
# 下载 Flink 二进制包
wget https://archive.apache.org/dist/flink/flink-1.16.3/flink-1.16.3-bin-scala_2.12.tgz
# 解压缩
tar -zxvf flink-1.16.-bin-scala_2.12.tgz
cd flink-1.16./
# 拷贝 Dinky-app Jar 包至 flink/lib 目录下
cp ../dinky-1.2./jar/dinky-app-1.16-1.2.-jar-with-dependencies.jar ./lib/
# 拷贝在 Dinky 调整好的 dist、table-planner 依赖替换 flink/lib 目录下依赖
cp ../dinky-1.2./extends/flink1./flink-dist-1.16.3.jar./lib/
cp ../dinky-1.2./extends/flink1./flink-table-planner_2.-1.16.3.jar./lib/
rm -rf ./lib/flink-table-planner-loader-1.16.3.jar
# 拷贝 MySQL-JDBC-Driver
cp ../dinky-1.2./extends/flink1./mysql-connector-java-8.0.28.jar./lib/
# 启动 Flink
./bin/start_cluster.sh
图片
图片
Dinky 注册 Flink 实例
图片
图片
配置任务

我们以同步一张非主键的 MySQL 表 customer 为例,选择默认 Catalog 和 Flink-Standalone 资源:

图片
图片
代码语言:javascript
复制
SET 'execution.checkpointing.interval' = '10s';
EXECUTE PIPELINE WITHYAML (
source:
  type: mysql
  scan.incremental.snapshot.chunk.key-column: ssb_test.customer:c_name
  hostname: localhost
  port: 
  username: admin
  password: 'Syj123456'
  tables: ssb_test.customer
  server-id: -
sink:
  type: doris
  fenodes: 192.168..88:
  username: root
  password: 'Doris2024'
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 
pipeline:
  name: SyncMySQLDatabasetoDoris
  parallelism: 
)
运行任务
图片
图片

运维中心观察任务状态

图片
图片
校验数据

MySQL 端:

图片
图片

Doris 端:

图片
图片

使用建议

最后,给准备深度使用的同学给一个建议,如果要添加诸如 MySQL、PG、Oracle 等 CDC Pipeline 等依赖包时,建议使用 Resource 资源管理器和 ADD JAR 或 ADD CUSTOMJAR 语法来按任务添加相关依赖,一方面可以做到依赖冲突的隔离,另一方面可以按需加载,无需每次重启 Dinky 服务。

小节

这一套折腾的确累人,本来预想两天搞完,没想到搞了小一周,一方面时间有限,只能下班后加班搞搞,另一方面对 Dinky 的确不熟悉,依赖冲突处理花费了很多时间。

同时为了满足每一个希望照着可以完全复现的同学都可以成功部署,这个方案的环境是铲了部署,部署后再铲,前前后后从零部署环境到实验,大概做了五次,直到后续三次依照文档可完全满足照搬即可的既定目标后,我才放心定稿。

在这里特别鸣谢 Dinky 社区的老朱、凯哥、高哥、洲哥四位巨佬的鼎力相助,还有老朱的小号也得鸣谢一下😄,四位大佬在这篇文章助我很多,也希望 Doris 和 Dinky 两社区继续长长久久~

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-12-27,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Apache Doris 补习班 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 引言
  • Dinky 简介
  • 部署 Dinky 环境
    • 安装 Dinky
      • 下载安装包
      • MySQL 配置及 Conf 文件修改【可选】
      • 开放端口
  • 整库同步方案
    • Doris-Connector 整库同步
      • 添加 lib 依赖
      • 构建 Jar 任务
    • Dinky Pipeline 整库同步
      • 添加 Dinky 依赖
      • 处理冲突依赖
      • 启动 Dinky
      • Flink Standalone 启动
      • Dinky 注册 Flink 实例
      • 配置任务
      • 运行任务
      • 校验数据
    • 使用建议
  • 小节
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档