Hive 的三种join
reduce阶段完成join。整个过程包括Map、Shuffle和Reduce三个阶段。
读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key;
Map输出的value为join之后所关心的(select或者where中需要用到的)列;同时在value中还会包含表的Tag信息,用于标明此value对应哪个表;
最后对key进行排序。
根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中。
根据key的值完成join操作,期间通过Tag来识别不同表中的数据。
MapJoin通常用于一个很小的表和一个大表进行join的场景,具体小表有多小,由参数 hive.mapjoin.smalltable.filesize 来决定,该参数表示小表的总大小,默认值为25000000字节,即25M。
Hive0.7之前,需要使用hint提示* /+ mapjoin(table) */才会执行MapJoin,否则执行Common Join,但在0.7版本之后,默认自动会转换Map Join,由参数 hive.auto.convert.join 来控制,默认为true。
执行流程如下:
1.通过MapReduce Local Task,将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。
2.MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。
当两张表都很大时,超过hive.mapjoin.smalltable.filesize时,就不能使用mapjoin了。针对这种情况,可以采用skew join进行优化。
使用方式
-- 方法1:hint表名(注意hint的是表的alias)
-- 下面的case认为join的a这一路发生了数据倾斜
select /*+ skewjoin(a) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1;
-- 方法2:hint表名和认为可能产生倾斜的列
-- 下面的case认为join的a这一路发生了数据倾斜,且是a的c0和c1列存在数据倾斜
select /*+ skewjoin(a(c0, c1)) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1 and a.c2 = b.c2;
-- 方法3:hint表名和列,并提供发生倾斜的key值(注意如果是String类型,需要加上引号)
-- 下面的case认为join的a这一路发生了数据倾斜,且认为热值有两个,1)a.c0为1,且a.c1为"2"时; 2)a.c0为3,且a.c1为"4"时
select /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1 and a.c2 = b.c2;
原理解释
定义热值key:出现次数很多的Join key值,例如下图中红色部分,a.c0=1 and a.c1=2 有10000行,a.c0=3 and a.c1=4有9000行。
在没加skew join hint的情况下,将表T0和表T1进行Join,由于T0和T1的数量都很大,只能进行Merge Join,因此相同的热值需要shuffle到一个节点进行处理,导致数据倾斜,如下图所示。
加hint后,ODPS会运行一个聚合函数获取热值(默认情况下获取TOP20,即数量最多的20个Join Key值),然后将表T0中属于热值的值(下图所示,数据A)、T0中不属于热值的值(数据B)拆分。将表T1中能与T0中属于热值的值Join上的值(数据C)、表T1中与T0属于热值的值Join不上的值(数据D)进行拆分。然后将数据A与数据C进行Map Join(由于数据C量很少,可以进行Map Join),将数据B和数据D进行Merge Join(已经没有热值,不会发生数据倾斜)。最后将Map Join和Merge Join的结果Union,得到最终的结果,如下图所示。
由于表T0的热值部分使用了Map Join,所以加速了计算过程。