前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Impala 3.4 SQL查询之ScanRange详解(五)

Impala 3.4 SQL查询之ScanRange详解(五)

作者头像
skyyws
发布2022-05-20 08:41:21
5210
发布2022-05-20 08:41:21
举报

在上篇文章中,我们介绍了PerDiskState的unstarted_scan_ranges_这个队列的更新逻辑,主要就是成员的入队和出队。总结下来就是:HdfsScanNode会获取每个文件的footer ScanRange,然后入队;IO thread会通过RequestContext获取对应的PerDiskState,然后出队,并设置到next_scan_range_to_start_成员,同时入队到RequestContext的ready_to_start_ranges_队列。IO thead并不会直接从unstarted_scan_ranges_获取对象,进行scan操作,而是会从另外一个队列in_flight_ranges_中获取对象,返回并进行后续的操作。在本文中,我们同样会结合代码,一起学习下,in_flight_ranges_队列是如何更新的。

ScanRange分配buffer

首先,我们来看下ScanRange的buffer分配问题。在将ScanRange放到in_flight_ranges_队列之前,需要先给ScanRange分配buffer,只有当分配了buffer之后,IO thread才能进行实际的scan操作。Buffer分配的主要处理就是在AllocateBuffersForRange函数中。我们先来看下主要的处理逻辑:

代码语言:javascript
复制
// DiskIoMgr::AllocateBuffersForRange()
  vector<unique_ptr<BufferDescriptor>> buffers;
  for (int64_t buffer_size : ChooseBufferSizes(range->bytes_to_read(), max_bytes)) {
    BufferPool::BufferHandle handle;
    status = bp->AllocateBuffer(bp_client, buffer_size, &handle);
    if (!status.ok()) goto error;
    buffers.emplace_back(new BufferDescriptor(range, bp_client, move(handle)));
  }
  
// DiskIoMgr::ChooseBufferSizes()
// 删除了部分代码,只保留了关键的部分
vector<int64_t> DiskIoMgr::ChooseBufferSizes(int64_t scan_range_len, int64_t max_bytes) {
  while (bytes_allocated < scan_range_len) {
    int64_t bytes_remaining = scan_range_len - bytes_allocated;
    int64_t next_buffer_size;
    if (bytes_remaining >= max_buffer_size_) {
      next_buffer_size = max_buffer_size_;
    } else {
      next_buffer_size =
          max(min_buffer_size_, BitUtil::RoundUpToPowerOfTwo(bytes_remaining));
    }
    if (next_buffer_size + bytes_allocated > max_bytes) {
      if (bytes_allocated > 0) break;
      next_buffer_size = BitUtil::RoundDownToPowerOfTwo(max_bytes);
    }
    buffer_sizes.push_back(next_buffer_size);
    bytes_allocated += next_buffer_size;
  }
  return buffer_sizes;
}

这里主要涉及到两个参数:bytes_to_read_,表示这个ScanRange需要read的字节数;max_bytes,是一个阈值,我们这里先不展开它的获取方式,后面再介绍。接着,在ChooseBufferSizes函数中,会根据这个两个参数,来循环构造buffer,所有的buffer都放到一个vector中。这里的max_buffer_size_对应的就是read_size参数,默认是8M;min_buffer_size_对应的是min_buffer_size参数,默认是8K。代码的主要逻辑就是:

  1. 如果待分配字节数(初始就是range的bytes_to_read_)大于max_buffer_size_,则直接分配一个max_buffer_size_大小的buffer,加入到vector中;如果小于max_buffer_size_,则取待分配字节数和min_buffer_size_较大的,保证分配的buffer不会小于min_buffer_size_;
  2. 如果分配的buffer总大小超过了max_bytes限制,则结束此次分配,也就是说,给ScanRange一次分配的buffer数量,不一定能够保证所有的bytes_to_read_都足够读取,必须小于max_bytes;

当获取了需要的buffer之后,我们根据这些buffer,构造BufferDescriptor,更新ScanRange的unused_iomgr_buffer_bytes_和unused_iomgr_buffers_成员。然后IO thread就会获取buffer,进行后续的scan操作。

IO thread处理ScanRange流程

当IO thread获取到ScanRange的对象之后,就会进行实际的scan操作。整个ScanRange的处理流程如下所示:

