最近再调研业界一些计算引擎的 Semi / Anti Join 的实现方式,刚好对 Flink Semi / Anti Join 的实现方式进行了研究,通过对 Flink SemiAntiJoinTest 的单测以及源码的 Debug,目前整体对 Flink 实现 Semi / Anti Join 的原理有一定理解,所以这里整体做一个总结,同时也帮助大家对于 Flink 有个更好的理解。
Flink 最底层由于支持 SemiJoin 或者 AntiJoin 的算子(具体看 SemiHashJoinOperator、NestedLoopJoinCodeGenerator、AntiHashJoinOperator),所以整体上 Flink 支持子查询的场景还是非常多的,除了将常见的 In / Not In、Exists / Not Exists 转换到 SemiJoin/AntiJoin 的场景,还支持 In/ Not In 子查询是关联子查询转换到 SemiJoin/AntiJoin,同时在 In Or Exists 关联子查询中,也支持将有多个关联条件的这种 Case,转换到 SemiJoin/AntiJoin。
Flink 中对于 Filter 中子查询转 SemiJoin/AntiJoin 的条件有着严格的限制,只有当条件都必须是合取范式的情况(谓词都是 AND 链接在一起),才会尝试去做转 SemiJoin / AntiJoin 的逻辑。这样做的原因,我个人理解有两点:
对于 Flink Filter 中 In 子查询(Or Not)或者 Exists 子查询(Or Not)会先转换为如下形式:
LogicalJoin(condition=[xxx], joinType=[anti/semi])
--举一个示例:
SELECT * FROM l WHERE a IN (SELECT d FROM r WHERE l.b > r.e)
--转换的 RelNode 结构
LogicalJoin(condition=[AND(=($0, $3), >($1, $4))], joinType=[semi])
:- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]])
+- LogicalProject(inputs=[0..1])
+- LogicalFilter(condition=[true])
+- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]])
最后结合 Join condition 条件,以及 JoinType 的类型(Anti / Semi),将其转换为对应的算子实现。由于 Flink 底层自定义了相关 SemiJoin Or AntiJoin 的算子实现,所以在 SemiJoin Or AntiJoin 的 Condition 上,SemiJoin Or AntiJoin,允许有非等值的条件(比如大于),不过在 Calcite 以及 Presto 中的 SemiJoin 实现,Join 条件必须是等值的。
下面是 Flink 一个 Semi Join 的 SQL 示例:
SELECT a FROM l u where exists (select * from r where r.e = u.b)
其转换为 Semi Join 的计划为:
LogicalJoin(condition=[=($3, $1)], joinType=[semi])
:- LogicalTableScan(table=[[default_catalog, default_database, l, source: [TestTableSource(a, b, c)]]])
+- LogicalProject(exprs=[[$1]])
+- LogicalFilter(condition=[true])
+- LogicalTableScan(table=[[default_catalog, default_database, r, source: [TestTableSource(d, e, f)]]])
Flink 中具体使用如下优化规则集合来尝试将子查询转换到 SemiJoin/AntiJoin:
在尝试将 Not Exists Or Not In 子查询转换到Anti Join 的时候,要注意等值条件的 NULL-aware的。下面的是 chatgpt 的回答:
Flink 中选择在子查询消除之前,先尝试将 Filter 的子查询转换到 SemiJoin/AntiJoin,如果子查无法转换到 SemiJoin/AntiJoin,那么之后还会使用 Calcite 中 ( SubQueryRemoveRule 子查询消除 + 子查询解关联RelDecorrelator.decorrelateQuery)来对子查询进行转换。简单来说,就是先尝试将子查询转换到 SemiJoin/AntiJoin,转换不了就用 Calcite 那套子查询消除 + 解关联来对子查询兜底。
下面分别 Flink SEMI_JOIN_RULES
规则集合每条规则的作用进行介绍:
上面 5 个规则,共同组成了 Flink SemiJoin/AntiJoin 的转换规则,接下来重点对 FlinkSubQueryRemoveRule.FILTER 优化规则进行讲解,因为核心转换逻辑在这个柜子中。
SQL 子查询可以出现在 Project、Filter、Join 中,对于 FlinkSubQueryRemoveRule.FILTER
优化规则,主要是匹配 Filter RelNode,然后尝试将 Filter 条件中的子查询转换为 SemiJoin / AntiJoin。
Calcite 从解析到初始 RelNode 转换完成后,会将子查询转换为 RexSubQuery,RexSubQuery 本质是一个 RexCall。Flink 选择在子查询消除之前(在使用 SubQueryRemoveRule 规则之前)尝试对 RelNode 转换到 SemiJoin / AntiJoin。
下面是FlinkSubQueryRemoveRule.FILTER
规则将子查询转换到 Semi/Anti Join 流程图:
上面尝试将子查询转换到 SemiJoin/AntiJoin,如果子查询不能转换到 SemiJoin/AntiJoin,后续会使用 Calcite SubQueryRemoveRule
相关子查询消除规则 + RelDecorrelator.decorrelateQuery,来处理子查询。
使用FlinkSubQueryRemoveRule.FILTER
规则将子查询转换到 SemiJoin 或者 AntiJoin 后,其还是Logical RelNode。以 Flink Batch 为例,最终在决定 Join 的具体物理实现时,比如这里 Join 使用的是 BatchPhysicalHashJoin,BatchPhysicalHashJoin 的 translateToExecNode 方法,用来将 BatchPhysicalRel 转换到 Flink ExecNode 。在 translateToExecNode 方法中,最终会调用到 HashJoinOperator 的 newHashJoinOperator 方法,其会根据 Join 的具体类型,来创建相应的 Join 的 Operator。
对于 SemiJoin/AntiJoin,本质只是 Join 的两种类型,所以底层算子的实现,可以使用 HashJoinOperator 或者 NestedLoopJoinOperator 来实现,当然不同引擎可能也有不同的实现。
Flink 中对于 SemiJoin/AntiJoin 有自己相应的 Operator 的实现,整体上支持的场景会更加广泛。对于 Dremio-oss 来说,本身是没有 SemiJoin 和 AntiJoin 的优化,本质就是使用 Calcite 子查询消除优化规则(Calcite 中 SubQueryRemoveRule) + 解关联(RelDecorrelator.decorrelateQuery)逻辑之后计划来进行运算。
Presto 中主要P只支持 In 子查询是非关联的转换到 SemiJoin,Presto 会使用 TransformUncorrelatedInPredicateSubqueryToSemiJoin
来尝试将 In 子查询转换为 SemiJoin。这里需要注意,Presto SemiJoin 产出的结果,只是对于 Join 左边数据是否出现在右边的一个标记,还需要再上面增加 Filter + Project,根据标记过滤出在右边的数据。
Calcite 当前不支持 AntiJoin 的转换规则,对于 SemiJoin 的转换,能够使用SemiJoinRule
来将符合条件的 Join(Inner、Left)转换为 SemiJoin。
最后,都看到这了,如果对你有帮助的话,帮我点击一下在看和点赞,你的鼓励,是我更新的最大动力。