CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。
目前 Flink SQL 支持下列 CREATE 语句:
此节重点介绍建表,建数据库、视图和 UDF 会在后面的扩展章节进行介绍。
下面的 SQL 语句就是建表语句的定义,根据指定的表名创建一个表,如果同名表已经在 catalog 中存在了,则无法注册。
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] ]
<physical_column_definition>:
column_name column_type [ <column_constraint> ] [COMMENT column_comment]
<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<metadata_column_definition>:
column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
<source_table>:
[catalog_name.][db_name.]table_name
<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]
物理列是数据库中所说的常规列。其定义了物理介质中存储的数据中字段的名称、类型和顺序。
其他类型的列可以在物理列之间声明,但不会影响最终的物理列的读取。
举一个仅包含常规列的表的案例:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING
) WITH (
...
);
元数据列是 SQL 标准的扩展,允许访问数据源本身具有的一些元数据。元数据列由 METADATA
关键字标识。
例如,我们可以使用元数据列从 Kafka 数据中读取 Kafka 数据自带的时间戳(这个时间戳不是数据中的某个时间戳字段,而是数据写入 Kafka 时,Kafka 引擎给这条数据打上的时间戳标记),然后我们可以在 Flink SQL 中使用这个时间戳,比如进行基于时间的窗口操作。
举例:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 读取 kafka 本身自带的时间戳
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka'
...
);
元数据列可以用于后续数据的处理,或者写入到目标表中。
举例:
INSERT INTO MyTable
SELECT
user_id
, name
, record_time + INTERVAL '1' SECOND
FROM MyTable;
如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样的话,FROM xxx
子句是可以被省略的。
举例:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 读取 kafka 本身自带的时间戳
`timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH (
'connector' = 'kafka'
...
);
关于 Flink SQL 的每种 Connector 都提供了哪些 metadata 字段,详细可见官网文档 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/
如果自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致的话,程序运行时会自动 cast 强转。但是这要求两种数据类型是可以强转的。举例如下:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 将时间戳强转为 BIGINT
`timestamp` BIGINT METADATA
) WITH (
'connector' = 'kafka'
...
);
默认情况下,Flink SQL planner 认为 metadata 列是可以 读取
也可以 写入
的。但是有些外部存储系统的元数据信息是只能用于读取,不能写入的。
那么在往一个表写入的场景下,我们就可以使用 VIRTUAL
关键字来标识某个元数据列不写入到外部存储中(不持久化)。
以 Kafka 举例:
CREATE TABLE MyTable (
-- sink 时会写入
`timestamp` BIGINT METADATA,
-- sink 时不写入
`offset` BIGINT METADATA VIRTUAL,
`user_id` BIGINT,
`name` STRING,
) WITH (
'connector' = 'kafka'
...
);
在上面这个案例中,Kafka 引擎的 offset
是只读的。所以我们在把 MyTable
作为数据源(输入)表时,schema 中是包含 offset
的。在把 MyTable
作为数据汇(输出)表时,schema 中是不包含 offset
的。如下:
-- 当做数据源(输入)的 schema
MyTable(`timestamp` BIGINT, `offset` BIGINT, `user_id` BIGINT, `name` STRING)
-- 当做数据汇(输出)的 schema
MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)
所以这里在写入时需要注意,不要在 SQL 的 INSERT INTO 语句中写入 offset
列,否则 Flink SQL 任务会直接报错。
计算列其实就是在写建表的 DDL 时,可以拿已有的一些列经过一些自定义的运算生成的新列。这些列本身是没有以物理形式存储到数据源中的。
举例:
CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
-- cost 就是使用 price 和 quanitity 生成的计算列,计算方式为 price * quanitity
`cost` AS price * quanitity,
) WITH (
'connector' = 'kafka'
...
);
注意!!! 计算列可以包含其他列、常量或者函数,但是不能写一个子查询进去。
小伙伴萌这时会问到一个问题,既然只能包含列、常量或者函数计算,我就直接在 DML query 代码中写就完事了呗,为啥还要专门在 DDL 中定义呢?
结论:没错,如果只是简单的四则运算的话直接写在 DML 中就可以,但是计算列一般是用于定义时间属性的(因为在 SQL 任务中时间属性只能在 DDL 中定义,不能在 DML 语句中定义)。比如要把输入数据的时间格式标准化。处理时间、事件时间分别举例如下:
PROCTIME()
函数来定义处理时间列注意!!!和虚拟 metadata 列是类似的,计算列也是只能读不能写的。
也就是说,我们在把 MyTable
作为数据源(输入)表时,schema 中是包含 cost
的。
在把 MyTable
作为数据汇(输出)表时,schema 中是不包含 cost
的。举例:
-- 当做数据源(输入)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` DOUBLE)
-- 当做数据汇(输出)的 schema
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE)
Watermark 是在 Create Table
中进行定义的。具体 SQL 语法标准是 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
。
其中:
rowtime_column_name
:表的事件时间属性字段。该列必须是 TIMESTAMP(3)
、TIMESTAMP_LTZ(3)
类,这个时间可以是一个计算列。watermark_strategy_expression
:定义 Watermark 的生成策略。Watermark 的一般都是由 rowtime_column_name
列减掉一段固定时间间隔。SQL 中 Watermark 的生产策略是:当前 Watermark 大于上次发出的 Watermark 时发出当前 Watermark。注意:
pipeline.auto-watermark-interval
进行配置,如果设置为 200ms 则每 200ms 会计算一次 Watermark,然如果比之前发出的 Watermark 大,则发出。如果间隔设为 0ms,则 Watermark 只要满足触发条件就会发出,不会受到间隔时间控制。Flink SQL 提供了几种 WATERMARK 生产策略:
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit
。此类策略就可以用于设置最大乱序时间,假如设置为 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
,则生成的是运行 5s 延迟的 Watermark。。一般都用这种 Watermark 生成策略
,此类 Watermark 生成策略通常用于有数据乱序的场景中,而对应到实际的场景中,数据都是会存在乱序的,所以基本都使用此类策略。WATERMARK FOR rowtime_column AS rowtime_column
。一般基本不用这种方式
。如果你能保证你的数据源的时间戳是严格升序的,那就可以使用这种方式。严格升序代表 Flink 任务认为时间戳只会越来越大,也不存在相等的情况,只要相等或者小于之前的,就认为是迟到的数据。WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
。一般基本不用这种方式
。如果设置此类,则允许有相同的时间戳出现。先看一个案例:
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
可以看到 DDL 中 With 子句就是在建表时,描述数据源、数据汇的具体外部存储的元数据信息的。
一般 With 中的配置项由 Flink SQL 的 Connector(链接外部存储的连接器) 来定义,每种 Connector 提供的 With 配置项都是不同的。
注意:
回到上述案例中,With 声明了以下几项信息:
'connector' = 'kafka'
:声明外部存储是 Kafka'topic' = 'user_behavior'
:声明 Flink SQL 任务要连接的 Kafka 表的 topic 是 user_behavior'properties.bootstrap.servers' = 'localhost:9092'
:声明 Kafka 的 server ip 是 localhost:9092'properties.group.id' = 'testGroup'
:声明 Flink SQL 任务消费这个 Kafka topic,会使用 testGroup 的 group id 去消费'scan.startup.mode' = 'earliest-offset'
:声明 Flink SQL 任务消费这个 Kafka topic 会从最早位点开始消费'format' = 'csv'
:声明 Flink SQL 任务读入或者写出时对于 Kafka 消息的序列化方式是 csv 格式从这里也可以看出来 With 中具体要配置哪些配置项都是和每种 Connector 决定的。
Like 子句是 Create Table 子句的一个延伸。举例:
下面定义了一张 Orders
表:
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);
但是忘记定义 Watermark 了,那如果想加上 Watermark,就可以用 Like
子句定义一张带 Watermark 的新表:
CREATE TABLE Orders_with_watermark (
-- 1. 添加了 WATERMARK 定义
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- 2. 覆盖了原 Orders 表中 scan.startup.mode 参数
'scan.startup.mode' = 'latest-offset'
)
-- 3. Like 子句声明是在原来的 Orders 表的基础上定义 Orders_with_watermark 表
LIKE Orders;
上面这个语句的效果就等同于:
CREATE TABLE Orders_with_watermark (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);
不过这种不常使用。就不过多介绍了。如果小伙伴萌感兴趣,直接去官网参考具体注意事项:
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/create/#like
-- 语法糖+1
WITH orders_with_total AS (
SELECT
order_id
, price + tax AS total
FROM Orders
)
SELECT
order_id
, SUM(total)
FROM orders_with_total
GROUP BY
order_id;
INSERT INTO target_table
SELECT * FROM Orders
INSERT INTO target_table
SELECT order_id, price + tax FROM Orders
INSERT INTO target_table
-- 自定义 Source 的数据
SELECT order_id, price FROM (VALUES (1, 2.0), (2, 3.1)) AS t (order_id, price)
INSERT INTO target_table
SELECT price + tax FROM Orders WHERE id = 10
-- 使用 UDF 做字段标准化处理
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
-- 过滤条件
Where id > 3
SQL 语义
:其实理解一个 SQL 最后生成的任务是怎样执行的,最好的方式就是理解其语义。
以下面的 SQL 为例,我们来介绍下其在离线中和在实时中执行的区别,对比学习一下,大家就比较清楚了
INSERT INTO target_table
SELECT PRETTY_PRINT(order_id) FROM Orders
Where id > 3
这个 SQL 对应的实时任务,假设 Orders 为 kafka,target_table 也为 Kafka,在执行时,会生成三个算子:
数据源算子
(From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 过滤和字段标准化算子
过滤和字段标准化算子
(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子发的一条一条的数据,然后判断 id > 3?将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,一条一条将计算结果数据发给下游 数据汇算子
数据汇算子
(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中可以看到这个实时任务的所有算子是以一种 pipeline 模式运行的,所有的算子在同一时刻都是处于 running 状态的,24 小时一直在运行,实时任务中也没有离线中常见的分区概念。
select & where
关于看如何看一段 Flink SQL 最终的执行计划: 最好的方法就如上图,看 Flink web ui 的算子图,算子图上详细的标记清楚了每一个算子做的事情。以上图来说,我们可以看到主要有三个算子:
table=[[default_catalog, default_database, Orders]
,字段为 select=[order_id, name, CAST(CURRENT_TIMESTAMP()) AS row_time]
,Watermark 策略为 rowtime=[row_time], watermark=[(row_time - 5000:INTERVAL SECOND)]
。where=[(order_id > 3)]
,结果字段为 select=[order_id, name, row_time]
table=[default_catalog.default_database.target_table]
,表字段为 fields=[order_id, name, row_time]
可以看到 Flink SQL 具体执行了哪些操作是非常详细的标记在算子图上。所以小伙伴萌一定要学会看算子图,这是掌握 debug、调优前最基础的一个技巧。
那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个类似的算子(虽然实际可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),离线和实时任务的执行方式完全不同:
数据源算子
(From Order):数据源从 Order Hive 表(通常都是读一天、一小时的分区数据)中一次性读取所有的数据,然后将读到的数据全部发给下游 过滤和字段标准化算子
,然后 数据源算子
就运行结束了,释放资源了过滤和字段标准化算子
(Where id > 3 和 PRETTY_PRINT(order_id)):接收到上游算子的所有数据,然后遍历所有数据判断 id > 3?将判断结果为 true 的数据执行 PRETTY_PRINT UDF 后,将所有数据发给下游 数据汇算子
,然后 过滤和字段标准化算子
就运行结束了,释放资源了数据汇算子
(INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 表中,然后整个任务就运行结束了,整个任务的资源也就都释放了可以看到离线任务的算子是分阶段(stage)进行运行的,每一个 stage 运行结束之后,然后下一个 stage 开始运行,全部的 stage 运行完成之后,这个离线任务就跑结束了。
注意: 很多小伙伴都是之前做过离线数仓的,熟悉了离线的分区、计算任务定时调度运行这两个概念,所以在最初接触 Flink SQL 时,会以为 Flink SQL 实时任务也会存在这两个概念,这里博主做一下解释
在实时任务中,是没有分区的概念的
,实时任务的上游、下游都是无限的数据流。在实时任务中,是没有定时调度的概念的
,实时任务一旦运行起来就是 24 小时不间断,不间断的处理上游无限的数据,不简单的产出数据给到下游。flink sql 知其所以然(七):不会连最适合 flink sql 的 ETL 和 group agg 场景都没见过吧?
INSERT into target_table
SELECT
DISTINCT id
FROM Orders
SQL 语义
:也是拿离线和实时做对比。
这个 SQL 对应的实时任务,假设 Orders 为 kafka,target_table 也为 Kafka,在执行时,会生成三个算子:
数据源算子
(From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 去重算子
去重算子
(DISTINCT id):接收到上游算子发的一条一条的数据,然后判断这个 id 之前是否已经来过了,判断方式就是使用 Flink 中的 state 状态,如果状态中已经有这个 id 了,则说明已经来过了,不往下游算子发,如果状态中没有这个 id,则说明没来过,则往下游算子发,也是一条一条发给下游 数据汇算子
数据汇算子
(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中select distinct
注意: 对于实时任务,计算时的状态可能会无限增长。 状态大小取决于不同 key(上述案例为 id 字段)的数量。为了防止状态无限变大,我们可以设置状态的 TTL。但是这可能会影响查询结果的正确性,比如某个 key 的数据过期从状态中删除了,那么下次再来这么一个 key,由于在状态中找不到,就又会输出一遍。
那么如果这个 SQL 放在 Hive 中执行时,假设其中 Orders 为 Hive 表,target_table 也为 Hive 表,其也会生成三个相同的算子(虽然可能会被优化为一个算子,这里为了方便对比,划分为三个进行介绍),但是其和实时任务的执行方式完全不同:
数据源算子
(From Order):数据源从 Order Hive 表(通常都有天、小时分区限制)中一次性读取所有的数据,然后将读到的数据全部发给下游 去重算子
,然后 数据源算子
就运行结束了,释放资源了去重算子
(DISTINCT id):接收到上游算子的所有数据,然后遍历所有数据进行去重,将去重完的所有结果数据发给下游 数据汇算子
,然后 去重算子
就运行结束了,释放资源了数据汇算子
(INSERT INTO target_table):接收到上游的所有数据,将所有数据都写到 target_table Hive 中,然后整个任务就运行结束了,整个任务的资源也就都释放了由于窗口涉及到的知识内容比较多,所以博主先为大家说明介绍下面内容时的思路,大家跟着思路走。思路如下:
首先来看看 Flink SQL 中支持的 4 种窗口的运算。
tumble window
那么上面这个案例的 SQL 要咋写呢?
关于滚动窗口,在 1.13 版本之前和 1.13 及之后版本有两种 Flink SQL 实现方式,分别是:
博主这里两种方法都会介绍:
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)
-- 数据处理逻辑
insert into sink_table
select
dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start
from source_table
group by
dim,
tumble(row_time, interval '1' minute)
可以看到 Group Window Aggregation 滚动窗口的 SQL 语法就是把 tumble window 的声明写在了 group by 子句中,即 tumble(row_time, interval '1' minute)
,第一个参数为事件时间的时间戳;第二个参数为滚动窗口大小。
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)
-- 数据处理逻辑
insert into sink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
count(distinct user_id) as uv
FROM TABLE(TUMBLE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '60' SECOND))
GROUP BY window_start,
window_end,
dim
可以看到 Windowing TVF 滚动窗口的写法就是把 tumble window 的声明写在了数据源的 Table 子句中,即 TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND))
,包含三部分参数。
第一个参数 TABLE source_table
声明数据源表;第二个参数 DESCRIPTOR(row_time)
声明数据源的时间戳;第三个参数 INTERVAL '60' SECOND
声明滚动窗口大小为 1 min。
可以直接在公众号后台回复1.13.2 最全 flink sql获取源代码。所有的源码都开源到 github 上面了。里面包含了非常多的案例。可以直接拿来在本地运行的!!!肥肠的方便。
SQL 语义
:由于离线没有相同的时间窗口聚合概念,这里就直接说实时场景 SQL 语义,假设 Orders 为 kafka,target_table 也为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子:
数据源算子
(From Order):连接到 Kafka topic,数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 窗口聚合算子
窗口聚合算子
(TUMBLE 算子):接收到上游算子发的一条一条的数据,然后将每一条数据按照时间戳划分到对应的窗口中(根据事件时间、处理时间的不同语义进行划分),上述案例为事件时间,事件时间中,滚动窗口算子接收到上游的 Watermark 大于窗口的结束时间时,则说明当前这一分钟的滚动窗口已经结束了,将窗口计算完的结果发往下游算子(一条一条发给下游 数据汇算子
)数据汇算子
(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中这个实时任务也是 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。
注意: 事件时间中滚动窗口的窗口计算触发是由 Watermark 推动的。
hop window
依然是 Group Window Aggregation、Windowing TVF 两种方案:
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
-- 数据处理逻辑
insert into sink_table
SELECT dim,
UNIX_TIMESTAMP(CAST(hop_start(row_time, interval '1' minute, interval '5' minute) AS STRING)) * 1000 as window_start,
count(distinct user_id) as uv
FROM source_table
GROUP BY dim
, hop(row_time, interval '1' minute, interval '5' minute)
可以看到 Group Window Aggregation 滚动窗口的写法就是把 hop window 的声明写在了 group by 子句中,即 hop(row_time, interval '1' minute, interval '5' minute)
。其中:
第一个参数为事件时间的时间戳;第二个参数为滑动窗口的滑动步长;第三个参数为滑动窗口大小。
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
-- 数据处理逻辑
insert into sink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
count(distinct user_id) as bucket_uv
FROM TABLE(HOP(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '1' MINUTES, INTERVAL '5' MINUTES))
GROUP BY window_start,
window_end,
dim
可以看到 Windowing TVF 滚动窗口的写法就是把 hop window 的声明写在了数据源的 Table 子句中,即 TABLE(HOP(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '1' MINUTES, INTERVAL '5' MINUTES))
,包含四部分参数:
第一个参数 TABLE source_table
声明数据源表;第二个参数 DESCRIPTOR(row_time)
声明数据源的时间戳;第三个参数 INTERVAL '1' MINUTES
声明滚动窗口滑动步长大小为 1 min。第四个参数 INTERVAL '5' MINUTES
声明滚动窗口大小为 5 min。
SQL 语义
:滑动窗口语义和滚动窗口类似,这里不再赘述。
flink sql 知其所以然(八):flink sql tumble window 的奇妙解析之路
flink sql 知其所以然(九):window tvf tumble window 的奇思妙解
session window
目前 1.13 版本中 Flink SQL 不支持 Session 窗口的 Window TVF,所以这里就只介绍 Group Window Aggregation 方案:
-- 数据源表,用户购买行为记录表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT, -- 购买商品数量
window_start bigint
) WITH (
'connector' = 'print'
);
-- 数据处理逻辑
insert into sink_table
SELECT
dim,
UNIX_TIMESTAMP(CAST(session_start(row_time, interval '5' minute) AS STRING)) * 1000 as window_start,
count(1) as pv
FROM source_table
GROUP BY dim
, session(row_time, interval '5' minute)
注意: 上述 SQL 任务是在整个 Session 窗口结束之后才会把数据输出。Session 窗口即支持
处理时间
也支持事件时间
。但是处理时间只支持在 Streaming 任务中运行,Batch 任务不支持。
可以看到 Group Window Aggregation 中 Session 窗口的写法就是把 session window 的声明写在了 group by 子句中,即 session(row_time, interval '5' minute)
。其中:
第一个参数为事件时间的时间戳;第二个参数为 Session gap 间隔。
SQL 语义
:Session 窗口语义和滚动窗口类似,这里不再赘述。
可以直接在公众号后台回复1.13.2 最全 flink sql获取源代码。所有的源码都开源到 github 上面了。里面包含了非常多的案例。可以直接拿来在本地运行的!!!肥肠的方便。
固定窗口间隔内提前触发的的滚动窗口
,其实就是 Tumble Window + early-fire
的一个事件时间的版本。例如,从每日零点到当前这一分钟绘制累积 UV,其中 10:00 时的 UV 表示从 00:00 到 10:00 的 UV 总数。渐进式窗口可以认为是首先开一个最大窗口大小的滚动窗口,然后根据用户设置的触发的时间间隔将这个滚动窗口拆分为多个窗口,这些窗口具有相同的窗口起点和不同的窗口终点。如下图所示:cumulate window
明细输入数据:
time | id | money |
---|---|---|
2021-11-01 00:01:00 | A | 3 |
2021-11-01 00:01:00 | B | 5 |
2021-11-01 00:01:00 | A | 7 |
2021-11-01 00:02:00 | C | 3 |
2021-11-01 00:03:00 | C | 10 |
预期经过渐进式窗口计算的输出数据:
time | count distinct id | sum money |
---|---|---|
2021-11-01 00:01:00 | 2 | 15 |
2021-11-01 00:02:00 | 3 | 18 |
2021-11-01 00:03:00 | 3 | 28 |
转化为折线图长这样:
当日累计
可以看到,其特点就在于,每一分钟的输出结果都是当天零点累计到当前的结果。
渐进式窗口目前只有 Windowing TVF 方案支持:
-- 数据源表
CREATE TABLE source_table (
-- 用户 id
user_id BIGINT,
-- 用户
money BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 数据汇表
CREATE TABLE sink_table (
window_end bigint,
window_start bigint,
sum_money BIGINT,
count_distinct_id bigint
) WITH (
'connector' = 'print'
);
-- 数据处理逻辑
insert into sink_table
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
window_start,
sum(money) as sum_money,
count(distinct id) as count_distinct_id
FROM TABLE(CUMULATE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '60' SECOND
, INTERVAL '1' DAY))
GROUP BY
window_start,
window_end
可以看到 Windowing TVF 滚动窗口的写法就是把 cumulate window 的声明写在了数据源的 Table 子句中,即 TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND, INTERVAL '1' DAY))
,其中包含四部分参数:
第一个参数 TABLE source_table
声明数据源表;第二个参数 DESCRIPTOR(row_time)
声明数据源的时间戳;第三个参数 INTERVAL '60' SECOND
声明渐进式窗口触发的渐进步长为 1 min。第四个参数 INTERVAL '1' DAY
声明整个渐进式窗口的大小为 1 天,到了第二天新开一个窗口重新累计。
SQL 语义
:渐进式窗口语义和滚动窗口类似,这里不再赘述。
具体应用场景:实际的案例场景中,经常会有多个维度进行组合(cube)计算指标的场景。如果把每个维度组合的代码写一遍,然后 union all 起来,这样写起来非常麻烦,而且会导致一个数据源读取多遍。
这时,有离线 Hive SQL 使用经验的小伙伴萌就会想到,如果有了 Grouping Sets,我们就可以直接用 Grouping Sets 将维度组合写在一条 SQL 中,写起来方便并且执行效率也高。当然,Flink 支持这个功能。
但是目前 Grouping Sets 只在 Window TVF 中支持,不支持 Group Window Aggregation。
来一个实际案例感受一下,计算每日零点累计到当前这一分钟的分汇总、age、sex、age+sex 维度的用户数。
-- 用户访问明细表
CREATE TABLE source_table (
age STRING,
sex STRING,
user_id BIGINT,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.age.length' = '1',
'fields.sex.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000'
);
CREATE TABLE sink_table (
age STRING,
sex STRING,
uv BIGINT,
window_end bigint
) WITH (
'connector' = 'print'
);
insert into sink_table
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
if (age is null, 'ALL', age) as age,
if (sex is null, 'ALL', sex) as sex,
count(distinct user_id) as bucket_uv
FROM TABLE(CUMULATE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '5' SECOND
, INTERVAL '1' DAY))
GROUP BY
window_start,
window_end,
-- grouping sets 写法
GROUPING SETS (
()
, (age)
, (sex)
, (age, sex)
)
小伙伴萌这里需要注意下!!!
Flink SQL 中 Grouping Sets 的语法和 Hive SQL 的语法有一些不同,如果我们使用 Hive SQL 实现上述 SQL 的语义,其实现如下:
insert into sink_table
SELECT
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
if (age is null, 'ALL', age) as age,
if (sex is null, 'ALL', sex) as sex,
count(distinct user_id) as bucket_uv
FROM source_table
GROUP BY
age
, sex
-- hive sql grouping sets 写法
GROUPING SETS (
()
, (age)
, (sex)
, (age, sex)
)
按颜色分 key(横向)
就是 Group 聚合,按窗口划分(纵向)
就是窗口聚合。tumble window + key
那么这时候,小伙伴萌就会问到,我其实可以把窗口聚合的写法也转换为 Group 聚合,只需要把 Group 聚合的 Group By key 换成时间就行,那这两个聚合的区别到底在哪?
首先来举一个例子看看怎么将窗口聚合转换为 Group 聚合。假如一个窗口聚合是按照 1 分钟的粒度进行聚合,如下 SQL:
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)
-- 数据处理逻辑
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '1' minute) AS STRING)) * 1000 as window_start
from source_table
group by
dim,
-- 按照 Flink SQL tumble 窗口写法划分窗口
tumble(row_time, interval '1' minute)
转换为 Group 聚合的写法如下:
-- 数据源表
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
);
-- 数据汇表
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
);
-- 数据处理逻辑
insert into sink_table
select dim,
count(*) as pv,
sum(price) as sum_price,
max(price) as max_price,
min(price) as min_price,
-- 计算 uv 数
count(distinct user_id) as uv,
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint) as window_start
from source_table
group by
dim,
-- 将秒级别时间戳 / 60 转化为 1min
cast((UNIX_TIMESTAMP(CAST(row_time AS STRING))) / 60 as bigint)
确实没错,上面这个转换是一点问题都没有的。
但是窗口聚合和 Group by 聚合的差异在于:
时间
绑定的,窗口聚合其中窗口的计算结果触发都是由时间(Watermark)推动的。Group by 聚合完全由数据推动触发计算,新来一条数据去根据这条数据进行计算出结果发出;由此可见两者的实现方式也大为不同。SQL 语义
也是拿离线和实时做对比,Orders 为 kafka,target_table 为 Kafka,这个 SQL 生成的实时任务,在执行时,会生成三个算子:
数据源算子
(From Order):数据源算子一直运行,实时的从 Order Kafka 中一条一条的读取数据,然后一条一条发送给下游的 Group 聚合算子
,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个 SubTask(并发) 中Group 聚合算子
(group by key + sum\count\max\min):接收到上游算子发的一条一条的数据,去状态 state 中找这个 key 之前的 sum\count\max\min 结果。如果有结果 oldResult
,拿出来和当前的数据进行 sum\count\max\min
计算出这个 key 的新结果 newResult
,并将新结果 [key, newResult]
更新到 state 中,在向下游发送新计算的结果之前,先发一条撤回上次结果的消息 -[key, oldResult]
,然后再将新结果发往下游 +[key, newResult]
;如果 state 中没有当前 key 的结果,则直接使用当前这条数据计算 sum\max\min 结果 newResult
,并将新结果 [key, newResult]
更新到 state 中,当前是第一次往下游发,则不需要先发回撤消息,直接发送 +[key, newResult]
。数据汇算子
(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Kafka 中这个实时任务也是 24 小时一直在运行的,所有的算子在同一时刻都是处于 running 状态的。
特别注意:
运行时参数
小节。如果这个 SQL 放在 Hive 中执行时,其中 Orders 为 Hive,target_table 也为 Hive,其也会生成三个相同的算子,但是其和实时任务的执行方式完全不同:
数据源算子
(From Order):数据源算子从 Order Hive 中读取到所有的数据,然后所有数据发送给下游的 Group 聚合算子
,向下游发送数据的 shuffle 策略是根据 group by 中的 key 进行发送,相同的 key 发到同一个算子中,然后这个算子就运行结束了,释放资源了Group 聚合算子
(group by + sum\count\max\min):接收到上游算子发的所有数据,然后遍历计算 sum\count\max\min 结果,批量发给下游 数据汇算子
,这个算子也就运行结束了,释放资源了数据汇算子
(INSERT INTO target_table):接收到上游发的一条一条的数据,写入到 target_table Hive 中,整个任务也就运行结束了,整个任务的资源也就都释放了Group 聚合也支持 Grouping sets
、Rollup
、Cube
举一个 Grouping sets
的案例:
SELECT
supplier_id
, rating
, product_id
, COUNT(*)
FROM (VALUES
('supplier1', 'product1', 4),
('supplier1', 'product2', 3),
('supplier2', 'product3', 3),
('supplier2', 'product4', 4))
AS Products(supplier_id, product_id, rating)
GROUP BY GROUPING SET (
( supplier_id, product_id, rating ),
( supplier_id, product_id ),
( supplier_id, rating ),
( supplier_id ),
( product_id, rating ),
( product_id ),
( rating ),
( )
)
那这里我们拿 Over 聚合
与 窗口聚合
做一个对比,其之间的最大不同之处在于:
注意: 其实在生产环境中,Over 聚合的使用场景还是比较少的。在 Hive 中也有相同的聚合,但是小伙伴萌可以想想你在离线数仓经常使用嘛?
SELECT order_id, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM Orders
Over 聚合的语法总结如下:
SELECT
agg_func(agg_col) OVER (
[PARTITION BY col1[, col2, ...]]
ORDER BY time_col
range_definition),
...
FROM ...
其中:
按照行数聚合
,第二种为 按照时间区间聚合
。如下案例所示:a. ⭐ 时间区间聚合:
按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。
CREATE TABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '10',
'fields.product.min' = '1',
'fields.product.max' = '2'
);
CREATE TABLE sink_table (
product BIGINT,
order_time TIMESTAMP(3),
amount BIGINT,
one_hour_prod_amount_sum BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 标识统计范围是一个 product 的最近 1 小时的数据
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table
结果如下:
+I[2, 2021-12-24T22:08:26.583, 7, 73]
+I[2, 2021-12-24T22:08:27.583, 7, 80]
+I[2, 2021-12-24T22:08:28.583, 4, 84]
+I[2, 2021-12-24T22:08:29.584, 7, 91]
+I[2, 2021-12-24T22:08:30.583, 8, 99]
+I[1, 2021-12-24T22:08:31.583, 9, 138]
+I[2, 2021-12-24T22:08:32.584, 6, 105]
+I[1, 2021-12-24T22:08:33.584, 7, 145]
b. ⭐ 行数聚合:
按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 行数据的 amount 之和。
CREATE TABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '2',
'fields.product.min' = '1',
'fields.product.max' = '2'
);
CREATE TABLE sink_table (
product BIGINT,
order_time TIMESTAMP(3),
amount BIGINT,
one_hour_prod_amount_sum BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 标识统计范围是一个 product 的最近 5 行数据
ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table
预跑结果如下:
+I[2, 2021-12-24T22:18:19.147, 1, 9]
+I[1, 2021-12-24T22:18:20.147, 2, 11]
+I[1, 2021-12-24T22:18:21.147, 2, 12]
+I[1, 2021-12-24T22:18:22.147, 2, 12]
+I[1, 2021-12-24T22:18:23.148, 2, 12]
+I[1, 2021-12-24T22:18:24.147, 1, 11]
+I[1, 2021-12-24T22:18:25.146, 1, 10]
+I[1, 2021-12-24T22:18:26.147, 1, 9]
+I[2, 2021-12-24T22:18:27.145, 2, 11]
+I[2, 2021-12-24T22:18:28.148, 1, 10]
+I[2, 2021-12-24T22:18:29.145, 2, 10]
当然,如果你在一个 SELECT 中有多个聚合窗口的聚合方式,Flink SQL 支持了一种简化写法,如下案例:
SELECT order_id, order_time, amount,
SUM(amount) OVER w AS sum_amount,
AVG(amount) OVER w AS avg_amount
FROM Orders
-- 使用下面子句,定义 Over Window
WINDOW w AS (
PARTITION BY product
ORDER BY order_time
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)
Flink 也支持了非常多的数据 Join 方式,主要包括以下三种:
细分 Flink SQL 支持的 Join:
L
作为左流中的数据标识,R
作为右流中的数据标识):+[L, R]
+[L, R]
,没 Join 到输出 +[L, null]
),如果右流之后数据到达之后,发现左流之前输出过没有 Join 到的数据,则会发起回撤流,先输出 -[L, null]
,然后输出 +[L, R]
+[L, R]
,没 Join 到输出 +[null, R]
;对左流来说:Join 到输出 +[L, R]
,没 Join 到输出 +[L, null]
)。如果一条流的数据到达之后,发现之前另一条流之前输出过没有 Join 到的数据,则会发起回撤流(左流数据到达为例:回撤 -[null, R]
,输出 +[L, R]
,右流数据到达为例:回撤 -[L, null]
,输出 +[L, R]
)。下面这个案例为 Inner Join 案例
:
-- 曝光日志数据
CREATE TABLE show_log_table (
log_id BIGINT,
show_params STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '2',
'fields.show_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '100'
);
-- 点击日志数据
CREATE TABLE click_log_table (
log_id BIGINT,
click_params STRING
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '2',
'fields.click_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
-- 流的 INNER JOIN,条件为 log_id
INSERT INTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table
INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
输出结果如下:
+I[5, d, 5, f]
+I[5, d, 5, 8]
+I[5, d, 5, 2]
+I[3, 4, 3, 0]
+I[3, 4, 3, 3]
...
如果为 Left Join
案例:
CREATE TABLE show_log_table (
log_id BIGINT,
show_params STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.show_params.length' = '3',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE click_log_table (
log_id BIGINT,
click_params STRING
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.click_params.length' = '3',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table
LEFT JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
输出结果如下:
+I[5, f3c, 5, c05]
+I[5, 6e2, 5, 1f6]
+I[5, 86b, 5, 1f6]
+I[5, f3c, 5, 1f6]
-D[3, 4ab, null, null]
-D[3, 6f2, null, null]
+I[3, 4ab, 3, 765]
+I[3, 6f2, 3, 765]
+I[2, 3c4, null, null]
+I[3, 4ab, 3, a8b]
+I[3, 6f2, 3, a8b]
+I[2, c03, null, null]
...
如果为 Full Join
案例:
CREATE TABLE show_log_table (
log_id BIGINT,
show_params STRING
) WITH (
'connector' = 'datagen',
'rows-per-second' = '2',
'fields.show_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE click_log_table (
log_id BIGINT,
click_params STRING
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '2',
'fields.click_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table
FULL JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id;
输出结果如下:
+I[null, null, 7, 6]
+I[6, 5, null, null]
-D[1, c, null, null]
+I[1, c, 1, 2]
+I[3, 1, null, null]
+I[null, null, 7, d]
+I[10, 0, null, null]
+I[null, null, 2, 6]
-D[null, null, 7, 6]
-D[null, null, 7, d]
...
关于 Regular Join 的注意事项:
等值 join
。等值 join
和 非等值 join
区别在于,等值 join
数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join
数据 shuffle 策略是 Global,所有数据发往一个并发,按照非等值条件进行关联SQL 语义
:详细的 SQL 语义案例可以参考:
flink sql 知其所以然(十二):流 join 很难嘛???(上)
flink sql 知其所以然(十三):流 join 很难嘛???(下)
L
作为左流中的数据标识,R
作为右流中的数据标识):+[L, R]
+[L, R]
。事件时间中随着 Watermark 的推进(也支持处理时间)。如果发现发现左流 State 中的数据过期了,就把左流中过期的数据从 State 中删除,然后输出 +[L, null]
,如果右流 State 中的数据过期了,就直接从 State 中删除。+[L, R]
。事件时间中随着 Watermark 的推进(也支持处理时间),发现 State 中的数据能够过期了,就将这些数据从 State 中删除并且输出(左流过期输出 +[L, null]
,右流过期输出 -[null, R]
)可以发现 Inner Interval Join 和其他三种 Outer Interval Join 的区别在于,Outer 在随着时间推移的过程中,如果有数据过期了之后,会根据是否是 Outer 将没有 Join 到的数据也给输出。
下面为 Inner Interval Join
:
CREATE TABLE show_log_table (
log_id BIGINT,
show_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.show_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE click_log_table (
log_id BIGINT,
click_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.click_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
show_log_table.log_id as s_id,
show_log_table.show_params as s_params,
click_log_table.log_id as c_id,
click_log_table.click_params as c_params
FROM show_log_table INNER JOIN click_log_table ON show_log_table.log_id = click_log_table.log_id
AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL '4' HOUR AND click_log_table.row_time;
输出结果如下:
6> +I[2, a, 2, 6]
6> +I[2, 6, 2, 6]
2> +I[4, 1, 4, 5]
2> +I[10, 8, 10, d]
2> +I[10, 7, 10, d]
2> +I[10, d, 10, d]
2> +I[5, b, 5, d]
6> +I[1, a, 1, 7]
如果是 Left Interval Join
:
CREATE TABLE show_log (
log_id BIGINT,
show_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.show_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE click_log (
log_id BIGINT,
click_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.click_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
show_log.log_id as s_id,
show_log.show_params as s_params,
click_log.log_id as c_id,
click_log.click_params as c_params
FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id
AND show_log.row_time BETWEEN click_log.row_time - INTERVAL '5' SECOND AND click_log.row_time + INTERVAL '5' SECOND;
输出结果如下:
+I[6, e, 6, 7]
+I[11, d, null, null]
+I[7, b, null, null]
+I[8, 0, 8, 3]
+I[13, 6, null, null]
如果是 Full Interval Join
:
CREATE TABLE show_log (
log_id BIGINT,
show_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.show_params.length' = '1',
'fields.log_id.min' = '5',
'fields.log_id.max' = '15'
);
CREATE TABLE click_log (
log_id BIGINT,
click_params STRING,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.click_params.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
s_id BIGINT,
s_params STRING,
c_id BIGINT,
c_params STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
show_log.log_id as s_id,
show_log.show_params as s_params,
click_log.log_id as c_id,
click_log.click_params as c_params
FROM show_log LEFT JOIN click_log ON show_log.log_id = click_log.log_id
AND show_log.row_time BETWEEN click_log.row_time - INTERVAL '5' SECOND AND click_log.row_time + INTERVAL '5' SECOND;
输出结果如下:
+I[6, 1, null, null]
+I[7, 3, 7, 8]
+I[null, null, 6, 6]
+I[null, null, 4, d]
+I[8, d, null, null]
+I[null, null, 3, b]
关于 Interval Join 的注意事项: ⭐ 实时 Interval Join 可以不是
等值 join
。等值 join
和非等值 join
区别在于,等值 join
数据 shuffle 策略是 Hash,会按照 Join on 中的等值条件作为 id 发往对应的下游;非等值 join
数据 shuffle 策略是 Global,所有数据发往一个并发,然后将满足条件的数据进行关联输出
关于详细的 SQL 语义可以参考。
flink sql 知其所以然(十三):流 join 很难嘛???(下)
拉链快照表
,使用一个明细表去 join 这个 拉链快照表
的 join 方式就叫做 Temporal Join。而 Flink SQL 中也有对应的概念,表叫做 Versioned Table
,使用一个明细表去 join 这个 Versioned Table
的 join 操作就叫做 Temporal Join。Temporal Join 中,Versioned Table
其实就是对同一条 key(在 DDL 中以 primary key 标记同一个 key)的历史版本(根据时间划分版本)做一个维护,当有明细表 Join 这个表时,可以根据明细表中的时间版本选择 Versioned Table
对应时间区间内的快照数据进行 join。Versioned Table
。-- 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
WATERMARK FOR update_time AS update_time,
-- PRIMARY KEY 定义方式
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/* ... */
);
-- 定义一个 append-only 的数据源表
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
WATERMARK FOR update_time AS update_time
) WITH (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/* ... */
);
-- 将数据源表按照 Deduplicate 方式定义为 Versioned Table
CREATE VIEW versioned_rates AS
SELECT currency, conversion_rate, update_time -- 1. 定义 `update_time` 为时间字段
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY currency -- 2. 定义 `currency` 为主键
ORDER BY update_time DESC -- 3. ORDER BY 中必须是时间戳列
) AS rownum
FROM currency_rates)
WHERE rownum = 1;
以 事件时间
任务举例:
-- 1. 定义一个输入订单表
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (/* ... */);
-- 2. 定义一个汇率 versioned 表,其中 versioned 表的概念下文会介绍到
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
WATERMARK FOR update_time AS update_time,
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/* ... */
);
SELECT
order_id,
price,
currency,
conversion_rate,
order_time,
FROM orders
-- 3. Temporal Join 逻辑
-- SQL 语法为:FOR SYSTEM_TIME AS OF
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
结果如下,可以看到相同的货币汇率会根据具体数据的事件时间不同 Join 到对应时间的汇率:
order_id price 货币 汇率 order_time
======== ===== ======== =============== =========
o_001 11.11 EUR 1.14 12:00:00
o_002 12.51 EUR 1.10 12:06:00
注意:
还是相同的案例,如果是 处理时间
语义:
10:15> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 114
Yen 1
10:30> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 114
Yen 1
-- 10:42 时,Euro 的汇率从 114 变为 116
10:52> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 116 <==== 从 114 变为 116
Yen 1
-- 从 Orders 表查询数据
SELECT * FROM Orders;
amount currency
====== =========
2 Euro <== 在处理时间 10:15 到达的一条数据
1 US Dollar <== 在处理时间 10:30 到达的一条数据
2 Euro <== 在处理时间 10:52 到达的一条数据
-- 执行关联查询
SELECT
o.amount, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency
-- 结果如下:
amount currency rate amount*rate
====== ========= ======= ============
2 Euro 114 228 <== 在处理时间 10:15 到达的一条数据
1 US Dollar 102 102 <== 在处理时间 10:30 到达的一条数据
2 Euro 116 232 <== 在处理时间 10:52 到达的一条数据
可以发现处理时间就比较好理解了,因为处理时间语义中是根据左流数据到达的时间决定拿到的汇率值。Flink 就只为 LatestRates 维护了最新的状态数据,不需要关心历史版本的数据。
来一波输入数据:
曝光用户日志流(show_log)数据(数据存储在 kafka 中):
log_id timestamp user_id
1 2021-11-01 00:01:03 a
2 2021-11-01 00:03:00 b
3 2021-11-01 00:05:00 c
4 2021-11-01 00:06:00 b
5 2021-11-01 00:07:00 c
用户画像维表(user_profile)数据(数据存储在 redis 中):
user_id(主键) age sex
a 12-18 男
b 18-24 女
c 18-24 男
注意: redis 中的数据结构存储是按照 key,value 去存储的。其中 key 为 user_id,value 为 age,sex 的 json。
具体 SQL:
CREATE TABLE show_log (
log_id BIGINT,
`timestamp` as cast(CURRENT_TIMESTAMP as timestamp(3)),
user_id STRING,
proctime AS PROCTIME()
)
WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.length' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE user_profile (
user_id STRING,
age STRING,
sex STRING
) WITH (
'connector' = 'redis',
'hostname' = '127.0.0.1',
'port' = '6379',
'format' = 'json',
'lookup.cache.max-rows' = '500',
'lookup.cache.ttl' = '3600',
'lookup.max-retries' = '1'
);
CREATE TABLE sink_table (
log_id BIGINT,
`timestamp` TIMESTAMP(3),
user_id STRING,
proctime TIMESTAMP(3),
age STRING,
sex STRING
) WITH (
'connector' = 'print'
);
-- lookup join 的 query 逻辑
INSERT INTO sink_table
SELECT
s.log_id as log_id
, s.`timestamp` as `timestamp`
, s.user_id as user_id
, s.proctime as proctime
, u.sex as sex
, u.age as age
FROM show_log AS s
LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u
ON s.user_id = u.user_id
输出数据如下:
log_id timestamp user_id age sex
1 2021-11-01 00:01:03 a 12-18 男
2 2021-11-01 00:03:00 b 18-24 女
3 2021-11-01 00:05:00 c 18-24 男
4 2021-11-01 00:06:00 b 18-24 女
5 2021-11-01 00:07:00 c 18-24 男
注意: 实时的 lookup 维表关联能使用
处理时间
去做关联。
详细 SQL 语义及案例可见:
flink sql 知其所以然(十四):维表 join 的性能优化之路(上)附源码
flink sql 知其所以然(十五):改了改源码,实现了个 batch lookup join(附源码)
其实,Flink 官方并没有提供 redis 的维表 connector 实现。
没错,博主自己实现了一套。关于 redis 维表的 connector 实现,直接参考下面的文章。都是可以从 github 上找到源码拿来用的!
注意:
再说说维表常见的性能问题及优化思路。
所有的维表性能问题都可以总结为:高 qps 下访问维表存储引擎产生的任务背压,数据产出延迟问题。
举个例子:
0.1 ms
,那么并行度为 1 的任务的吞吐可以达到 1 query / 0.1 ms = 1w qps
。2 ms
,那么一条数据从输入 Flink 任务到输出 Flink 任务的时延就会变成 2.1 ms
,那么同样并行度为 1 的任务的吞吐只能达到1 query / 2.1 ms = 476 qps
。两者的吞吐量相差 21 倍
。这就是为什么维表 join 的算子会产生背压,任务产出会延迟。
那么当然,解决方案也是有很多的。抛开 Flink SQL 想一下,如果我们使用 DataStream API,甚至是在做一个后端应用,需要访问外部存储时,常用的优化方案有哪些?这里列举一下:
博主认为上述优化效果中,最好用的是 1 + 3,2 相比 3 还是一条一条发请求,性能会差一些。
既然 DataStream 可以这样做,Flink SQL 必须必的也可以借鉴上面的这些优化方案。具体怎么操作呢?看下文骚操作
flink sql 知其所以然(十五):改了改源码,实现了个 batch lookup join(附源码)
CREATE TABLE show_log_table (
log_id BIGINT,
show_params ARRAY<STRING>
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.log_id.min' = '1',
'fields.log_id.max' = '10'
);
CREATE TABLE sink_table (
log_id BIGINT,
show_param STRING
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT
log_id,
t.show_param as show_param
FROM show_log_table
-- array 炸开语法
CROSS JOIN UNNEST(show_params) AS t (show_param)
show_log_table 原始数据:
+I[7, [a, b, c]]
+I[5, [d, e, f]]
输出结果如下所示:
-- +I[7, [a, b, c]] 一行转为 3 行
+I[7, a]
+I[7, b]
+I[7, b]
-- +I[5, [d, e, f]] 一行转为 3 行
+I[5, d]
+I[5, e]
+I[5, f]
public class TableFunctionInnerJoin_Test {
public static void main(String[] args) throws Exception {
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
String sql = "CREATE FUNCTION user_profile_table_func AS 'flink.examples.sql._07.query._06_joins._06_table_function"
+ "._01_inner_join.TableFunctionInnerJoin_Test$UserProfileTableFunction';\n"
+ "\n"
+ "CREATE TABLE source_table (\n"
+ " user_id BIGINT NOT NULL,\n"
+ " name STRING,\n"
+ " row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),\n"
+ " WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND\n"
+ ") WITH (\n"
+ " 'connector' = 'datagen',\n"
+ " 'rows-per-second' = '10',\n"
+ " 'fields.name.length' = '1',\n"
+ " 'fields.user_id.min' = '1',\n"
+ " 'fields.user_id.max' = '10'\n"
+ ");\n"
+ "\n"
+ "CREATE TABLE sink_table (\n"
+ " user_id BIGINT,\n"
+ " name STRING,\n"
+ " age INT,\n"
+ " row_time TIMESTAMP(3)\n"
+ ") WITH (\n"
+ " 'connector' = 'print'\n"
+ ");\n"
+ "\n"
+ "INSERT INTO sink_table\n"
+ "SELECT user_id,\n"
+ " name,\n"
+ " age,\n"
+ " row_time\n"
+ "FROM source_table,\n"
// Table Function Join 语法对应 LATERAL TABLE
+ "LATERAL TABLE(user_profile_table_func(user_id)) t(age)";
Arrays.stream(sql.split(";"))
.forEach(flinkEnv.streamTEnv()::executeSql);
}
public static class UserProfileTableFunction extends TableFunction<Integer> {
public void eval(long userId) {
// 自定义输出逻辑
if (userId <= 5) {
// 一行转 1 行
collect(1);
} else {
// 一行转 3 行
collect(1);
collect(2);
collect(3);
}
}
}
}
执行结果如下:
-- <= 5,则只有 1 行结果
+I[3, 7, 1, 2021-05-01T18:23:42.560]
-- > 5,则有行 3 结果
+I[8, e, 1, 2021-05-01T18:23:42.560]
+I[8, e, 2, 2021-05-01T18:23:42.560]
+I[8, e, 3, 2021-05-01T18:23:42.560]
-- <= 5,则只有 1 行结果
+I[4, 9, 1, 2021-05-01T18:23:42.561]
-- > 5,则有行 3 结果
+I[8, c, 1, 2021-05-01T18:23:42.561]
+I[8, c, 2, 2021-05-01T18:23:42.561]
+I[8, c, 3, 2021-05-01T18:23:42.561]
集合操作支持 Batch\Streaming 任务。
union
Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c');
Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b');
Flink SQL> (SELECT s FROM t1) UNION (SELECT s FROM t2);
+---+
| s|
+---+
| c|
| a|
| b|
| d|
| e|
+---+
Flink SQL> (SELECT s FROM t1) UNION ALL (SELECT s FROM t2);
+---+
| c|
+---+
| c|
| a|
| b|
| b|
| c|
| d|
| e|
| a|
| b|
| b|
+---+
Flink SQL> create view t1(s) as values ('c'), ('a'), ('b'), ('b'), ('c');
Flink SQL> create view t2(s) as values ('d'), ('e'), ('a'), ('b'), ('b');
Flink SQL> (SELECT s FROM t1) INTERSECT (SELECT s FROM t2);
+---+
| s|
+---+
| a|
| b|
+---+
Flink SQL> (SELECT s FROM t1) INTERSECT ALL (SELECT s FROM t2);
+---+
| s|
+---+
| a|
| b|
| b|
+---+
Flink SQL> (SELECT s FROM t1) EXCEPT (SELECT s FROM t2);
+---+
| s |
+---+
| c |
+---+
Flink SQL> (SELECT s FROM t1) EXCEPT ALL (SELECT s FROM t2);
+---+
| s |
+---+
| c |
| c |
+---+
上述 SQL 在流式任务中,如果一条左流数据先来了,没有从右流集合数据中找到对应的数据时会直接输出,当右流对应数据后续来了之后,会下发回撤流将之前的数据給撤回。这也是一个回撤流。
SELECT user, amount
FROM Orders
WHERE product IN (
SELECT product FROM NewProducts
)
上述 SQL 的 In 子句其实就和之前介绍到的 Inner Join 类似。并且 In 子查询也会涉及到大状态问题,大家注意设置 State 的 TTL。
支持 Batch\Streaming,但在实时任务中一般用的非常少。
实时任务中,Order By 子句中必须要有时间属性字段,并且时间属性必须为升序时间属性,即 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
或者 WATERMARK FOR rowtime_column AS rowtime_column
。
举例:
CREATE TABLE source_table_1 (
user_id BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10'
);
CREATE TABLE sink_table (
user_id BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT user_id
FROM source_table_1
Order By row_time, user_id desc
支持 Batch\Streaming,但实时场景一般不使用,但是此处依然举一个例子:
CREATE TABLE source_table_1 (
user_id BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10'
);
CREATE TABLE sink_table (
user_id BIGINT
) WITH (
'connector' = 'print'
);
INSERT INTO sink_table
SELECT user_id
FROM source_table_1
Limit 3
结果如下,只有 3 条输出:
+I[5]
+I[9]
+I[4]
某个排序
条件,计算某个分组
下的排行榜数据SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name)
WHERE rownum <= N [AND conditions]
ROW_NUMBER()
:标识 TopN 排序子句PARTITION BY col1[, col2...]
:标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序取 topN,比如下述案例中的 partition by key
,就是根据需求中的搜索关键词(key)做为分区ORDER BY col1 [asc|desc][, col2 [asc|desc]...]
:标识 TopN 的排序规则,是按照哪些字段、顺序或逆序进行排序WHERE rownum <= N
:这个子句是一定需要的,只有加上了这个子句,Flink 才能将其识别为一个 TopN 的查询,其中 N 代表 TopN 的条目数[AND conditions]
:其他的限制条件也可以加上输入数据为搜索词条数据的搜索热度数据,当搜索热度发生变化时,会将变化后的数据写入到数据源的 Kafka 中:
数据源 schema:
-- 字段名 备注
-- key 搜索关键词
-- name 搜索热度名称
-- search_cnt 热搜消费热度(比如 3000)
-- timestamp 消费词条时间戳
CREATE TABLE source_table (
name BIGINT NOT NULL,
search_cnt BIGINT NOT NULL,
key BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
...
);
-- 数据汇 schema:
-- key 搜索关键词
-- name 搜索热度名称
-- search_cnt 热搜消费热度(比如 3000)
-- timestamp 消费词条时间戳
CREATE TABLE sink_table (
key BIGINT,
name BIGINT,
search_cnt BIGINT,
`timestamp` TIMESTAMP(3)
) WITH (
...
);
-- DML 逻辑
INSERT INTO sink_table
SELECT key, name, search_cnt, row_time as `timestamp`
FROM (
SELECT key, name, search_cnt, row_time,
-- 根据热搜关键词 key 作为 partition key,然后按照 search_cnt 倒排取前 100 名
ROW_NUMBER() OVER (PARTITION BY key
ORDER BY search_cnt desc) AS rownum
FROM source_table)
WHERE rownum <= 100
输出结果:
-D[关键词1, 词条1, 4944]
+I[关键词1, 词条1, 8670]
+I[关键词1, 词条2, 1735]
-D[关键词1, 词条3, 6641]
+I[关键词1, 词条3, 6928]
-D[关键词1, 词条4, 6312]
+I[关键词1, 词条4, 7287]
可以看到输出数据是有回撤数据的,为什么会出现回撤,我们来看看 SQL 语义。
上面的 SQL 会翻译成以下三个算子:
数据源
:数据源即最新的词条下面的搜索词的搜索热度数据,消费到 Kafka 中数据后,按照 partition key 将数据进行 hash 分发到下游排序算子,相同的 key 数据将会发送到一个并发中排序算子
:为每个 Key 维护了一个 TopN 的榜单数据,接受到上游的一条数据后,如果 TopN 榜单还没有到达 N 条,则将这条数据加入 TopN 榜单后,直接下发数据,如果到达 N 条之后,经过 TopN 计算,发现这条数据比原有的数据排序靠前,那么新的 TopN 排名就会有变化,就变化了的这部分数据之前下发的排名数据撤回(即回撤数据),然后下发新的排名数据数据汇
:接收到上游的数据之后,然后输出到外部存储引擎中上面三个算子也是会 24 小时一直运行的。
SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER (PARTITION BY window_start, window_end [, col_key1...]
ORDER BY col1 [asc|desc][, col2 [asc|desc]...]) AS rownum
FROM table_name) -- windowing TVF
WHERE rownum <= N [AND conditions]
输入表字段:
-- 字段名 备注
-- key 搜索关键词
-- name 搜索热度名称
-- search_cnt 热搜消费热度(比如 3000)
-- timestamp 消费词条时间戳
CREATE TABLE source_table (
name BIGINT NOT NULL,
search_cnt BIGINT NOT NULL,
key BIGINT NOT NULL,
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
WATERMARK FOR row_time AS row_time
) WITH (
...
);
-- 输出表字段:
-- 字段名 备注
-- key 搜索关键词
-- name 搜索热度名称
-- search_cnt 热搜消费热度(比如 3000)
-- window_start 窗口开始时间戳
-- window_end 窗口结束时间戳
CREATE TABLE sink_table (
key BIGINT,
name BIGINT,
search_cnt BIGINT,
window_start TIMESTAMP(3),
window_end TIMESTAMP(3)
) WITH (
...
);
-- 处理 sql:
INSERT INTO sink_table
SELECT key, name, search_cnt, window_start, window_end
FROM (
SELECT key, name, search_cnt, window_start, window_end,
ROW_NUMBER() OVER (PARTITION BY window_start, window_end, key
ORDER BY search_cnt desc) AS rownum
FROM (
SELECT window_start, window_end, key, name, max(search_cnt) as search_cnt
-- window tvf 写法
FROM TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '1' MINUTES))
GROUP BY window_start, window_end, key, name
)
)
WHERE rownum <= 100
输出结果:
+I[关键词1, 词条1, 8670, 2021-1-28T22:34, 2021-1-28T22:35]
+I[关键词1, 词条2, 6928, 2021-1-28T22:34, 2021-1-28T22:35]
+I[关键词1, 词条3, 1735, 2021-1-28T22:34, 2021-1-28T22:35]
+I[关键词1, 词条4, 7287, 2021-1-28T22:34, 2021-1-28T22:35]
...
可以看到结果是符合预期的,其中没有回撤数据。
数据源
:数据源即最新的词条下面的搜索词的搜索热度数据,消费到 Kafka 中数据后,将数据按照窗口聚合的 key 通过 hash 分发策略发送到下游窗口聚合算子窗口聚合算子
:进行窗口聚合计算,随着时间的推进,将窗口聚合结果计算完成发往下游窗口排序算子窗口排序算子
:这个算子其实也是一个窗口算子,只不过这个窗口算子为每个 Key 维护了一个 TopN 的榜单数据,接受到上游发送的窗口结果数据进行排序,随着时间的推进,窗口的结束,将排序的结果输出到下游数据汇算子。数据汇
:接收到上游的数据之后,然后输出到外部存储引擎中SELECT [column_list]
FROM (
SELECT [column_list],
ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
ORDER BY time_attr [asc|desc]) AS rownum
FROM table_name)
WHERE rownum = 1
其中:
ROW_NUMBER()
:标识当前数据的排序值PARTITION BY col1[, col2...]
:标识分区字段,代表按照这个 col 字段作为分区粒度对数据进行排序ORDER BY time_attr [asc|desc]
:标识排序规则,必须为时间戳列,当前 Flink SQL 支持处理时间、事件时间,ASC 代表保留第一行,DESC 代表保留最后一行WHERE rownum = 1
:这个子句是一定需要的,而且必须为 rownum = 1博主这里举两个案例:
星星
,月亮
,太阳
的用户数分别有多少。-- 数据源:当每一个用户的等级初始化及后续变化的时候的数据,即用户等级变化明细数据。
CREATE TABLE source_table (
user_id BIGINT COMMENT '用户 id',
level STRING COMMENT '用户等级',
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)) COMMENT '事件时间戳',
WATERMARK FOR row_time AS row_time
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.level.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '1000000'
);
-- 数据汇:输出即每一个等级的用户数
CREATE TABLE sink_table (
level STRING COMMENT '等级',
uv BIGINT COMMENT '当前等级用户数',
row_time timestamp(3) COMMENT '时间戳'
) WITH (
'connector' = 'print'
);
-- 处理逻辑:
INSERT INTO sink_table
select
level
, count(1) as uv
, max(row_time) as row_time
from (
SELECT
user_id,
level,
row_time,
row_number() over(partition by user_id order by row_time) as rn
FROM source_table
)
where rn = 1
group by
level
输出结果:
+I[等级 1, 6928, 2021-1-28T22:34]
-I[等级 1, 6928, 2021-1-28T22:34]
+I[等级 1, 8670, 2021-1-28T22:34]
-I[等级 1, 8670, 2021-1-28T22:34]
+I[等级 1, 77287, 2021-1-28T22:34]
...
可以看到其有回撤数据。
其对应的 SQL 语义如下:
数据源
:消费到 Kafka 中数据后,将数据按照 partition by 的 key 通过 hash 分发策略发送到下游去重算子Deduplication 去重算子
:接受到上游数据之后,根据 order by 中的条件判断当前的这条数据和之前数据时间戳大小,以上面案例来说,如果当前数据时间戳大于之前数据时间戳,则撤回之前向下游发的中间结果,然后将最新的结果发向下游(发送策略也为 hash,具体的 hash 策略为按照 group by 中 key 进行发送),如果当前数据时间戳小于之前数据时间戳,则不做操作。次算子产出的结果就是每一个用户的对应的最新等级信息。Group by 聚合算子
:接受到上游数据之后,根据 Group by 聚合粒度对数据进行聚合计算结果(每一个等级的用户数),发往下游数据汇算子数据汇
:接收到上游的数据之后,然后输出到外部存储引擎中-- 数据源:原始日志明细数据
CREATE TABLE source_table (
user_id BIGINT COMMENT '用户 id',
name STRING COMMENT '用户姓名',
server_timestamp BIGINT COMMENT '用户访问时间戳',
proctime AS PROCTIME()
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.name.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10',
'fields.server_timestamp.min' = '1',
'fields.server_timestamp.max' = '100000'
);
-- 数据汇:根据 user_id 去重的第一条数据
CREATE TABLE sink_table (
user_id BIGINT,
name STRING,
server_timestamp BIGINT
) WITH (
'connector' = 'print'
);
-- 处理逻辑:
INSERT INTO sink_table
select user_id,
name,
server_timestamp
from (
SELECT
user_id,
name,
server_timestamp,
row_number() over(partition by user_id order by proctime) as rn
FROM source_table
)
where rn = 1
输出结果:
+I[1, 用户 1, 2021-1-28T22:34]
+I[2, 用户 2, 2021-1-28T22:34]
+I[3, 用户 3, 2021-1-28T22:34]
...
可以看到这个处理逻辑是没有回撤数据的。其对应的 SQL 语义如下:
数据源
:消费到 Kafka 中数据后,将数据按照 partition by 的 key 通过 hash 分发策略发送到下游去重算子Deduplication 去重算子
:处理时间语义下,如果是当前 key 的第一条数据,则直接发往下游,如果判断(根据 state 中是否存储过改 key)不是第一条,则直接丢弃数据汇
:接收到上游的数据之后,然后输出到外部存储引擎中注意: 在 Deduplication 关于是否会出现回撤流,博主总结如下:
可能会有
比当前事件时间还大的数据可能会有
比当前事件时间还小的数据可能会有
比当前处理时间还大的数据不可能会有
比当前处理时间还小的数据EXPLAIN PLAN FOR <query_statement_or_insert_statement>
public class Explain_Test {
public static void main(String[] args) throws Exception {
FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);
flinkEnv.env().setParallelism(1);
String sql = "CREATE TABLE source_table (\n"
+ " user_id BIGINT COMMENT '用户 id',\n"
+ " name STRING COMMENT '用户姓名',\n"
+ " server_timestamp BIGINT COMMENT '用户访问时间戳',\n"
+ " proctime AS PROCTIME()\n"
+ ") WITH (\n"
+ " 'connector' = 'datagen',\n"
+ " 'rows-per-second' = '1',\n"
+ " 'fields.name.length' = '1',\n"
+ " 'fields.user_id.min' = '1',\n"
+ " 'fields.user_id.max' = '10',\n"
+ " 'fields.server_timestamp.min' = '1',\n"
+ " 'fields.server_timestamp.max' = '100000'\n"
+ ");\n"
+ "\n"
+ "CREATE TABLE sink_table (\n"
+ " user_id BIGINT,\n"
+ " name STRING,\n"
+ " server_timestamp BIGINT\n"
+ ") WITH (\n"
+ " 'connector' = 'print'\n"
+ ");\n"
+ "\n"
+ "EXPLAIN PLAN FOR\n"
+ "INSERT INTO sink_table\n"
+ "select user_id,\n"
+ " name,\n"
+ " server_timestamp\n"
+ "from (\n"
+ " SELECT\n"
+ " user_id,\n"
+ " name,\n"
+ " server_timestamp,\n"
+ " row_number() over(partition by user_id order by proctime) as rn\n"
+ " FROM source_table\n"
+ ")\n"
+ "where rn = 1";
/**
* 算子 {@link org.apache.flink.streaming.api.operators.KeyedProcessOperator}
* -- {@link org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction}
*/
for (String innerSql : sql.split(";")) {
TableResult tableResult = flinkEnv.streamTEnv().executeSql(innerSql);
tableResult.print();
}
}
}
上述代码执行结果如下:
1. 抽象语法树
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2])
+- LogicalFilter(condition=[=($3, 1)])
+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2], rn=[ROW_NUMBER() OVER (PARTITION BY $0 ORDER BY PROCTIME() NULLS FIRST)])
+- LogicalTableScan(table=[[default_catalog, default_database, source_table]])
2. 优化后的物理计划
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- Calc(select=[user_id, name, server_timestamp])
+- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME])
+- Exchange(distribution=[hash[user_id]])
+- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3])
+- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])
3. 优化后的执行计划
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.sink_table], fields=[user_id, name, server_timestamp])
+- Calc(select=[user_id, name, server_timestamp])
+- Deduplicate(keep=[FirstRow], key=[user_id], order=[PROCTIME])
+- Exchange(distribution=[hash[user_id]])
+- Calc(select=[user_id, name, server_timestamp, PROCTIME() AS $3])
+- TableSourceScan(table=[[default_catalog, default_database, source_table]], fields=[user_id, name, server_timestamp])
USE CATALOG catalog_name
USE MODULES module_name1[, module_name2, ...]
USE db名称
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// create a catalog
tEnv.executeSql("CREATE CATALOG cat1 WITH (...)");
tEnv.executeSql("SHOW CATALOGS").print();
// +-----------------+
// | catalog name |
// +-----------------+
// | default_catalog |
// | cat1 |
// +-----------------+
// change default catalog
tEnv.executeSql("USE CATALOG cat1");
tEnv.executeSql("SHOW DATABASES").print();
// databases are empty
// +---------------+
// | database name |
// +---------------+
// +---------------+
// create a database
tEnv.executeSql("CREATE DATABASE db1 WITH (...)");
tEnv.executeSql("SHOW DATABASES").print();
// +---------------+
// | database name |
// +---------------+
// | db1 |
// +---------------+
// change default database
tEnv.executeSql("USE db1");
// change module resolution order and enabled status
tEnv.executeSql("USE MODULES hive");
tEnv.executeSql("SHOW FULL MODULES").print();
// +-------------+-------+
// | module name | used |
// +-------------+-------+
// | hive | true |
// | core | false |
// +-------------+-------+
SHOW CATALOGS:展示所有 Catalog
SHOW CURRENT CATALOG:展示当前的 Catalog
SHOW DATABASES:展示当前 Catalog 下所有 Database
SHOW CURRENT DATABASE:展示当前的 Database
SHOW TABLES:展示当前 Database 下所有表
SHOW VIEWS:展示所有视图
SHOW FUNCTIONS:展示所有的函数
SHOW MODULES:展示所有的 Module(Module 是用于 UDF 扩展)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// show catalogs
tEnv.executeSql("SHOW CATALOGS").print();
// +-----------------+
// | catalog name |
// +-----------------+
// | default_catalog |
// +-----------------+
// show current catalog
tEnv.executeSql("SHOW CURRENT CATALOG").print();
// +----------------------+
// | current catalog name |
// +----------------------+
// | default_catalog |
// +----------------------+
// show databases
tEnv.executeSql("SHOW DATABASES").print();
// +------------------+
// | database name |
// +------------------+
// | default_database |
// +------------------+
// show current database
tEnv.executeSql("SHOW CURRENT DATABASE").print();
// +-----------------------+
// | current database name |
// +-----------------------+
// | default_database |
// +-----------------------+
// create a table
tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)");
// show tables
tEnv.executeSql("SHOW TABLES").print();
// +------------+
// | table name |
// +------------+
// | my_table |
// +------------+
// create a view
tEnv.executeSql("CREATE VIEW my_view AS ...");
// show views
tEnv.executeSql("SHOW VIEWS").print();
// +-----------+
// | view name |
// +-----------+
// | my_view |
// +-----------+
// show functions
tEnv.executeSql("SHOW FUNCTIONS").print();
// +---------------+
// | function name |
// +---------------+
// | mod |
// | sha256 |
// | ... |
// +---------------+
// create a user defined function
tEnv.executeSql("CREATE FUNCTION f1 AS ...");
// show user defined functions
tEnv.executeSql("SHOW USER FUNCTIONS").print();
// +---------------+
// | function name |
// +---------------+
// | f1 |
// | ... |
// +---------------+
// show modules
tEnv.executeSql("SHOW MODULES").print();
// +-------------+
// | module name |
// +-------------+
// | core |
// +-------------+
// show full modules
tEnv.executeSql("SHOW FULL MODULES").print();
// +-------------+-------+
// | module name | used |
// +-------------+-------+
// | core | true |
// | hive | false |
// +-------------+-------+
-- 加载
LOAD MODULE module_name [WITH ('key1' = 'val1', 'key2' = 'val2', ...)]
-- 卸载
UNLOAD MODULE module_name
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 加载 Flink SQL 体系内置的 Hive module
tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')");
tEnv.executeSql("SHOW MODULES").print();
// +-------------+
// | module name |
// +-------------+
// | core |
// | hive |
// +-------------+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
// 卸载唯一的一个 CoreModule
tEnv.executeSql("UNLOAD MODULE core");
tEnv.executeSql("SHOW MODULES").print();
// 结果啥 Moudle 都没有了
SET (key = value)?
RESET (key)?
启动一个 SQL CLI 之后,在 SQL CLI 中可以进行以下 SET 设置:
Flink SQL> SET table.planner = blink;
[INFO] Session property has been set.
Flink SQL> SET;
table.planner=blink;
Flink SQL> RESET table.planner;
[INFO] Session property has been reset.
Flink SQL> RESET;
[INFO] All session properties have been set to their default values.
latest-offset
select 一些数据出来预览,其元数据已经存储在 Hive MetaStore 中,但是 Hive MetaStore 中存储的配置中的 scan.startup.mode
是 earliest-offset
,通过 SQL Hints,用户可以在 DML 语句中将 scan.startup.mode
改为 latest-offset
查询,因此可以看出 SQL Hints 常用语这种比较临时的参数修改,比如 Ad-hoc 这种临时查询中,方便用户使用自定义的新的表参数而不是 Catalog 中已有的表参数。以下 DML SQL 中的 /*+ OPTIONS(key=val [, key=val]*) */
就是 SQL Hints。
SELECT *
FROM table_path /*+ OPTIONS(key=val [, key=val]*) */
启动一个 SQL CLI 之后,在 SQL CLI 中可以进行以下 SET 设置:
CREATE TABLE kafka_table1 (id BIGINT, name STRING, age INT) WITH (...);
CREATE TABLE kafka_table2 (id BIGINT, name STRING, age INT) WITH (...);
-- 1. 使用 'scan.startup.mode'='earliest-offset' 覆盖原来的 scan.startup.mode
select id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;
-- 2. 使用 'scan.startup.mode'='earliest-offset' 覆盖原来的 scan.startup.mode
select * from
kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1
join
kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2
on t1.id = t2.id;
-- 3. 使用 'sink.partitioner'='round-robin' 覆盖原来的 Sink 表的 sink.partitioner
insert into kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */ select * from kafka_table2;