这几天在看 Flink SQL 内置优化参数的功能和原理,虽然网上会有一些文章介绍,这里还是自己做一个整体的总结和思考,方便自己以后的回顾。
Flink SQL 内置的优化参数是 Blink Planner 里面的功能,也就是 1.9 以后 Blink Planner 自带功能,从 Flink 1.11 开始,Blink Planner 已经成为 Flink 默认的 Planner,目前聚合优化参数是针对无界流非窗口类聚合,窗口类聚合优化参数未来会进行支持。下面开始讲解一下 Flink SQL 优化参数的功能以及其适用场景,官网相关参考:Streaming Aggregation。
默认情况下,在无界流聚合场景下,每来一条记录,会经历下面三个步骤:
当数据量非常大时,由于每条记录都需要经过上面三个步骤,同时还涉及到序列化和反序列化,所以此时这种场景下,实时作业的吞吐量以及 RocksDB StateBackend 负载都会受到很大的影响,所以在 Blink Planer 中就引入了 MiniBatch 功能。
MiniBatch 的本质还是在内存中缓存一批数据,通过周期性时间或者缓存的记录数到达预设值时,会触发计算。简单理解,会将记录存储在一个 HashMap 中,Key 就是业务聚合 Key,Value 是这个 Key 的消息记录集合,之后会遍历内存的数据(通过 Key),先获取该 Key 之前的状态值,将内存中缓存的数据参与到状态计算,最终写入到状态后端中。通过对数据攒批处理后,降低对于状态后端的操作,从而提升实时作业的吞吐量。Mini Batch 功能是 Flink 在吞吐量以及延迟之间做的权衡。
开启 Mini Bathch 功能有三个参数:
// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
个人认为 Mini Batch 参数开启的适用场景有两点:
应用场景为无界流非窗口聚合时,而且实时任务的数据量非常大,业务方能够允许实时作业有一定延迟,这种情况下,你可以牺牲一点点延迟,来换取更大的实时任务的吞吐量。
对于可撤回流的二次聚合,引入该参数,尽可能降低聚合值突然变小而后又恢复正常值的抖动。比如下面统计最新 word 的次数:
select word,count(*) as
cnt from (select name,last_value(word) as new_word from source group name) as t group word
由于内层逻辑是一个聚合场景,同时实时数据也可能一直在变,所以内层结果存在撤回情况。当外层聚合逻辑遇到撤回记录时,会减去撤回消息记录 key 的相关结果值,然后在根据新发送的记录进行统计,所以就可能导致结果抖动,尤其在大促期间,大屏实时统计类任务,这种会造成业务方的疑问和担心,为什么结果值变小了,数据会不会丢失等等。引入 Mini Batch 参数,可以对一批数据进行计算后,在进行结果更新,尽可能减少这种数据抖动的情形。
Local-Global 聚合参数主要解决非 Distinct 聚合场景下,比如 SUM, COUNT, MAX, MIN, AVG,数据倾斜问题。Flink 在进行 keyBy 时,相同的 Key 肯定会到同一 TaskManager 中,所以如果某类 Key 数据量过多时,会造成某个 TaskManager 负载过高,极端情况可能会导致实时作业反压,Checkpoint 超时失败等问题。
Flink Local-Global 聚合类似 Hadoop MapReduce 任务的 Combine,先在上游将结果本地聚合好,在发送聚合后的数据到下游,大大降低了发送到下游的数据量(将明细数据转换成聚合后数据),从而解决数据倾斜问题。下面是 Flink Local-Global 聚合示意图:
使用 Local-Global 聚合优化的前提,需要开启 Mini Batch 功能,下面是代码使用 Local-Global 功能:
// instantiate table environment
TableEnvironment tEnv = ...
// access flink configuration
Configuration configuration = tEnv.getConfig().getConfiguration();
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true"); // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
Local-Global 聚合类参数,能够解决非 Distinct 类的聚合场景数据倾斜问题,却无法解决 Distinct 类聚合场景,因为 Distinct 需要记住之前的原始数据,进行去重。下面是可能存在 Distinct 类数据倾斜聚合的 SQL 语句:
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day
由于 day 一般是当天的日期,所以这种情况,day 相同的数据都会到同一个 TaskManager 上面去,最终造成实时任务热点。Flink 内置的 Distinct 聚合优化参数table.optimizer.distinct-agg.split.enabled
,通过将 Key 相同的记录,分到不同的 BUCKET(桶) 中去,BUCKET 默认数量为 1024,可以通过参数table.optimizer.distinct-agg.split.bucket-num
配置,配置 Split Distinct 聚合优化参数后,上面 SQL 会被转成:
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
在 day 相同的情况下,通过对 user_Id hash 取模,尽可能把消息打散到多个桶中,多个桶有分散在不同的 TaskManager,可以确定的是,user_id 相同的记录肯定会到同一 TaskManager 上面进行进行聚合。
下图是使用 Local Global 聚合参数和Split Distinct 聚合优化参数示意图: