Loading [MathJax]/jax/output/CommonHTML/config.js
前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >专栏 >Impala 从 Fragment -> DataSink -> RowBatch 粗讲

Impala 从 Fragment -> DataSink -> RowBatch 粗讲

原创
作者头像
jasong
发布于 2021-12-27 13:44:08
发布于 2021-12-27 13:44:08
67200
代码可运行
举报
文章被收录于专栏:ClickHouseClickHouse
运行总次数:0
代码可运行

Impala

Impala 如果是Fragment 一步一步 Prepare -> Open -> Read -> Close

Fragment -> DataSink --> RowBatch
Fragment -> DataSink --> RowBatch

PlanNode

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Status PlanNode::CreatePlanNode(ObjectPool * pool, const TPlanNode & tnode, PlanNode ** node)
{
    switch (tnode.node_type)
    {
        case TPlanNodeType::HDFS_SCAN_NODE:
            *node = pool->Add(new HdfsScanPlanNode());
            break;
        case TPlanNodeType::HBASE_SCAN_NODE:
        case TPlanNodeType::DATA_SOURCE_NODE:
        case TPlanNodeType::KUDU_SCAN_NODE:
            *node = pool->Add(new ScanPlanNode());
            break;
        case TPlanNodeType::AGGREGATION_NODE:
            *node = pool->Add(new AggregationPlanNode());
            break;
        case TPlanNodeType::HASH_JOIN_NODE:
            *node = pool->Add(new PartitionedHashJoinPlanNode());
            break;
        case TPlanNodeType::NESTED_LOOP_JOIN_NODE:
            *node = pool->Add(new NestedLoopJoinPlanNode());
            break;
        case TPlanNodeType::EMPTY_SET_NODE:
            *node = pool->Add(new EmptySetPlanNode());
            break;
        case TPlanNodeType::EXCHANGE_NODE:
            *node = pool->Add(new ExchangePlanNode());
            break;
        case TPlanNodeType::SELECT_NODE:
            *node = pool->Add(new SelectPlanNode());
            break;
        case TPlanNodeType::SORT_NODE:
            if (tnode.sort_node.type == TSortType::PARTIAL)
            {
                *node = pool->Add(new PartialSortPlanNode());
            }
            else if (tnode.sort_node.type == TSortType::TOPN || tnode.sort_node.type == TSortType::PARTITIONED_TOPN)
            {
                *node = pool->Add(new TopNPlanNode());
            }
            else
            {
                DCHECK(tnode.sort_node.type == TSortType::TOTAL);
                *node = pool->Add(new SortPlanNode());
            }
            break;
        case TPlanNodeType::UNION_NODE:
            *node = pool->Add(new UnionPlanNode());
            break;
        case TPlanNodeType::ANALYTIC_EVAL_NODE:
            *node = pool->Add(new AnalyticEvalPlanNode());
            break;
        case TPlanNodeType::SINGULAR_ROW_SRC_NODE:
            *node = pool->Add(new SingularRowSrcPlanNode());
            break;
        case TPlanNodeType::SUBPLAN_NODE:
            *node = pool->Add(new SubplanPlanNode());
            break;
        case TPlanNodeType::UNNEST_NODE:
            *node = pool->Add(new UnnestPlanNode());
            break;
        case TPlanNodeType::CARDINALITY_CHECK_NODE:
            *node = pool->Add(new CardinalityCheckPlanNode());
            break;
        default:
            map<int, const char *>::const_iterator i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type);
            const char * str = "unknown node type";
            if (i != _TPlanNodeType_VALUES_TO_NAMES.end())
            {
                str = i->second;
            }
            stringstream error_msg;
            error_msg << str << " not implemented";
            return Status(error_msg.str());
    }
    return Status::OK();
}

