AQE优化和源码

发布时间:2024年01月19日

介绍

AQE全称是Adaptive Query Execution,官网介绍如下

Performance Tuning - Spark 3.5.0 Documentation

AQE做了什么

AQE 是 Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化

特性

  1. 自动分区合并:在 Shuffle 过后,Reduce Task 数据分布参差不齐,AQE 将自动合并过小的数据分区。
  2. Join 策略调整:如果某张表在过滤之后,尺寸小于广播变量阈值,这张表参与的数据关联就会从 Shuffle Sort Merge Join 降级(Demote)为执行效率更高的 Broadcast Hash Join。
  3. 自动倾斜处理:结合配置项,AQE 自动拆分 Reduce 阶段过大的数据分区,降低单个 Reduce Task 的工作负载。

AQE开启前后对比:

在非AQE的情况下,Spark会在规划阶段确定了物理执行计划后,根据每个算子的定义生成RDD对应的DAG。然后 Spark DAGScheduler通过shuffle来划分RDD Graph并创建stage,然后提交Stage以供执行

AQE:

首先会将逻辑树拆分为多个QueryStages, 在执行时先将它的子 QueryStages 被提交(先提交mapStage),收集它们的MapOutputStatistics对象。根据收集到的 shuffle 数据统计信息,将当前 QueryStage 的执行计划优化为更好的执行计划。然后转换为DAG图再执行Stage

源码

AQE源码

首先AQE 只作用在 exchange 阶段,即需要发生数据交换的阶段,spark AQE 优化都是发生在 shuffle map 之后。

核心类:AdaptiveSparkPlanExec

Spark经典的doExecute()

获取更新后的plan数

getFinalPhysicalPlan?是核心方法

看着行代码

var result = createQueryStages(currentPhysicalPlan)

将原始的物理计划通过?createQueryStages?方法进行替换


对参数中传递的 sparkplan 的所有节点进行自下而上的递归。根据节点类型的不同进行针对处理

Exchange 节点:Exchange 节点被替换为?QueryStageExec ,这个类有两个实现类

ShuffleQueryStageExec??用于shuffle 数据,BroadcastQueryStageExec?用于广播数据

最后调用CreateStageResult,参数中包含?QueryStageExec,同时?allChildStagesMaterialized?参数表示当前 plan 的所有子节点的输出是否已经具像化

然后也会替换plan

replaceWithQueryStagesInLogicalPlan 做了

以上是里面的方法调用和处理事情

createQueryStages?返回?CreateStageResult?,会在 AQE 的循环中判断是否可以结束。这部分逻辑又回到?getFinalPhysicalPlan?中。

文章来源:https://blog.csdn.net/weixin_43283487/article/details/135690781
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。