Adaptive Query Execution
自适应查询执行(AQE)是Spark SQL中的一种优化技术,利用运行时的统计信息来选择最高效的查询执行计划。
- spark3.2起,默认启用。可以通过设置spark.sql.adaptive.enabled为true启用。
- spark3,AQE主要有以下功能:通过合并减少shuffle分区数、将sore-merge join转换为broadcast join、优化数据倾斜
Coalescing Post Shuffle Partitions
简单来讲该特性用于调整shuffle分区数量,用户不需要精心为数据集设置合适的shuffle分区数量。只需设置一个较大的初始shuffle分区数量,但AQE会基于实际map阶段的输出统计来合并shuffle分区,从而选择一个合适的分区数量。
Property Name | Default | Meaning | Since Version |
---|
spark.sql.adaptive.coalescePartitions.enabled | true | AQE启用后,将该参数置为true,spark将根据advisoryPartitionSizeInBytes参数指定的大小自动合并相邻的shuffle分区,防止出现太多的小task | 3.0.0 |
spark.sql.adaptive.coalescePartitions.parallelismFirst | true | 为true时,spark在合并相邻shuffle分区时,将会忽略advisoryPartitionSizeInBytes参数指定的大小。只按照salecePartitions.minPartitionSize参数指定的大小来划分最小分区大小,以最大限度地提高并行性--------------------------------------------该配置是为了防止启用AQE后,性能出现回退。建议不使用该配置而使用advisoryPartitionSizeInBytes参数指定 | 3.2.0 |
spark.sql.adaptive.coalescePartitions.minPartitionSize | 1MB | 合并后shuffle分区的最小大小,该值最多可以是advisoryPartitionSizeInBytes参数的20%。 | 3.2.0 |
spark.sql.adaptive.coalescePartitions.initialPartitionNum | (none) | shuffle分区的初始化数量。默认未设置,等于spark.sql.shuffle.partitions参数的值。只有当AQE和coalescePartitions配置启用时才有效。 | 3.0.0 |
spark.sql.adaptive.advisoryPartitionSizeInBytes | 64 MB | AQE优化时,shuffle分区的建议大小(以字节为单位) | 3.0.0 |
Spliting skewed shuffle partitions
Property Name | Default | Meaning | Since Version |
---|
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabled | true | 当AQE启用,并且该参数为true时。spark将优化ReblancePartitions中倾斜的shuffle分区,并根据目标大小(由spark.sql.adaptive.advisoryPartitionSizeInBytes指定)将其拆分为更小的分区,以避免数据偏斜 | 3.2.0 |
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor | 0.2 | 如果分区大小小于该因子乘以spark.sql.adaptive.advisoryPartitionSizeInBytes参数的值,将在拆分过程中被合并 | 3.3.0 |
Converting sort-merge join to broadcast join
在运行统计时,AQE会将sort-merge join 转换为broadcast hash join。broadcast hash join可以在本地读取shuffle文件以节省网络流量。
Property Name | Default | Meaning | Since Version |
---|
spark.sql.adaptive.autoBroadcastJoinThreshold | (none) | 执行join时,广播到所有worker节点的表的最大大小(以字节为单位)。如果设置为-1,则禁用broadcast。默认值等于spark.sql.autoBroadcastJoinThreshold。该配置仅用于自适应框架。 | 3.2.0 |
spark.sql.adaptive.localShuffleReader.enabled | true | AQE启动,同时该参数为true时。spark会尝试使用本地shuffle reader来读取数据 | 3.0.0 |
Converting sort-merge join to shuffled hash join
当所有的shuffle分区小于阈值(spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold)时,AQE会将sort-merge join转换为shuffled hash join。
Property Name | Default | Meaning | Since Version |
---|
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold | 0 | 配置被允许构建本地hash map分区的最大值。如果该值大于等于advisoryPartitionSizeInBytes,并且小于等于所有分区大小。则无论spark.sql.join.preferSortMergeJoin参数值怎样设置,join的更倾向于使用shuffled hash join替代sort merge join | 3.2.0 |
Optimizing Skew Join(for sore-merge join)
数据倾斜会严重降低join查询的性能。当AQE启用并且spark.sql.adaptive.skewJoin.enabled参数的值为true时,spark会在sort-merge join中将倾斜的tasks拆分为大小基本相等的task来处理倾斜问题。
Property Name | Default | Meaning | Since Version |
---|
spark.sql.adaptive.skewJoin.enabled | true | 当AQE启用并且spark.sql.adaptive.skewJoin.enabled参数的值为true时,spark会通过在sort-merge join中拆分倾斜的分区来动态调整倾斜 | 3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | 5.0 | 如果分区的大小大于此因子乘以分区大小的中值,并且也大于spark.sql.adaptive.skewJoin.strakedPartitionThresholdInBytes,则将其视为倾斜分区 | 3.0.0 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 256M | 如果分区的大小大于该值,并且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor参数的值乘以中间分区大小,则将其视为倾斜分区。理想情况下,该配置的值应该大于spark.sql.adaptive.advisoryPartitionSizeInBytes参数的值 | 3.0.0 |
spark.sql.adaptive.forceOptimizeSkewedJoin | false | 为true时,即使引入了额外的shuffle,也将强制启用OptimizeSkewedJoin,用于优化倾斜join以避免较大任务 | 3.3.0 |
其他配置
Property Name | Default | Meaning | Since Version |
---|
spark.sql.adaptive.optimizer.excludedRules | none | 配置自适应优化器禁用的规则列表,规则名字以逗号分隔。优化器将记录被排除的规则 | 3.1.0 |
spark.sql.adaptive.customCostEvaluatorClass | none | 自定义用于自适应执行的评估代价类。如果未设置,spark将默认使用自己的SimpleCostEvaluator | 3.2.0 |