前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Flink SQL 算子生成固定 ID 的方法总结

Flink SQL 算子生成固定 ID 的方法总结

原创
作者头像
KyleMeow
发布于 2023-06-14 09:48:28
发布于 2023-06-14 09:48:28
2.1K00
代码可运行
举报
运行总次数:0
代码可运行

背景知识

Flink 的作业的算子拓扑结构,由一系列算子组成的运行图来描述,如下图所示:

含有多个算子的 Flink 运行图
含有多个算子的 Flink 运行图

运行图中的每个节点有自己的 ID,也可以有自己的状态(State)。当 Flink 做快照时,会保存算子 ID 和状态的对应关系。因此,我们从快照恢复作业时,如果每个算子 ID 都和之前的算子一一对应,就可以精确还原之前快照时的运行状态。

如果用户没有显式指定算子的 ID,Flink 会根据拓扑结构,自动为算子生成自己的 ID

问题描述

我们通过 SQL 或者 Table API 的方式来编写 Flink 作业时,由于需要经过 Calcite 翻译、优化才可以得到最终的 Flink 算子,用户侧很难直接干预算子的生成逻辑。

例如,用户稍微修改了一下 SQL 代码,或者升级了 Flink 版本,都可能导致运行图发生变化,自动生成的算子 ID 不再与之前的保持一致,从而造成快照无法恢复的后果。

那么问题来了:如何能够固定算子的 ID,即后续无论做了什么修改,只要这个算子还是他自己,那么它的 ID 永远都不变呢?

原理介绍

在 DataStream API 编程模式下,Flink 确实提供了固定算子 ID 的方式:我们可以通过 uid() 方法,显式为算子设置一个字符串 ID,随后 Flink 就会把这个 uid 进行 hash 处理,最终映射为唯一的算子 ID。

例如我们可以在 Flink 的测试代码里找到如下的例子:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
env.addSource(new StatefulSource(false, finalCheckpointLatch))
        .uid(SOURCE_UID)
        .setParallelism(NUM_SOURCES)
        .sinkTo(sink)
        .setParallelism(NUM_SINKS)
        .uid(SINK_UID);

在这个示例中,用户为 Source 和 Sink 算子显式声明了 uid。这样后续无论再在中间加入多少其他算子,都不影响 Source 和 Sink 的状态匹配。

这个 uid() 方法底层是调用 Transformation#setUid() 方法来设置 uid 的,因此这里的突破口就是:如何找到 Flink SQL 生成的 Transformation 对象,并为它设置唯一的 uid。

我们已经可以从很多文章中了解到 Flink 的 SQL 代码转换为 Transformation 的步骤,例如这篇文章,因此本文不再复述其中的细节。

简而言之,Flink 的 SQL 作业要经过 SQL 代码 → SqlNode AST 语法树 → Operation 抽象层 → RelNode 逻辑树 → RelNode 物理树 → ExecNodeGraph 执行图 → Transformations → StreamGraph → JobGraph → ExecutionGraph,才能最终提交执行。

固定 UID 方法一: 链路追溯

既然我们知道只要给 Transformation 设置 uid 即可保证后续的算子 ID 固定化,那可以反向思考:只要在它的前体 ExecNode 中保存 uid,那么在 ExecNodeBase#translateToPlanInternal 方法里,我们就可以根据这个保存的 uid 来设置 Transformation 的 uid。

例如我们新建一个 StreamExecCalc 的子类,名为 EnhancedStreamExecCalc,覆盖 translateToPlanInternal 方法:

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
@Override
protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
    Transformation<RowData> transformation = super.translateToPlanInternal(planner);
    transformation.setUid(uid);
    return transformation;
}

这样生成的 Transformation 就会一路把我们的 uid 传递下去。

按照这个思路,我们继续反向追溯,在 ExecNode 的前体,即 StreamPhysicalRel 中,也加入 uid 字段。这样它生成的 ExecNode 就可以带上 uid 信息。

例如,我们可以继续新建一个 StreamPhysicalCalc 的子类,覆盖 translateToExecNode 方法,在这里生成刚刚提到的有 uid 的 EnhancedStreamExecCalc 对象。

随后,我们新增 Planner Rule,根据算子的特征(例如名字、参数),编写匹配规则,将 RelNode 替换成我们增强后的版本。

总结:这个方法对每个类型的都需要做定制,较为繁琐,适合逻辑较为特化的场景。

固定 UID 方法二: 官方增强

由于上述提到的“单点式”增强方式通用性不够,Flink 社区在 FLIP-190: Support Version Upgrades for Table API & SQL Programs 提案里,对这类因 ID 变化而导致不兼容的问题有了一个系统化的方案(目前还不成熟)。

该提案的核心技术点仍然是根据一定规则来生成 Transformation 的 uid,从而保证运行图算子 ID 的固定化。这里新增了一个 TransformationMetadata 类,用于在刚刚提到的 translateToPlanInternal 方法里记录某个算子的名称、uid、描述等元数据。

