阅读本文前,请先阅读:
如上图,sql text 到物理执行计划主要分几个阶段:
上例中生成的 java code 见下文
该过程主要封装在 OLAPToEnumerableConverter#implement
中,主要流程如下:
implementOLAP、implementRewrite、implementEnumerable 为 OLAPRel 接口的方法,每个 OLAPRel 实现类都要有自己的实现,虽然各个实现不同,但可以进行一些归纳:
void implementOLAP(OLAPImplementor implementor)
:
void implementRewrite(RewriteImplementor rewriter)
:
EnumerableRel implementEnumerable(List<EnumerableRel> inputs)
:
EnumerableRel#implement
方法返回的 Result 用来生成该物理节点对应的 java code我们以概览中的 sql 来作为示例来对生成物理执行计划的过程进行分析
我们对以下几个被修改的实例进一步说明:
由于 firstTableScan 会被当做是 factTable,与概览中的 sql 同义的下面这条 sql 查询时会报 No realization found
的异常,这是因为 Kylin 很不智能的把 left table 作为 firstTableScan(及对应 factTable),但在 Kylin 中没有用以 KYLIN_SALES 为事实表的 model/cube:
SELECT KYLIN_SALES.TRANS_ID, SUM(KYLIN_SALES.PRICE), COUNT(KYLIN_ACCOUNT.ACCOUNT_ID)
FROM KYLIN_ACCOUNT
INNER JOIN KYLIN_SALES ON KYLIN_SALES.BUYER_ID = KYLIN_ACCOUNT.ACCOUNT_ID
WHERE KYLIN_SALES.LSTG_SITE_ID != 1000
GROUP BY KYLIN_SALES.TRANS_ID
ORDER BY TRANS_ID
LIMIT 10;
注①:为什么 OLAPTableScan 除了自身的 tableColumns 外,还会包含 metricColumns ?
metricsColumns 命名规则:
COUNT
,返回 _KY_COUNT_
COUNT (DISTINCT KYLIN_SALES.TRANS_ID)
,返回_KY_COUNT_DISTINCT_1_3c0c94b7_TRANS_ID_
SUM(KYLIN_SALES.PRICE)
,返回 _KY_SUM_1_3c0c94b7_PRICE_
其中 1_3c0c94b7 是 KYLIN_SALES 的别名,别名的目的是为了防止出现计算的 SUM(KYLIN_.SALESPRICE)
和 SUM(KYLIN_SALES.PRICE)
的 metricsColumn name 一样的问题
我们对以下几个被修改的实例进一步说明:
this.hasJoin && !this.afterJoin
,则 OLAPProjectRel 不会进行 rewrite(visitChild 除外)。这是因为 OLAPProjectRel rewrite 干的事情主要是增加 projectList,增加的是对维度做 agg 的度量列,OLAPAggregateRel 使用该新增的度量列进行 aggregation 部分的 rewrite(比如 OLAPProjectRel rewrite 增加了 Count 的 metrics 列,OLAPAggregateRel 会对该 metrics 列做 SUM 来替换对相应维度列的 COUNT)仅支持最内层的 agg 出现 count distinct 的一个示例如下
SELECT COUNT(DISTINCT TID)
FROM (
SELECT KYLIN_SALES.TRANS_ID AS TID, SUM(KYLIN_SALES.PRICE), COUNT(KYLIN_ACCOUNT.ACCOUNT_ID)
FROM KYLIN_SALES
INNER JOIN KYLIN_ACCOUNT ON KYLIN_SALES.BUYER_ID = KYLIN_ACCOUNT.ACCOUNT_ID
WHERE KYLIN_SALES.LSTG_SITE_ID != 1000
GROUP BY KYLIN_SALES.TRANS_ID
) a
报错图片上传失败...(image-2b7d64-1558959393134)其实这里可以做个优化,对于这种情况的外层 COUNT DISTINCT 其实可以先对 subQuery 使用预计算
整个过程封装在 RealizationChooser#``selectRealization
中,分为几步来讲
model -> ``Set<``IRealization``>
进行排序,得到 modelMap: Map<DataModelDesc, Set<IRealization>>
如果 modelMap 为空,则抛 No model found for ...
异常
遍历 modelMap: Map<DataModelDesc, Set<IRealization>> 每一个 entry:
IRealization selectRealization(OLAPContext olapContext, Set<IRealization> realizations)
逻辑如下:
kylin.query.realization-filter
过滤的如上,主要分两步:
若 context.rewriteFields 不为空,则说明后续 OLAPAggregateRel#implementRewrite part2 会需要把对源表列的 agg 操作重写为对 cube metrics 列的 agg,这这里需要准备好 OLAPAggregateRel#implementRewrite part2 需要的 metrics 列
下面流程图按下标遍历 aggCalls 中的每个元素 aggCall,下标为 i
把对源表列的 agg 操作重写为对 cube metrics 列的 agg,其中如果是 COUNT 操作,需要重写为 SUM。需要注意的是,在这些 OLAPRel 中,columnRowType 各个 col 主要是通过在 input.columnRowType 中的 index 来引用,而不是直接使用 name(当然也会包含 name)
本例中:
SUM(KYLIN_SALES.PRICE)
重写为 SUM(_KY_SUM_1_3c0c94b7_PRICE_)
_KY_SUM_1_3c0c94b7_PRICE_
在 input.columnRowType 中 index 为 4COUNT(KYLIN_ACCOUNT.ACCOUNT_ID)
重写为 SUM(_KY_COUNT__)
_KY_COUNT__
在 input.columnRowType 中 index 3由于 Calcite 各个物理节点及 code gen 涉及代码及模块非常多,暂不在这里展开
每个EnumerableRel#implement
方法返回的 Result 都会生成一段 java code,parent EnumerableRel 生成的 java code 还会包含 child 生成的 java code,最终最顶层的 EnumerableRel 生成的 java code 就是完整的。
在 Kylin 中,OLAPJoinRel 对应的物理节点还是其自身,当 OLAPJoinRel#implement 生成用于生成 java code 的 Result 时,并不会使用到其 children,而是直接使用 OLAPContext.firstTableScan 作为事实表来获取其对应的 OLAPQuery 实例,如本例中的 join 生成的最终代码如下
return ((org.apache.kylin.query.schema.OLAPTable) root.getRootSchema()
.getSubSchema("DEFAULT").getTable("KYLIN_SALES")).executeOLAPQuery(root, 0);
事实上,虽然 OLAPJoinRel#implement 没有直接使用 children 生成的代码,但其 left OLAPTableScan#implement 得到的 Result 生成的代码也是
return ((org.apache.kylin.query.schema.OLAPTable) root.getRootSchema()
.getSubSchema("DEFAULT").getTable("KYLIN_SALES")).executeOLAPQuery(root, 0);
另外,OLAPToEnumerableConverter 也继承了 EnumerableRel,实现了自己的 implement 物化方法,也就是触发了本文中所有:
上述例子生成的 java 代码如下:
// _inputEnumerable 为 OLAPQuery 类型,OLAPQuery
final org.apache.calcite.linq4j.Enumerable<java.lang.Object[]> _inputEnumerable = ((org.apache.kylin.query.schema.OLAPTable) root.getRootSchema().getSubSchema("DEFAULT").getTable("KYLIN_SALES")).executeOLAPQuery(root, 0);
final org.apache.calcite.linq4j.AbstractEnumerable child = new org.apache.calcite.linq4j.AbstractEnumerable(){
public org.apache.calcite.linq4j.Enumerator<Object[]> enumerator() {
return new org.apache.calcite.linq4j.Enumerator<Object[]>(){
// 类型,OLAPQuery.enumerator() 得到的 inputEnumerator 为 OLAPEnumerator 类型
// inputEnumerator 会调用 StorageEngine 去 HBase 中查询指定 cube、指定 cuboid(及可能的 filter 下推)数据
public final org.apache.calcite.linq4j.Enumerator<Object[]> inputEnumerator = _inputEnumerable.enumerator();
public void reset() {
inputEnumerator.reset();
}
public boolean moveNext() {
while (inputEnumerator.moveNext()) {
final Integer inp4_ = (Integer) ((Object[]) inputEnumerator.current())[4];
if (inp4_ != null && inp4_.intValue() != 1000) {
return true;
}
}
return false;
}
public void close() {
inputEnumerator.close();
}
public Object current() {
final Object[] current = (Object[]) inputEnumerator.current();
return new Object[] {
current[0],
current[5],
current[13],
current[11],
current[10]};
}
};
}
};
return child.groupBy(new org.apache.calcite.linq4j.function.Function1() {
public Long apply(Object[] a0) {
return (Long) a0[0];
}
public Object apply(Object a0) {
return apply(
(Object[]) a0);
}
}
, new org.apache.calcite.linq4j.function.Function0() {
public Object apply() {
java.math.BigDecimal a0s0;
boolean a0s1;
a0s1 = false;
a0s0 = new java.math.BigDecimal(0L);
long a1s0;
a1s0 = 0;
Record3_0 record0;
record0 = new Record3_0();
record0.f0 = a0s0;
record0.f1 = a0s1;
record0.f2 = a1s0;
return record0;
}
}
, new org.apache.calcite.linq4j.function.Function2() {
public Record3_0 apply(Record3_0 acc, Object[] in) {
final java.math.BigDecimal inp4_ = in[4] == null ? (java.math.BigDecimal) null : org.apache.calcite.runtime.SqlFunctions.toBigDecimal(in[4]);
if (inp4_ != null) {
acc.f1 = true;
acc.f0 = acc.f0.add(inp4_);
}
acc.f2 = acc.f2 + org.apache.calcite.runtime.SqlFunctions.toLong(in[3]);
return acc;
}
public Record3_0 apply(Object acc, Object in) {
return apply(
(Record3_0) acc,
(Object[]) in);
}
}
, new org.apache.calcite.linq4j.function.Function2() {
public Object[] apply(Long key, Record3_0 acc) {
return new Object[] {
key,
acc.f1 ? acc.f0 : (java.math.BigDecimal) null,
acc.f2};
}
public Object[] apply(Object key, Object acc) {
return apply(
(Long) key,
(Record3_0) acc);
}
}
).orderBy(new org.apache.calcite.linq4j.function.Function1() {
public Long apply(Object[] v) {
return (Long) v[0];
}
public Object apply(Object v) {
return apply(
(Object[]) v);
}
}
, org.apache.calcite.linq4j.function.Functions.nullsComparator(false, false)).take(10);
我们可以看到,整个计算过程迭代的读取指定 cube、指定 cuboid 数据,并执行相应的计算逻辑,是一个基于内存的单机计算过程
SELECT KYLIN_SALES.TRANS_ID * 6, SUM(KYLIN_SALES.PRICE) + 1, COUNT(KYLIN_ACCOUNT.ACCOUNT_ID)
FROM KYLIN_SALES
INNER JOIN KYLIN_ACCOUNT ON KYLIN_SALES.BUYER_ID = KYLIN_ACCOUNT.ACCOUNT_ID
WHERE KYLIN_SALES.LSTG_SITE_ID != 1000
GROUP BY KYLIN_SALES.TRANS_ID
ORDER BY TRANS_ID
LIMIT 10;
通过多加了一层 Project 来实现
SELECT KYLIN_SALES.TRANS_ID, SUM(KYLIN_SALES.PRICE), COUNT(KYLIN_ACCOUNT.ACCOUNT_ID + 100)
FROM KYLIN_SALES
INNER JOIN KYLIN_ACCOUNT ON KYLIN_SALES.BUYER_ID = KYLIN_ACCOUNT.ACCOUNT_ID
WHERE KYLIN_SALES.LSTG_SITE_ID != 1000
GROUP BY KYLIN_SALES.TRANS_ID
ORDER BY TRANS_ID
LIMIT 10;
SELECT COUNT(DISTINCT TID)
FROM (
SELECT KYLIN_SALES.TRANS_ID AS TID, SUM(KYLIN_SALES.PRICE), COUNT(KYLIN_ACCOUNT.ACCOUNT_ID)
FROM KYLIN_SALES
INNER JOIN KYLIN_ACCOUNT ON KYLIN_SALES.BUYER_ID = KYLIN_ACCOUNT.ACCOUNT_ID
WHERE KYLIN_SALES.LSTG_SITE_ID != 1000
GROUP BY KYLIN_SALES.TRANS_ID
) a
报错
其实这里可以做个优化,对于这种情况的外层 COUNT DISTINCT 其实可以先对 subQuery 使用预计算
SELECT KYLIN_SALES.TRANS_ID, SUM(KYLIN_SALES.PRICE + 100), COUNT(KYLIN_ACCOUNT.ACCOUNT_ID)
FROM KYLIN_SALES
INNER JOIN KYLIN_ACCOUNT ON KYLIN_SALES.BUYER_ID = KYLIN_ACCOUNT.ACCOUNT_ID
WHERE KYLIN_SALES.LSTG_SITE_ID != 1000
GROUP BY KYLIN_SALES.TRANS_ID
ORDER BY TRANS_ID
LIMIT 10
报错
SELECT KYLIN_SALES.TRANS_ID, SUM(KYLIN_SALES.PRICE), COUNT(KYLIN_ACCOUNT.ACCOUNT_ID)
FROM KYLIN_ACCOUNT
INNER JOIN KYLIN_SALES ON KYLIN_SALES.BUYER_ID = KYLIN_ACCOUNT.ACCOUNT_ID
WHERE KYLIN_SALES.LSTG_SITE_ID != 1000
GROUP BY KYLIN_SALES.TRANS_ID
ORDER BY TRANS_ID
LIMIT 10
报错
Kylin 机械的将 join 坐表作为 factTable
SELECT SUM(KYLIN_SALES.PRICE) FROM KYLIN_SALES
查询成功
SELECT SUM(PRICE) FROM (
SELECT * FROM KYLIN_SALES LIMIT 1000
) A
报错
Kylin 是怎么做到 grouping 和 agg 补偿的?答:在计算哪个 cuboid 可满足 query 的时候,会优先根据 grouping cols、agg cols、filter cols 来计算一个 cuboid id:
CuboidScheduler#findBestMatchCuboid
中,比如当 cuboid id 为 001000000000000100
的 cuboid 不存在,会使用 id 为 111111111111111111
的 cuboid上述使用替代的 cuboid 与 grouping 补偿和 agg 补偿原理一致,均是通过更细粒度的 grouping 或 agg 来实现
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有