这里有几点需要注意:

  1. 需要先获取buffer,才能进行scan操作,如果没有可用的buffer,则直接返回,需要scanner线程分配buffer之后,才能继续;
  2. 如果本次操作完成之后,当前的ScanRange还没有读完,需要放回in_flight_range队列,等待再次处理;
  3. 保存数据的buffer,会更新到ScanRange的ready_buffers_成员,后续scanner线程会获取ready_buffers_中的buffer,进行处理;
Impala处理parquet格式文件

接着我们再来看下Impala对于parquet格式的文件是如何处理的。这个对于后面Impala处理ScanRange的介绍有一定的帮助。首先简单看下parquet的文件结构:

一个parquet文件主要包括三个部分:header和footer以及中间的数据区,数据区由多个RowGroup组成,每个RowGroup包含一批数据;每个RowGroup又分为多个ColumnChunk,每个ColumnChunk表示一个列的数据;ColumnChunk又包含多个DataPage,这是数据存储的最小单元。

为了读取parquet文件的数据,针对上述文件结构,Impala也设计了相应的类进行处理,如下所示:

结合上述的UML,我们将处理流程归纳为如下几点:

  1. 对于每一个split,executor都会构造一个HdfsParquetScanner(如果是其他的文件格式,则是其他的scanner对象);
  2. HdfsParquetScanner会根据SQL中涉及列,来构造ParquetColumnReader,或者是其子类BaseScalarColumnReader,每一个reader负责处理一个列的数据;
  3. 一个split,可能会包含多个RowGroup,Impala会根据RowGroup中的ColumnChunk信息,来初始化BaseScalarColumnReader中的ParquetColumnChunkReader对象,ParquetColumnChunkReader主要负责从data pages中读取数据、解压、数据buffer的拷贝等;
  4. 在初始化ParquetColumnChunkReader的时候,会一并初始化的它的一个成员ParquetPageReader,ParquetPageReader就是最终实际去读page headers和data pages。

需要注意的是,上面的这些操作,都是在executor上,由scanner线程进行处理的,而真正的ScanRange的扫描操作,是由IO thread进行的。

in_flight_ranges_的出队操作

介绍了一些前置基础知识,接下来我们看下in_flight_ranges_队列的更新操作。其实在Impala 3.4 SQL查询之ScanRange详解(四)一文中,已经有in_flight_ranges_的出现了,主要是在RequestContext::GetNextRequestRange函数中,先对unstarted_scan_ranges_进行了出队操作,然后再判断in_flight_ranges_是否为空,不为空的话直接弹出队头成员,否则直接返回空,相关函数如下:

代码语言:javascript
复制
  if (request_disk_state->in_flight_ranges()->empty()) {
    // There are no inflight ranges, nothing to do.
    request_disk_state->DecrementDiskThread(request_lock, this);
    return nullptr;
  }
  DCHECK_GT(request_disk_state->num_remaining_ranges(), 0);
  RequestRange* range = request_disk_state->in_flight_ranges()->Dequeue();
  DCHECK(range != nullptr);

因此,我们可以知道,IO thread实际每次是取in_flight_ranges_队列的队首成员返回进行处理的。出队操作比较简单,入队操作相对比较复杂。

in_flight_ranges_的入队操作

关于in_flight_ranges_的入队操作,涉及到的情况比较多,因此我们将相关的代码调用整理成了一张图,如下所示:

图中每个方框表示相应的函数或者函数调用栈,最下面的方框就是最终的in_flight_ranges_的入队。黄色方框表示的是,当满足该条件时,才会插入到in_flight_ranges_队列。下面我们就结合代码来看看不同场景下,in_flight_ranges_的入队操作。

Footer ScanRange的处理

Impala 3.4 SQL查询之ScanRange详解(四)一文中,我们提到过:对于parquet格式的文件,会针对每个split(一个文件的一个block,会对应一个HdfsFileSplit),构造一个footer ScanRange,大小是100KB,并且保存着原始的split信息,主要是offset、len等。这些footer ScanRange会先被入队到unstarted_scan_ranges_队列中,然后在RequestContext::GetNextUnstartedRange()函数中出队,那么在这里就是通过图中的第二条路径:

代码语言:javascript
复制
StartNextScanRange(hdfs-scan-node-base.cc):679
-GetNextUnstartedRange(request-context.cc):467

