Flink 流式写入

最近更新时间:2026-04-16 11:28:52

我的收藏

概述

Setats 支持通过 Flink 持续写入实时数据,使用 setats connector 将 Upsert 数据写入 Setats 主键表。数据写入后会触发 Eager 合并并生成 Changelog,因此能够同时满足秒级可见、后续 CDC 消费和 OLAP 查询等场景。
注意:
1. 以下作业使用 Hive catalog 和 COS 加速桶演示。
2. 如果使用 Hadoop catalog ,需要修改 ‘catalog-type’='hadoop' 。
3. Oceanus 可以不提前建表,Flink Setats Sink 会自动建表。

前提条件

已创建 Setats 集群并完成 Warehouse 配置。
已获取 manager-url。
已准备 Flink 运行环境,例如流计算 Oceanus。
已确认 Warehouse 对应的底层存储、Hive Metastore、网络访问和权限配置均已打通。
如果使用 COS 加速桶 作为 Warehouse,建议额外确认:
加速桶已提前创建目标目录,例如 cosn://<bucket>/warehouse。
已为加速桶配置 Oceanus 所在子网的访问权限。
Flink 集群具备访问 Hive Metastore、COS / CHDFS 的网络能力。

Oceanus 高级参数示例

如果作业运行在 Oceanus,且 Warehouse 使用 COS 加速桶,可参考以下高级参数模板:
pipeline.max-parallelism: 2048
containerized.taskmanager.env.HADOOP_USER_NAME: hadoop
containerized.master.env.HADOOP_USER_NAME: hadoop
taskmanager.memory.managed.fraction: 0.1
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_SUCCESS
execution.checkpointing.tolerable-failed-checkpoints: 5
flink.hadoop.fs.cosn.trsf.fs.AbstractFileSystem.ofs.impl: com.qcloud.chdfs.fs.CHDFSDelegateFSAdapter
flink.hadoop.fs.cosn.trsf.fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter
flink.hadoop.fs.cosn.trsf.fs.ofs.tmp.cache.dir: /tmp/chdfs/
flink.hadoop.fs.cosn.trsf.fs.ofs.user.appid: <YourAppId>
flink.hadoop.fs.cosn.trsf.fs.ofs.bucket.region: <YourRegion>
flink.hadoop.fs.cosn.trsf.fs.ofs.upload.flush.flag: true
flink.hadoop.fs.AbstractFileSystem.cosn.impl: org.apache.hadoop.fs.CosN
flink.hadoop.fs.cosn.impl: org.apache.hadoop.fs.CosFileSystem
flink.hadoop.fs.cosn.bucket.region: <YourRegion>
flink.hadoop.fs.cosn.userinfo.appid: <YourAppId>
flink.hadoop.fs.ofs.impl: com.qcloud.chdfs.fs.CHDFSHadoopFileSystemAdapter
flink.hadoop.fs.ofs.tmp.cache.dir: /tmp/chdfs/
flink.hadoop.fs.ofs.upload.flush.flag: true
flink.hadoop.fs.ofs.user.appid: <YourAppId>
flink.hadoop.fs.ofs.bucket.region: <YourRegion>
说明:
请根据实际环境替换 `AppId`、`Region`、`HADOOP_USER_NAME` 等参数。不同集群环境中可用的参数项可能略有差异,请以实际运行环境校验结果为准。

建表语法

创建 Setats 目标表

以下示例以 Hive Catalog 为例:
CREATE TABLE `setats_sink` (
`id` STRING,
`dt` STRING,
`name` STRING,
`address` STRING,
PRIMARY KEY (`id`, `dt`) NOT ENFORCED
) PARTITIONED BY (`dt`) WITH (
'connector' = 'setats',
'manager-url' = '<Manager Url>',
'catalog-type' = 'hive',
'catalog-name' = 'setats',
'catalog-database' = 'testdb',
'catalog-table' = 'setats_sink',
'warehouse' = 'cosn://<bucket>/warehouse',
'location' = 'cosn://<bucket>/warehouse/testdb/setats_sink',
'uri' = 'thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004',
'bucket' = '10',
'bucket-key' = 'id',
'merge-engine' = 'PARTIAL_UPDATE',
'setats.sink.flush-interval' = '5s'
);

