概述
Setats 支持高效的 Lookup Join,可以将 Setats 表作为维表,通过持续消费 changelog 并在本地构建缓存,实现流式事实表与维表的实时关联查询。
前提条件
已创建 Setats 集群并完成 Warehouse 配置。
已获取 manager-url。
维表数据已写入 Setats,且具备可持续消费的 Changelog。
Flink 运行环境已完成相关依赖准备。
维表建表语法
CREATE TABLE `user_dim` (`id` BIGINT,`dt` STRING,`name` STRING,`age` INT,`address` STRING,PRIMARY KEY (`id`, `dt`) NOT ENFORCED) WITH ('connector' = 'setats-cdc','warehouse' = 'cosn://<bucket>/warehouse','catalog-name' = 'setats','catalog-type' = 'hive','uri' = 'thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004','catalog-database' = 'testdb','catalog-table' = 'user_dim','manager-url' = '<Manager Url>','lookup.force-local' = 'true','lookup.bucket.discover.interval' = '10s','lookup.refresh.async' = 'false','lookup.table.store-type' = 'ROCKSDB','lookup.table.store-cache-size' = '1024');
Lookup Join 参数说明
参数 | 说明 | 默认值 |
lookup.force-local | 是否强制本地 Lookup。当前请固定设置为 `true` | true |
lookup.continuous.discovery-interval | 维表 Changelog 刷新间隔 | 10s |
lookup.bucket.discover.interval | 新 Bucket / 分区发现间隔 | 10s |
lookup.refresh.async | 是否异步刷新本地维表缓存 | false |
lookup.table.store-type | 本地存储类型,可选 ROCKSDB 或 HEAP | ROCKSDB |
lookup.table.store-cache-size | ROCKSDB 模式下额外 Heap Cache 可保存的 Join Key 数量 | 1024 |
使用示例:
-- 创建流表CREATE TABLE `order_stream` (`order_id` BIGINT,`user_id` BIGINT,`dt` STRING,`amount` DECIMAL(10, 2),`proctime` AS PROCTIME()) WITH ('connector' = 'kafka'-- 省略其他 Kafka 参数);-- 创建 Setats 维表CREATE TABLE `setats_user_info_lookup` (`user_id` BIGINT,`user_name` STRING,`user_level` INT,`city` STRING,PRIMARY KEY (`user_id`) NOT ENFORCED) WITH ('connector' = 'setats-cdc','warehouse' = 'cosn://<bucket>/warehouse','catalog-name' = 'setats','catalog-type' = 'hive','uri' = 'thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004','catalog-database' = 'testdb','catalog-table' = 'user_info','manager-url' = '<Manager Url>','lookup.force-local' = 'true','lookup.bucket.discover.interval' = '10s','lookup.refresh.async' = 'false','lookup.table.store-type' = 'ROCKSDB','lookup.table.store-cache-size' = '1024');-- 执行 Lookup JoinSELECTo.order_id,o.amount,u.user_name,u.user_level,u.cityFROM `order_stream` AS oJOIN `setats_user_info_lookup` FOR SYSTEM_TIME AS OF o.proctime AS uON o.user_id = u.user_id;
运行建议
存储类型选择:
ROCKSDB`:适合维表较大、Key 较多的场景。
`HEAP`:适合维表较小、延迟更敏感的场景。