Flink CDC流式读取

最近更新时间:2026-04-16 11:28:52

我的收藏

概述

Setats 支持通过 Flink 以 CDC 方式流式读取表的 Changelog 数据。数据写入 Setats 后,系统会基于主键合并结果生成变更日志,下游可通过 setats-cdc connector 持续消费新增、更新和删除事件。
该能力适用于以下场景:
实时订阅 Setats 中的主键变更数据
将 Setats 作为统一增量数据源,分发给下游 Flink 作业
基于 Changelog 构建维表缓存、实时聚合或消息分发链路

前提条件

已创建 Setats 集群并完成 Warehouse 配置。
已获取 manager-url。
已有上游任务持续向目标 Setats 表写入数据。

建表语法

CREATE TABLE `setats_cdc_source` (
`id` STRING,
`dt` STRING,
`name` STRING,
`address` STRING,
PRIMARY KEY (`id`, `dt`) NOT ENFORCED
) WITH (
'connector' = 'setats-cdc',
'warehouse' = 'cosn://<bucket>/warehouse',
'catalog-type' = 'hive',
'catalog-name' = 'setats',
'catalog-database' = 'testdb',
'catalog-table' = 'demo_setats_table1',
'uri' = 'thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004',
'manager-url' = '<Manager Url>',
'bucket.discovery-interval' = '30s',
'startup.mode' = 'earliest'
);


关键参数说明

参数
必填
说明
默认值
connector
固定为 setats
(无,必须填写)
manager-url
Setats Manager 地址
(无,必须填写)
warehouse
Setats Warehouse 路径
(无,必须填写)
catalog-type
Catalog 类型,常用 Hive 或 Hadoop
(无,必须填写)
catalog-name
Catalog 名称
(无,必须填写)
catalog-database
数据库名称
default
catalog-table
目标表名,与实际表名保持一致
uri
Hive catalog 必填
Hive metastore 地址
(无,Hive catalog 下必须填写)
bucket
目标表的桶数量,建议结合数据规模和并发写入能力配置
(无,必须填写)
bucket-key
分桶键,用于决定数据写入时落到哪个 Bucket。建议选择分布较均匀、相对稳定的字段,通常可使用主键字段或主键子集
默认主键,建议显示设置合理业务字段作为 bucket-key
bucket.discovery-interval
动态监测新增的 BucketShadingGroup
300s
startup.mode
latest: 从最新读
earliest:从最早读
from-timestamp: 指定时间戳读
Scan.timestamp-millis: 在 "timestamp" 启动模式下,启动时间的毫秒时间戳
latest
scan.timestamp-millis
scan.timestamp-millis 模式下,设置启动时间戳。示例:1767196800000

使用示例

-- 创建 Setats CDC 源表
CREATE TABLE `demo_setats_table1` (
`id` STRING,
`dt` STRING,
`name` STRING,
`address` STRING,
PRIMARY KEY (`id`, `dt`) NOT ENFORCED
) WITH (
'connector' = 'setats-cdc',
'warehouse' = 'cosn://<bucket>/warehouse',
'catalog-name' = 'setats',
'catalog-type' = 'hive',
'uri' = 'thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004',
'catalog-database' = 'testdb',
'catalog-table' = 'demo_setats_table1',
'manager-url' = '<Manager Url>',
'bucket.discovery-interval' = '30s',
'startup.mode' = 'earliest'
);
-- 创建调试 Sink 表
CREATE TABLE `logger_sink_table` (
`id` STRING,
`dt` STRING,
`name` STRING,
`address` STRING
) WITH (
'connector' = 'logger',
'all-changelog-mode' = 'true',
'print-identifier' = 'DebugData'
);
-- 执行 CDC 读取
INSERT INTO `logger_sink_table`
SELECT `id`, `dt`, `name`, `address`
FROM `demo_setats_table1`;


查看消费结果

logger connector 会将 CDC 结果打印到 TaskManager 日志中。发布作业后,可在 Flink UI / Oceanus 控制台查看对应日志,确认是否已持续收到变更事件。

注意事项

setats-cdc 读取的是 Changelog ,而不是静态快照。
Changelog 的保留时间由集群参数 changelog.retention.ms 控制,默认 24 小时;需要确保下游消费延迟不超过该时长。