【Spark精讲】SparkSQL Join选择逻辑

发布时间:2024年01月03日

SparkSQL Join选择逻辑?

先看JoinSelection的注释

??? ??? If it is an equi-join, we first look at the join hints w.r.t. the following order:
? ? ? ? ?1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides
? ? ? ? ? ? have the broadcast hints, choose the smaller side (based on stats) to broadcast.
? ? ? ? ?2. sort merge hint: pick sort merge join if join keys are sortable.
? ? ? ? ?3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both
? ? ? ? ? ? sides have the shuffle hash hints, choose the smaller side (based on stats) as the
? ? ? ? ? ? build side.
? ? ? ? ?4. shuffle replicate NL hint: pick cartesian product if join type is inner like.
? ? ??
? ? ? ?If there is no hint or the hints are not applicable, we follow these rules one by one:
? ? ? ? ?1. Pick broadcast hash join if one side is small enough to broadcast, and the join type
? ? ? ? ? ? is supported. If both sides are small, choose the smaller side (based on stats)
? ? ? ? ? ? to broadcast.
? ? ? ? ?2. Pick shuffle hash join if one side is small enough to build local hash map, and is
? ? ? ? ? ? much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false.
? ? ? ? ?3. Pick sort merge join if the join keys are sortable.
? ? ? ? ?4. Pick cartesian product if join type is inner like.
? ? ? ? ?5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
? ? ? ? ? ? other choice.

翻译下就是:

如果是等值join,就先看join hints,顺序如下?

  1. broadcast hint:join类型支持的话选择broadcast hash join,如果join的两边都有broadcast hint,选择小的(基于统计)一方去广播
  2. sort merge hint:如果join的key是可排序的,选择sort merge join
  3. shuffle hash hint:join类型支持的话选择shuffle hash join
  4. shuffle replicate NL hint:如果是inner like类型(inner或cross),则选择cartesian product join

如果没有hint或者hint的类型是不合适的,按如下顺序选择

  1. broadcast hash join:如果join的一方足够小,小到可以广播,同时join类型支持,如果两边都很小,选最小的(基于统计)
  2. shuffle hash join:如果join的一方足够小可以构建hash map,并且比另一端小很多,同时需要spark.sql.join.preferSortMergeJoin置为false
  3. sort merge join:如果join的key是可排序的
  4. cartesian product:如果join类型是inner like类型(inner或cross)
  5. broadcast nested loop join:打底策略,即便可能导致OOM但别无选择

注意:

  • hash join(broadcast hash join或者shuffle hash join)只支持等值Join,不支持full outer join
  • 小到可以广播指的是,小于spark.sql.autoBroadcastJoinThreshold的阈值(默认10MB)
  val AUTO_BROADCASTJOIN_THRESHOLD = buildConf("spark.sql.autoBroadcastJoinThreshold")
    .doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
      "nodes when performing a join.  By setting this value to -1 broadcasting can be disabled. " +
      "Note that currently statistics are only supported for Hive Metastore tables where the " +
      "command `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been " +
      "run, and file-based data source tables where the statistics are computed directly on " +
      "the files of data.")
    .version("1.1.0")
    .bytesConf(ByteUnit.BYTE)
    .createWithDefaultString("10MB")
  • shuffle hash join时要求一边比另一边小很多,小很多指的是3倍小于
    /**
     * Matches a plan whose output should be small enough to be used in broadcast join.
     */
    private def canBroadcast(plan: LogicalPlan): Boolean = {
      plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
    }

    /**
     * Matches a plan whose single partition should be small enough to build a hash table.
     *
     * Note: this assume that the number of partition is fixed, requires additional work if it's
     * dynamic.
     */
    private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
      plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
    }

    /**
     * Returns whether plan a is much smaller (3X) than plan b.
     *
     * The cost to build hash map is higher than sorting, we should only build hash map on a table
     * that is much smaller than other one. Since we does not have the statistic for number of rows,
     * use the size of bytes here as estimation.
     */
    private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
      a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
    }

