在上篇文章中,我们主要介绍了ScanRange的构造,以及在FE和BE端的一些处理流程。同时,我们还介绍了IO thead处理模型中一个比较重要的对象RequestContext::PerDiskState,以及部分成员变量的含义,在本篇文章中,我们将介绍其中一个比较重要的成员:unstarted_scan_ranges_。
在上篇文章中,我们提到,在FE端的ScanRange信息,主要通过TScanRange传到BE端,然后构造为TPlanFragmentInstanceCtx中的TScanRangeParams,传到各个executor进行实际的扫描操作,那么当各个executor接收到请求之后,就会根据这些信息,构造相应的ScanRange类。ScanRange是继承RequestRange这个类的,另外WriteRange也是继承了RequestRange对象的。从名字就可以看出,WriteRange主要是针对写入的情况,这里我们不展开介绍,主要看下ScanRange对象。首先,RequestRange主要包含了file、offset、len这些基本信息。而ScanRange则增加了一些额外的信息,如下所示:
class ScanRange : public RequestRange {
struct SubRange {
int64_t offset;
int64_t length;
};
DiskIoMgr* io_mgr_ = nullptr;
RequestContext* reader_ = nullptr;
bool read_in_flight_ = false;
int64_t bytes_read_ = 0;
std::vector<SubRange> sub_ranges_;
......
}
关于这些成员变量的含义,我们这里先不一一介绍了,后面在相应的场景下,我们再一一展开说明。 当我们将TPlanFragmentInstanceCtx的信息传到对应的executor的时候,对应的executor节点就会构造相应的HdfsScanNode,然后在HdfsScanNodeBase::Prepare函数中,会循环遍历每个TScanRangeParams,然后初始化下面的这个map成员:
// hdfs-scan-node-base.h
/// This is a pair for partition ID and filename
typedef pair<int64_t, std::string> PartitionFileKey;
/// partition_id, File path => file descriptor (which includes the file's splits)
typedef std::unordered_map<PartitionFileKey, HdfsFileDesc*, pair_hash> FileDescMap;
FileDescMap file_descs_;
struct HdfsFileDesc {
hdfsFS fs;
std::string filename;
int64_t file_length;
int64_t mtime
THdfsCompression::type file_compression;
bool is_erasure_coded;
std::vector<io::ScanRange*> splits;
};
file_descs_是一个map,用分区id和文件名来作为map的key,value是一个HdfsFileDesc对象。当循环遍历TScanRangeParams对象的时候,Impala会用其中包含的THdfsFileSplit对象的信息,来构造一个HdfsFileDesc对象,填充其中的fs、filename等信息,关键代码如下:
for (const TScanRangeParams& params: *scan_range_params_) {
const THdfsFileSplit& split = params.scan_range.hdfs_file_split;
partition_ids_.insert(split.partition_id);
HdfsPartitionDescriptor* partition_desc =
hdfs_table_->GetPartition(split.partition_id);
filesystem::path file_path(partition_desc->location());
file_path.append(split.relative_path, filesystem::path::codecvt());
const string& native_file_path = file_path.native();
auto file_desc_map_key = make_pair(partition_desc->id(), native_file_path);
HdfsFileDesc* file_desc = NULL;
FileDescMap::iterator file_desc_it = file_descs_.find(file_desc_map_key);
if (file_desc_it == file_descs_.end()) {
// Add new file_desc to file_descs_ and per_type_files_
file_descs_[file_desc_map_key] = file_desc;
// 省略其余代码
file_desc = runtime_state_->obj_pool()->Add(new HdfsFileDesc(native_file_path));
per_type_files_[partition_desc->file_format()].push_back(file_desc);
} else {
// File already processed
file_desc = file_desc_it->second;
}
file_desc->splits.push_back(
AllocateScanRange(file_desc->fs, file_desc->filename.c_str(), split.length,
split.offset, split.partition_id, params.volume_id, expected_local,
file_desc->is_erasure_coded, file_desc->mtime, BufferOpts(cache_options)));
}
我们删除了部分代码,只保留了关键的部分。可以看到,当file_descs_中,不存在指定key时,我们构造新的key和value,加入到map中。这里关注下对于splits这个vector的处理。对于分区的某个指定文件,在map中会有一条记录,如果这个文件对应多个TScanRangeParams,那么这个map的value对应的splits则会有多个成员,但是这条key-value记录只有一条。我们前面说过了,一个ScanRange在HDFS_SCAN_NODE代表一个block,所以如果文件跨越了多个block,那么就会分成多个ScanRange,此时map的value,HdfsFileDesc对象的splits就会存在多个成员;反之,如果文件只存在于1个block中,那么HdfsFileDesc的splits对象,则只会有1个成员。 除了file_descs_之外,还有一个成员也需要关注下:per_type_files_,这个成员变量的定义如下所示:
// hdfs-scan-node-base.h
/// File format => file descriptors.
typedef std::map<THdfsFileFormat::type, std::vector<HdfsFileDesc*>>
FileFormatsMap;
FileFormatsMap per_type_files_;
可以看到,这个per_type_files_保存的就是文件格式和HdfsFileDesc的集合,在上述处理file_descs_的代码中,我们也可以看到对per_type_files_的处理,根据当前这个文件所属分区的格式,加入到map value的vector中。
上面我们介绍完了BE端的ScanRange对象,接下来我们来看一下PerDiskState中的unstarted_scan_ranges_成员,以及它是如何更新的。首先,我们还是先看下这个成员变量的定义:
/// Queue of ranges that have not started being read. This list is exclusive
/// with in_flight_ranges.
InternalQueue<ScanRange> unstarted_scan_ranges_;
从注释我们可以看到,unstarted_scan_ranges_表示是还没有开始进行scan操作的ScanRange,这个解释比较空泛,我们接着看下unstarted_scan_ranges这个成员更新的相关函数调用(当前是针对parquet格式的表进行梳理):
ExecFInstance(query-state.cc):697
-Exec(fragment-instance-state.cc):98
--ExecInternal(fragment-instance-state.cc):383
---GetNext(hdfs-scan-node.cc):91
----IssueInitialScanRanges(hdfs-scan-node-base.cc):636
-----IssueInitialRanges(hdfs-parquet-scanner.cc):82
------IssueFooterRanges(hdfs-scanner.cc):837
-------AddDiskIoRanges(hdfs-scan-node.cc):212
--------AddScanRanges(request-context.cc):404
---------AddRangeToDisk(request-context.cc):357
----------unstarted_scan_ranges()->Enqueue
---------AddRangeToDisk(request-context.cc):362
----------num_unstarted_scan_ranges_.Add(1)
---------AddRangeToDisk(request-context.cc):366
----------next_scan_range_to_start()=null ScheduleContext(request-context.cc)
---------AddRangeToDisk(request-context.cc):379
----------num_remaining_ranges_++
在HdfsScanNodeBase::IssueInitialScanRanges函数中,我们通过per_type_files_成员,获取所有的PARQUET格式的HdfsFileDesc集合,然后在HdfsScanner::IssueFooterRanges函数中,循环构造初始的ScanRange(不同的文件格式,这里的处理流程有所不同),由于当前是PARQUET文件,所以会构造每个文件footer的ScanRange,这里我们摘取一些主要的步骤看下(忽略其他的一些特殊情况):
//这里FOOTER_SIZE是一个常量,为1024*100
int64_t footer_size = min(FOOTER_SIZE, files[i]->file_length);
int64_t footer_start = files[i]->file_length - footer_size;
ScanRange* footer_split = FindFooterSplit(files[i]);
for (int j = 0; j < files[i]->splits.size(); ++j) {
ScanRange* split = files[i]->splits[j];
if (!scan_node->IsZeroSlotTableScan() || footer_split == split) {
ScanRangeMetadata* split_metadata =
static_cast<ScanRangeMetadata*>(split->meta_data());
ScanRange* footer_range;
if (footer_split != nullptr) {
footer_range = scan_node->AllocateScanRange(files[i]->fs,
files[i]->filename.c_str(), footer_size, footer_start,
split_metadata->partition_id, footer_split->disk_id(),
footer_split->expected_local(), files[i]->is_erasure_coded, files[i]->mtime,
BufferOpts(footer_split->cache_options()), split);
}
footer_ranges.push_back(footer_range);
}
// The threads that process the footer will also do the scan.
if (footer_ranges.size() > 0) {
RETURN_IF_ERROR(scan_node->AddDiskIoRanges(footer_ranges, EnqueueLocation::TAIL));
}
return Status::OK();
}
我们删除了其他的一些代码和注释,关注下主要的处理步骤,首先获取footer_size和footer_start,然后利用FindFooterSplit函数获取该file的footer split,判断逻辑就是从splits成员中找到:split.len+split.offset=file.len,可以理解为文件的最后一个split成员对象。然后遍历splits集合,当找到与footer_split对应的split时,我们就用这个footer_split和file的相关信息来构造一个ScanRange,作为footer ScanRange。这里需要注意的是一个file对应多个split(即多个block)的情况,此时在遍历某个file对应的split集合的时候,当满足如下的条件时候,我们就会用对应的split来构造foot ScanRange,如下所示:
// HdfsScanner::IssueFooterRanges()
// If there are no materialized slots (such as count(*) over the table), we can
// get the result with the file metadata alone and don't need to read any row
// groups. We only want a single node to process the file footer in this case,
// which is the node with the footer split. If it's not a count(*), we create a
// footer range for the split always.
if (!scan_node->IsZeroSlotTableScan() || footer_split == split) {
也就是说,当满足条件时,我们对于一个file的多个split,我们会分别构造一个footer ScanRange,而不是1个。但是这些footer ScanRange的len、offset、file信息都是一样的,唯一不同的就是meta_data_,该成员类型是void*,但是实际会被赋值为ScanRangeMetadata。meta_data_中的original_split会保存原始的split对应的ScanRange信息,也就是原始的len、offset。 当处理完成所有的文件之后,我们最终通过RequestContext::AddRangeToDisk函数,将这些footer的ScanRange加入到unstarted_scan_ranges_对象中,同时,每入队一个ScanRange对象,我们会将num_unstarted_scan_ranges_这个成员加1。也就是说,这个unstarted_scan_ranges_最终存放的是所有file文件的footer ScanRange。 上面我们介绍了unstarted_scan_ranges_这个队列的入队流程,接着我们看下出队的操作。在前面的文章中,我们提到了,IO thread会从RequestContext队列的头部取出一个RequestContext对象,然后通过该RequestContext对象获取一个ScanRange进行处理,相关处理函数如下:
RequestRange* RequestContext::GetNextRequestRange(int disk_id) {
PerDiskState* request_disk_state = &disk_states_[disk_id];
unique_lock<mutex> request_lock(lock_);
if (request_disk_state->next_scan_range_to_start() == nullptr &&
!request_disk_state->unstarted_scan_ranges()->empty()) {
ScanRange* new_range = request_disk_state->unstarted_scan_ranges()->Dequeue();
num_unstarted_scan_ranges_.Add(-1);
ready_to_start_ranges_.Enqueue(new_range);
request_disk_state->set_next_scan_range_to_start(new_range);
}
if (request_disk_state->in_flight_ranges()->empty()) {
request_disk_state->DecrementDiskThread(request_lock, this);
return nullptr;
}
RequestRange* range = request_disk_state->in_flight_ranges()->Dequeue();
request_disk_state->ScheduleContext(request_lock, this, disk_id);
return range;
}
同样我们删除了一些代码,方便阅读。首选获取对应的PerDiskState对象,然后将unstarted_scan_ranges_队列的头部对象出队,并将num_unstarted_scan_ranges_加1,同时入队到ready_to_start_ranges_中,这两个变量都是RequestContext的成员,这里我们先不展开说明。接着将出队的ScanRange对象设置到next_scan_range_to_start_成员,关于这个成员的用处,我们也在后面展开说明。 紧接着,会判断in_flight_ranges_队列是否为空,是则直接返回null,表示这次IO thead没取到ScanRange;否则,从in_flight_ranges_弹出头部的ScanRange对象,返回进行处理。
前面我们提到了IO thread并不会直接获取unstarted_scan_ranges_队列上的ScanRange进行处理。先将unstarted_scan_ranges_的头部出队,然后入队到ready_to_start_ranges_队列中,同时设置到next_scan_range_to_start_成员。然后再从in_flight_ranges_队列中取出头部对象,进行后续的处理。由于这里涉及到的成员变量很多,我们将RequestContext和PerDiskState的成员进行了归纳,如下所示:
这里我们简单说明一下,RequestContex对象会包含多个PerDiskState对象,每一个PerDiskState对象表示一种disk queue,例如remote HDFS、S3等,所以RequestContex对象的这些成员,统计的是所有PerDiskState的相应成员的累加和,比如num_unstarted_scan_ranges_这个成员,统计的就是该RequestContex对象上的所有PerDiskState的unstarted_scan_ranges_的总和。这点需要注意。 下面我们来看下ready_to_start_ranges_和next_scan_range_to_start_的相关处理,函数调用如下所示:
由于这里涉及到了不同的调用路径,因此我们使用了上述图片的方式。可以看到,主要分为两条路径:左边路径的主要处理逻辑就是在HdfsScanNode的Open函数中,将回调函数ThreadTokenAvailableCb绑定到线程池;右边路径则会通过回调函数ThreadTokenAvailableCb启动专门的scanner线程来处理unstarted_scan_ranges。 最终在GetNextUnstartedRange函数中,会对next_scan_range_to_start_和ready_to_start_ranges_进行处理,关键代码如下所示:
// RequestContext::GetNextUnstartedRange()
*range = ready_to_start_ranges_.Dequeue();
int disk_id = (*range)->disk_id();
disk_states_[disk_id].set_next_scan_range_to_start(nullptr);
可以看到在GetNextUnstartedRange函数中,先将ready_to_start_ranges_队列中的头部对象弹出,然后将该ScanRange对应的PerDiskState的next_scan_range_to_start_对象设置为空,然后再继续后续的处理,这里省略了后续处理代码。关于回调函数和scanner线程,后面我们讲到in_flight_ranges_的时候,会再详细说明,这里简单了解下这个处理过程即可。
到这里,关于unstarted_scan_ranges_的相关处理流程我们就介绍的差不多了。回顾一下,我们在本文中,首先介绍了BE端的ScanRange,相较于thrift的TScanRange结构体,ScanRange对象主要是在每个executor上进行实际scan操作时,需要用到的类。除此之外,我们还介绍了一个关键的对象:unstarted_scan_ranges_,这是一个ScanRange的队列,我们通过代码,一步一步了解了这个队列的更新情况,包括入队和出队,这个对象对于整个IO thread模型是比较重要的。现在读者看下来这两篇文章可能觉得比较琐碎,后面笔者会将各个成员串起来,整体看下Impala的这个IO thread的处理。