Spark之Adaptive Query Execution

发布时间:2024年01月02日

Adaptive Query Execution

自适应查询执行(AQE)是Spark SQL中的一种优化技术,利用运行时的统计信息来选择最高效的查询执行计划。

  • spark3.2起,默认启用。可以通过设置spark.sql.adaptive.enabled为true启用。
  • spark3,AQE主要有以下功能:通过合并减少shuffle分区数、将sore-merge join转换为broadcast join、优化数据倾斜

Coalescing Post Shuffle Partitions

简单来讲该特性用于调整shuffle分区数量,用户不需要精心为数据集设置合适的shuffle分区数量。只需设置一个较大的初始shuffle分区数量,但AQE会基于实际map阶段的输出统计来合并shuffle分区,从而选择一个合适的分区数量。

Property NameDefaultMeaningSince Version
spark.sql.adaptive.coalescePartitions.enabledtrueAQE启用后,将该参数置为true,spark将根据advisoryPartitionSizeInBytes参数指定的大小自动合并相邻的shuffle分区,防止出现太多的小task3.0.0
spark.sql.adaptive.coalescePartitions.parallelismFirsttrue为true时,spark在合并相邻shuffle分区时,将会忽略advisoryPartitionSizeInBytes参数指定的大小。只按照salecePartitions.minPartitionSize参数指定的大小来划分最小分区大小,以最大限度地提高并行性--------------------------------------------该配置是为了防止启用AQE后,性能出现回退。建议不使用该配置而使用advisoryPartitionSizeInBytes参数指定3.2.0
spark.sql.adaptive.coalescePartitions.minPartitionSize1MB合并后shuffle分区的最小大小,该值最多可以是advisoryPartitionSizeInBytes参数的20%。3.2.0
spark.sql.adaptive.coalescePartitions.initialPartitionNum(none)shuffle分区的初始化数量。默认未设置,等于spark.sql.shuffle.partitions参数的值。只有当AQE和coalescePartitions配置启用时才有效。3.0.0
spark.sql.adaptive.advisoryPartitionSizeInBytes64 MBAQE优化时,shuffle分区的建议大小(以字节为单位)3.0.0

Spliting skewed shuffle partitions

Property NameDefaultMeaningSince Version
spark.sql.adaptive.optimizeSkewsInRebalancePartitions.enabledtrue当AQE启用,并且该参数为true时。spark将优化ReblancePartitions中倾斜的shuffle分区,并根据目标大小(由spark.sql.adaptive.advisoryPartitionSizeInBytes指定)将其拆分为更小的分区,以避免数据偏斜3.2.0
spark.sql.adaptive.rebalancePartitionsSmallPartitionFactor0.2如果分区大小小于该因子乘以spark.sql.adaptive.advisoryPartitionSizeInBytes参数的值,将在拆分过程中被合并3.3.0

Converting sort-merge join to broadcast join

在运行统计时,AQE会将sort-merge join 转换为broadcast hash join。broadcast hash join可以在本地读取shuffle文件以节省网络流量。

Property NameDefaultMeaningSince Version
spark.sql.adaptive.autoBroadcastJoinThreshold(none)执行join时,广播到所有worker节点的表的最大大小(以字节为单位)。如果设置为-1,则禁用broadcast。默认值等于spark.sql.autoBroadcastJoinThreshold。该配置仅用于自适应框架。3.2.0
spark.sql.adaptive.localShuffleReader.enabledtrueAQE启动,同时该参数为true时。spark会尝试使用本地shuffle reader来读取数据3.0.0

Converting sort-merge join to shuffled hash join

当所有的shuffle分区小于阈值(spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold)时,AQE会将sort-merge join转换为shuffled hash join。

Property NameDefaultMeaningSince Version
spark.sql.adaptive.maxShuffledHashJoinLocalMapThreshold0配置被允许构建本地hash map分区的最大值。如果该值大于等于advisoryPartitionSizeInBytes,并且小于等于所有分区大小。则无论spark.sql.join.preferSortMergeJoin参数值怎样设置,join的更倾向于使用shuffled hash join替代sort merge join3.2.0

Optimizing Skew Join(for sore-merge join)

数据倾斜会严重降低join查询的性能。当AQE启用并且spark.sql.adaptive.skewJoin.enabled参数的值为true时,spark会在sort-merge join中将倾斜的tasks拆分为大小基本相等的task来处理倾斜问题。

Property NameDefaultMeaningSince Version
spark.sql.adaptive.skewJoin.enabledtrue当AQE启用并且spark.sql.adaptive.skewJoin.enabled参数的值为true时,spark会通过在sort-merge join中拆分倾斜的分区来动态调整倾斜3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionFactor5.0如果分区的大小大于此因子乘以分区大小的中值,并且也大于spark.sql.adaptive.skewJoin.strakedPartitionThresholdInBytes,则将其视为倾斜分区3.0.0
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes256M如果分区的大小大于该值,并且大于spark.sql.adaptive.skewJoin.skewedPartitionFactor参数的值乘以中间分区大小,则将其视为倾斜分区。理想情况下,该配置的值应该大于spark.sql.adaptive.advisoryPartitionSizeInBytes参数的值3.0.0
spark.sql.adaptive.forceOptimizeSkewedJoinfalse为true时,即使引入了额外的shuffle,也将强制启用OptimizeSkewedJoin,用于优化倾斜join以避免较大任务3.3.0

其他配置

Property NameDefaultMeaningSince Version
spark.sql.adaptive.optimizer.excludedRulesnone配置自适应优化器禁用的规则列表,规则名字以逗号分隔。优化器将记录被排除的规则3.1.0
spark.sql.adaptive.customCostEvaluatorClassnone自定义用于自适应执行的评估代价类。如果未设置,spark将默认使用自己的SimpleCostEvaluator3.2.0
文章来源:https://blog.csdn.net/zincooo/article/details/135245572
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。