
针对 datastream api 大家都比较熟悉了,还是那句话,在 datastream 中,你写的代码逻辑是什么样的,它最终的执行方式就是什么样的。
但是对于 flink sql 的执行过程,大家还是不熟悉的。
此节就是窗口聚合章节的第二篇,上节介绍了 1.13 之前的 tumble window 实现,本节介绍 window tvf 下的 tumble window 案例给大家介绍其使用方式和原理。
本节依然从以下几个章节给大家详细介绍 flink sql 的能力。
先说说结论,以下这些结论已经在上节说过了,此处附上上节文章:
TIMESTAMP(3)。如果你的数据源时间戳类型是 13 位 bigint 类型时间戳,可以用 ts AS TO_TIMESTAMP_LTZ(row_time, 3) 将其转换为 TIMESTAMP(3) 类型。关于 flink sql tumble window 一般都会有以下问题。本文的目标也是为大家解答这些问题:
在正式开始聊 tumble window 之前,先看看上节 flink sql 适用场景的结论。让大家先有 flink sql 的一个整体印象以及结论。
不装了,我坦白了,flink sql 其实很适合干的活就是 dwd 清洗,dws 聚合。
此处主要针对实时数仓的场景来说。flink sql 能干 dwd 清洗,dws 聚合,基本上实时数仓的大多数场景都能给覆盖了。
flink sql 牛逼!!!
但是!!!
经过博主使用 flink sql 经验来看,并不是所有的 dwd,dws 聚合场景都适合 flink sql(截止发文阶段来说)!!!
其实这些目前不适合 flink sql 的场景总结下来就是在处理上比 datastream 还是会有一定的损失。
先总结下使用场景:
1. dwd:简单的清洗、复杂的清洗、维度的扩充、各种 udf 的使用
2. dws:各类聚合
然后分适合的场景和不适合的场景来说,因为只这一篇不能覆盖所有的内容,所以本文此处先大致给个结论,之后会结合具体的场景详细描述。
窗口聚合大家都在 datastream api 中很熟悉了,目前在实时数据处理的过程中,窗口计算可以说是最重要、最常用的一种计算方式了。
但是在抛出窗口概念之前,博主有几个关于窗口的小想法说一下。
一个小想法。
先抛结论:窗口会拖慢实时数据的产出,是在目前下游分析引擎能力有限的情况下的一种妥协方案。
站在数据开发以及需求方的世界中,当然希望所有的数据都是实时来的,实时处理的,实时产出的,实时展现的。
举个例子:如果你要满足一个一分钟窗口聚合的 pv,uv,或者其他聚合需求。
olap 数据服务引擎 就可以满足上述的实时来的,实时处理的,实时产出的,实时展现的的场景。flink 消费处理明细数据,产出到 kafka,然后直接导入到 olap 引擎中。查询时直接用 olap 做聚合。这其中是没有任何窗口的概念的。但是整个链路中,要保障端对端精确一次,要保障大数据量情况下 olap 引擎能够秒级查询返回,更何况有一些去重类指标的计算,等等场景。把这些压力都放在 olap 引擎的压力是很大的。
因此在 flink 数据计算引擎中就诞生了窗口的概念。我们可以直接在计算引擎中进行窗口聚合计算,然后等到窗口结束之后直接把结果数据产出。这就出现了博主所说的窗口拖慢了实时数据产出的情况。而且窗口在处理不好的情况下可能会导致数据丢失。
关于上述两种情况的具体优劣选择,都由大家自行选择。上述只是引出博主一些想法。
目前已知的窗口分为以下四种。
1. Tumble Windows2. Hop Windows3. Cumulate Windows4. Session Windows
这些窗口的具体描述直接见官网,有详细的说明。此处不赘述。
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-agg/
此处介绍下 flink 中常常会涉及到的两个容易混淆的概念就是:窗口 + key。这里来形象的说明下。
如下图所示。

