概述
Setats 支持通过 Flink 以 CDC 方式流式读取表的 Changelog 数据。数据写入 Setats 后,系统会基于主键合并结果生成变更日志,下游可通过 setats-cdc connector 持续消费新增、更新和删除事件。
该能力适用于以下场景:
实时订阅 Setats 中的主键变更数据
将 Setats 作为统一增量数据源,分发给下游 Flink 作业
基于 Changelog 构建维表缓存、实时聚合或消息分发链路
前提条件
已创建 Setats 集群并完成 Warehouse 配置。
已获取 manager-url。
已有上游任务持续向目标 Setats 表写入数据。
建表语法
CREATE TABLE `setats_cdc_source` (`id` STRING,`dt` STRING,`name` STRING,`address` STRING,PRIMARY KEY (`id`, `dt`) NOT ENFORCED) WITH ('connector' = 'setats-cdc','warehouse' = 'cosn://<bucket>/warehouse','catalog-type' = 'hive','catalog-name' = 'setats','catalog-database' = 'testdb','catalog-table' = 'demo_setats_table1','uri' = 'thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004','manager-url' = '<Manager Url>','bucket.discovery-interval' = '30s','startup.mode' = 'earliest');
关键参数说明
参数 | 必填 | 说明 | 默认值 |
connector | 是 | 固定为 setats | (无,必须填写) |
manager-url | 是 | Setats Manager 地址 | (无,必须填写) |
warehouse | 是 | Setats Warehouse 路径 | (无,必须填写) |
catalog-type | 是 | Catalog 类型,常用 Hive 或 Hadoop | (无,必须填写) |
catalog-name | 是 | Catalog 名称 | (无,必须填写) |
catalog-database | 是 | 数据库名称 | default |
catalog-table | 是 | 目标表名,与实际表名保持一致 | 无 |
uri | Hive catalog 必填 | Hive metastore 地址 | (无,Hive catalog 下必须填写) |
bucket | 是 | 目标表的桶数量,建议结合数据规模和并发写入能力配置 | (无,必须填写) |
bucket-key | 是 | 分桶键,用于决定数据写入时落到哪个 Bucket。建议选择分布较均匀、相对稳定的字段,通常可使用主键字段或主键子集 | 默认主键,建议显示设置合理业务字段作为 bucket-key |
bucket.discovery-interval | 否 | 动态监测新增的 BucketShadingGroup | 300s |
startup.mode | 否 | latest: 从最新读 earliest:从最早读 from-timestamp: 指定时间戳读 Scan.timestamp-millis: 在 "timestamp" 启动模式下,启动时间的毫秒时间戳 | latest |
scan.timestamp-millis | 否 | scan.timestamp-millis 模式下,设置启动时间戳。示例:1767196800000 | 无 |
使用示例
-- 创建 Setats CDC 源表CREATE TABLE `demo_setats_table1` (`id` STRING,`dt` STRING,`name` STRING,`address` STRING,PRIMARY KEY (`id`, `dt`) NOT ENFORCED) WITH ('connector' = 'setats-cdc','warehouse' = 'cosn://<bucket>/warehouse','catalog-name' = 'setats','catalog-type' = 'hive','uri' = 'thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004','catalog-database' = 'testdb','catalog-table' = 'demo_setats_table1','manager-url' = '<Manager Url>','bucket.discovery-interval' = '30s','startup.mode' = 'earliest');-- 创建调试 Sink 表CREATE TABLE `logger_sink_table` (`id` STRING,`dt` STRING,`name` STRING,`address` STRING) WITH ('connector' = 'logger','all-changelog-mode' = 'true','print-identifier' = 'DebugData');-- 执行 CDC 读取INSERT INTO `logger_sink_table`SELECT `id`, `dt`, `name`, `address`FROM `demo_setats_table1`;
查看消费结果
logger connector 会将 CDC 结果打印到 TaskManager 日志中。发布作业后,可在 Flink UI / Oceanus 控制台查看对应日志,确认是否已持续收到变更事件。
注意事项
setats-cdc 读取的是 Changelog ,而不是静态快照。
Changelog 的保留时间由集群参数 changelog.retention.ms 控制,默认 24 小时;需要确保下游消费延迟不超过该时长。