关键参数说明

参数
必填
说明
默认值
connector
固定为 setats
(无,必须填写)
manager-url
Setats Manager 地址
(无,必须填写)
warehouse
Setats Warehouse 路径
(无,必须填写)
catalog-type
Catalog 类型,常用 Hive 或 Hadoop
(无,必须填写)
catalog-name
Catalog 名称
(无,必须填写)
catalog-database
数据库名称
default
catalog-table
建议填写
目标表名,建议与实际表名保持一致
(无)
location
建议填写
表的物理存储路径,使用 COS / HDFS 时建议显式指定
(无)
uri
Hive Catalog 必填
Hive Metastore 地址
(无,必须填写)
bucket
目标表的桶数量,建议结合数据规模和并发写入能力配置
-1
bucket-key
分桶键,用于决定数据写入时落到哪个 Bucket。建议选择分布较均匀、相对稳定的字段,通常可使用主键字段或主键子集
(无)
merge-engine
合并策略:
DEDUPLICATE:合并取最后一条
PARTIAL_UPDATE:部分列更新,取每个字段最后一个不为空的值
DEDUPLICATE
setats.sink.flush-interval
Sink 刷新间隔,例如 5s
5s

写入示例

-- 创建测试数据源

CREATE TABLE `test_source_datagen` (
`id` STRING,
`name` STRING,
`address` STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.id.start' = '0',
'fields.id.end' = '100000',
'fields.id.kind' = 'sequence',
'fields.name.length' = '5',
'fields.address.length' = '10'
);
-- 创建 Setats Sink 表

CREATE TABLE `demo_setats_table1` (
`id` STRING,
`dt` STRING,
`name` STRING,
`address` STRING,
PRIMARY KEY (`id`, `dt`) NOT ENFORCED
) PARTITIONED BY (`dt`) WITH (
'connector' = 'setats',
'manager-url' = '<Manager Url>',
'catalog-type' = 'hive',
'catalog-name' = 'setats',
'catalog-database' = 'testdb',
'catalog-table' = 'demo_setats_table1',
'warehouse' = 'cosn://<bucket>/warehouse',
'location' = 'cosn://<bucket>/warehouse/testdb/demo_setats_table1',
'uri' = 'thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004',
'bucket' = '10',
'merge-engine' = 'PARTIAL_UPDATE',
'setats.sink.flush-interval' = '5s'
);
-- 执行写入

INSERT INTO `demo_setats_table1`
SELECT
`id`,
DATE_FORMAT(NOW(), 'yyyyMMdd') AS `dt`,
`name`,
`address`
FROM `test_source_datagen`;

部分列更新

Setats 主键表支持部分列更新,Flink 写入任务中可以只声明需要更新的列,未出现在写入记录中的其他列会保留原值。对于订单、画像、用户标签等“按主键增量补列”的业务场景,建议结合 merge-engine = 'PARTIAL_UPDATE' 使用。
示例:
CREATE TABLE `setats_partial_sink` (
`id` STRING,
`dt` STRING,
`name` STRING,
PRIMARY KEY (`id`, `dt`) NOT ENFORCED
) PARTITIONED BY (`dt`) WITH (
'connector' = 'setats',
'manager-url' = '<Manager Url>',
'catalog-type' = 'hive',
'catalog-name' = 'setats',
'catalog-database' = 'testdb',
'catalog-table' = 'demo_setats_table1',
'warehouse' = 'cosn://<bucket>/warehouse',
'uri' = 'thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004',
'merge-engine' = 'PARTIAL_UPDATE'
);

发布与运行

在 Oceanus 等托管环境中发布 SQL 作业时,建议按以下顺序检查:
1. 依赖 JAR 是否已上传并关联到当前空间。
2. Hive 配置 JAR 是否已随作业一并引用。
3. Warehouse、location、uri、manager-url 是否已替换为真实值。
4. 加速桶场景下,相关高级参数是否已正确配置。

注意事项

dt 等分区字段建议结合业务查询模式设计,避免过细分区带来额外管理成本。
bucket 数量建议结合写入并发、数据规模和 Worker 数量综合评估。
manager-url、warehouse、location 等参数必须与集群配置保持一致。