摘要:本文由韩公子老师带了 Dinky 实时计算平台从 checkpoint 与 savepoint 自动恢复整库同步作业的实操过程分享。内容包括:
Tips:历史传送门~
《Dinky 实践系列之 Flink Catalog 元数据管理》
《Dinky FlinkCDC 整库入仓 StarRocks》
GitHub 地址
https://github.com/DataLinkDC/dlink
https://gitee.com/DataLinkDC/Dinky
欢迎大家关注 Dinky 的发展~
一、场景
使用 Dinky 自动 savepoint、checkpoint 恢复整库同步作业。
组件 | 版本 |
---|---|
Flink | 1.14.4 |
Flink-mysql-cdc | 2.2.1 |
Mysql | 5.7+ |
Dinky | 0.6.6 |
温馨提示: 由于 Fink 自身 bug,Dinky 暂时不支持 Flink1.15.x 版本做 savepoint 处理, 请等待后续更新支持,或者使用小于 Flink1.15 的版本。
二、Dinky 提交作业
将 flink-sql-connector-mysql-cdc-2.2.1.jar 添加到 dinky 根目录 plugins 和 hdfs 集群配置路径上。
依赖图:
Mysql 数据源准备
create database emp_1;
use emp_1;
CREATE TABLE IF NOT EXISTS `employees_1` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(50) NOT NULL,
`last_name` varchar(50) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into employees_1 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
insert into employees_1 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");
FlinkSQL 准备
-- 测试参数, 生成环境不需设置
SET pipeline.operator-chaining = false;
SET table.local-time-zone = Asia/Shanghai;
SET execution.runtime-mode = streaming;
SET execution.checkpointing.interval = 6000;
SET execution.checkpointing.tolerable-failed-checkpoints = 10;
SET execution.checkpointing.timeout =600000;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET execution.checkpointing.max-concurrent-checkpoints = 1;
SET state.checkpoints.num-retained = 3;
SET restart-strategy = fixed-delay;
SET restart-strategy.fixed-delay.attempts = 10 ;
SET restart-strategy.fixed-delay.delay = 20s;
SET table.exec.source.cdc-events-duplicate = true;
SET table.sql-dialect = default;
--SET pipeline.name = hive_catalog_cdc_orders;
SET jobmanager.memory.process.size = 1600m;
SET taskmanager.memory.process.size = 1780m;
SET taskmanager.memory.managed.size = 200m;
SET taskmanager.numberOfTaskSlots=1;
SET yarn.application.queue= default;
EXECUTE CDCSOURCE dinkySource_mysqlCDC_to_Doris WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop102',
'port' = '3306',
'username' = 'root',
'password' = '000000',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'emp_1\.employees_[0-9]+',
'sink.connector' = 'print',
)
补充说明:
Flink 需要开启 checkpoint,并配置好状态后端参数。
配置 SavePoint 策略
SavePoint 策略选择最近一次。
任务提交
因为作业是第一次运行,之前没有做过savepoint,所以作业是一个新的程序,消费两条数据。
Flink WebUI
TaskManager 输出
三、自动 savepoint 恢复
查看作业详情栏, 如下图右上角所示, 他们的含义分别为:
名称 | 含义 |
---|---|
智能停止 | 触发一次 SavePoint,并停止作业 |
SavePoint 触发 | 只触发一次 SavePoint |
SavePoint 暂停 | 触发一次 SavePoint,并暂停作业 |
SavePoint 停止 | 触发一次 SavePoint,并停止作业 |
点击 '智能停止' 或者 'Savepoint停止',触发一次Savepoint,并停止作业。
运维中心查看作业 SavePoint 记录
等作业停止后,在作业快照 Savepoint 栏中,查看到刚刚成功保存的Savepoint 记录。
在dlink数据库中,也可以查看到保存的Savepoint元数据。
同时,在'数据开发' 面板对应的作业中,右边栏也可以查看到savepoint记录。
接下来,往表中插入一条新的数据。
insert into employees_1 VALUES ("55", "2020-09-15", "huang", "meiji", "F", "2022-04-12");
作业会自动从之前保存的savepoint处启动。
观察到作业,成功做到断点续传,只消费到一条记录。
Taskmanager 成功输出一条记录。
四、自动 CheckPoint 恢复
Dinky 的 checkpoint 恢复功能使用非常方便,只需要点击一个按钮即可恢复,整体过程如下所示:
create database emp_2;
use emp_2;
CREATE TABLE IF NOT EXISTS `employees_2` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(50) NOT NULL,
`last_name` varchar(50) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- flink sql
-- 测试参数, 生成环境不需设置
SET pipeline.operator-chaining = false;
SET table.local-time-zone = Asia/Shanghai;
SET execution.runtime-mode = streaming;
SET execution.checkpointing.interval = 6000;
SET execution.checkpointing.tolerable-failed-checkpoints = 10;
SET execution.checkpointing.timeout =600000;
SET execution.checkpointing.externalized-checkpoint-retention = RETAIN_ON_CANCELLATION;
SET execution.checkpointing.mode = EXACTLY_ONCE;
SET execution.checkpointing.unaligned = true;
SET execution.checkpointing.max-concurrent-checkpoints = 1;
SET state.checkpoints.num-retained = 3;
SET restart-strategy = fixed-delay;
SET restart-strategy.fixed-delay.attempts = 10 ;
SET restart-strategy.fixed-delay.delay = 20s;
SET table.exec.source.cdc-events-duplicate = true;
SET table.sql-dialect = default;
--SET pipeline.name = hive_catalog_cdc_orders;
SET jobmanager.memory.process.size = 1600m;
SET taskmanager.memory.process.size = 1780m;
SET taskmanager.memory.managed.size = 200m;
SET taskmanager.numberOfTaskSlots=1;
SET yarn.application.queue= default;
EXECUTE CDCSOURCE dinkySource_mysqlCDC_to_Doris WITH (
'connector' = 'mysql-cdc',
'hostname' = 'hadoop102',
'port' = '3306',
'username' = 'root',
'password' = '000000',
'checkpoint' = '3000',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'emp_2\.employees_[0-9]+',
'sink.connector' = 'print',
)
插入数据
insert into employees_2 VALUES ("10", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
insert into employees_2 VALUES ("11", "1990-09-15", "wang", "meimei", "F", "2021-04-12");
点击 '普通停止',不做savepoint,从checkpoint 处启动。
运维中心查看 checkpoint 信息
停止之后,我们可以从 '作业快照'中,查看到作业保存的checkpoint记录。
这跟hdfs 上保存的checkpoint记录 是一致的。
hdfs 的 checkpoint
恢复最新的 checkpoint
重启后插入一条数据
insert into employees_2 VALUES ("13", "1992-09-12", "cai", "kunkun", "M", "2022-09-22");
断点续传
温馨提示
运行 perjob、 app 模式的作业,如果作业被强行kill掉、内部错误等原因导致集群实例销毁, 会导致 Dinky 无法访问 JobManager 来获取 checkpoint 信息,可能存在 dinky 保存的 checkpoint 记录,跟 hdfs 上保存的记录不一致,有可能缺失最新的 checkpoint,所以线上作业恢复 checkpoint 时,需要查看 hdfs 上保存的最新 checkpoint 记录与 dinky 作比较。
五、手动指定 checkpoint 恢复
在上一个步骤中,点击 '此处恢复' 之后,作业能 '断点续传',实际原理是dinky 将 checkpoint 的记录填充到了作业的右边栏,选项为 '指定一次' 然后运行的
所以,dinky也是支持手动指定某处checkpoint 恢复,只需 'SavePoin策略' 选择 '指定一次',将ck路径粘贴到 'SavePointPath',运行即可恢复checkpoint。
运行完毕,如查看到成功恢复ck之后,还请将 'SavePoin策略' 还原回 '最近一次',避免后续从这个检查点再次恢复。
六、总结
优点: 使用dinky,简化了线上作业的部署、运维、作业恢复等操作,增强了flink作业的健壮性。
不足: 如果线上作业过多,'运维中心' 找到指定的作业会比较费力,所以期待 '运维中心',增加能按照 '数据开发' 面板的分目录、分层级查看作业的功能,这样就能快速找到对应的作业。