hash join为什么只支持等值join,同时不支持full outer join?

  • 这是由于hashmap的特性决定的,随机访问效率最高O(1),为了性能是不会通过hashmap进行遍历查找的。
  • 不支持full outer join 是因为小表做的是构建表,由于不是流式表,无法决定是否输出该行,完全是被动的

参考一下SparkSQL Join流程:

在Spark SQL中Join的实现都基于一个基本的流程,根据角色的不同,参与Join的两张表分别被称为"流式表"和"构建表",不同表的角色在Spark SQL中会通过一定的策略进行设定,通常来讲,系统会默认大表为流式表,将小表设定为构建表。

流式表的迭代器为StreamIterator,构建表的迭代器为BuildIterator。通过遍历StreamIterator中的每条记录,然后在BuildIterator中查找相匹配的记录,这个查找过程被称为Build过程,每次Build操作的结果为一条JoinedRow(A,B),其中A来自StreamIterator,B来自BuildIterator,这个过程为BuildRight操作,而如果B来自StreamIterator,A来自BuildIterator,则为BuildLeft操作。

对于LeftOuter、RightOuter、LeftSemi、RightSemi,他们的build类型是确定的,即LeftOuter、LeftSemi为BuildRight类型,RightOuter、RightSemi为BuildLeft类型。

cartesian join为什么会限制是inner like?

        def createCartesianProduct() = {
          if (joinType.isInstanceOf[InnerLike]) {
            Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
          } else {
            None
          }
        }

可以看下JoinType的类,继承了InnerLike的一个是inner join,一个是cross join

object JoinType {
  def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match {
    case "inner" => Inner
    case "outer" | "full" | "fullouter" => FullOuter
    case "leftouter" | "left" => LeftOuter
    case "rightouter" | "right" => RightOuter
    case "leftsemi" | "semi" => LeftSemi
    case "leftanti" | "anti" => LeftAnti
    case "cross" => Cross
    case _ =>
      val supported = Seq(
        "inner",
        "outer", "full", "fullouter", "full_outer",
        "leftouter", "left", "left_outer",
        "rightouter", "right", "right_outer",
        "leftsemi", "left_semi", "semi",
        "leftanti", "left_anti", "anti",
        "cross")

      throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
        "Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
  }
}

sealed abstract class JoinType {
  def sql: String
}

/**
 * The explicitCartesian flag indicates if the inner join was constructed with a CROSS join
 * indicating a cartesian product has been explicitly requested.
 */
sealed abstract class InnerLike extends JoinType {
  def explicitCartesian: Boolean
}

case object Inner extends InnerLike {
  override def explicitCartesian: Boolean = false
  override def sql: String = "INNER"
}

case object Cross extends InnerLike {
  override def explicitCartesian: Boolean = true
  override def sql: String = "CROSS"
}

case object LeftOuter extends JoinType {
  override def sql: String = "LEFT OUTER"
}

case object RightOuter extends JoinType {
  override def sql: String = "RIGHT OUTER"
}

case object FullOuter extends JoinType {
  override def sql: String = "FULL OUTER"
}

case object LeftSemi extends JoinType {
  override def sql: String = "LEFT SEMI"
}

case object LeftAnti extends JoinType {
  override def sql: String = "LEFT ANTI"
}
...
}

为什么Broadcast Nested Loop Join会OOM?

Broadcast Nested Loop Join需要广播数据集和嵌套循环,计算效率极低,对内存的需求也极大,因为不论数据集大小,都会有一个数据集被广播到所有executor上。

Cross Join优化案例

select /*+ mapjoin(b)*/
  a.*, sum(b.work_date) as '工作日'
from a
cross join 
work_date_dim b 
on b.begin_tm >= a.任务开始时间 
and b.end_tm < a.任务结束时间
group by ...

不加mapjoin的hint,执行结果就是特别慢!a表不到 10w, b表只有几千条,执行了30分钟还是不行!

加mapjoin的hint,不到 1分钟就执行完了。但是,注意b表不能太大。

文章来源:https://blog.csdn.net/weixin_40035038/article/details/135358917
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。