先看JoinSelection的注释
??? ??? If it is an equi-join, we first look at the join hints w.r.t. the following order:
? ? ? ? ?1. broadcast hint: pick broadcast hash join if the join type is supported. If both sides
? ? ? ? ? ? have the broadcast hints, choose the smaller side (based on stats) to broadcast.
? ? ? ? ?2. sort merge hint: pick sort merge join if join keys are sortable.
? ? ? ? ?3. shuffle hash hint: We pick shuffle hash join if the join type is supported. If both
? ? ? ? ? ? sides have the shuffle hash hints, choose the smaller side (based on stats) as the
? ? ? ? ? ? build side.
? ? ? ? ?4. shuffle replicate NL hint: pick cartesian product if join type is inner like.
? ? ??
? ? ? ?If there is no hint or the hints are not applicable, we follow these rules one by one:
? ? ? ? ?1. Pick broadcast hash join if one side is small enough to broadcast, and the join type
? ? ? ? ? ? is supported. If both sides are small, choose the smaller side (based on stats)
? ? ? ? ? ? to broadcast.
? ? ? ? ?2. Pick shuffle hash join if one side is small enough to build local hash map, and is
? ? ? ? ? ? much smaller than the other side, and `spark.sql.join.preferSortMergeJoin` is false.
? ? ? ? ?3. Pick sort merge join if the join keys are sortable.
? ? ? ? ?4. Pick cartesian product if join type is inner like.
? ? ? ? ?5. Pick broadcast nested loop join as the final solution. It may OOM but we don't have
? ? ? ? ? ? other choice.
翻译下就是:
如果是等值join,就先看join hints,顺序如下?
如果没有hint或者hint的类型是不合适的,按如下顺序选择
注意:
val AUTO_BROADCASTJOIN_THRESHOLD = buildConf("spark.sql.autoBroadcastJoinThreshold")
.doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
"nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " +
"Note that currently statistics are only supported for Hive Metastore tables where the " +
"command `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been " +
"run, and file-based data source tables where the statistics are computed directly on " +
"the files of data.")
.version("1.1.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("10MB")
/**
* Matches a plan whose output should be small enough to be used in broadcast join.
*/
private def canBroadcast(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= conf.autoBroadcastJoinThreshold
}
/**
* Matches a plan whose single partition should be small enough to build a hash table.
*
* Note: this assume that the number of partition is fixed, requires additional work if it's
* dynamic.
*/
private def canBuildLocalHashMap(plan: LogicalPlan): Boolean = {
plan.stats.sizeInBytes < conf.autoBroadcastJoinThreshold * conf.numShufflePartitions
}
/**
* Returns whether plan a is much smaller (3X) than plan b.
*
* The cost to build hash map is higher than sorting, we should only build hash map on a table
* that is much smaller than other one. Since we does not have the statistic for number of rows,
* use the size of bytes here as estimation.
*/
private def muchSmaller(a: LogicalPlan, b: LogicalPlan): Boolean = {
a.stats.sizeInBytes * 3 <= b.stats.sizeInBytes
}
参考一下SparkSQL Join流程:
在Spark SQL中Join的实现都基于一个基本的流程,根据角色的不同,参与Join的两张表分别被称为"流式表"和"构建表",不同表的角色在Spark SQL中会通过一定的策略进行设定,通常来讲,系统会默认大表为流式表,将小表设定为构建表。
流式表的迭代器为StreamIterator,构建表的迭代器为BuildIterator。通过遍历StreamIterator中的每条记录,然后在BuildIterator中查找相匹配的记录,这个查找过程被称为Build过程,每次Build操作的结果为一条JoinedRow(A,B),其中A来自StreamIterator,B来自BuildIterator,这个过程为BuildRight操作,而如果B来自StreamIterator,A来自BuildIterator,则为BuildLeft操作。
对于LeftOuter、RightOuter、LeftSemi、RightSemi,他们的build类型是确定的,即LeftOuter、LeftSemi为BuildRight类型,RightOuter、RightSemi为BuildLeft类型。
def createCartesianProduct() = {
if (joinType.isInstanceOf[InnerLike]) {
Some(Seq(joins.CartesianProductExec(planLater(left), planLater(right), condition)))
} else {
None
}
}
可以看下JoinType的类,继承了InnerLike的一个是inner join,一个是cross join
object JoinType {
def apply(typ: String): JoinType = typ.toLowerCase(Locale.ROOT).replace("_", "") match {
case "inner" => Inner
case "outer" | "full" | "fullouter" => FullOuter
case "leftouter" | "left" => LeftOuter
case "rightouter" | "right" => RightOuter
case "leftsemi" | "semi" => LeftSemi
case "leftanti" | "anti" => LeftAnti
case "cross" => Cross
case _ =>
val supported = Seq(
"inner",
"outer", "full", "fullouter", "full_outer",
"leftouter", "left", "left_outer",
"rightouter", "right", "right_outer",
"leftsemi", "left_semi", "semi",
"leftanti", "left_anti", "anti",
"cross")
throw new IllegalArgumentException(s"Unsupported join type '$typ'. " +
"Supported join types include: " + supported.mkString("'", "', '", "'") + ".")
}
}
sealed abstract class JoinType {
def sql: String
}
/**
* The explicitCartesian flag indicates if the inner join was constructed with a CROSS join
* indicating a cartesian product has been explicitly requested.
*/
sealed abstract class InnerLike extends JoinType {
def explicitCartesian: Boolean
}
case object Inner extends InnerLike {
override def explicitCartesian: Boolean = false
override def sql: String = "INNER"
}
case object Cross extends InnerLike {
override def explicitCartesian: Boolean = true
override def sql: String = "CROSS"
}
case object LeftOuter extends JoinType {
override def sql: String = "LEFT OUTER"
}
case object RightOuter extends JoinType {
override def sql: String = "RIGHT OUTER"
}
case object FullOuter extends JoinType {
override def sql: String = "FULL OUTER"
}
case object LeftSemi extends JoinType {
override def sql: String = "LEFT SEMI"
}
case object LeftAnti extends JoinType {
override def sql: String = "LEFT ANTI"
}
...
}
Broadcast Nested Loop Join需要广播数据集和嵌套循环,计算效率极低,对内存的需求也极大,因为不论数据集大小,都会有一个数据集被广播到所有executor上。
select /*+ mapjoin(b)*/
a.*, sum(b.work_date) as '工作日'
from a
cross join
work_date_dim b
on b.begin_tm >= a.任务开始时间
and b.end_tm < a.任务结束时间
group by ...
不加mapjoin的hint,执行结果就是特别慢!a表不到 10w, b表只有几千条,执行了30分钟还是不行!
加mapjoin的hint,不到 1分钟就执行完了。但是,注意b表不能太大。