HashJoin是关联查询中最重要的算子,对于计算密集型应用,关联查询的性能瓶颈主要在于HashJoin算子Probe阶段的Hash查找和Data Shuffle上。RuntimeFilter是用于运行时优化HashJoin性能的一种常见方法,RuntimeFilter对于INNER JOIN, Right Join, Semi Join等都有显著的性能提升效果。目前RuntimeFilter技术已经在很多数据库中得以应用,比如SnowFlake(BloomJoins), Impala,EMR Spark,Apache doris,Starrocks,PolarDB-X等。
分析型数据库中星型模型是常见的建模方法。比较有代表性的测试集就是SSB(Star Schema Benchmark)。星型模型主要分为事实表和维度表,事实表一般是大表,比如SSB测试集中的lineorder表,维度表一般为小表,比如SSB测试集中的customer,date等。这里的大表小表准确来说是指表的Distinct记录数。RuntimeFilter对于这类星型的数据模型下的复杂查询有非常大的提升作用。
HashJoin实现上是通过内表(一般为较小的表)构建Hash Table,然后遍历外表(一般为大表)数据查找Hash Table,根据不同的Join类型输出匹配的结果。HashJoin中Hash table probe算子一般是最为耗时的过程,另外一个耗时的过程就是数据的Shuffle过程,一般它们也就是性能的瓶颈点。那么对HashJoin性能优化最朴素的思想是减少probe遍历的数据量或者减少数据移动的大小从而提升性能。RuntimeFilter的原理正是将probe操作push down到外表的Scan算子,使用更快的非精确查找算法(MINMAX,BloomFilter)或者更快的精确查找算法(HashSet)来提前过滤数据,从而提升整个查询的性能。RuntimeFilter与SemiJoin相比,不同在于RuntimeFilter需要下推到Scan,实现更少的数据移动。类似的优化思路比如mysql中的pickup join,通过内表过滤后的结果集,通过索引计算左表的值,如此便不需要计算hash,这对于内表非常小的场景效果最佳。
RuntimeFilter是在优化器的CBO阶段之后插入物理计划中的。具体来说,首先需要从根节点遍历整个查询计划树,找到HashJoinNode节点,然后找到该HashJoinNode的等值表达式,将Join右孩子节点的条件下推到左孩子节点是Scan的节点上。举例来说对于下面的Join查询,首先生成如下的物理计划,遍历第一个Join节点,将 t3.a加入到RuntimeFilters中,编号为RuntimeFilterId=0,然后继续遍历左子树,遇到Join节点,将t2.a加入到RuntimeFilters中,编号为RuntimeFilterId=1。然后遇到ScanNode节点时,将RuntimeFilters上的所有RuntimeFilter下推到Scan t1节点上。当然,在论文https://dl.acm.org/doi/pdf/10.1145/3318464.3389769也提到将RuntimeFilter纳入到CBO的代价估算中可能获得更优的执行计划。
并非所有的RuntimeFilter都可以下推,比如对于下面的查询
select count(*) from store t1 left outer join store t2 on t1.s_store_sk = t2.s_store_sk where coalesce(t2.s_store_sk + 100, 100) in (select ifnull(100, s_store_sk) from store);
按照上面的思路生成的查询计划是这样的
Query plan:
| 4:HASH JOIN
| | join op: LEFT SEMI JOIN (BROADCAST)
| | equal join conjunct: coalesce(`t2`.`s_store_sk` + 100, 100) = ifnull(100, `s_store_sk`)
| | runtime filters: RF000[in] <- ifnull(100, `s_store_sk`)
| |----7:EXCHANGE
| 3:HASH JOIN
| | join op: LEFT OUTER JOIN
| | equal join conjunct: `t1`.`s_store_sk` = `t2`.`s_store_sk`
| |----1:OlapScanNode
| | TABLE: store
| | runtime filters: RF000[in] -> coalesce(`t2`.`s_store_sk` + 100, 100)
| 0:OlapScanNode
| TABLE: store
但实际上正确的结果应该是0.所以在以上这种情况下,RuntimeFilter就不能下推。
除此之外,RuntimeFilter对于Left Outer Join,Anti Join,Full Outer Join等也不适用,这是因为外连接没有找到对应的数据时需要补NULL,而不能直接过滤掉。目前关于RuntimeFilter的限制可以参考https://doris.apache.org/docs/advanced/join-optimization/runtime-filter/
BloomFilter 中衡量 BloomFilter 的误判率称为 false positives。 false positives 可以通过以下公式计算得到 \epsilon = (1-(1-1/m)^{kn})^{k} \approx(1-e^{-kn/m})^{k}
所以对于给定的误判率,我们可以估算 BloomFilter 位数组的大小,m 即 BloomFilter 的位数组大小。 m=\frac {-kn}{ln(1-\epsilon^{1/k})}
k表示BloomFilter中hash函数的个数。这里非常重要的一点就是n,需要通过估算 number distinct value 值得到。
所以精确的统计信息有助于更高效的实现BloomFilter。
多个Filter在实现上有两种方式。
一种是短路计算,short-circuit 的优势在于逐步减少向下传递的数据量,所以过滤效果越好的过滤条件需要提前。所以依赖于优化器利用CBO提前准备好过滤顺序。这种方法在过滤效果较好时性能更好。
另一种是避免利用Filter的提前过滤,将多个Filter的计算移动到最后的AND或OR中,充分利用位运算加速最后的与或运算。这种方式稳定性较好,不受过滤效果的影响。
动态调整RuntimeFilter的顺序对提高查询性能会有一定的帮助。
RuntimeFilter一般是在HashJoin构建HashTable的时候构建出来的,主流的RuntimeFilterType有三种,In,BloomFilter,MinMax。在我们的实现中,可以根据HashTable读取的数据多少来决定使用哪种Type。一般来说In更适合数据量较少的场景。BloomFilter适合数据量较多的场景,MinMax通常可以与BloomFilter一起使用,在实现上统一抽象为一个RuntimeFilter。
分布式数据库中,HashJoin最常使用的有两种分布方式,BoardCastJoin和RedistirbuteJoin。如下图3中的Join(BC)代表的是BoardCastJoin,表示将小表广播到所有的节点。Join(Re)代表的是RedistributeJoin,表示根据JoinKey对大小表数据进行重分布。从实现上来看,BoardCastJoin更适合小表比较少的场景下,在每个Backend上,BoardCastJoin得到的RuntimeFilter都是完整的,可以直接下推到外表。而RedistributeJoin得到的RuntimeFilter需要先Merge后再Shuffle后,才能下推到外表。RedistributeJoin由于右表构建的RuntimeFilter是不完整的,如果将不完整的RuntimeFilter下推到Scan层,则有可能会漏掉部分数据,所以需要将所有RuntimeFilter合并后才可以下推到外表。多个RuntimeFilter可以通过And的逻辑运算同时过滤数据。
RuntimeFilterBuilder: 用于构建RuntimeFilter并更新RuntimeFilterMgr中对应的RuntimeFilterMerger中的RuntimeFilter。
RuntimeFilterMgr:Backend上全局唯一的数据结构,维护所有queryid到RuntimeFilterMerger的映射关系
RuntimeFilterMerger:维护当前执行query的所有RuntimeFilter结构,每个FilterId对应唯一的RuntimeFilterMergerEntity,而RuntimeFilterKey唯一对应FilterId, 多个RuntimeFilterKey可以对应同一个FilterId。FilterId是在优化器生成计划时生成的。
假设有两张表,每张表有两个col,t1(col1, col2), t2(col1, col2)。以查询select count(*) from t1 inner join t2 on t1.col1 = t2.col1 and t1.col1 = t2.col2为例,此时有两个RuntimeFilterKey , FilterId, RuntimeFilter的对应关系则如下所示。
在分布式数据库的并行执行框架中有两种方式,一种是算子间并行,一种是算子内并行。Pipeline是实现算子间并行的最好的方式,Pipeline在很多领域都有提及,比如CPU的pipeline流水线。Pipeline的核心在于调度,每个算子只做一件事,但不同算子可以并行执行,在一个Pipeline内部不必等待上一个算子完全执行完毕。算子内并行是将算子逻辑拆分为多个子算子,子算子执行同样的工作但处理不同的数据。在MPP执行引擎的并行执行框架中两者是同时存在的,从而实现最大的并行度,获得最大的性能收益。
在ClickHouse的Pipeline的实现中,Scan算子要处理的数据块Granule是在Pipelien生成阶段确定的。而RuntimeFilter是在PipeLine执行时才可以确定,所以在Pipeline上实现RuntimeFilter的简单的 方式是通过插入Filter operator(Function)算子的方式,具体实现的是通过Internal Function函数实现的。Internal Function在执行时会通过RuntimeFilterMgr获取对应的RuntimeFilter,如果可以拿到则使用RuntimeFilter来过滤数据,如果没有则直接返回当前数据,由于runtimefilter一般执行足够快,一般在10ms-100ms内,所以这里不会成为瓶颈。
实际上ClickHouse中可以通过主键过滤和PreWhere来提前过滤数据,从而减少IO,这就需要等待RuntimeFilter生成。当RuntimeFilter对应的key是主键索引或者二级索引时,等待RuntimeFilter可以获得更优的性能。
Pipeline中实现的RuntimeFilter具有以下优势:
但是RuntimeFilter也是双刃剑,
目前在Pipeline上实现的效果如下,下图是 RuntimeFilter开启前后SSB Join 100GB测试集的性能对比结果。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。