主要思路是通过增加 COMPILE PLAN 语句,把给定的 SQL 查询逻辑变成一个 JSON 描述 Plan 文件(见 示例文件),随后用户可以通过 EXECUTE PLAN 语句,执行这个 JSON 格式的 Plan 文件。只要 Plan 文件的格式是兼容的,算子 ID 的生成规则固定,就可以保障最后的运行图算子的逻辑和 ID 的稳定性。

用户只需要在 Flink 参数里设置 table.exec.uid.generationPLAN_ONLY(默认值),即可开启该功能。对于所有用 COMPILE PLAN 语句包含的逻辑,Flink 都会为每个算子根据规则(由 table.exec.uid.format 参数控制)生成唯一的 ID。

总结

本文讲解了 Flink 算子 ID 的用途、生成逻辑,以及不匹配的后果,并从流程上分析了如何显式给 SQL 语句生成的各项结构设置固定的 uid,随后还介绍了 Flink 社区对此问题的应对思路,希望能给大家带来一些启发。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
Flink SQL 优化实战 - 维表 JOIN 优化
作者:龙逸尘,腾讯 CSIG 高级工程师 背景介绍 维表(Dimension Table)是来自数仓建模的概念。在数仓模型中,事实表(Fact Table)是指存储有事实记录的表,如系统日志、销售记录等,而维表是与事实表相对应的一种表,它保存了事实表中指定属性的相关详细信息,可以跟事实表做关联;相当于将事实表上经常重复出现的属性抽取、规范出来用一张表进行管理。 在实际生产中,我们经常会有这样的需求,以原始数据流作为基础,关联大量的外部表来补充一些属性。例如,在订单数据中希望能获取订单收货人所在市区的名称。一
腾讯云大数据
2022/05/30
4K0
Flink SQL 优化实战 - 维表 JOIN 优化
一文搞懂Flink SQL执行过程
学习了 apache calcite,基本上把 apache calcite 的官网看了一遍,也写了几个小例子,现在该分析一下 Flink SQL 的执行过程了,其中关于 apache calcite 的部分不深究,因为 apache calcite 有些复杂,真的要了解清楚需要大量时间,本次还是聚焦 Flink.
shengjk1
2021/03/17
2.1K0
一文搞懂Flink SQL执行过程
[源码分析] 带你梳理 Flink SQL / Table API内部执行流程
本文将简述Flink SQL / Table API的内部实现,为大家把 "从SQL语句到具体执行" 这个流程串起来。并且尽量多提供调用栈,这样大家在遇到问题时就知道应该从什么地方设置断点,对整体架构理解也能更加深入。
罗西的思考
2020/09/07
3.3K0
【Flink】第二十九篇:源码分析 Blink Planner
【Flink】第四篇:【迷思】对update语义拆解D-、I+后造成update原子性丢失
章鱼carl
2022/03/31
2.4K0
【Flink】第二十九篇:源码分析 Blink Planner
从一个诡异的 Bug 来看 Flink 快照和状态读取的流程
流计算 Oceanus 平台支持以 SQL 的方式提交作业,独享集群支持最新的 Flink 1.10 提供的新版 Blink Planner 语法。有一位客户写了一段代码,用到了 SQL 的 TopN 功能,语句类似于:
KyleMeow
2020/06/30
3.4K0
从一个诡异的 Bug 来看 Flink 快照和状态读取的流程
个推基于Flink SQL建设实时数仓实践
作为一家数据智能企业,个推在服务垂直行业客户的过程中,会涉及到很多数据实时计算和分析的场景,比如在服务开发者时,需要对App消息推送的下发数、到达数、打开率等后效数据进行实时统计;在服务政府单位时,需要对区域内实时人口进行统计和画像分析。为了更好地支撑大数据业务发展,个推也建设了自己的实时数仓。相比Storm、Spark等实时处理框架,Flink不仅具有高吞吐、低延迟等特性,同时还支持精确一次语义(exactly once)、状态存储等特性,拥有很好的容错机制,且使用门槛低、易上手、开发难度小。因此,个推主要基于Flink SQL来解决大部分的实时作业需求。
个推
2022/03/28
1.3K1
个推基于Flink SQL建设实时数仓实践
flink sql源码分析一之执行流程梳理
我们在梳理flink sql 执行流程时以sql解析、sql校验、sql转化及sql优化的顺序来展开,本篇主要是对过程的梳理,不会涉及过多的代码部分,后面会针对各环节进行逐一分析。
山行AI
2020/12/14
2K0
flink sql源码分析一之执行流程梳理
深入分析 Flink SQL 工作机制
摘要:本文整理自 Flink Forward 2020 全球在线会议中文精华版,由 Apache Flink PMC 伍翀(云邪)分享,社区志愿者陈婧敏(清樾)整理。旨在帮助大家更好地理解 Flink SQL 引擎的工作原理。文章主要分为以下四部分:
Spark学习技巧
2021/03/05
2K0
深入分析 Flink SQL 工作机制
flink sql 知其所以然(六)| flink sql 约会 calcite(看这篇就够了)
全网第一个 flink sql 实战,本文主要介绍 flink sql 与 calcite 之间的关系。flink sql 的解析主要依赖 calcite。
公众号:大数据羊说
2022/04/04
2.5K0
flink sql 知其所以然(六)| flink sql 约会 calcite(看这篇就够了)
Flink进行Paimon写入源码分析
Paimon的前身是Flink-Table-Store,希望提供流批一体的存储,提供一定的OLAP查询能力(基于列式存储),做到毫秒级别的实时流式读取。Flink-Table-Store希望能够支持Flink SQL的全部概念,能够结合Flink SQL提供DB级别体验,并且支持大规模的更新。Flink-Table-Store希望能够结合Flink,实现完整的流批一体体验(计算+存储),同时拓展Flink-Table-Store的生态,升级为Paimon,来支持更多大数据引擎的查询/写入。如果我们希望深度使用Paimon,并充分利用Paimon的特性,那么了解Flilnk写入Paimon的过程十分重要,本文希望通过源码分析的方式带大家充分了解Flink写入Paimon的完整过程。
wenly
2023/08/15
2.7K0
Flink进行Paimon写入源码分析
Flink优化器与源码解析系列--让Flink飞奔起来这篇文章就够啦(一)
ApacheFlink是一个框架和分布式处理引擎,用于在无限和有界数据流上进行有状态计算。Flink被设计成在所有常见的集群环境中运行,以内存速度和任何规模执行计算。
用户7600169
2022/04/25
1.1K0
Flink优化器与源码解析系列--让Flink飞奔起来这篇文章就够啦(一)
Flink面试通关手册
2019 年是大数据实时计算领域最不平凡的一年,2019 年 1 月阿里巴巴 Blink (内部的 Flink 分支版本)开源,大数据领域一夜间从 Spark 独步天下走向了两强争霸的时代。Flink 因为其天然的流式计算特性以及强大的处理性能成为炙手可热的大数据处理框架。
大数据真好玩
2019/12/09
1.4K0
Flink面试通关手册
Flink SQL代码生成与UDF重复调用的优化
代码生成(code generation)是当今各种数据库和数据处理引擎广泛采用的物理执行层技术之一。通过代码生成,可以将原本需要解释执行的算子逻辑转为编译执行(二进制代码),充分利用JIT编译的优势,克服传统Volcano模型虚函数调用过多、对寄存器不友好的缺点,在CPU-bound场景下可以获得大幅的性能提升。
Spark学习技巧
2022/03/14
1.7K0
Flink SQL代码生成与UDF重复调用的优化
Apache Calcite 功能简析及在 Flink 的应用
• Apache Calcite 是一个动态数据的管理框架,可以用来构建数据库系统的语法解析模块
KyleMeow
2018/09/02
8K0
Apache Calcite 功能简析及在 Flink 的应用
Flink 面试题
Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。并且 Flink 提供了数据分布、容错机制以及资源管理等核心功能。
Tim在路上
2020/08/05
1.5K0
从头分析flink源码第四篇之channel selector
•void setup(int numberOfChannels):用输出通道的数量来对channel selector进行初始化操作,主要用于数据输出时使用;•selectChannel(T record):返回逻辑channel index,给定记录应写入该索引。broadcast模式的 channel selectors对应的这个方法不应该被调用,在实现时可以抛出UnsupportedOperationException。•isBroadcast() 方法:在broadcast模式下会选择所有的channel,这个方法用来标识是否是broadcast方法。
山行AI
2021/08/18
1.2K0
Flink
  1)Flink 是标准的实时处理引擎,基于事件驱动。而 Spark Streaming 是微批(Micro-Batch)的模型;
挽风
2023/10/17
6220
Flink
[源码分析] 从FlatMap用法到Flink的内部实现
本文将从FlatMap概念和如何使用开始入手,深入到Flink是如何实现FlatMap。希望能让大家对这个概念有更深入的理解。
罗西的思考
2020/09/07
1.8K0
Flink SQL流式聚合Mini-Batch优化原理浅析
流式聚合(streaming aggregation)是我们编写实时业务逻辑时非常常见的场景,当然也比较容易出现各种各样的性能问题。Flink SQL使得用户可以通过简单的聚合函数和GROUP BY子句实现流式聚合,同时也内置了一些优化机制来解决部分case下可能遇到的瓶颈。本文对其中常用的Mini-Batch做个简要的介绍,顺便从源码看一看它的实现思路。
大数据真好玩
2021/04/21
3.3K0
flink sql 知其所以然(十五):改了改源码,实现了个 batch lookup join
flink sql 知其所以然(十四):维表 join 的性能优化之路(上)附源码
公众号:大数据羊说
2022/04/04
1.2K0
flink sql 知其所以然(十五):改了改源码,实现了个 batch lookup join
相关推荐
Flink SQL 优化实战 - 维表 JOIN 优化
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验