上面处理会将footer ScanRange从unstarted_scan_ranges_队列弹出,然后由于该ScanRange的tag是NO_BUFFER,所以不会直接入队到in_flight_ranges_中,而是经由第三条路径中处理,通过scanner线程加入到in_flight_ranges_队列中。关于ScanRange::ExternalBufferTag::NO_BUFFER我们后面会再提到,这里先不展开。为了防止大家混淆,我们将第三条路径单独拎出来,如下所示:

代码语言:javascript
复制
ScannerThread(hdfs-scan-node.cc):403
-StartNextScanRange(hdfs-scan-node-base.cc):692
--AllocateBuffersForRange(disk-io-mgr.cc):399
---AddUnusedBuffers(scan-range.cc):147
----ScheduleScanRange(request-context.cc):797
-----state.in_flight_ranges()->Enqueue(range)

首先需要先对这些ScanRange分配buffer,然后再将这个ScanRange加入到in_flight_ranges_队列中。对照上面的ScanRange分配buffer的逻辑来看,scan_range_len参数对应初始的footer ScanRange大小,是100KB,而max_bytes参数的大小,来自于FE端的计算,表示处理一个ScanRange需要的最小内存,以HdfsScanNode为例,相关函数调用如下所示:

代码语言:javascript
复制
doCreateExecRequest(Frontend.java):1600
-getPlannedExecRequest(Frontend.java):1734
--createExecRequest(Frontend.java):1420
---computeResourceReqs(Planner.java):435
----computeResourceProfile(PlanFragment.java):263
-----computeRuntimeFilterResources(PlanFragment.java):327
------computeNodeResourceProfile(HdfsScanNode.java):1609
-------computeMinMemReservation(HdfsScanNode.java)

最终,在computeMinMemReservation函数中,会计算出一个值,通过TBackendResourceProfile结构体的min_reservation成员保存,并传到BE端。一般情况下,这个值是大于100KB的,因此,对于footer ScanRange,处理之后会分配1个buffer,大小是128KB(通过函数BitUtil::RoundUpToPowerOfTwo()向上取到2的整数次幂),最后将footer ScanRange加到in_flight_ranges_队列。之后IO thread就可以通过in_flight_ranges_队列取到这些footer ScanRange,根据上面的ScanRange处理流程进行处理。也就是说,对于每一个split,都会先构造一个footer ScanRange,该footer ScanRange处理完成之后,才能继续进行后面的数据扫描处理。

Data ScanRange的处理

前面我们提到了对于每个split,Impala都会构造一个footer ScanRange。只有先解析出footer的信息,我们才能知道parquet文件的元数据信息,进而构造data ScanRange,扫描真正的数据。我们将data ScanRange的处理流程进行了梳理,如下所示:

整个处理流程同样是通过scanner线程进行处理的,主要分为如下几个部分:

  1. 最左边红色的方框,就是scanner线程读取footer ScanRange的buffer中的元数据信息。通过ScanRange::GetNext函数,就可以获取ready_buffers_中的buffer成员,进行后续的解析操作。需要注意的是,此时footer ScanRange是已经被IO thead处理完成,如果没有处理完成的话,scanner线程会一直等待;
  2. 中间蓝色的方框,就是HdfsParquetScanner在获取到元数据之后,构造相应的column reader成员,这里主要就是根据SQL中涉及到的column进行构造,详细构造过程不展开;
  3. 左下角黄色的方框,就是计算每个ScanRange分配的最大字节数,也就是我们在上面提到的max_bytes。最终,在给ScanRange分配buffer的时候,分配的总字节数不会超过这个max_bytes。这个地方的计算与column reader包含的ScanRange的bytes_to_read_以及read_size和min_buffer_size参数有关系,核心实现逻辑在HdfsParquetScanner::DivideReservationBetweenColumnsHelper函数中,这块的计算也相对比较复杂,感兴趣的同学可以自行学习;
  4. 最后是右边的绿色方框,就是根据这些column reader构造对应的data ScanRange,然后分配buffer,并添加到in_flight_ranges_队列。此时IO thread就可以获取这些data ScanRange进行实际的scan操作了。