DataSinkConfig 创建

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Status DataSinkConfig::CreateConfig(
    const TDataSink & thrift_sink, const RowDescriptor * row_desc, FragmentState * state, DataSinkConfig ** data_sink)
{
    ObjectPool * pool = state->obj_pool();
    *data_sink = nullptr;
    switch (thrift_sink.type)
    {
        case TDataSinkType::DATA_STREAM_SINK:
            if (!thrift_sink.__isset.stream_sink)
                return Status("Missing data stream sink.");
            // TODO: figure out good buffer size based on size of output row
            *data_sink = pool->Add(new KrpcDataStreamSenderConfig());
            break;
        case TDataSinkType::TABLE_SINK:
            if (!thrift_sink.__isset.table_sink)
                return Status("Missing table sink.");
            switch (thrift_sink.table_sink.type)
            {
                case TTableSinkType::HDFS:
                    *data_sink = pool->Add(new HdfsTableSinkConfig());
                    break;
                case TTableSinkType::KUDU:
                    RETURN_IF_ERROR(CheckKuduAvailability());
                    *data_sink = pool->Add(new KuduTableSinkConfig());
                    break;
                case TTableSinkType::HBASE:
                    *data_sink = pool->Add(new HBaseTableSinkConfig());
                    break;
                default:
                    stringstream error_msg;
                    map<int, const char *>::const_iterator i = _TTableSinkType_VALUES_TO_NAMES.find(thrift_sink.table_sink.type);
                    const char * str = i != _TTableSinkType_VALUES_TO_NAMES.end() ? i->second : "Unknown table sink";
                    error_msg << str << " not implemented.";
                    return Status(error_msg.str());
            }
            break;
        case TDataSinkType::PLAN_ROOT_SINK:
            *data_sink = pool->Add(new PlanRootSinkConfig());
            break;
        case TDataSinkType::HASH_JOIN_BUILDER: {
            *data_sink = pool->Add(new PhjBuilderConfig());
            break;
        }
        case TDataSinkType::NESTED_LOOP_JOIN_BUILDER: {
            *data_sink = pool->Add(new NljBuilderConfig());
            break;
        }
        default:
            stringstream error_msg;
            map<int, const char *>::const_iterator i = _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
            const char * str = i != _TDataSinkType_VALUES_TO_NAMES.end() ? i->second : "Unknown data sink type ";
            error_msg << str << " not implemented.";
            return Status(error_msg.str());
    }
    RETURN_IF_ERROR((*data_sink)->Init(thrift_sink, row_desc, state));
    return Status::OK();
}

ScanNode

代码语言:javascript
代码运行次数:0
运行
AI代码解释
复制
Status ScanPlanNode::CreateExecNode(RuntimeState * state, ExecNode ** node) const
{
    ObjectPool * pool = state->obj_pool();
    switch (tnode_->node_type)
    {
        case TPlanNodeType::HBASE_SCAN_NODE:
            *node = pool->Add(new HBaseScanNode(pool, *this, state->desc_tbl()));
            break;
        case TPlanNodeType::DATA_SOURCE_NODE:
            *node = pool->Add(new DataSourceScanNode(pool, *this, state->desc_tbl()));
            break;
        case TPlanNodeType::KUDU_SCAN_NODE:
            if (tnode_->kudu_scan_node.use_mt_scan_node)
            {
                DCHECK_GT(state->query_options().mt_dop, 0);
                *node = pool->Add(new KuduScanNodeMt(pool, *this, state->desc_tbl()));
            }
            else
            {
                DCHECK(state->query_options().mt_dop == 0 || state->query_options().num_scanner_threads == 1);
                *node = pool->Add(new KuduScanNode(pool, *this, state->desc_tbl()));
            }
            break;
        default:
            DCHECK(false) << "Unexpected scan node type: " << tnode_->node_type;
    }
    return Status::OK();
}

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。

如有侵权,请联系 cloudcommunity@tencent.com 删除。

