前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >用户投稿 | Dinky 从保存点恢复 FlinkSQL 作业

用户投稿 | Dinky 从保存点恢复 FlinkSQL 作业

作者头像
文末丶
发布2023-10-24 19:07:15
6710
发布2023-10-24 19:07:15
举报
文章被收录于专栏:DataLink数据中台

导读:本文来自用户投稿,介绍了 Dinky 如何通过 SavePoint 来恢复 FlinkSQL 作业。

社区公告:

问题反馈、代码提交、文章投稿与社区贡献请移步 Github issue。

Github issue #66 登记企业或组织生产使用,可邀请至企业支持群,获取团队技术支持与其他企业用户的经验分享。

社区团队正在构建 Dinky 1.0 全新版本,带来更多创新实用功能,欢迎参与贡献,共建共赢。

GitHub 地址

https://github.com/DataLinkDC/dinky

https://gitee.com/DataLinkDC/Dinky

欢迎大家为 Dinky 送上小星星~

一、运行环境

说明项

内容

hadoop 版本

hadoop-3.1.4

Flink 任务执行模式

Yarn Session

Flink 版本

Flink-1.17.0

Dinky 版本

dlink-release-0.7.4

Kafka 版本

kafka_2.12-3.5.1

Kafka 运行模式

kraft

Mysql 版本

5.7.28

HDFS 集群、YARN 集群、Dinky 环境的搭建和启动,这里略过,假设已经完成。

Dinky 所需 Jar 包

在本用例中,以 Kafka 作为 source,以 MySQL 作为 sink;

把 Kafka 的依赖包放到 dlink-release-0.7.4/plugins/flink1.17 下,另外还增加:

  • flink-connector-jdbc-1.17.0.jar
  • flink-sql-connector-kafka-1.17.0.jar

Flink 配置中指定 Savepoint 存储路径

修改Flink家目录下 flink/conf/flink-conf.yaml 文件,指定savepoint目录位置。

代码语言:javascript
复制
# Default target directory for savepoints, optional.
#
# state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
state.savepoints.dir: hdfs://bd171:8020/sp

二、在 Dinky 中恢复 FlinkSQL 作业

创建 Yarn Session 集群

在 Flink 根目录下执行以下命令向 Yarn 集群申请资源,开启一个Yarn 会话,启动 Flink 集群:

代码语言:javascript
复制
./bin/yarn-session.sh -d -nm ww

可以在 Yarn Web UI 中看到我们新启动的 Yarn 会话:

参数说明:

  • -d:分离模式,如果你不想让Flink YARN客户端一直前台运行,可以使用这个参数,即使关掉当前对话窗口,YARN session也可以后台运行。
  • -nm(--name):配置在YARN UI界面上显示的任务名。

编写 FlinkSQL 作业

在编辑器中输入以下内容:

代码语言:javascript
复制
SET pipeline.operator-chaining = false;
DROP TABLE IF EXISTS employees_kafka;
CREATE TABLE IF NOT EXISTS employees_kafka (
    `emp_no` INT NOT NULL,
    `birth_date` DATE,
    `first_name` STRING,
    `last_name` STRING,
    `gender` STRING,
    `hire_date` DATE
) WITH (
    'connector' = 'kafka',
    'topic' = 'flink-cdc-kafka',
    'properties.bootstrap.servers' = 'bd171:19092,bd172:19092,bd173:19092',
    'properties.group.id' = 'flink-cdc-kafka-group',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
);
CREATE TABLE IF NOT EXISTS employees_sink (
    `emp_no` INT NOT NULL,
    `birth_date` DATE,
    `first_name` STRING,
    `last_name` STRING,
    `gender` STRING,
    `hire_date` DATE,
    PRIMARY KEY (`emp_no`) NOT ENFORCED
) WITH (
    'connector' = 'jdbc-newtec',
    'url' = 'jdbc:mysql://mysql201:3306/employees?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',
    'table-name' = 'employees_kafka_sink',
    'driver' = 'com.mysql.cj.jdbc.Driver', 
    'username' = 'root', 
    'password' = '****' 
    );
insert into
    employees_sink
select
    emp_no,
    birth_date,
    first_name,
    last_name,
    gender,
    hire_date
from
    employees_kafka;

同时注意右边 SavePoint 策略,选择 “最近一次”,然后运行这个作业:

此时我们向kafka相关topic插入300条记录,随后这些数据写到了MySQL数据库的相关表里:

SavePoint 停止 FlinkSQL 作业

点击 Dinky 的运维中心菜单,在任务列表里点击上面运行的这个任务进入任务详情页面,在页面右上角点击三个点的省略号按钮,弹出框中点击 “SavePoint停止”:

在 HDFS 中可以看到相关的 SavePoint 保存记录:

点击链接查看:

在Dinky 的运维中心,任务列表,任务详情页面,作业快照sheet下面的SavePoint 这个 Sheet 下,也可以看到 SavePoint 保存的路径信息:

在 Dinky 的数据开发的作业中, 右边“保存点”栏也可以查看到 savepoint 记录:

向 Kafka 相关 topic 写入 300 条数据

FlinlSQL 作业当前是停止状态,此时,向 Kafka 相关 Topic 写入300条记录。

重启作业

在 Dinky 的运维中心,任务列表,任务详情页面,重启任务;任务重启完成后,可以看到,FlinlSQL 作业实现了从 SavePoint 中的状态恢复,找到 Kafka 的正确偏移,在任务停止期间进行 Kafka 相关 Topic 中的数据,被 FlinkSQL 作业找到并读到到,最终写到了任务的 Sink,MySQL 数据库的相关表里:

三、结论

Dinky 这个图形化的 FlinkSQL 开发工具,不仅简化了 FlinkSQL 的开发调试,还集成了对从 SavePoint 恢复作业运行的支持,非常方便。

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-10-24,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 Dinky开源 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • HDFS 集群、YARN 集群、Dinky 环境的搭建和启动,这里略过,假设已经完成。
  • Dinky 所需 Jar 包
  • Flink 配置中指定 Savepoint 存储路径
  • 创建 Yarn Session 集群
相关产品与服务
云数据库 MySQL
腾讯云数据库 MySQL(TencentDB for MySQL)为用户提供安全可靠,性能卓越、易于维护的企业级云数据库服务。其具备6大企业级特性,包括企业级定制内核、企业级高可用、企业级高可靠、企业级安全、企业级扩展以及企业级智能运维。通过使用腾讯云数据库 MySQL,可实现分钟级别的数据库部署、弹性扩展以及全自动化的运维管理,不仅经济实惠,而且稳定可靠,易于运维。
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档