概述
Setats 支持通过 Flink 持续写入实时数据,使用 setats connector 将 Upsert 数据写入 Setats 主键表。数据写入后会触发 Eager 合并并生成 Changelog,因此能够同时满足秒级可见、后续 CDC 消费和 OLAP 查询等场景。
注意:
1. 以下作业使用 Hive catalog 和 COS 加速桶演示。
2. 如果使用 Hadoop catalog ,需要修改 ‘catalog-type’='hadoop' 。
3. Oceanus 可以不提前建表,Flink Setats Sink 会自动建表。
前提条件
已创建 Setats 集群并完成 Warehouse 配置。
已获取 manager-url。
已准备 Flink 运行环境,例如流计算 Oceanus。
已确认 Warehouse 对应的底层存储、Hive Metastore、网络访问和权限配置均已打通。
如果使用 COS 加速桶 作为 Warehouse,建议额外确认:
加速桶已提前创建目标目录,例如 cosn://<bucket>/warehouse。
已为加速桶配置 Oceanus 所在子网的访问权限。
Flink 集群具备访问 Hive Metastore、COS / CHDFS 的网络能力。
Oceanus 高级参数示例
如果作业运行在 Oceanus,且 Warehouse 使用 COS 加速桶,可参考以下高级参数模板:
pipeline.max-parallelism: 2048containerized.taskmanager.env.HADOOP_USER_NAME: hadoopcontainerized.master.env.HADOOP_USER_NAME: hadooptaskmanager.memory.managed.fraction: 0.1execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_SUCCESSexecution.checkpointing.tolerable-failed-checkpoints: 5flink.hadoop.fs.cosn.trsf.fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapterflink.hadoop.fs.cosn.trsf.fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapterflink.hadoop.fs.cosn.trsf.fs.ofs.tmp.cache.dir: /tmp/chdfs/flink.hadoop.fs.cosn.trsf.fs.ofs.user.appid: <YourAppId>flink.hadoop.fs.cosn.trsf.fs.ofs.bucket.region: <YourRegion>flink.hadoop.fs.cosn.trsf.fs.ofs.upload.flush.flag: trueflink.hadoop.fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosNflink.hadoop.fs.cosn.impl: org.apache.hadoop.fs.CosFileSystemflink.hadoop.fs.cosn.bucket.region: <YourRegion>flink.hadoop.fs.cosn.userinfo.appid: <YourAppId>flink.hadoop.fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapterflink.hadoop.fs.ofs.tmp.cache.dir: /tmp/chdfs/flink.hadoop.fs.ofs.upload.flush.flag: trueflink.hadoop.fs.ofs.user.appid: <YourAppId>flink.hadoop.fs.ofs.bucket.region: <YourRegion>
说明:
请根据实际环境替换 `AppId`、`Region`、`HADOOP_USER_NAME` 等参数。不同集群环境中可用的参数项可能略有差异,请以实际运行环境校验结果为准。
建表语法
创建 Setats 目标表
以下示例以 Hive Catalog 为例:
CREATE TABLE `setats_sink` (`id` STRING,`dt` STRING,`name` STRING,`address` STRING,PRIMARY KEY (`id`, `dt`) NOT ENFORCED) PARTITIONED BY (`dt`) WITH ('connector' = 'setats','manager-url' = '<Manager Url>','catalog-type' = 'hive','catalog-name' = 'setats','catalog-database' = 'testdb','catalog-table' = 'setats_sink','warehouse' = 'cosn://<bucket>/warehouse','location' = 'cosn://<bucket>/warehouse/testdb/setats_sink','uri' = 'thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004','bucket' = '10','bucket-key' = 'id','merge-engine' = 'PARTIAL_UPDATE','setats.sink.flush-interval' = '5s');
关键参数说明
参数 | 必填 | 说明 | 默认值 |
connector | 是 | 固定为 setats | (无,必须填写) |
manager-url | 是 | Setats Manager 地址 | (无,必须填写) |
warehouse | 是 | Setats Warehouse 路径 | (无,必须填写) |
catalog-type | 是 | Catalog 类型,常用 Hive 或 Hadoop | (无,必须填写) |
catalog-name | 是 | Catalog 名称 | (无,必须填写) |
catalog-database | 是 | 数据库名称 | default |
catalog-table | 建议填写 | 目标表名,建议与实际表名保持一致 | (无) |
location | 建议填写 | 表的物理存储路径,使用 COS / HDFS 时建议显式指定 | (无) |
uri | Hive Catalog 必填 | Hive Metastore 地址 | (无,必须填写) |
bucket | 否 | 目标表的桶数量,建议结合数据规模和并发写入能力配置 | -1 |
bucket-key | 否 | 分桶键,用于决定数据写入时落到哪个 Bucket。建议选择分布较均匀、相对稳定的字段,通常可使用主键字段或主键子集 | (无) |
merge-engine | 否 | 合并策略: DEDUPLICATE:合并取最后一条 PARTIAL_UPDATE:部分列更新,取每个字段最后一个不为空的值 | DEDUPLICATE |
setats.sink.flush-interval | 否 | Sink 刷新间隔,例如 5s | 5s |
写入示例
-- 创建测试数据源CREATE TABLE `test_source_datagen` (`id` STRING,`name` STRING,`address` STRING) WITH ('connector' = 'datagen','rows-per-second' = '1','fields.id.start' = '0','fields.id.end' = '100000','fields.id.kind' = 'sequence','fields.name.length' = '5','fields.address.length' = '10');-- 创建 Setats Sink 表CREATE TABLE `demo_setats_table1` (`id` STRING,`dt` STRING,`name` STRING,`address` STRING,PRIMARY KEY (`id`, `dt`) NOT ENFORCED) PARTITIONED BY (`dt`) WITH ('connector' = 'setats','manager-url' = '<Manager Url>','catalog-type' = 'hive','catalog-name' = 'setats','catalog-database' = 'testdb','catalog-table' = 'demo_setats_table1','warehouse' = 'cosn://<bucket>/warehouse','location' = 'cosn://<bucket>/warehouse/testdb/demo_setats_table1','uri' = 'thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004','bucket' = '10','merge-engine' = 'PARTIAL_UPDATE','setats.sink.flush-interval' = '5s');-- 执行写入INSERT INTO `demo_setats_table1`SELECT`id`,DATE_FORMAT(NOW(), 'yyyyMMdd') AS `dt`,`name`,`address`FROM `test_source_datagen`;
部分列更新
Setats 主键表支持部分列更新,Flink 写入任务中可以只声明需要更新的列,未出现在写入记录中的其他列会保留原值。对于订单、画像、用户标签等“按主键增量补列”的业务场景,建议结合 merge-engine = 'PARTIAL_UPDATE' 使用。
示例:
CREATE TABLE `setats_partial_sink` (`id` STRING,`dt` STRING,`name` STRING,PRIMARY KEY (`id`, `dt`) NOT ENFORCED) PARTITIONED BY (`dt`) WITH ('connector' = 'setats','manager-url' = '<Manager Url>','catalog-type' = 'hive','catalog-name' = 'setats','catalog-database' = 'testdb','catalog-table' = 'demo_setats_table1','warehouse' = 'cosn://<bucket>/warehouse','uri' = 'thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004','merge-engine' = 'PARTIAL_UPDATE');
发布与运行
在 Oceanus 等托管环境中发布 SQL 作业时,建议按以下顺序检查:
1. 依赖 JAR 是否已上传并关联到当前空间。
2. Hive 配置 JAR 是否已随作业一并引用。
3. Warehouse、location、uri、manager-url 是否已替换为真实值。
4. 加速桶场景下,相关高级参数是否已正确配置。
注意事项
dt 等分区字段建议结合业务查询模式设计,避免过细分区带来额外管理成本。
bucket 数量建议结合写入并发、数据规模和 Worker 数量综合评估。
manager-url、warehouse、location 等参数必须与集群配置保持一致。