评论
登录后参与评论
暂无评论
推荐阅读
编辑精选文章
换一批
impala be query plan 3 prepare->open->close
为特定查询创建的所有后端执行状态的中心类(例如:各个片段实例的FragmentInstanceStates)。此类包含或使可访问状态在片段实例之间共享;相反,片段实例特定的状态收集在FragmentInstanceState中。QueryState的生存期由引用计数决定。代表查询执行并访问其任何状态的任何线程都必须获取对相应QueryState的引用,并至少在该访问期间保持该引用。通过QueryExecMgr::Get-/ReleaseQueryState()或QueryState::ScopedRef(后者用于仅限于单个函数或块范围的引用)获取和发布引用。只要引用计数大于0,查询的所有控制结构(包含在该类中或可通过该类访问,如FragmentInstanceStates)都保证是活动的。
jasong
2022/09/28
6100
Impala Plan Schedule
数据库中一个逻辑查询计划生成后, 需要进行ToPhysical Plan 转化为物理的查询计划, 本文主要讲解 Scan算子 是怎么初始化和分发的, 其他算子可类推
jasong
2024/09/23
1710
Impala 疑问
C++ 指针传递是有问题吗? 为啥语言传达不了信息1. 调用前后 指针的地址是没有变化的2. eeeeeeeeeeeeStatus PlanNode::CreateTreeHelper(FragmentState* state, const std::vector<TPlanNode>& tnodes, PlanNode* parent, int* node_idx, PlanNode** root) { // propagate error case if (*node_idx >= tn
jasong
2022/08/09
3590
Impala be query plan2 - AdmissionController
AdmissionController 用于根据在一个或多个资源池中配置的可用集群资源限制请求(例如查询、DML)。请求将被允许立即执行、排队等待稍后执行或拒绝(立即或排队后)。资源池可以配置为具有最大并发查询数、最大集群范围内存、最大队列大小、每个查询的最大和最小每主机内存限制,并设置mem_limit查询选项是否会被前面提到的最大/最小每主机限制限制限制。如果执行的查询太多或可用内存不足,查询将排队。一旦队列达到最大队列大小,传入的查询将被拒绝。队列中的请求将在可配置的超时后超时。
jasong
2022/09/28
3610
Apache Impala RowBatch/Tuple/Row/Slot
RowBatch 即对一批行(TupleRow)的封装, 每一行有多个元祖Tuple组成, 最大的行数在构建时是固定的, RowBatch 算子之间交互的最小的单位
jasong
2024/04/07
2090
Impala ImpalaServer QueryHander, ClientRequestState, Coordinator, Scheduler 关系
jasong
2024/06/06
1670
Impala HDFS_SCAN_NODE之AverageHdfsReadThreadConcurrency
在Impala的HDFS_SCAN_NODE中有一个counter,叫AverageHdfsReadThreadConcurrency,其相关解释如下所示:
skyyws
2022/05/20
4270
Impala HDFS_SCAN_NODE之AverageHdfsReadThreadConcurrency
PHP与Memcached服务器交互的分布式实现源码分析
前段 时间,因为一个 项目的关系,研究了php通过调用 memcache和 memcached PECL扩展库的接口存储到分布式缓存 服务器的机制,在此做我根据他们各自的 源码进行分析,希望能对这方面感兴趣的人有些帮助。 本篇文章我会针对php和memcache扩展库的交互根据源码展开分析。 PHP调用memcache的接口通常会是如下过程:
黄规速
2022/04/14
6930
Impala 3.4 SQL查询之ScanRange详解(四)
在上篇文章中,我们主要介绍了ScanRange的构造,以及在FE和BE端的一些处理流程。同时,我们还介绍了IO thead处理模型中一个比较重要的对象RequestContext::PerDiskState,以及部分成员变量的含义,在本篇文章中,我们将介绍其中一个比较重要的成员:unstarted_scan_ranges_。
skyyws
2022/05/20
4190
Impala 3.4 SQL查询之ScanRange详解(四)
【Android 逆向】ART 脱壳 ( DexClassLoader 脱壳 | oat_file_assistant.cc 中涉及的 oat 文件生成流程 )
在上一篇博客 【Android 逆向】ART 脱壳 ( DexClassLoader 脱壳 | DexClassLoader 构造函数 | 参考 Dalvik 的 DexClassLoader 类加载流程 ) 中 , 分析了 ART 虚拟机下 DexClassLoader 类加载器加载 dex 文件的 Java 层流程 , 与 Dalvik 虚拟机下基本一致 , 从 native 层开始不一致 , 本篇博客开始分析 native 层的类加载流程 ;
韩曙亮
2023/03/30
8600
Impala 3.4 SQL查询之ScanRange详解(三)
我们在本系列的前两篇文章中,简单介绍了SQL查询的整个流程以及重写的相关知识。在接下来的这几篇中,会跟大家一起详细学习ScanRange的知识。由于涉及到的内容非常多,因此会分成几篇来讲解,主要会涉及到HDFS_SCAN_NODE、IO thread等知识。由于现在相关的文档比较少,这些文章都是笔者根据代码和实际调试结果整理出来的,如有错误,欢迎指正。默认情况下,本文涉及到的测试表都是HDFS上的parquet表,并且是以天为分区。
skyyws
2022/05/20
5900
Impala 3.4 SQL查询之ScanRange详解(三)
万字整理 | 深入理解工作队列
伟林,中年码农,从事过电信、手机、安全、芯片等行业,目前依旧从事Linux方向开发工作,个人爱好Linux相关知识分享,个人微博CSDN pwl999,欢迎大家关注! 1.1 worker_pool 1.1.1 normal worker_pool 1.1.2 unbound worker_pool 1.2 worker 1.2.1 worker处理work 1.2.2 worker_pool动态管理worker 1.2.3 cpu hotplug处理 1.3 workqueue 1.3.1 系统workq
刘盼
2022/08/26
1.9K0
万字整理 | 深入理解工作队列
Ceph df分析
GLOBAL中的RAW USED :34901G, AVAIL:26383G POOLS 中USED:11603G3 + (43928k/1024/1024)3 = 34809.123G MAX AVAIL:20793G 发现问题没,pools使用的跟global有偏差,少了一部分数据。
Lucien168
2020/07/20
3K0
Ceph df分析
Impala:Impalad impala-server beeswax 调用过程及关系图
Main->ImapadMain->ImpalaServer->ThriftServer
jasong
2022/03/07
9260
Impala profile相关参数介绍(一)
Impala原生提供了每个SQL执行过程中的profile信息,profile里面有很多的参数可以供我们参考,来排查SQL执行过程中遇到的各种问题。由于目前官方没有对这些参数进行一一解释,因此本文旨在通过阅读代码的方式,来介绍一些在实际使用过程中碰到的参数,希望对大家有所帮助。首先要介绍的是如下所示的几个参数:
skyyws
2022/05/20
8230
AudioRecord源码解读(4)
本篇介绍下AudioRecord的线程运行,以及startRecording,stop,pause等流程。
一只小虾米
2022/10/25
1.9K0
读 NebulaGraph源码 | 查询语句 LOOKUP 的一生
LOOKUP 是图数据库 NebulaGraph 的一个查询语句。它依赖索引,可以查询点或者边的信息。在本文,我将着重从源码的角度解析一下 LOOKUP 语句的一生是如何度过的。
NebulaGraph
2023/01/05
1.5K0
读 NebulaGraph源码 | 查询语句 LOOKUP 的一生
Help - hack the box
To be honest, Help is not a difficult box. But there are some rabbit holes in the box. And in some case, you may come across some very strange situations. May you should step back, find if there is something wrong. For the PrivEsc of root, never give up trying the most basic method.
madneal
2019/11/28
1.2K0
一次Impala upsert kudu执行缓慢问题排查总结
BI同学会用Impala在Kudu表上跑一些ETL任务,最近,BI同学反馈一个Kudu表的ETL任务突然变慢,执行时间从原来的不到1分钟到现在的7分钟。
Fayson
2020/03/10
3.8K0
一次Impala upsert kudu执行缓慢问题排查总结
Impala HDFS_SCAN_NODE之IO threads模型
本文主要从代码出发,跟大家一起分享下Impala HDFS_SCAN_NODE中的IO threads模型。首先,在Impala中,有几个io threads相关的配置,通过对这几个参数进行配置,我们就可以增加处理io的线程数,相关的几个配置如下所示:
skyyws
2022/05/20
6520
Impala HDFS_SCAN_NODE之IO threads模型
相关推荐
impala be query plan 3 prepare->open->close
更多 >
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档
本文部分代码块支持一键运行,欢迎体验
本文部分代码块支持一键运行,欢迎体验