MERGE INTO target_table t
USING source_table s
ON s.id = t.id //这里是JOIN的关联条件
WHEN MATCHED AND s.opType = 'delete' THEN DELETE // WHEN条件是对当前行进行打标的匹配条件
WHEN MATCHED AND s.opType = 'update' THEN UPDATE SET id = s.id, name = s.name
WHEN NOT MATCHED AND s.opType = 'insert' THEN INSERT (key, value) VALUES (key, value)
如上是一条MERGE INTO语句,经过Spark Analyzer解析时,会发现它是MERGE INTO
命令,因此将解析target_table
对应生成的SparkTable实例
封装成RowLevelOperationTable
的实例,它会绑定一个SparkCopyOnWriteOperation
的实例,并且实现了创建ScanBuilder
和WriteBuilder
的方法。
ScanBuilder和WriteBuilder是Spark中定义的接口,分别用于构建读数据器(Scan)和写数据器(BatchWrite)。
Iceberg基于Spark 3.x提供的外部Catalog及相关的读写接口,实现了对于Iceberg表(存储格式)的数据读写。
下面以SparkCopyOnWriteOperation
跟踪分析如何利用Spark写出数据为Iceberg表格式。
Iceberg行级更新的操作,目前支持UPDATE / DELETE / MERGE INTO三个语法。
public class SparkTable
implements org.apache.spark.sql.connector.catalog.Table, // 继承自Spark的接口
SupportsRead,
SupportsWrite,
SupportsDelete, // 支持删除
SupportsRowLevelOperations, // 支持行级的数据更新
SupportsMetadataColumns {
private final Table icebergTable;
private final Long snapshotId;
private final boolean refreshEagerly;
private final Set<TableCapability> capabilities;
private String branch;
private StructType lazyTableSchema = null;
private SparkSession lazySpark = null;
public SparkTable(Table icebergTable, Long snapshotId, boolean refreshEagerly) {
this.icebergTable = icebergTable;
this.snapshotId = snapshotId;
this.refreshEagerly = refreshEagerly;
boolean acceptAnySchema =
PropertyUtil.propertyAsBoolean(
icebergTable.properties(),
TableProperties.SPARK_WRITE_ACCEPT_ANY_SCHEMA,
TableProperties.SPARK_WRITE_ACCEPT_ANY_SCHEMA_DEFAULT);
this.capabilities = acceptAnySchema ? CAPABILITIES_WITH_ACCEPT_ANY_SCHEMA : CAPABILITIES;
}
/**
* 该表支持读取,因此实现了此方法返回一个ScanBuilder实例
*/
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
if (options.containsKey(SparkReadOptions.FILE_SCAN_TASK_SET_ID)) {
// skip planning the job and fetch already staged file scan tasks
// 如果设置了此参数,则会在读取数据后,将此次生成的Iceberg ScanTasks缓存在本地进程中的ScanTaskSetManager实例里,
// 后面再对同相同的FileSet集合(或scan file的任务集合)构建时,可以避免重复构建任务集,
// 起到缓存的作用
return new SparkFilesScanBuilder(sparkSession(), icebergTable, options);
}
if (options.containsKey(SparkReadOptions.SCAN_TASK_SET_ID)) {
// 作用同上
return new SparkStagedScanBuilder(sparkSession(), icebergTable, options);
}
if (refreshEagerly) {
icebergTable.refresh();
}
// 可以支持基于branch或是基于SnapshotId创建SparkTable
// 如果基于SnapshotID,则需要显示地解析SnapshotId归属的branch
CaseInsensitiveStringMap scanOptions =
branch != null ? options : addSnapshotId(options, snapshotId);
return new SparkScanBuilder(
sparkSession(), icebergTable, branch, snapshotSchema(), scanOptions);
}
/**
* 该表支持写,因此实现了此方法返回一个WriteBuilder实例
*/
@Override
public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
Preconditions.checkArgument(
snapshotId == null, "Cannot write to table at a specific snapshot: %s", snapshotId);
return new SparkWriteBuilder(sparkSession(), icebergTable, branch, info);
}
假设我们有如下配置,定义了一个新的用户自定义的catalog,其name为iceberg
。并通过spark.sql.catalog.iceberg
指定了这个catalog的实现类org.apache.iceberg.spark.SparkCatalog
,其类型为hive
(意味着会在Iceberg的侧使用HiveCatalog
解析库、表),meta存储地址为thrift://metastore-host:port
。
spark.sql.catalog.iceberg = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.iceberg.type = hive
spark.sql.catalog.iceberg.uri = thrift://metastore-host:port
当我们执行SELECT * FROM iceberg.test_tbl
时,在SQL解析过程中,会通过如下的过程来解析CatalogName和TableName,并创建对应的Catalog
和Table
实例,即对应Iceberg中的实现类SparkCatalog
和SparkTable
。
/**
* 如果当前Plan是还未解析的表视图或是表,或是INSERT INTO语句,则应用此Rule,
* 查看绑定的目标表名是否是SQL层级的临时视图或是Session级别的全局视图表名,最终返回一个新的SubqueryAlias的实例
*/
object ResolveTempViews extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
case u @ UnresolvedRelation(ident) =>
lookupTempView(ident).getOrElse(u)
case i @ InsertIntoStatement(UnresolvedRelation(ident), _, _, _, _) =>
lookupTempView(ident)
.map(view => i.copy(table = view))
.getOrElse(i)
case u @ UnresolvedTable(ident) =>
lookupTempView(ident).foreach { _ =>
u.failAnalysis(s"${ident.quoted} is a temp view not table.")
}
u
case u @ UnresolvedTableOrView(ident) =>
lookupTempView(ident).map(_ => ResolvedView(ident.asIdentifier)).getOrElse(u)
}
def lookupTempView(identifier: Seq[String]): Option[LogicalPlan] = {
// Permanent View can't refer to temp views, no need to lookup at all.
if (isResolvingView) return None
identifier match {
case Seq(part1) => v1SessionCatalog.lookupTempView(part1)
case Seq(part1, part2) => v1SessionCatalog.lookupGlobalTempView(part1, part2)
case _ => None
}
}
}
/**
* Resolve table relations with concrete relations from v2 catalog.
*
* [[ResolveRelations]] still resolves v1 tables.
*/
object ResolveTables extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = ResolveTempViews(plan).resolveOperatorsUp {
case u: UnresolvedRelation =>
lookupV2Relation(u.multipartIdentifier)
.map { rel =>
val ident = rel.identifier.get
SubqueryAlias(rel.catalog.get.name +: ident.namespace :+ ident.name, rel)
}.getOrElse(u)
case u @ UnresolvedTable(NonSessionCatalogAndIdentifier(catalog, ident)) =>
// NonSessionCatalogAndIdentifier的unapply方法,会尝试解析catalog,并通过Spark.CatalogManager::catalog(name)解析并构建Catalog实例
// 这里就是一个Iceberg中定义的SparkCatalog实现类,然后通过工具类的方法加载表,并创建SparkTable实例。
CatalogV2Util.loadTable(catalog, ident)
.map(ResolvedTable(catalog.asTableCatalog, ident, _))
.getOrElse(u)
case u @ UnresolvedTableOrView(NonSessionCatalogAndIdentifier(catalog, ident)) =>
CatalogV2Util.loadTable(catalog, ident)
.map(ResolvedTable(catalog.asTableCatalog, ident, _))
.getOrElse(u)
case i @ InsertIntoStatement(u: UnresolvedRelation, _, _, _, _) if i.query.resolved =>
lookupV2Relation(u.multipartIdentifier)
.map(v2Relation => i.copy(table = v2Relation))
.getOrElse(i)
case alter @ AlterTable(_, _, u: UnresolvedV2Relation, _) =>
CatalogV2Util.loadRelation(u.catalog, u.tableName)
.map(rel => alter.copy(table = rel))
.getOrElse(alter)
case u: UnresolvedV2Relation =>
CatalogV2Util.loadRelation(u.catalog, u.tableName).getOrElse(u)
}
/**
* Performs the lookup of DataSourceV2 Tables from v2 catalog.
*/
private def lookupV2Relation(identifier: Seq[String]): Option[DataSourceV2Relation] =
expandRelationName(identifier) match {
case NonSessionCatalogAndIdentifier(catalog, ident) =>
CatalogV2Util.loadTable(catalog, ident) match {
case Some(table) =>
Some(DataSourceV2Relation.create(table, Some(catalog), Some(ident)))
case None => None
}
case _ => None
}
}
SparkCatalog
由于继承自TableCatalog
,因此拥有 Table loadTable(Identifier ident) throws NoSuchTableException
方法,在Spark内部进行SQL解析时,可以调用此方法,生成用户自定义的Table实例。
SparkCatalog定位于Spark与Iceberg之前的桥梁,最终的实现效果是将某个Catalog的解析并创建表的任务,路由给Iceberg中的Catalog实现,例如HiveCatalog
。
/**
* 实现了Spark中的如下接口:
* public interface TableCatalog extends CatalogPlugin
* 可以在SQL解析过程时,应用`ResolveTables`规则时,通过Catalog + Identifier创建
*/
public class SparkCatalog extends BaseCatalog {
/**
* 由于继承自CatalogPlugin接口类,因此需要重写initialize(...)方法,以初始化SparkCatalog实例
*/
@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
this.cacheEnabled =
PropertyUtil.propertyAsBoolean(
options, CatalogProperties.CACHE_ENABLED, CatalogProperties.CACHE_ENABLED_DEFAULT);
long cacheExpirationIntervalMs =
PropertyUtil.propertyAsLong(
options,
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS,
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT);
// An expiration interval of 0ms effectively disables caching.
// Do not wrap with CachingCatalog.
if (cacheExpirationIntervalMs == 0) {
this.cacheEnabled = false;
}
// 创建Iceberg支持的catalog实例,一共支持如下几个
// public static final String ICEBERG_CATALOG_TYPE_HADOOP = "hadoop";
// public static final String ICEBERG_CATALOG_TYPE_HIVE = "hive";
// public static final String ICEBERG_CATALOG_TYPE_REST = "rest";
// public static final String ICEBERG_CATALOG_HADOOP = "org.apache.iceberg.hadoop.HadoopCatalog";
// public static final String ICEBERG_CATALOG_HIVE = "org.apache.iceberg.hive.HiveCatalog";
// public static final String ICEBERG_CATALOG_REST = "org.apache.iceberg.rest.RESTCatalog";
// 默认情况下,我们创建的是HiveCatalog,后续调用loadTable(...)方法创建SparkTable时,则通过HiveCatalog::loadTable(name)方法生成
Catalog catalog = buildIcebergCatalog(name, options);
this.catalogName = name;
SparkSession sparkSession = SparkSession.active();
this.useTimestampsWithoutZone =
SparkUtil.useTimestampWithoutZoneInNewTables(sparkSession.conf());
this.tables =
new HadoopTables(SparkUtil.hadoopConfCatalogOverrides(SparkSession.active(), name));
this.icebergCatalog =
cacheEnabled ? CachingCatalog.wrap(catalog, cacheExpirationIntervalMs) : catalog;
// 支持通过参数的方式,指定默认的namespace,默认值为default
if (catalog instanceof SupportsNamespaces) {
this.asNamespaceCatalog = (SupportsNamespaces) catalog;
if (options.containsKey("default-namespace")) {
this.defaultNamespace =
Splitter.on('.').splitToList(options.get("default-namespace")).toArray(new String[0]);
}
}
EnvironmentContext.put(EnvironmentContext.ENGINE_NAME, "spark");
EnvironmentContext.put(
EnvironmentContext.ENGINE_VERSION, sparkSession.sparkContext().version());
EnvironmentContext.put(CatalogProperties.APP_ID, sparkSession.sparkContext().applicationId());
}
@Override
public Table loadTable(Identifier ident) throws NoSuchTableException {
// 基于标识符创建SparkTable实例
try {
return load(ident);
} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(ident);
}
}
@Override
public Table loadTable(Identifier ident, String version) throws NoSuchTableException {
Table table = loadTable(ident);
// ...
}
@Override
public Table loadTable(Identifier ident, long timestamp) throws NoSuchTableException {
Table table = loadTable(ident);
// ...
}
}
public static Catalog buildIcebergCatalog(String name, Map<String, String> options, Object conf) {
String catalogImpl = options.get(CatalogProperties.CATALOG_IMPL);
if (catalogImpl == null) {
String catalogType =
PropertyUtil.propertyAsString(options, ICEBERG_CATALOG_TYPE, ICEBERG_CATALOG_TYPE_HIVE);
switch (catalogType.toLowerCase(Locale.ENGLISH)) {
case ICEBERG_CATALOG_TYPE_HIVE:
catalogImpl = ICEBERG_CATALOG_HIVE;
break;
case ICEBERG_CATALOG_TYPE_HADOOP:
catalogImpl = ICEBERG_CATALOG_HADOOP;
break;
case ICEBERG_CATALOG_TYPE_REST:
catalogImpl = ICEBERG_CATALOG_REST;
break;
default:
throw new UnsupportedOperationException("Unknown catalog type: " + catalogType);
}
} else {
String catalogType = options.get(ICEBERG_CATALOG_TYPE);
Preconditions.checkArgument(
catalogType == null,
"Cannot create catalog %s, both type and catalog-impl are set: type=%s, catalog-impl=%s",
name,
catalogType,
catalogImpl);
}
return CatalogUtil.loadCatalog(catalogImpl, name, options, conf);
}
这里以Hive为例,解析如何加载Custom Catalog
public static Catalog loadCatalog(
String impl, String catalogName, Map<String, String> properties, Object hadoopConf) {
// impl = ICEBERG_CATALOG_HIVE
// catalogName = hive
// properties = spark.sql.catalog.[catalogName].x
// 其中properties指的是catalogName对应的配置选项,是从Spark.SQLConf解析得到的
Preconditions.checkNotNull(impl, "Cannot initialize custom Catalog, impl class name is null");
DynConstructors.Ctor<Catalog> ctor;
try {
// 通过默认的impl名字,通过Refect机制,调用无参的构造函数,生成对应的类的实例
ctor = DynConstructors.builder(Catalog.class).impl(impl).buildChecked();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(
String.format("Cannot initialize Catalog implementation %s: %s", impl, e.getMessage()),
e);
}
Catalog catalog;
try {
catalog = ctor.newInstance();
} catch (ClassCastException e) {
throw new IllegalArgumentException(
String.format("Cannot initialize Catalog, %s does not implement Catalog.", impl), e);
}
configureHadoopConf(catalog, hadoopConf);
// 通过properties,来助力catalog对象的初始化过程
catalog.initialize(catalogName, properties);
return catalog;
}
@Override
public void initialize(String inputName, Map<String, String> properties) {
this.catalogProperties = ImmutableMap.copyOf(properties);
this.name = inputName;
if (conf == null) {
LOG.warn("No Hadoop Configuration was set, using the default environment Configuration");
this.conf = new Configuration();
}
// 解析指定的Hive metastore地址
if (properties.containsKey(CatalogProperties.URI)) {
this.conf.set(HiveConf.ConfVars.METASTOREURIS.varname, properties.get(CatalogProperties.URI));
}
// 解析指定的metastore的工作目录
if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
this.conf.set(
HiveConf.ConfVars.METASTOREWAREHOUSE.varname,
LocationUtil.stripTrailingSlash(properties.get(CatalogProperties.WAREHOUSE_LOCATION)));
}
this.listAllTables =
Boolean.parseBoolean(properties.getOrDefault(LIST_ALL_TABLES, LIST_ALL_TABLES_DEFAULT));
// 解析指定的读写文件的 接口实现类,如果不指定则默认为HadoopFileIO
// 否则加载用户自定义的实现类
String fileIOImpl = properties.get(CatalogProperties.FILE_IO_IMPL);
this.fileIO =
fileIOImpl == null
? new HadoopFileIO(conf)
: CatalogUtil.loadFileIO(fileIOImpl, properties, conf);
// 初始化元数据交互的客户端,使用默认使用HiveClientPool
this.clients = new CachedClientPool(conf, properties);
}
在生成SparkCatalog
时,会根据spark.sql.catalog.iceberg.type
这个配置,知道我们要创建的表在Iceberg中的类型是hive
,因此需要通过HiveCatalog
加载表。
@Override
public Table loadTable(TableIdentifier identifier) {
Table result;
if (isValidIdentifier(identifier)) {
// 对于HiveCatalog来说,所有的identifier都是合法的,因此会通过下面的方法得到对应类型的TableOperations实例
// 在Iceberg世界中,实际上是不存在表的,只是利用表的概念,将TableMetadata进行了抽象,
// 因此Iceberg中的Table都必须绑定一个TableOperations实例,来读取TableMetadata数据
// 例如在这里会对应生成HiveTableOperations
TableOperations ops = newTableOps(identifier);
if (ops.current() == null) {
// the identifier may be valid for both tables and metadata tables
if (isValidMetadataIdentifier(identifier)) {
result = loadMetadataTable(identifier);
} else {
throw new NoSuchTableException("Table does not exist: %s", identifier);
}
} else {
//
result = new BaseTable(ops, fullTableName(name(), identifier), metricsReporter());
}
} else if (isValidMetadataIdentifier(identifier)) {
result = loadMetadataTable(identifier);
} else {
throw new NoSuchTableException("Invalid table identifier: %s", identifier);
}
LOG.info("Table loaded by catalog: {}", result);
return result;
}
MERGE INTO重写时,会为目标表生成一个DataSourceV2Relation的逻辑计划实例,以读取目标表中的相关数据,因此在过滤表达式下推优化时,会同时构建
COW
模式下的Scan
实例。
Scan是Spark中定义的读取数据的接口。
object V2ScanRelationPushDown extends Rule[LogicalPlan] {
import DataSourceV2Implicits._
override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case ScanOperation(project, filters, relation: DataSourceV2Relation) =>
// 调用这里的Table是一个RowLevelOperationTable的实例,同时它绑定了一个SparkCopyOnWriteOperation实例
// 因此底层实际上调用的是SparkCopyOnWriteOperation::newScanBuilder方法
val scanBuilder = relation.table.asReadable.newScanBuilder(relation.options)
val normalizedFilters = DataSourceStrategy.normalizeExprs(filters, relation.output)
val (normalizedFiltersWithSubquery, normalizedFiltersWithoutSubquery) =
normalizedFilters.partition(SubqueryExpression.hasSubquery)
// `pushedFilters` will be pushed down and evaluated in the underlying data sources.
// `postScanFilters` need to be evaluated after the scan.
// `postScanFilters` and `pushedFilters` can overlap, e.g. the parquet row group filter.
val (pushedFilters, postScanFiltersWithoutSubquery) = PushDownUtils.pushFilters(
scanBuilder, normalizedFiltersWithoutSubquery)
val postScanFilters = postScanFiltersWithoutSubquery ++ normalizedFiltersWithSubquery
val normalizedProjects = DataSourceStrategy
.normalizeExprs(project, relation.output)
.asInstanceOf[Seq[NamedExpression]]
// 列裁剪,同时调用scanBuilder.build()方法,生成一个读取具体数据源的Scan实例
val (scan, output) = PushDownUtils.pruneColumns(
scanBuilder, relation, normalizedProjects, postScanFilters)
logInfo(
s"""
|Pushing operators to ${relation.name}
|Pushed Filters: ${pushedFilters.mkString(", ")}
|Post-Scan Filters: ${postScanFilters.mkString(",")}
|Output: ${output.mkString(", ")}
""".stripMargin)
val wrappedScan = scan match {
case v1: V1Scan =>
val translated = filters.flatMap(DataSourceStrategy.translateFilter(_, true))
V1ScanWrapper(v1, translated, pushedFilters)
case _ => scan
}
// 这里生成一个读取数据的逻辑计划
val scanRelation = DataSourceV2ScanRelation(relation.table, wrappedScan, output)
val projectionOverSchema = ProjectionOverSchema(output.toStructType)
val projectionFunc = (expr: Expression) => expr transformDown {
case projectionOverSchema(newExpr) => newExpr
}
val filterCondition = postScanFilters.reduceLeftOption(And)
val newFilterCondition = filterCondition.map(projectionFunc)
val withFilter = newFilterCondition.map(Filter(_, scanRelation)).getOrElse(scanRelation)
val withProjection = if (withFilter.output != project) {
val newProjects = normalizedProjects
.map(projectionFunc)
.asInstanceOf[Seq[NamedExpression]]
Project(newProjects, withFilter)
} else {
withFilter
}
// 返回最终的逻辑计划
withProjection
}
}
从前面我们知道,在Spark进行Filter Pushdown优化时,会调用Table::newScanBuilder
方法构建一个具体的数据描述器(Scan),实际上是会最终调用Iceberg中如下的方法:
class SparkCopyOnWriteOperation implements RowLevelOperation {
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
if (lazyScanBuilder == null) {
lazyScanBuilder =
new SparkScanBuilder(spark, table, branch, options) {
@Override
public Scan build() {
// 构建COW模式的Scan实例
Scan scan = super.buildCopyOnWriteScan();
SparkCopyOnWriteOperation.this.configuredScan = scan;
return scan;
}
};
}
return lazyScanBuilder;
}
}
如下是对SparkCopyOnWriteOperation::buildCopyOnWriteScan
方法的完整定义:
public Scan buildCopyOnWriteScan() {
// table变量,是一个BaseTable实例,因为从Spark的代码流转到Iceberg侧时,使用的都是Iceberg中定义的类
// 这里是从当前表找到最新的Snapshot
Snapshot snapshot = SnapshotUtil.latestSnapshot(table, readConf.branch());
if (snapshot == null) {
return new SparkCopyOnWriteScan(
spark, table, readConf, schemaWithMetadataColumns(), filterExpressions);
}
Schema expectedSchema = schemaWithMetadataColumns();
// Snapshot存在,说明有数据,因此需要生成Scan实例
// default BatchScan newBatchScan() {
// return new BatchScanAdapter(newScan());
// }
// 由于这里的table类型为BaseTable,因此会调用newScan()方法生成DataTableScan的实例,而BatchScan则是一个代理类
BatchScan scan =
table
.newBatchScan()
.useSnapshot(snapshot.snapshotId())
.ignoreResiduals()
.caseSensitive(caseSensitive)
.filter(filterExpression())
.project(expectedSchema);
scan = configureSplitPlanning(scan);
// 返回一个实现了Spark中的Scan接口的实例
return new SparkCopyOnWriteScan(
spark, table, scan, snapshot, readConf, expectedSchema, filterExpressions);
}
我们知道SparkCopyOnWriteScan实现的Spark中的Scan接口,而Scan是一个逻辑上的数据读取器,就像逻辑计划那样,因此还需要通过它的Scan::toBatch方法,创建一个直接可执行的实体类对象
class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask>
@Override
public Batch toBatch() {
// 返回一个Spark可操作的Batch实例,负责对待读取的数据划分Batches
// 注意这里在创建SparkBatch实例时,taskGroups()的调用,这个方法实际上是调用Iceberg的接口,搜索此次Scan任务需要读取的所有数据。
return new SparkBatch(
sparkContext, table, readConf, groupingKeyType(), taskGroups(), expectedSchema, hashCode());
}
}
public abstract class SnapshotScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
@Override
public CloseableIterable<T> planFiles() {
// 获取要读取的Snapshot
Snapshot snapshot = snapshot();
if (snapshot == null) {
LOG.info("Scanning empty table {}", table());
return CloseableIterable.empty();
}
LOG.info(
"Scanning table {} snapshot {} created at {} with filter {}",
table(),
snapshot.snapshotId(),
DateTimeUtil.formatTimestampMillis(snapshot.timestampMillis()),
ExpressionUtil.toSanitizedString(filter()));
Listeners.notifyAll(new ScanEvent(table().name(), snapshot.snapshotId(), filter(), schema()));
List<Integer> projectedFieldIds = Lists.newArrayList(TypeUtil.getProjectedIds(schema()));
List<String> projectedFieldNames =
projectedFieldIds.stream().map(schema()::findColumnName).collect(Collectors.toList());
Timer.Timed planningDuration = scanMetrics().totalPlanningDuration().start();
return CloseableIterable.whenComplete(
doPlanFiles(), // doPlanFiles()方法会通过Iceberg的接口,搜索所有要读取的data文件和delete文件
() -> {
planningDuration.stop();
Map<String, String> metadata = Maps.newHashMap(context().options());
metadata.putAll(EnvironmentContext.get());
ScanReport scanReport =
ImmutableScanReport.builder()
.schemaId(schema().schemaId())
.projectedFieldIds(projectedFieldIds)
.projectedFieldNames(projectedFieldNames)
.tableName(table().name())
.snapshotId(snapshot.snapshotId())
.filter(ExpressionUtil.sanitize(filter()))
.scanMetrics(ScanMetricsResult.fromScanMetrics(scanMetrics()))
.metadata(metadata)
.build();
context().metricsReporter().report(scanReport);
});
}
}
SparkBatch继承自Spark中的Batch接口
class SparkBatch implements Batch {
private final JavaSparkContext sparkContext;
private final Table table;
private final String branch;
private final SparkReadConf readConf;
private final Types.StructType groupingKeyType;
// 保存了由SparkCopyOnWriteScan::taskGroups()方法生成的所有要读取的Iceberg管理的data文件和delete文件,
// 这些文件按对应的分区数据进行分组,并且一个分区的数据文件可能被划分到多个groups
private final List<? extends ScanTaskGroup<?>> taskGroups;
private final Schema expectedSchema;
private final boolean caseSensitive;
private final boolean localityEnabled;
private final int scanHashCode;
@Override
public InputPartition[] planInputPartitions() {
// 负责对要读取的数据进行分区
// broadcast the table metadata as input partitions will be sent to executors
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
String expectedSchemaString = SchemaParser.toJson(expectedSchema);
// 一个Group就对应Spark中的一个Partition
InputPartition[] partitions = new InputPartition[taskGroups.size()];
Tasks.range(partitions.length)
.stopOnFailure()
.executeWith(localityEnabled ? ThreadPools.getWorkerPool() : null)
.run(
index ->
partitions[index] =
new SparkInputPartition(
groupingKeyType, // 一个taskGroup包含的文件拥有相同的Grouping key
taskGroups.get(index),
tableBroadcast,
branch,
expectedSchemaString,
caseSensitive,
localityEnabled));
return partitions;
}
@Override
public PartitionReaderFactory createReaderFactory() {
// 负责创建读取数据的Reader,支持列式读取和行式读取
if (useParquetBatchReads()) {
int batchSize = readConf.parquetBatchSize();
return new SparkColumnarReaderFactory(batchSize);
} else if (useOrcBatchReads()) {
int batchSize = readConf.orcBatchSize();
return new SparkColumnarReaderFactory(batchSize);
} else {
return new SparkRowReaderFactory();
}
}
}
前文提到的Spark中有关数据的读写接口,都是由DataSourceV2中定义的,因此对于数据读取的逻辑计划(DataSourceV2ScanRelation),会先转换成物理执行计划BatchScanExec。
case class BatchScanExec(
output: Seq[AttributeReference],
@transient scan: Scan) extends DataSourceV2ScanExecBase {
// scan,对应于Iceberg中的SparkCopyOnWriteScan
// 因此batch变量是一个SparkBatch实例
@transient lazy val batch = scan.toBatch
// TODO: unify the equal/hashCode implementation for all data source v2 query plans.
override def equals(other: Any): Boolean = other match {
case other: BatchScanExec => this.batch == other.batch
case _ => false
}
override def hashCode(): Int = batch.hashCode()
// 调用SparkBatch::planInputPartitions生成partitions信息
@transient override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions()
// 调用SparkBatch::createReaderFactory生成Reader工厂对象
override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory()
override lazy val inputRDD: RDD[InternalRow] = {
// 执行时,净当前的物理执行计划,转换为一个RDD,并传递给所有的RDD以及Reader工厂
new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar)
}
override def doCanonicalize(): BatchScanExec = {
this.copy(output = output.map(QueryPlan.normalizeExpressions(_, output)))
}
}
这里需要重点关注的是compute
方法,在Spark中,每一个Partition都对应一个Task,这个Task负责最终调用compute
方法,触发当前分区上的计算逻辑。
// columnar scan.
class DataSourceRDD(
sc: SparkContext,
@transient private val inputPartitions: Seq[InputPartition],
partitionReaderFactory: PartitionReaderFactory,
columnarReads: Boolean)
extends RDD[InternalRow](sc, Nil) {
override protected def getPartitions: Array[Partition] = {
inputPartitions.zipWithIndex.map {
case (inputPartition, index) => new DataSourceRDDPartition(index, inputPartition)
}.toArray
}
private def castPartition(split: Partition): DataSourceRDDPartition = split match {
case p: DataSourceRDDPartition => p
case _ => throw new SparkException(s"[BUG] Not a DataSourceRDDPartition: $split")
}
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
// partition对应于Iceberg中的一个TaskGroup,而一个TaskGroup的数据文件拥有相同的Partition data
val inputPartition = castPartition(split).inputPartition
val (iter, reader) = if (columnarReads) {
// 列读
// batchReader实际上是一个BatchDataReader的实例
val batchReader = partitionReaderFactory.createColumnarReader(inputPartition)
val iter = new MetricsBatchIterator(new PartitionIterator[ColumnarBatch](batchReader))
(iter, batchReader)
} else {
// 行读
val rowReader = partitionReaderFactory.createReader(inputPartition)
val iter = new MetricsRowIterator(new PartitionIterator[InternalRow](rowReader))
(iter, rowReader)
}
context.addTaskCompletionListener[Unit](_ => reader.close())
// TODO: SPARK-25083 remove the type erasure hack in data source scan
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]])
}
override def getPreferredLocations(split: Partition): Seq[String] = {
castPartition(split).inputPartition.preferredLocations()
}
}
BatchDataReader继承自Spark中的
PartitionReader<ColumnarBatch>
接口
class BatchDataReader extends BaseBatchReader<FileScanTask>
implements PartitionReader<ColumnarBatch> {
// 返回一个迭代器,可以在FileScanTask包含的所有data文件和delete文件,
@Override
protected CloseableIterator<ColumnarBatch> open(FileScanTask task) {
String filePath = task.file().path().toString();
LOG.debug("Opening data file {}", filePath);
// update the current file for Spark's filename() function
InputFileBlockHolder.set(filePath, task.start(), task.length());
Map<Integer, ?> idToConstant = constantsMap(task, expectedSchema());
InputFile inputFile = getInputFile(filePath);
Preconditions.checkNotNull(inputFile, "Could not find InputFile associated with FileScanTask");
// 创建一个SparkDeleteFilter实例,它负责收集等值删除文件 和 位置删除文件,并建立删除数据记录的索引,
// 如此在每遍历一个data file时,就可以根据索引信息,确定当前的record是不是存活的。
SparkDeleteFilter deleteFilter =
task.deletes().isEmpty()
? null
: new SparkDeleteFilter(filePath, task.deletes(), counter());
// newBatchIterable()方法会根据inputFile的类型,创建相应的文件读取器,例如为Parquet创建VectorizedParquetReader
return newBatchIterable(
inputFile,
task.file().format(),
task.start(),
task.length(),
task.residual(),
idToConstant,
deleteFilter)
.iterator();
}
}
不论是Parquet/Orc文件,最底层都是通过ColumnarBatchReader负责真正的数据读取与过滤
public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
private final boolean hasIsDeletedColumn;
private DeleteFilter<InternalRow> deletes = null;
private long rowStartPosInBatch = 0;
public ColumnarBatchReader(List<VectorizedReader<?>> readers) {
super(readers);
this.hasIsDeletedColumn =
readers.stream().anyMatch(reader -> reader instanceof DeletedVectorReader);
}
@Override
public void setRowGroupInfo(
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData, long rowPosition) {
super.setRowGroupInfo(pageStore, metaData, rowPosition);
this.rowStartPosInBatch = rowPosition;
}
public void setDeleteFilter(DeleteFilter<InternalRow> deleteFilter) {
this.deletes = deleteFilter;
}
@Override
public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {
if (reuse == null) {
closeVectors();
}
// 通过内部类ColumnBatchLoader代理完成数据的读取与结果转换
ColumnarBatch columnarBatch = new ColumnBatchLoader(numRowsToRead).loadDataToColumnBatch();
rowStartPosInBatch += numRowsToRead;
return columnarBatch;
}
private class ColumnBatchLoader {
private final int numRowsToRead;
// the rowId mapping to skip deleted rows for all column vectors inside a batch, it is null when
// there is no deletes
private int[] rowIdMapping;
// the array to indicate if a row is deleted or not, it is null when there is no "_deleted"
// metadata column
private boolean[] isDeleted;
/**
* Build a row id mapping inside a batch, which skips deleted rows. Here is an example of how we
* delete 2 rows in a batch with 8 rows in total. [0,1,2,3,4,5,6,7] -- Original status of the
* row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted array Position
* delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num records to 6]
* [F,F,T,F,F,F,T,F] -- After applying position deletes
*
* @param deletedRowPositions a set of deleted row positions
* @return the mapping array and the new num of rows in a batch, null if no row is deleted
*/
Pair<int[], Integer> buildPosDelRowIdMapping(PositionDeleteIndex deletedRowPositions) {
if (deletedRowPositions == null) {
return null;
}
int[] posDelRowIdMapping = new int[numRowsToRead];
int originalRowId = 0;
int currentRowId = 0;
while (originalRowId < numRowsToRead) {
if (!deletedRowPositions.isDeleted(originalRowId + rowStartPosInBatch)) {
posDelRowIdMapping[currentRowId] = originalRowId;
currentRowId++;
} else {
if (hasIsDeletedColumn) {
isDeleted[originalRowId] = true;
}
deletes.incrementDeleteCount();
}
originalRowId++;
}
if (currentRowId == numRowsToRead) {
// there is no delete in this batch
return null;
} else {
return Pair.of(posDelRowIdMapping, currentRowId);
}
}
int[] initEqDeleteRowIdMapping() {
int[] eqDeleteRowIdMapping = null;
if (hasEqDeletes()) {
eqDeleteRowIdMapping = new int[numRowsToRead];
for (int i = 0; i < numRowsToRead; i++) {
eqDeleteRowIdMapping[i] = i;
}
}
return eqDeleteRowIdMapping;
}
/**
* Filter out the equality deleted rows. Here is an example, [0,1,2,3,4,5,6,7] -- Original
* status of the row id mapping array [F,F,F,F,F,F,F,F] -- Original status of the isDeleted
* array Position delete 2, 6 [0,1,3,4,5,7,-,-] -- After applying position deletes [Set Num
* records to 6] [F,F,T,F,F,F,T,F] -- After applying position deletes Equality delete 1 <= x <=
* 3 [0,4,5,7,-,-,-,-] -- After applying equality deletes [Set Num records to 4]
* [F,T,T,T,F,F,T,F] -- After applying equality deletes
*
* @param columnarBatch the {@link ColumnarBatch} to apply the equality delete
*/
void applyEqDelete(ColumnarBatch columnarBatch) {
Iterator<InternalRow> it = columnarBatch.rowIterator();
int rowId = 0;
int currentRowId = 0;
while (it.hasNext()) {
InternalRow row = it.next();
if (deletes.eqDeletedRowFilter().test(row)) {
// the row is NOT deleted
// skip deleted rows by pointing to the next undeleted row Id
rowIdMapping[currentRowId] = rowIdMapping[rowId];
currentRowId++;
} else {
if (hasIsDeletedColumn) {
isDeleted[rowIdMapping[rowId]] = true;
}
deletes.incrementDeleteCount();
}
rowId++;
}
columnarBatch.setNumRows(currentRowId);
}
}
}
从前面的章节可以看到,在构建Scan的过程中,会同时搜集data files和delete files,因此在调用Reader实例读取每一个TaskGroup
中的数据文件时,同时会应用DeleteFilter
,来过滤掉那些被删除的记录。
这个过程实际上就是一个\Merge On Read的过程。
而MERGE INTO的Write过程,在我之前的文章有解析,大体的思路就是将从target_table
Scan得到的、经过删除过滤后的数据集,与source_table
中的数据JOIN;从而产生带有变更标记的结果数据集(每个被标记为INSERT/UPDATE/DELETE)
;在写出数据到文件时,就可以根据每一行的标记确定写出行为,最终只会产生Data Files
,数据文件更加干净。