整个data ScanRange的处理流程就in_flight_ranges_队列图的第四条路径,也就是最右边的那个绿色方框。需要注意的是,如果分配给ScanRange的buffer不能一次读完所有的字节数,那么当IO thread用完分配的buffer之后,scanner线程会重新分配buffer,等待后续IO thead再次处理。

IO thread的处理

最左边的红色方框代表的路径表示:IO thread在处理完对应的ScanRange时,会更新相应的bytes_read、unused_iomgr_buffers_等成员。处理完成之后,会判断当前这个ScanRange是否处理完成,如果处理完成的话,则直接将num_remaining_ranges_成员减1,表示这个ScanRange已经处理完成。如果处理的结果是ReadOutcome::SUCCESS_NO_EOSR,则表示这个ScanRange还没有处理完成,会将这个ScanRange再次放回到in_flight_ranges_队列。这样其他的IO thread可以再次获取这个ScanRange进行处理。

非ExternalBufferTag::NO_BUFFER

对于图中的第二条路径,主要是针对非remote HDFS的情况。在Impala 3.4 SQL查询之ScanRange详解(四)中介绍BE端的ScanRange的时候,我们提到会根据FE端的文件信息来构造ScanRange,此时会构造一个buffer tag,如下所示:

代码语言:javascript
复制
// HdfsScanNodeBase::Prepare()
    int cache_options = BufferOpts::NO_CACHING;
    if (params.__isset.try_hdfs_cache && params.try_hdfs_cache) {
      cache_options |= BufferOpts::USE_HDFS_CACHE;
    }
    if ((!expected_local || FLAGS_always_use_data_cache) && !IsDataCacheDisabled()) {
      cache_options |= BufferOpts::USE_DATA_CACHE;
    }

对于remote HDFS,这里最终cache_options的值就是4,即NO_CACHING|USE_DATA_CACHE。接着,在RequestContext::GetNextUnstartedRange函数中,会使用该tag进行判断,如下所示:

代码语言:javascript
复制
// RequestContext::GetNextUnstartedRange()
      ScanRange::ExternalBufferTag buffer_tag = (*range)->external_buffer_tag();
      if (buffer_tag == ScanRange::ExternalBufferTag::NO_BUFFER) {
        // We can't schedule this range until the client gives us buffers. The context
        // must be rescheduled regardless to ensure that 'next_scan_range_to_start' is
        // refilled.
        disk_states_[disk_id].ScheduleContext(lock, this, disk_id);
        (*range)->SetBlockedOnBuffer();
        *needs_buffers = true;
      } else {
        ScheduleScanRange(lock, *range);
      }

只有当tag不是NO_BUFFER的时候,才会将ScanRange加入in_flight_ranges_队列。也就是说,对于remote HDFS的scan操作,不是直接将ScanRange加入到in_flight_ranges_队列,而是在其他的地方进行处理。由于笔者手头的测试环境都是remote HDFS,因此,对于这种情况,目前暂不展开说明。

小结

到这里,关于in_flight_ranges_队列的更新,我们就基本介绍完毕了,当然这不是全部的情况,目前还有一些其他的情况我们没有展示在这篇文章当中。由于篇幅原因,本文也省略了很多细节的地方。总结一下,在这篇文章当中,我们首先介绍了ScanRange分配buffer,也就是说对于每个ScanRange,都需要先通过scanner线程来分配buffer,之后才能通过IO thread进行实际的scan操作。接着,我们介绍了IO thread处理ScanRange流程和Impala处理parquet格式文件。最后我们看到了in_flight_ranges_队列是如何更新,最重要的部分就是footer ScanRange和data ScanRange的处理,这个Impala的IO模型比较关键的地方。本文所有的代码都是基于3.4.0分支,都是笔者个人结合调试结果,分析得出,如有错误,欢迎指正。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。
原始发表:2021-04-28,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 作者个人站点/博客 前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • ScanRange分配buffer
  • IO thread处理ScanRange流程
  • Impala处理parquet格式文件
  • in_flight_ranges_的出队操作
  • in_flight_ranges_的入队操作
    • Footer ScanRange的处理
      • Data ScanRange的处理
        • IO thread的处理
          • 非ExternalBufferTag::NO_BUFFER
          • 小结
          相关产品与服务
          大数据
          全栈大数据产品,面向海量数据场景,帮助您 “智理无数,心中有数”!
          领券
          问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档