1
源码公众号后台回复1.13.2 tumble window 的奇妙解析之路获取。
在介绍 sql tumble window 窗口算子执行案例之前,先看一个 datastream 中的窗口算子案例。其逻辑都是相通的。会对我们了解 sql tumble window 算子有帮助。
我们先看看 datastream 处理逻辑。
以下面这个为例。
public class _04_TumbleWindowTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.addSource(new UserDefinedSource())
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, String, Integer, Long>>(Time.seconds(0)) {
@Override
public long extractTimestamp(Tuple4<String, String, Integer, Long> element) {
return element.f3;
}
})
.keyBy(new KeySelector<Tuple4<String, String, Integer, Long>, String>() {
@Override
public String getKey(Tuple4<String, String, Integer, Long> row) throws Exception {
return row.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.sum(2)
.print();
env.execute("1.12.1 DataStream TUMBLE WINDOW 案例");
}
private static class UserDefinedSource implements SourceFunction<Tuple4<String, String, Integer, Long>> {
private volatile boolean isCancel;
@Override
public void run(SourceContext<Tuple4<String, String, Integer, Long>> sourceContext) throws Exception {
while (!this.isCancel) {
sourceContext.collect(Tuple4.of("a", "b", 1, System.currentTimeMillis()));
Thread.sleep(10L);
}
}
@Override
public void cancel() {
this.isCancel = true;
}
}
}
datastream 生产的具体的 transformation 如下图:

24
其中我们只关注最重要的 WindowOperator 算子。

25
其中 WindowOperator 算子包含的重要属性如下图。

26
来看看 WindowOperator 的执行逻辑。窗口执行的整体详细流程可以参考:http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/

23
介绍到 tumble window 的语义,总要有对比的去介绍。这里的参照物就是 datastream api。
在 datastream api 中。tumble window 一般用作以下两种场景。
但是在 sql api 中。tumble window 是聚合(group by)语义,聚合在 sql 标准中的数据处理逻辑是多条输入,在窗口触发时就输出一条数据的语义。而上面的常常用在 datastream 中的优化场景是多对多的场景。因此和 sql 语义不符合。所以 flink sql tumble window 一般都是用于计算聚合运算值来使用。
滚动窗口的特性就是会将无限流进行纵向划分成一个一个的窗口,每个窗口都是相同的大小,并且不重叠。

22
来,在介绍原理之前,总要先用起来,我们就以下面这个例子展开。
1.(flink 1.13.2)场景:简单且常见的分维度分钟级别同时在线用户数、总销售额
数据源表:
CREATE TABLE source_table (
-- 维度数据
dim STRING,
-- 用户 id
user_id BIGINT,
-- 用户
price BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.dim.length' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.price.min' = '1',
'fields.price.max' = '100000'
)
Notes - 关于 watermark 容易踩得坑:sql 的 watermark 类型必须要设置为
TIMESTAMP(3)。如果你的数据源时间戳类型是 13 位 bigint 类型时间戳,可以用ts AS TO_TIMESTAMP_LTZ(row_time, 3)将其转换为TIMESTAMP(3)类型。
数据汇表:
CREATE TABLE sink_table (
dim STRING,
pv BIGINT,
sum_price BIGINT,
max_price BIGINT,
min_price BIGINT,
uv BIGINT,
window_start bigint
) WITH (
'connector' = 'print'
)
数据处理逻辑:
可以看下下面语法,窗口聚合的写法有专门的 tumble(row_time, interval '1' minute) 写法,这就是与平常我们写的 hive sql,mysql 等不一样的地方。
insert into sink_table
select dim,
sum(bucket_pv) as pv,
sum(bucket_sum_price) as sum_price,
max(bucket_max_price) as max_price,
min(bucket_min_price) as min_price,
sum(bucket_uv) as uv,
max(window_start) as window_start
from (
SELECT dim,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
window_end,
count(*) as bucket_pv,
sum(price) as bucket_sum_price,
max(price) as bucket_max_price,
min(price) as bucket_min_price,
-- 计算 uv 数
count(distinct user_id) as bucket_uv
FROM TABLE(TUMBLE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '60' SECOND))
GROUP BY window_start,
window_end,
dim,
-- 按照用户 id 进行分桶,防止数据倾斜
mod(user_id, 1024)
)
group by dim,
window_start
2.运行:可以看到,其实在 flink sql 任务中,其会把对应的处理逻辑给写到算子名称上面。
Notes - 观察 flink sql 技巧 1:这个其实就是我们观察 flink sql 任务的第一个技巧。如果你想知道你的 flink 任务在干啥,第一反应是去 flink webui 看看这个任务目前在做什么。包括算子名称都会给直接展示给我们目前哪个算子在干啥事情,在处理啥逻辑
先看一下整个算子图,如下图。从左到右总共分为四个算子。

3
整体描述一下:

29
来看看每一个算子具体做了什么事情。
第一个算子:

4
第二个算子:
Notes:

5
第三个算子:

6
第四个算子:

7
3.(flink 1.13.2)结果:
1> +U[7, 36403, 1824202613, 99999, 2, 30498, 1632136920000]
2> -U[a, 37001, 1857079208, 99999, 3, 30857, 1632136920000]
2> +U[a, 37037, 1858977218, 99999, 3, 30886, 1632136920000]
1> -U[7, 36403, 1824202613, 99999, 2, 30498, 1632136920000]
1> +U[7, 36428, 1825407205, 99999, 2, 30523, 1632136920000]
1> -U[2, 36970, 1848722634, 99999, 6, 30876, 1632136920000]
2> -U[6, 36911, 1856162742, 99998, 2, 30801, 1632136920000]
...
4.(flink 1.13.2)原理:
关于 sql 开始运行的机制见上一节详述。
此处只介绍相比前一节新增内容。可以看到上述代码的具体 transformation 如下图。

8

9
整体处理逻辑如下图。
这里处理每一条数据时,主要是把数据放入到 local buffer 中。

1
涉及到 local combiner 处理计算时,就是第 3 点,跟进代码 windowBuffer.advanceProgress(currentWatermark)。

12

13

14
这里看下具体 combine 流程。总共四步,如下图。

15

16
其实 local agg 的处理逻辑很简单,基本和上节说的 1.12 实现一致。都是代码生成之后做 sum,count,count distinct 的计算。

27
依然如下图:

30
先看看 transformation 中包含什么内容:

10
整体处理逻辑如下:

17
也是在处理 watermark 时,进行聚合计算。

18

19

20

21
这里有一个重点,就是 global agg udf 是执行 merge 操作进行聚合的。其逻辑就是将上游 combiner 的结果数据聚合。

22

23
在窗口触发时,将结果输出。

24

25

26

28
其实 global agg 和 local agg 逻辑基本一致,这里不再赘述。
本文主要介绍了 window tvf 实现的 tumble window 聚合类指标的常见场景案例以及其底层运行原理。
而且也介绍了在查看 flink sql 任务时的一些技巧:
TIMESTAMP(3)。如果你的数据源时间戳类型是 13 位 bigint 类型时间戳,可以用 ts AS TO_TIMESTAMP_LTZ(row_time, 3) 将其转换为 TIMESTAMP(3) 类型。