Flink Lookup Join

最近更新时间:2026-04-16 11:28:52

我的收藏

概述

Setats 支持高效的 Lookup Join,可以将 Setats 表作为维表,通过持续消费 changelog 并在本地构建缓存,实现流式事实表与维表的实时关联查询。

前提条件

已创建 Setats 集群并完成 Warehouse 配置。
已获取 manager-url。
维表数据已写入 Setats,且具备可持续消费的 Changelog。
Flink 运行环境已完成相关依赖准备。

维表建表语法

CREATE TABLE `user_dim` (
`id` BIGINT,
`dt` STRING,
`name` STRING,
`age` INT,
`address` STRING,
PRIMARY KEY (`id`, `dt`) NOT ENFORCED
) WITH (
'connector' = 'setats-cdc',
'warehouse' = 'cosn://<bucket>/warehouse',
'catalog-name' = 'setats',
'catalog-type' = 'hive',
'uri' = 'thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004',
'catalog-database' = 'testdb',
'catalog-table' = 'user_dim',
'manager-url' = '<Manager Url>',
'lookup.force-local' = 'true',
'lookup.bucket.discover.interval' = '10s',
'lookup.refresh.async' = 'false',
'lookup.table.store-type' = 'ROCKSDB',
'lookup.table.store-cache-size' = '1024'
);


Lookup Join 参数说明

参数
说明
默认值
lookup.force-local
是否强制本地 Lookup。当前请固定设置为 `true`
true
lookup.continuous.discovery-interval
维表 Changelog 刷新间隔
10s
lookup.bucket.discover.interval
新 Bucket / 分区发现间隔
10s
lookup.refresh.async
是否异步刷新本地维表缓存
false
lookup.table.store-type
本地存储类型,可选 ROCKSDB 或 HEAP
ROCKSDB
lookup.table.store-cache-size
ROCKSDB 模式下额外 Heap Cache 可保存的 Join Key 数量
1024
使用示例:
-- 创建流表
CREATE TABLE `order_stream` (
`order_id` BIGINT,
`user_id` BIGINT,
`dt` STRING,
`amount` DECIMAL(10, 2),
`proctime` AS PROCTIME()
) WITH (
'connector' = 'kafka'
-- 省略其他 Kafka 参数
);
-- 创建 Setats 维表
CREATE TABLE `setats_user_info_lookup` (
`user_id` BIGINT,
`user_name` STRING,
`user_level` INT,
`city` STRING,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'setats-cdc',
'warehouse' = 'cosn://<bucket>/warehouse',
'catalog-name' = 'setats',
'catalog-type' = 'hive',
'uri' = 'thrift://<metastore-host-1>:7004,thrift://<metastore-host-2>:7004',
'catalog-database' = 'testdb',
'catalog-table' = 'user_info',
'manager-url' = '<Manager Url>',
'lookup.force-local' = 'true',
'lookup.bucket.discover.interval' = '10s',
'lookup.refresh.async' = 'false',
'lookup.table.store-type' = 'ROCKSDB',
'lookup.table.store-cache-size' = '1024'
);
-- 执行 Lookup Join
SELECT
o.order_id,
o.amount,
u.user_name,
u.user_level,
u.city
FROM `order_stream` AS o
JOIN `setats_user_info_lookup` FOR SYSTEM_TIME AS OF o.proctime AS u
ON o.user_id = u.user_id;


运行建议

存储类型选择:
ROCKSDB`:适合维表较大、Key 较多的场景。
`HEAP`:适合维表较小、延迟更敏感的场景。