本文为笔者个人阅读Apache Impala源码时的笔记,仅代表我个人对代码的理解,个人水平有限,文章可能存在理解错误、遗漏或者过时之处。如果有任何错误或者有更好的见解,欢迎指正。
在文章Impala3.4源码阅读笔记(七)——解析ScanNode(上)及其后续文章中,我们分析了ScanNode
的继承与派生关系,以Kudu扫描为例子具体分析了扫描结点和扫描器的主要代码,这里再次贴出一些前置知识,首先Impala执行一条SQL的主要流程包括:
PlanNode
的计划树PlanTree
;Fragment
,以便在分布式集群上分配调度;Fragment
创建若干执行实例Instance
,在一个Instance
中Fragment
的每个PlanNode
对应生成一个执行结点ExecNode
;RowBatch
的形式在结点间传递;在这一流程中,各个执行结点各司其职、逻辑解耦的设计使得Impala的开发与优化变得清晰与方便。在各类执行结点中,扫描结点ScanNode
作为一个大类包括了各种数据源的扫描结点派生,负责了各种数据源的扫描,是Impala中最重要的结点之一。Impala支持多种数据源,每种数据都对应了一种扫描结点,这些扫描结点都派生自ScanNode
类,具体派生关系如图所示:
图中ExecNode
是所有执行结点的基类,主要定义了Prepare
、Open
、GetNext
和Close
四个接口,所有的执行结点都需要实现这些方法,完成准备、开启、获取下一批数据和关闭四种逻辑,整个执行树ExecTree
的开关与执行也正是由根结点到叶结点地调用这些方法。
图中ScanNode
作为所有扫描结点的基类,直接继承了ExecNode
并在其基础之上增加了ScanRange
、runtime filters和许多扫描性能相关的计时器、计数器,另外还有一个负责多线程扫描使用的内部类ScannerThreadState
。
如图所示,ScanNode
又进一步派生出四个类,分别对应了自定义数据源(DataSource)、HBase、Kudu和Hdfs。其中Kudu和Hdfs都支持了MT_DOP功能(Impala中提升查询并发度的功能,可以手动指定运行多个实例来提升性能),所以还包括了MT和非MT两个版本的扫描结点。
各个扫描结点为了完成对应数据源的扫描工作,可能还会包含各自的扫描器类,如KuduScanner
包括了连接Kudu、物化数据等逻辑。而HdfsScanner
更加复杂,根据数据储存格式又分为了文本格式扫描器HdfsTextScanner
、列存格式扫描器HdfsCoulumnarScanner
等,HdfsCoulumnarScanner
又进一步派生出了ORC格式、Parquet格式对应的扫描器。执行过程中,一个ScanNode
负责一张表的扫描,这张表的底层数据是可以由不同压缩格式的数据文件组成的。在ScanNode
的工作过程中,每个Scanner负责处理一个ScanRange
,ScanRange
可以理解为一个数据文件的一个片段,当然也可能是整个文件,取决于执行计划如何规划的。
本文要解析的是负责扫描文本数据的文本格式扫描器HdfsTextScanner
。当HdfsScanNode
需要扫描文本格式的数据文件时会创建HdfsTextScanner
,关于HdfsScanNode
,它的主体逻辑和KuduScanNode
相差不大,可以参看前文,此处不在赘述。
文本格式作为大数据存储格式其实并不算好用,它是行存的,读写和存储效率都不高,更不像Orc、Parquet等格式一样支持一些高级特性。但是它却是Impala的默认建表格式(至少4.1版本还是默认STORED AS TEXTFILE),那它肯定还是有一些优点的,至少有以下几点:
虽然文本格式本身很简单,比如csv就是简单的逗号分隔字段、换行符分隔行,但是要实现一个通用的、高性能的文本格式扫描器还是有一定难度的。在Impala中,HdfsTextScanner
支持了并发地扫描文本格式文件(即一个文件可以随意切分为任意个Block由多个扫描器并发读取),也支持扫描各种压缩后的文本文件,还使用了Codegen、SSE等技术提升性能。而相较于HdfsParquetScanner
等,它又相对简单,不涉及过多的格式特性。所以如果要研究Impala的Scanner或者自己实现一个Scanner,那么阅读HdfsTextScanner
的代码是一个很好的开始。
HdfsTextScanner
的结构扫描文件格式文件中的数据并转换为Impala的RowBatch
就是HdfsTextScanner
的全部任务了,这一过程又可以具体分为四步:读取数据、解析数据、转换数据和物化数据。读取数据即从数据文件中读取字节流,这一步是由Impala的IO层支撑的,但是为了分块读取的正确性,扫描器需要有自己的读取规划,这个后文会详细说明。解析数据即对原始的字节流进行分析,找到每行数据的边界、每个字段的位置,另外还要处理分块读取导致的不完整的数据行或字段。转换数据和物化数据实际上是一起完成的,根据之前解析得到的字段位置和字段类型,由转换器将文本数据转换为对应的数据类型然后写入RowBatch。在这四步中,转换数据和物化数据的逻辑是很多扫描器中通用的,分别由TextConverter
类和HdfsScanner
类实现了,解析数据也由单独的DelimitedTextParser
类负责,实际上HdfsTextScanner
本体只需要负责进行数据读取的规划、相关API的调用和一些准备及收尾工作就行了。
因此,我们可以把HdfsTextScanner
分为四块来看:其本身的代码、DelimitedTextParser
类的部分、TextConverter
类的部分和继承自HdfsScanner
类的部分,四部分分别对应了扫描的四个步骤。本篇文章我们只分析HdfsTextScanner
本身的代码,将其他部分当作提供特定功能的黑盒,有助于我们梳理脉络。
一个ScanNode
负责一张表的扫描,一张表可能包括多个数据文件,一个文件也可能包括多个ScanRange
,每个ScanRange
都需要一个Scanner处理。对于同一张表各个扫描器,它们有一些准备工作中是共通的,包括向IO层发布ScanRange
和进行Codegen,具体由两个静态成员函数完成,以下是相关代码和解析(略去部分代码):
// IssueInitialRanges负责将ScanRange发布到IO层,发布到IO层后就扫描器可以通过相关API读取文件数据
Status HdfsTextScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
const vector<HdfsFileDesc*>& files) {
vector<ScanRange*> compressed_text_scan_ranges;
map<string, vector<HdfsFileDesc*>> plugin_text_files;
// 循环遍历每个文件,根据文件的压缩类型,有不同的IO方式
for (int i = 0; i < files.size(); ++i) {
THdfsCompression::type compression = files[i]->file_compression;
switch (compression) {
case THdfsCompression::NONE:
// 对于未压缩的文件,我们可以分块并发读取,所以调用AddDiskIoRanges将该文件的所有ScanRagne发布到IO层
RETURN_IF_ERROR(scan_node->AddDiskIoRanges(files[i], EnqueueLocation::TAIL));
break;
case THdfsCompression::GZIP:
case THdfsCompression::SNAPPY:
case THdfsCompression::SNAPPY_BLOCKED:
case THdfsCompression::ZSTD:
case THdfsCompression::BZIP2:
case THdfsCompression::DEFLATE:
// 对于压缩的文件,我们一般无法分块并发读取,因为需要从头开始才能解压文件,没法从文件中间位置开始解压
// 因此我们需要遍历文件的所有ScanRange,也就是splits,将其“合并”为一个
for (int j = 0; j < files[i]->splits.size(); ++j) {
ScanRange* split = files[i]->splits[j];
// 如果ScanRange的offset为0,那么它要扫描的范围就是从文件头开始的,否则则是从中部开始的
if (split->offset() != 0) {
// 一般来说压缩文件应该只有一个ScanRange,范围是整个文件,但是有多个也不算错误
// 这种情况汇报一个警告,然后直接标记这个ScanRange已完成即可
scan_node->runtime_state()->LogError(ErrorMsg(
TErrorCode::COMPRESSED_FILE_MULTIPLE_BLOCKS,
files[i]->filename, split->offset()));
scan_node->RangeComplete(THdfsFileFormat::TEXT, compression);
continue;
}
// 根据第一个ScanRange(offset为0的),调用AllocateScanRange申请一个范围是整个文件的ScanRange
// 然后将其添加到compressed_text_scan_ranges中,之后统一发布
DCHECK_GT(files[i]->file_length, 0);
ScanRangeMetadata* metadata =
static_cast<ScanRangeMetadata*>(split->meta_data());
ScanRange* file_range = scan_node->AllocateScanRange(files[i]->GetFileInfo(),
files[i]->file_length, 0, metadata->partition_id, split->disk_id(),
split->expected_local(), BufferOpts(split->cache_options()));
compressed_text_scan_ranges.push_back(file_range);
scan_node->max_compressed_text_file_length()->Set(files[i]->file_length);
}
break;
default: {
// 对于其他impala没有内置支持的压缩类型,impala会尝试寻找有无相应的插件可以处理
auto it = _THdfsCompression_VALUES_TO_NAMES.find(compression);
if (it == _THdfsCompression_VALUES_TO_NAMES.end()) {
return Status(Substitute(
"Unexpected compression enum value: $0", static_cast<int>(compression)));
}
plugin_text_files[it->second].push_back(files[i]);
}
}
}
// 将刚才处理得到的压缩文件的ScanRange统一发布到IO层
if (compressed_text_scan_ranges.size() > 0) {
RETURN_IF_ERROR(scan_node->AddDiskIoRanges(compressed_text_scan_ranges,
EnqueueLocation::TAIL));
}
// 对于插件支持的压缩类型,由插件自行负责发布ScanRange
for (const auto& entry : plugin_text_files) {
DCHECK_GT(entry.second.size(), 0) << "List should be non-empty";
RETURN_IF_ERROR(HdfsPluginTextScanner::IssueInitialRanges(
scan_node, entry.second, entry.first));
}
return Status::OK();
}
// CodeGen是Impala的主要性能优化手段了,其基本思路就是使用运行时才能确定的信息来在运行时编译一个特化的函数版本
// 特化的函数版本能够利用运行时信息来避免很多分支判断、循环语句和虚函数调用等操作,在大数据量的情况下能够显著提升性能
// Codegen会在Fragment Instance Open时进行,具体是在FragmentState::InvokeCodegen中调用
// Codegen作为性能优化手段,即使不开启也不影响查询执行,对我们分析扫描器的工作流程也不是必要的,因此本文不深入分析
// 不过其中使用的很多技术如llvm、CallCodegendOrInterpreted的无缝切换函数版本还是很值得学习的,以后有机会可以再深入研究
Status HdfsTextScanner::Codegen(HdfsScanPlanNode* node, FragmentState* state,
llvm::Function** write_aligned_tuples_fn);
RowBatch
的结构在介绍扫描器之前,有必要先介绍一下RowBatch
。如前文所述,RowBatch
是负责在执行结点间传递数据的重要对象,作为数据的源头,扫描器的任务就是将数据源的数据转换为RowBatch
。为了分析扫描器的工作流程,我有必要先看看RowBatch
的结构。
如字面含义RowBatch
即行批,其封装了一批行对象TupleRow
,每行TupleRow
可以由多个元组Tuple
组成,实际上TupleRow
就是一个Tuple
指针数组(目前的Impala每行都只有一个元组,即指针数组长度固定为1,所以TupleRow
和Tuple*
差不多是同义词,因而在分析扫描器的过程中,元组Tuple
和行Row
在逻辑上差不多也是同义词)。Tuple
可以理解为以连续字节的形式存储的一行数据,在运行时的大部分时间里被当作无类型内存,当需要时通过reinterpret_cast转换为Tuple
。其中包含固定数量的大小固定的槽位Slot
,以及一个位向量用于表示各个槽位是否为null值。在扫描器中,Slot
可以理解为一行数据中储存各个字段的位置,所谓扫描就是根据数据填充这些槽位。
行批本身可以持有数据,也可以引用其他位置的内存,在扫描器中行批基本上是自己持有数据的,即行批内部的内存池申请了内存用于实际存放Tuple
,通过行对象TupleRow
指向这些数据进行引用,这样扫描器在工作时需要将实际数据写入Tuple
对象中,然后更新TupleRow
对象来引用Tuple
。下游消费者结点可以通过RowBatch
提供的迭代器等接口方便地读取行批中的数据。上面提到的这些类都提供了很多成员函数帮助我们方便地申请内存、写入数据,具体我们在扫描器的工作流程中可以看见。
和ScanNode
等ExecNode
类似,Scanner主要提供Open
、GetNext
、ProcessSplit
和Close
接口。值得一提的是GetNext
和ProcessSplit
功能类似,都是给ScanNode
提供RowBatch
的,区别是前者给HdfsScanNodeMT
等MT结点(单线程结点)调用,后者给多线程结点调用,因为前者直接返回RowBatch
、后者会将RowBatch
放入RowBatch
队列中。对于Scanner的子类基本上只需要实现Open
、GetNextInternal
和Close
即可,GetNext
、ProcessSplit
会通过调用GetNextInternal
来得到实际数据。
接下来我们逐个分析HdfsTextScanner
的这几个函数,首先是Open
,它会在扫描器运行前调用,执行开启的逻辑,具体到代码(略去部分代码):
Status HdfsTextScanner::Open(ScannerContext* context) {
// 调用父类HdfsScanner的Open函数,其中包括一些HdfsScanner的通用逻辑
// 如设置数据流、准备谓词计算器和创建模板元组等,模板元组即已经填充好了部分槽位的元组
// 比如对于带有分区列的扫描,同一个文件每行数据的分区列值都相同,是根据文件目录决定的
// 这样的列不是从数据文件中读取到的,而是模板元组实现物化的
RETURN_IF_ERROR(HdfsScanner::Open(context));
// 初始化用于计时解析耗时的计时器
parse_delimiter_timer_ = ADD_TIMER(scan_node_->runtime_profile(), "DelimiterParseTime");
// 为一些成员变量分配内存
field_locations_.resize(state_->batch_size() * scan_node_->materialized_slots().size());
row_end_locations_.resize(state_->batch_size());
RETURN_IF_ERROR(InitNewRange());
return Status::OK();
}
Status HdfsTextScanner::InitNewRange() {
DCHECK_EQ(scan_state_, CONSTRUCTED);
...
// 从元数据中拿到文本格式使用的分隔符号
HdfsPartitionDescriptor* hdfs_partition = context_->partition_descriptor();
char field_delim = hdfs_partition->field_delim();
char collection_delim = hdfs_partition->collection_delim();
// 对于没有物化槽的扫描可以进行优化,所谓物化槽可以理解为需要扫描器从数据文件中读取并填充的槽位
// 类似于count(*)或者只扫描分区列的查询,都是没有物化槽的,因为它们不需要读取具体的数据内容
if (scan_node_->materialized_slots().size() == 0) {
field_delim = '\0';
collection_delim = '\0';
}
// 根据扫描参数创建相应的解析器和文本转换器
delimited_text_parser_.reset(new TupleDelimitedTextParser(
scan_node_->hdfs_table()->num_cols(), scan_node_->num_partition_keys(),
scan_node_->is_materialized_col(), hdfs_partition->line_delim(),
field_delim, collection_delim, hdfs_partition->escape_char()));
text_converter_.reset(new TextConverter(hdfs_partition->escape_char(),
scan_node_->hdfs_table()->null_column_value(), true,
state_->strict_mode()));
// 重置扫描器,然后推进扫描状态scan_state_为SCAN_RANGE_INITIALIZED
// 扫描状态包括CONSTRUCTED, SCAN_RANGE_INITIALIZED, FIRST_TUPLE_FOUND, PAST_SCAN_RANGE, DONE
// 状态随着扫描进程而推进,CONSTRUCTED是初始状态,SCAN_RANGE_INITIALIZED标识已经初始化完成,扫描器就绪
RETURN_IF_ERROR(ResetScanner());
scan_state_ = SCAN_RANGE_INITIALIZED;
return Status::OK();
}
调用完Open
之后,就可以通过GetNextInternal
来填充RowBatch
了,代码如下:
Status HdfsTextScanner::GetNextInternal(RowBatch* row_batch) {
DCHECK(!eos_);
DCHECK_GE(scan_state_, SCAN_RANGE_INITIALIZED);
DCHECK_NE(scan_state_, DONE);
// 执行Open后第一次调用GetNextInternal时会进入该if,由于支持了分块扫描,我们可能是从文件中部开始读取数据的
// 因此需要先跳过头部不完整的数据行,从第一行完整的数据开始扫描,不完整的数据行会由之前的ScanRange负责处理
if (scan_state_ == SCAN_RANGE_INITIALIZED) {
// 调用FindFirstTuple找到第一个完整行,如果找到了会推进scan_state_为FIRST_TUPLE_FOUND
// 寻找第一个完整行的基本逻辑是找到第一个行分隔符line_delim之后的位置,这样可以避免重复扫描某行数据
// 因为扫描器约定了ScanRange中第一个行分隔符之前的数据都由上一个扫描器处理(第一个ScanRange除外)
// 对于第一个ScanRange(即offset == 0的),直接从头开始扫描即可
RETURN_IF_ERROR(FindFirstTuple(row_batch->tuple_data_pool()));
// 如果没有找到第一个元组,说明整个ScanRange都是一行数据或其中一部分,这种情况下之前的ScanRange会处理这行
// 本扫描器的没有更多任务需要处理了,设则eos(end of scan)并推进scan_state_为DONE标识扫描完成
if (scan_state_ != FIRST_TUPLE_FOUND) {
eos_ = true;
scan_state_ = DONE;
return Status::OK();
}
}
// 接下来就是主要的数据扫描部分了,在扫描之前先在RowBatch中申请内存用来之后储存Tuple数据
int64_t tuple_buffer_size;
RETURN_IF_ERROR(
row_batch->ResizeAndAllocateTupleBuffer(state_, &tuple_buffer_size, &tuple_mem_));
tuple_ = reinterpret_cast<Tuple*>(tuple_mem_);
// scan_state_为FIRST_TUPLE_FOUND说明扫描器正在正常扫描的过程中,调用ProcessRange来填充行批
// ProcessRange是HdfsTextScanner的核心函数之一,负责实际的扫描数据填充RowBatch,后面再详细分析
if (scan_state_ == FIRST_TUPLE_FOUND) {
int num_tuples;
RETURN_IF_ERROR(ProcessRange(row_batch, &num_tuples));
}
// 检查是否达到了ScanNode的limit限制,若是说明数据已经足够,不需要扫描更多数据了
if (scan_node_->ReachedLimitShared()) {
eos_ = true;
scan_state_ = DONE;
return Status::OK();
}
// 上文提到扫描器约定了ScanRange中第一个行分隔符之前的数据都由上一个扫描器处理,就是这里处理的
// 当ProcessRange处理完本ScanRange内所有数据后,scan_state_会被推进到PAST_SCAN_RANGE状态
// 这表示本ScanRange的数据已经处理完毕,还需要读取下一个ScanRange的数据,完成这“最后一行”的处理
if (scan_state_ == PAST_SCAN_RANGE && !row_batch->AtCapacity()) {
// FinishScanRange函数负责完成处理最后一行,后面再详细分析
RETURN_IF_ERROR(FinishScanRange(row_batch));
DCHECK_EQ(scan_state_, DONE);
eos_ = true;
}
return Status::OK();
}
当扫描完成,扫描器就可以关闭了,通过Close
函数完成关闭的逻辑(略去部分代码):
void HdfsTextScanner::Close(RowBatch* row_batch) {
...
// 根据调用方的使用方法,有时扫描器关闭时,最后一个RowBatch还需要使用,
// 此时需要调用AcquireData函数,将扫描器中内存池内的数据的所有权转移给RowBatch
// 因此RowBatch可能引用这些池中的内存,若不转移所有权会导致后续非法访问内存
if (row_batch != nullptr) {
row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false);
row_batch->tuple_data_pool()->AcquireData(data_buffer_pool_.get(), false);
if (scan_node_->HasRowBatchQueue()) {
static_cast<HdfsScanNode*>(scan_node_)->AddMaterializedRowBatch(
unique_ptr<RowBatch>(row_batch));
}
} else {
// 没有提供最后一个RowBatch说明调用方不需要,此时可以直接释放内存池申请的内存
template_tuple_pool_->FreeAll();
data_buffer_pool_->FreeAll();
}
// context_是ScannerContext对象,是从IO层获取数据而使用的,扫描完成后也可从释放其资源
context_->ReleaseCompletedResources(true);
...
// CloseInternal是HdfsScanner的函数,其包括了通用的扫描器关闭逻辑
CloseInternal();
}
以上就是扫描器整体的工作流程,总的来说就是调用Open
开启,重复调用GetNextInternal
获取新一批数据,扫描器指示eos之后扫描完成,调用Close
将其关闭。当然这些只是对扫描器调用者来说看见的流程,其内部还有许多工作要做,除去一些细枝末节的东西后,大体还有几个部分,下面我们继续分析。
ScannerContext
对象为扫描器提供了从IO层获取自身负责的ScanRange
中数据的接口,为了配合文本扫描的特点,HdfsTextScanner
还进行了额外的封装,也就是FillByteBufferWrapper
函数,代码如下:
// FillByteBufferWrapper是对FillByteBuffer简单包装,它额外保存缓冲区最后一个字符用于处理特殊情况
Status HdfsTextScanner::FillByteBufferWrapper(
MemPool* pool, bool* eosr, int num_bytes) {
// pool是内存池,解压时可能用到,eosr(end of scan range)标识ScanRange是否读完,num_bytes要读的字节数
RETURN_IF_ERROR(FillByteBuffer(pool, eosr, num_bytes));
// FillByteBuffer读取数据后会更新byte_buffer_ptr_, byte_buffer_end_和byte_buffer_read_size_
// 分别是数据缓冲区的头尾指针和长度,读取了至少一字节就会更新byte_buffer_last_byte_
if (byte_buffer_read_size_ > 0) {
byte_buffer_filled_ = true;
byte_buffer_last_byte_ = byte_buffer_end_[-1];
}
return Status::OK();
}
// 扫描器读取数据使用,从IO获取数据来更新Buffer指针,如果暂时没有数据会阻塞等待
// 如果扫描的是压缩文件,该函数还会负责解压数据,提供解压后的数据缓冲区
Status HdfsTextScanner::FillByteBuffer(MemPool* pool, bool* eosr, int num_bytes) {
*eosr = false;
// 读取数据分为三个分支,非压缩的、支持流式解压的和不支持流式解压的
if (decompressor_.get() == nullptr) {
Status status;
// 对于非压缩文件,可以调用ScannerContext提供的流stream_来直接获取文件数据
if (num_bytes > 0) {
// 如果指定了要读取的数据量,则通过GetBytes接口获取数据,更新byte_buffer_ptr_和byte_buffer_read_size_
// 对于HdfsTextScanner来说,只有在处理下一个ScanRange的数据时会指定num_bytes为一个较小的固定值,否则不指定
// 因为我们只需要跨ScanRange读取一行数据,不需要扫描太多字节
if (!stream_->GetBytes(num_bytes,
reinterpret_cast<uint8_t**>(&byte_buffer_ptr_), &byte_buffer_read_size_,
&status)) {
DCHECK(!status.ok());
return status;
}
} else {
// 大部分情况下,调用GetBuffer借口获取数据,数据量由IO buffer大小确定
DCHECK_EQ(num_bytes, 0);
RETURN_IF_ERROR(stream_->GetBuffer(false,
reinterpret_cast<uint8_t**>(&byte_buffer_ptr_), &byte_buffer_read_size_));
}
*eosr = stream_->eosr();
} else if (decompressor_->supports_streaming()) {
// 对于支持流式解压的压缩格式,调用FillByteBufferCompressedStream进行流式解压读取解压数据
DCHECK_EQ(num_bytes, 0);
RETURN_IF_ERROR(FillByteBufferCompressedStream(pool, eosr));
} else {
// 对于不支持流式解压的压缩格式,调用FillByteBufferCompressedFile解压完整的文件读取解压数据
DCHECK_EQ(num_bytes, 0);
RETURN_IF_ERROR(FillByteBufferCompressedFile(eosr));
}
// 最后更新一下缓冲区尾指针
byte_buffer_end_ = byte_buffer_ptr_ + byte_buffer_read_size_;
return Status::OK();
}
扫描的主体逻辑是由刚才在GetNextInternal
函数中看见的ProcessRange
完成的,这是一个很长也很复杂的函数:
Status HdfsTextScanner::ProcessRange(RowBatch* row_batch, int* num_tuples) {
DCHECK(scan_state_ == FIRST_TUPLE_FOUND || scan_state_ == PAST_SCAN_RANGE);
MemPool* pool = row_batch->tuple_data_pool();
// stream_->eosr()为true说明当前ScanRange的数据已经全部读取完了
// scan_state_ == PAST_SCAN_RANGE说明当前正在准备跨ScanRange处理最后一行数据
bool eosr = stream_->eosr() || scan_state_ == PAST_SCAN_RANGE;
// 扫描数据的主循环
while (true) {
// !eosr说明我们正在正常的扫描数据中
// byte_buffer_ptr_ == byte_buffer_end_说明当前缓冲区数据已经消费完毕
// 这种情况我们调用FillByteBufferWrapper获取下一个数据缓冲区
if (!eosr && byte_buffer_ptr_ == byte_buffer_end_) {
RETURN_IF_ERROR(FillByteBufferWrapper(pool, &eosr));
}
// AddRow函数会返回行批已有行数再加1,也就是下一行的行号
// 调用GetRow获取下一个TupleRow的写入位置,注意区别与Tuple的写入位置不是一回事
// TupleRow当前可以简单的理解为Tuple*,并不保存具体数据,而是指向保存具体数据的Tuple
TupleRow* tuple_row_mem = row_batch->GetRow(row_batch->AddRow());
// 使用行批容量capacity减去已有行数num_rows,计算行批还能容纳的行数max_tuples
int max_tuples = row_batch->capacity() - row_batch->num_rows();
// 如果是PAST_SCAN_RANGE,也就是准备跨ScanRange处理最后一行数据的状态
// 那么最大行数直接设置为1,同时eosr肯定已经达到了
if (scan_state_ == PAST_SCAN_RANGE) {
// 这种情况下我们肯定是从FinishScanRange函数中调用的ProcessRange
// 数据缓冲区已经在FinishScanRange中获取过了
max_tuples = 1;
eosr = true;
}
// 重置已读取读取行数num_tuples和字段数num_fields
*num_tuples = 0;
int num_fields = 0;
DCHECK_GT(max_tuples, 0);
// 记录这一批数据开始的位置和下一个字段开始的位置
batch_start_ptr_ = byte_buffer_ptr_;
char* col_start = byte_buffer_ptr_;
{
// 调用DelimitedTextParser解析字段位置,解析结果被存入field_locations_
// field_locations_的类型是vector<FieldLocation>,FieldLocation类记录了一个字段的位置
// 具体包括字段的起始地址和长度,这会在后面的转换和物化过程中使用
SCOPED_TIMER(parse_delimiter_timer_);
RETURN_IF_ERROR(delimited_text_parser_->ParseFieldLocations(max_tuples,
byte_buffer_end_ - byte_buffer_ptr_, &byte_buffer_ptr_,
row_end_locations_.data(), field_locations_.data(), num_tuples,
&num_fields, &col_start));
}
// 完成解析后接下来就是将数据物化进行批了,也就是根据字段位置信息将数据从数据缓冲区转换复制到Tuple中
// 这一阶段还要进行谓词计算过滤掉不满足条件的行,最后得到的物化行数num_tuples_materialized才是写入RowBatch的行数量
int num_tuples_materialized = 0;
if (scan_node_->materialized_slots().size() != 0 &&
(num_fields > 0 || *num_tuples > 0)) {
// 进入该if分支说明扫描是有物化槽的,且我们至少解析出了一行或一个字段,此时需要根据解析结果物化数据
DCHECK_LE(*num_tuples, num_fields + 1);
if (!boundary_column_.IsEmpty()) {
// boundary_column_是用于保存不完整的字段数据的一个字符串缓冲区,当上一次解析有不完整的字段时会将其保存
// 因此若其中有数据,需要调用CopyBoundaryField将其拷贝到这次解析的字段中的第一个字段前,形成完整的字段
RETURN_IF_ERROR(CopyBoundaryField(field_locations_.data(), pool));
boundary_column_.Clear();
}
// 调用WriteFields将解析的数据写入Tuple,然后进行谓词计算评估,通过评估的Tuple会被TupleRow引用从而加入RowBatch
num_tuples_materialized = WriteFields(num_fields, *num_tuples, pool, tuple_row_mem);
DCHECK_GE(num_tuples_materialized, 0);
// 检查WriteFields调用过程中是否出现了文本数据转换错误
RETURN_IF_ERROR(parse_status_);
if (*num_tuples > 0) {
// 与boundary_column_类似,boundary_row_是用于保存不完整的行数据的一个字符串缓冲区,在处理最后一行时可能用到
// 这里如果我们解析出了至少一行(*num_tuples > 0),说明之前的不完整的行已经处理了,需要清空boundary_row_
boundary_row_.Clear();
}
} else if (*num_tuples != 0) {
// 进入该分支说明扫描没有物化槽,且我们至少解析出了一行,此时我们不需要物化解析结果的数据
SCOPED_TIMER(scan_node_->materialize_tuple_timer());
boundary_row_.Clear();
// 调用WriteTemplateTuples来使用模板元组填充行批,WriteTemplateTuples是HdfsScanner提供的
// 模板元组在Open行数中已经介绍过了,可以理解为只有分区列数据的元组
// 类似count(*)等没有物化槽的查询的情况下,我们只需要让行批中每个TupleRow都引用模板元组就行了
num_tuples_materialized = WriteTemplateTuples(tuple_row_mem, *num_tuples);
}
// 使用num_tuples更新一下总读取行数的计数器
COUNTER_ADD(scan_node_->rows_read_counter(), *num_tuples);
// DelimitedTextParser解析过程中会推进col_start和byte_buffer_ptr_,分别是列起始位置和当前解析到的位置
// 解析完一个列或者说字段时,col_start就会推进到byte_buffer_ptr_的位置
// 当它们不相等时,说明解析停在了一个列的中间,也就是说该列跨过了数据缓冲区边界,有一部分在下一个缓冲区中
// 若ReturnCurrentColumn为true,说明这个不完整的列是扫描所需的(物化槽之一),此时需要保存这些边界数据
if (col_start != byte_buffer_ptr_ && delimited_text_parser_->ReturnCurrentColumn()) {
DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_);
// 调用Append将边界字段的数据追加到边界列缓冲区
RETURN_IF_ERROR(boundary_column_.Append(col_start, byte_buffer_ptr_ - col_start));
// 然后我们还需要确定最后一行的起始位置,因为有不完整的列,所以最后一行肯定也是不完整的,需要保存
char* last_row = nullptr;
if (*num_tuples == 0) {
// 如果一行都没有解析完成,那最后一行起始位置就是数据缓冲区的起始位置
last_row = batch_start_ptr_;
} else {
// 若有解析完成的行,那最后一行起始位置就是最后一完整行结束位置的下一个字符位置
// row_end_locations_记录了每一个完整行的结束位置,由DelimitedTextParser解析时维护
last_row = row_end_locations_[*num_tuples - 1] + 1;
}
// 调用Append将边界行的数据追加到边界行缓冲区
RETURN_IF_ERROR(boundary_row_.Append(last_row, byte_buffer_ptr_ - last_row));
}
// 这一批数据缓冲区中的数据处理完成,调用CommitRows将物化的行数提交,RowBatch会推进行计数
RETURN_IF_ERROR(CommitRows(num_tuples_materialized, row_batch));
// scan_state_ == PAST_SCAN_RANGE说明我们正在处理最后一行,且已经处理完成,可以退出循环
if (scan_state_ == PAST_SCAN_RANGE) break;
// 如果过程中已当前ScanRange内数据已经全部处理完毕,则推进scan_state_状态为PAST_SCAN_RANGE
// 然后可以退出循环,准备跨ScanRange处理最后一行
if (byte_buffer_ptr_ == byte_buffer_end_ && eosr) {
scan_state_ = PAST_SCAN_RANGE;
break;
}
// 当前RowBatch容量已满,或者已经达到ScanNode的limit限制时,都可以停止解析退出循环
if (row_batch->AtCapacity() || scan_node_->ReachedLimitShared()) break;
}
return Status::OK();
}
自此扫描的主体逻辑就分析完了,代码比较多也比较绕,但是再结合一下边界行的处理逻辑一起看就会清晰不少。
为了实现文件的分块并行扫描,扫描器必须跳过自身负责的ScanRange
范围内头部很可能出现的不完整的数据行,当然也有可能恰好是完整的(这里说的完整还包括行末尾的行分隔符),但是扫描器在理论上是无法区分第一行是否完整的。因此,约定除了负责第一个ScanRange
之外的每个扫描器都从第一个行分隔符之后开始扫描,跳过这之前的数据。同时,扫描器除了处理本身负责的ScanRange
内的数据之外还需要跨ScanRange
处理一行数据,这个约定能够保证文件中的每一行数据都不遗漏不重复的扫描。这部分代码主要由两个函数实现,其实之前已经在GetNextInternal
中见过了:
// FindFirstTuple负责跳过ScanRange范围内头部可能出现的不完整的数据行
Status HdfsTextScanner::FindFirstTuple(MemPool* pool) {
DCHECK_EQ(scan_state_, SCAN_RANGE_INITIALIZED);
bool tuple_found = true;
// scan_range的offset为0说明是第一个ScanRange,此时我们只需要跳过所有header line(若有)
// 否则就像之前说的,跳过一行,也就是跳到第一个行分隔符之后
int num_rows_to_skip = stream_->scan_range()->offset() == 0
? scan_node_->skip_header_line_count() : 1;
if (num_rows_to_skip > 0) {
int num_skipped_rows = 0;
bool eosr = false;
tuple_found = false;
do {
// 循环处理直到找到第一个元组的开始位置或ScanRange结束了,首先获取一批数据
RETURN_IF_ERROR(FillByteBufferWrapper(nullptr, &eosr));
delimited_text_parser_->ParserReset();
SCOPED_TIMER(parse_delimiter_timer_);
int64_t next_tuple_offset = 0;
int64_t bytes_left = byte_buffer_read_size_;
while (num_skipped_rows < num_rows_to_skip) {
// 循环调用DelimitedTextParser的FindFirstInstance来寻找下一个元组的开始位置
// 直到我们跳过了足够多的行,FindFirstInstance未找到元组时会返回-1
next_tuple_offset = delimited_text_parser_->FindFirstInstance(byte_buffer_ptr_,
bytes_left);
if (next_tuple_offset == -1) break;
byte_buffer_ptr_ += next_tuple_offset;
bytes_left -= next_tuple_offset;
++num_skipped_rows;
}
if (next_tuple_offset != -1) tuple_found = true;
} while (!tuple_found && !eosr);
// 这里处理一个特殊情况,如果我们找到了第一个元组,同时恰好消费完了这批数据
// 那说明行分隔符恰好位于当前缓冲区的末尾,则那它可能是一个\r\n分隔符,需要额外处理
if (tuple_found && byte_buffer_ptr_ == byte_buffer_end_) {
bool split_delimiter;
// 调用CheckForSplitDelimiter检查是否有"\r\n"分隔符跨缓冲区的情况
// 它会利用FillByteBufferWrapper保存的缓冲区最后一个字节和下一个字节做判断
// 当然如果文件使用的行分隔符不是'\n'的话,split_delimiter会始终返回false
RETURN_IF_ERROR(CheckForSplitDelimiter(&split_delimiter));
if (split_delimiter) {
if (eosr) {
// 这种情况下"\r\n"分隔符跨过当前ScanRange末尾,根据约定我们认为它属于下一个ScanRange
// 所以此处我们报告没有找到元组
tuple_found = false;
} else {
// "\r\n"分隔符跨缓冲区但是没有跨过ScanRange,这种情况下我们读取下一个缓冲区并跳过一个字符
// 即跳过下一个缓冲区开头的'\n'
RETURN_IF_ERROR(FillByteBufferWrapper(pool, &eosr));
DCHECK_GT(byte_buffer_read_size_, 0);
DCHECK_EQ(*byte_buffer_ptr_, '\n');
byte_buffer_ptr_ += 1;
}
}
}
// 一种罕见的情况是消费了整个ScanRange也没跳完头部行,可能是设置了过小的max_scan_range_length
if (num_rows_to_skip > 1 && num_skipped_rows != num_rows_to_skip) {
DCHECK(!tuple_found);
stringstream ss;
ss << "Could only skip " << num_skipped_rows << " header lines in first scan range "
<< "but expected " << num_rows_to_skip << ". Try increasing "
<< "max_scan_range_length to a value larger than the size of the file's header.";
return Status(ss.str());
}
}
// 顺利找到第一个元组开始的位置,推进scan_state_到FIRST_TUPLE_FOUND
if (tuple_found) scan_state_ = FIRST_TUPLE_FOUND;
DCHECK(delimited_text_parser_->AtTupleStart());
return Status::OK();
}
// FinishScanRange负责跨ScanRange再处理一行数据
Status HdfsTextScanner::FinishScanRange(RowBatch* row_batch) {
DCHECK(!row_batch->AtCapacity());
DCHECK_EQ(byte_buffer_ptr_, byte_buffer_end_);
// 同样先调用CheckForSplitDelimiter检查是否有"\r\n"分隔符跨缓冲区的情况
bool split_delimiter;
RETURN_IF_ERROR(CheckForSplitDelimiter(&split_delimiter));
if (split_delimiter) {
// 这种情况下"\r\n"分隔符恰好跨过ScanRange末尾,根据约定在这之后的数据由下一ScanRange负责
// 因此最后一行仅有一个'\n'跨过了ScanRange末尾,其他数据已经在之前处理完毕了
// 此处只需要推进scan_state_到DONE和做一些DEBUG的检查
DCHECK(!delimited_text_parser_->HasUnfinishedTuple());
DCHECK(partial_tuple_ == nullptr);
DCHECK(boundary_column_.IsEmpty());
DCHECK(boundary_row_.IsEmpty());
scan_state_ = DONE;
return Status::OK();
}
// 循环直到我们处理完最后一行
while (true) {
DCHECK_EQ(scan_state_, PAST_SCAN_RANGE);
bool eosr = true;
Status status = Status::OK();
byte_buffer_read_size_ = 0;
// 非压缩文件且还没到文件尾部(end of file)的情况下,我们读取一小段数据
// NEXT_BLOCK_READ_SIZE是固定的长度64KB,一般来说足够覆盖临界行了
// 不够的话会在下一个循环再读取同样的长度
if (decompressor_.get() == nullptr && !stream_->eof()) {
status =
FillByteBufferWrapper(row_batch->tuple_data_pool(), &eosr, NEXT_BLOCK_READ_SIZE);
}
// 接下来的if有很长一段代码,是处理数据读取失败或没有读取到更多数据的情况的,篇幅有限就此省略
if (!status.ok() || byte_buffer_read_size_ == 0) {
...
}
DCHECK(eosr);
// 调用ProcessRange处理了一行数据后就完成任务了,可以退出循环
int num_tuples;
RETURN_IF_ERROR(ProcessRange(row_batch, &num_tuples));
if (num_tuples == 1) break;
DCHECK_EQ(num_tuples, 0);
}
DCHECK(boundary_column_.IsEmpty()) << "Must finish processing boundary column";
scan_state_ = DONE;
return Status::OK();
}
至此,最复杂的边界行处理的逻辑就分析完成了,整个HdfsTextScanner
的代码也分析得差不多了,我们最后再看看它到底具体是怎么物化数据的。
在扫描的主题逻辑中,我们调用了WriteFields
来根据解析器解析出的字段位置信息FieldLocation
将数据缓冲区中的数据物化到RowBatch
。这也是一个十分关键的函数,我们具体分析一下:
// WriteFields也是一个比较复杂的函数,它的任务是就是根据字段位置信息将数据缓冲区中的数据物化到RowBatch
int HdfsTextScanner::WriteFields(int num_fields, int num_tuples, MemPool* pool,
TupleRow* row) {
SCOPED_TIMER(scan_node_->materialize_tuple_timer());
DCHECK(boundary_column_.IsEmpty());
FieldLocation* fields = field_locations_.data();
int num_tuples_processed = 0;
int num_tuples_materialized = 0;
// slot_idx_记录了下一个要物化的槽的索引,它不为0说明上一次的物化有不完整的行,我们需要继续处理
if (slot_idx_ != 0) {
DCHECK(tuple_ != nullptr);
// 计算一下本次在不完整行中可以物化的字段数
int num_partial_fields = scan_node_->materialized_slots().size() - slot_idx_;
num_partial_fields = min(num_partial_fields, num_fields);
// 调用WritePartialTuple物化不完整的行,代码不多,我们在下文分析它
WritePartialTuple(fields, num_partial_fields);
// slot_idx_等于物化槽位数量说明不完整行的所有所需字段已经物化完整
// num_tuples大于0,说明这批物化数据中至少有一个行分隔符,也就是说不完整行所有的字段也解析完毕了
// 也就是这一行已经就绪可以加入RowBatch了
if (slot_idx_ == scan_node_->materialized_slots().size() && num_tuples > 0) {
// 检查一下物化过程中有无错误
if (UNLIKELY(error_in_row_)) {
if (state_->abort_on_error()) {
parse_status_ = Status(state_->ErrorLog());
} else {
LogRowParseError();
}
if (!parse_status_.ok()) return 0;
error_in_row_ = false;
}
// 调用CopyAndClearPartialTuple将刚处理完的PartialTuple的数据拷贝到tuple_中
CopyAndClearPartialTuple(pool);
// 令RowBatch的当前TupleRow引用或者说指向tuple_
row->SetTuple(0, tuple_);
// 重置slot_idx_,更新一下计数器
slot_idx_ = 0;
++num_tuples_processed;
--num_tuples;
// 调用EvalConjuncts对刚才物化的行进行谓词评估,通过评估则物化行数正式+1
// 同时更新tuple_和row到下一个写入位置
if (EvalConjuncts(row)) {
++num_tuples_materialized;
tuple_ = next_tuple(tuple_byte_size_, tuple_);
row = next_row(row);
}
}
// 更新一下剩余待物化字段数和下一个字段位置的指针
num_fields -= num_partial_fields;
fields += num_partial_fields;
}
// 然后处理完整的行(若还有)
if (num_tuples > 0) {
// 如果字符串槽可能引用原始的I/O数据缓冲区,那需要进行字符串复制
// 压缩文件有压缩数据缓冲区由扫描器本身持有,所以扫描压缩文件不需要字符串复制
const bool copy_strings = !string_slot_offsets_.empty() &&
stream_->file_desc()->file_compression == THdfsCompression::NONE;
// 计算一下最大物化的行数,受到现有行数和扫描结点的limit限制
int max_added_tuples = (scan_node_->limit() == -1) ?
num_tuples :
scan_node_->limit() - scan_node_->rows_returned_shared();
if (write_tuples_fn_ != nullptr) {
// write_tuples_fn_是Codegen版本的WriteAlignedTuples函数
// 如果有支持转移字符的字符串槽则不支持Codegen,此处做一个Debug检查
DCHECK(scan_node_->tuple_desc()->string_slots().empty() ||
delimited_text_parser_->escape_char() == '\0');
}
// WriteAlignedTuplesCodegenOrInterpret是一个用来实现无缝切换Codegen函数调用的工具函数
// 由于Codegen编译是异步的,在其完成之前这里会先调用HdfsScanner::WriteAlignedTuples函数
// Codegen完成后这里改为调用Codegen版本的WriteAlignedTuples函数write_tuples_fn_
// WriteAlignedTuples函数作用是批量写入对齐的字段到物化槽,也就是这里没有不完整行的情况
// 也是物化数据过程中主要的执行路径,函数会返回物化且通过谓词评估的行的数量
// 它会不断调用WriteCompleteTuple写入完整的行,直到达到限制或无完整行
int tuples_returned = WriteAlignedTuplesCodegenOrInterpret(pool, row, fields,
num_tuples, max_added_tuples, scan_node_->materialized_slots().size(),
num_tuples_processed, copy_strings);
// 一批数据写完后进行检查并更新一下计数器和指针
if (tuples_returned == -1) return 0;
DCHECK_EQ(slot_idx_, 0);
num_tuples_materialized += tuples_returned;
num_fields -= num_tuples * scan_node_->materialized_slots().size();
fields += num_tuples * scan_node_->materialized_slots().size();
}
// 到了这一步,我们可能还剩下一些尾部不完整行的部分字段,进行一下检查
DCHECK_GE(num_fields, 0);
DCHECK_LE(num_fields, scan_node_->materialized_slots().size());
// 物化这些不完整行的字段,产生部分物化的元组也就是PartialTuple,它会在下一次WriteFields中处理
if (num_fields != 0) {
DCHECK(tuple_ != nullptr);
// 创建一个部分物化的元组,并用模板元组初始化它
partial_tuple_ = Tuple::Create(tuple_byte_size_, boundary_pool_.get());
InitTuple(template_tuple_, partial_tuple_);
// 调用WritePartialTuple部分物化这一不完整的行
WritePartialTuple(fields, num_fields);
}
DCHECK_LE(slot_idx_, scan_node_->materialized_slots().size());
return num_tuples_materialized;
}
void HdfsTextScanner::WritePartialTuple(FieldLocation* fields, int num_fields) {
// 循环处理字段
for (int i = 0; i < num_fields; ++i) {
// 在FieldLocation中规定,字段长度的正负号标识是否需要字符串转义
bool need_escape = false;
int len = fields[i].len;
if (len < 0) {
len = -len;
need_escape = true;
}
// 获取下一个要物化的槽的描述符SlotDescriptor和列的字符串编码类型AuxColumnType
const SlotDescriptor* slot_desc = scan_node_->materialized_slots()[slot_idx_];
const AuxColumnType& aux_type =
scan_node_->hdfs_table()->GetColumnDesc(slot_desc).auxType();
// 调用TextConverter的WriteSlot来写入一个槽,其中涉及了文本数据解析和类型转换
// WriteCompleteTuple实际上也是通过这个函数完成物化槽的写入的,也是一个十分核心的函数
if (!text_converter_->WriteSlot(slot_desc, &aux_type, partial_tuple_,
fields[i].start, len, true, need_escape, boundary_pool_.get())) {
// 有写入错误时报告错误
ReportColumnParseError(slot_desc, fields[i].start, len);
error_in_row_ = true;
}
// 移动slot_idx_到下一个待写入槽
++slot_idx_;
}
}
分析完了WriteFields
,整个HdfsTextScanner
的代码也就还剩一些压缩格式的处理和一些辅助函数了,篇幅有限本文就不再过多赘述。
只是分析完HdfsTextScanner
本身的代码(还略去不少),本文的篇幅就已经足够长了,在很多通用的扫描逻辑的支持下HdfsTextScanner
本身的大部分代码实际上是在实现分块扫描这一能力,如果不支持这一能力而让一个扫描器负责一整个文件的话能实现起来会简单很多(性能也差很多就是了)。不过想要完整地理解文本数据整个扫描过程,我们至少还需要再学习了解一下DelimitedTextParser
类、TextConverter
类和HdfsScanner
类。管中窥豹可见一斑,Impala代码里还有很多东西值得学习,仅是一个文本的扫描器就已经有很多门道了,有机会的话我们后续再深入分析其他代码。