最近在使用 Paimon 的时候遇到了一件很有意思的事情,写的 SQL 居然读取的数据不下推,明明是分区表,但是却全量扫描了。
目前使用的版本信息如下:
Spark 3.5.0
Paimon 0.6.0
paimon的建表语句如下:
CREATE TABLE `table_demo`(
`user_id` string COMMENT 'from deserializer'
)
PARTITIONED BY (
`dt` string COMMENT '日期, yyyyMMdd',
`hour` string COMMENT '小时, HH')
ROW FORMAT SERDE
'org.apache.paimon.hive.PaimonSerDe'
STORED BY
'org.apache.paimon.hive.PaimonStorageHandler'
WITH SERDEPROPERTIES (
'serialization.format'='1')
LOCATION
'xxxx'
TBLPROPERTIES (
'bucket'='50',
'bucketing_version'='2',
'bukect-key'='user_id',
'file.format'='parquet',
'merge-engine'='partial-update',
'partial-update.ignore-delete'='true',
'primary-key'='user_id',
'transient_lastDdlTime'='1701679855',
'write-only'='false')
查询的SQL如下:
select * from
table_demo
where dt =20231212
and hour =10
limit 100;
注意我们这里写的dt是整数类型
,而表中定义的是字符串类型
具体的原因是Spark DSv2中的规则 V2ScanRelationPushDown.pushDownFilters
对于 Cast类型转换表达式
不会传递到DataSource
端,所以只会在读取完Source转换进行过滤,
这种情况下,对于文件的读取IO会增大,但是对于shuffle等操作是不会有性能的影响的。
对于分区字段来说,我们在写SQL对分区字段进行过滤的时候,保持和分区字段类型一致
针对于错误的写法,也就是导致读取全量数据的写法,我们分析一下,首先是类型转换阶段,在Spark中,对于类型不匹配的问题,spark会用规则进行转换,具体的规则是
CombinedTypeCoercionRule
,
在日志中可以看到:
=== Applying Rule org.apache.spark.sql.catalyst.analysis.TypeCoercionBase$CombinedTypeCoercionRule ===
'GlobalLimit 100 'GlobalLimit 100
+- 'LocalLimit 100 +- 'LocalLimit 100
+- 'Project [*] +- 'Project [*]
! +- 'Filter ((dt#520 = 20231212) AND (hour#521 = 10)) +- Filter ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10))
+- SubqueryAlias spark_catalog.default.table_demo +- SubqueryAlias spark_catalog.default.table_demo
+- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo +- RelationV2[user_id#497,dt#520, hour#521] spark_catalog.default.table_demo
通过以上规则我们可以看到 过滤条件(dt#520 = 20231212) AND (hour#521 = 10)
转换为了 (cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10)
接着再经过以下规则:V2ScanRelationPushDown
的洗礼,我们可以看到如下日志:
12-13 13:52:58 763 INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) -
Pushing operators to table_demo
Pushed Filters: IsNotNull(dt), IsNotNull(hour)
Post-Scan Filters: (cast(dt#520 as int) = 20231212),(cast(hour#521 as int) = 10)
12-13 13:52:58 723 INFO (org.apache.paimon.spark.PaimonScanBuilder:62) - pushFilter log: IsNotNull(dt),IsNotNull(hour)
12-13 13:52:58 823 INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) -
Output: user_id#497, dt#520, hour#521
12-13 13:52:58 837 INFO (org.apache.spark.sql.catalyst.rules.PlanChangeLogger:60) -
=== Applying Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown ===
InsertIntoHadoopFsRelationCommand ], Overwrite, [user_id, dt, hour] InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour]
+- WriteFiles +- WriteFiles
+- Repartition 1, true +- Repartition 1, true
+- GlobalLimit 100 +- GlobalLimit 100
+- LocalLimit 100 +- LocalLimit 100
! +- Filter ((isnotnull(dt#520) AND isnotnull(hour#521)) AND ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10))) +- Filter ((cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10))
! +- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo table_demo +- RelationV2[user_id#497, dt#520, hour#521] spark_catalog.default.table_demo table_demo
这里只有过滤条件 isnotnull(dt#520) AND isnotnull(hour#521)
被下推到了 DataSource。
从现象来看,确实分区的过滤条件没有推到DataSource端, 我们来分析一下该规则的数据流:
V2ScanRelationPushDown.pushDownFilters
||
\/
PushDownUtils.pushFilters
||
\/
DataSourceStrategy.translateFilterWithMappin
||
\/
translateLeafNodeFilter
具体到translateLeafNodeFilter
方法:
private def translateLeafNodeFilter(
predicate: Expression,
pushableColumn: PushableColumnBase): Option[Filter] = predicate match {
case expressions.EqualTo(pushableColumn(name), Literal(v, t)) =>
Some(sources.EqualTo(name, convertToScala(v, t)))
case expressions.EqualTo(Literal(v, t), pushableColumn(name)) =>
Some(sources.EqualTo(name, convertToScala(v, t)))
...
case _ => None
这里没有对Cast
表达式进行处理,所以说最后返回的就是不能下推的处理,而 Paimon datasouce
那边,具体的类为PaimonBaseScanBuilder
:
override def pushFilters(filters: Array[Filter]): Array[Filter] = {
这里传进来的filters实参 就不存在 (cast(dt#520 as int) = 20231212) AND (cast(hour#521 as int) = 10)
这个过滤条件,所以就不会下推到Paimon中去
其实不仅仅是对于Paimon Source
, 其他的source
也会有这个问题。
正确的SQL如下:
select * from
table_demo
where dt ='20231212'
and hour ='10'
limit 100;
运行如上SQL,我们可以看到如下日志:
12-14 14:22:42 328 INFO (org.apache.paimon.spark.PaimonScanBuilder:62) - pushFilter log: IsNotNull(dt),IsNotNull(hour),EqualTo(dt,20231212),EqualTo(hour,10)
12-14 14:22:42 405 INFO (org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown:60) -
Pushing operators to table_demo
Pushed Filters: IsNotNull(dt), IsNotNull(hour), EqualTo(dt,20231212), EqualTo(hour,10)
Post-Scan Filters:
=== Applying Rule org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown ===
InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour] InsertIntoHadoopFsRelationCommand xxx, false, Parquet, [path=xxx], Overwrite, [user_id, dt, hour]
+- WriteFiles +- WriteFiles
+- Repartition 1, true +- Repartition 1, true
+- GlobalLimit 100 +- GlobalLimit 100
+- LocalLimit 100 +- LocalLimit 100
! +- Filter ((isnotnull(dt#1330) AND isnotnull(hour#1331)) AND ((dt#1330 = 20231212) AND (hour#1331 = 10))) +- RelationV2[user_id#1307, dt#1330, hour#1331] table_demo
! +- RelationV2[user_id#1307, dt#1330, hour#1331] spark_catalog.ad_dwd.table_demo table_demo
可以看到经过了规则转换 所有的过滤条件都下推到了DataSource
了,但是具体的下推还得在DataSource进一步处理才能保证真正的下推