数据倾斜问题,通常是指参与计算的数据分布不均,即某个key或者某些key的数据量远超其他key,导致在shuffle阶段,大量相同key的数据被发往同一个Reduce,进而导致该Reduce所需的时间远超其他Reduce,成为整个任务的瓶颈。
比如对于一张表的province_id字段,其中99%的值都为1,则进行hash分区的时候,都会放到一个reduce上,运行效率降低,发生数据倾斜。
Hive中的数据倾斜常出现在分组聚合和join操作的场景中,下面分别介绍在上述两种场景下的优化思路。
Hive中未经优化的分组聚合,是通过一个MapReduce Job实现的。Map端负责读取数据,并按照分组字段分区,通过Shuffle,将数据发往Reduce端,各组数据在Reduce端完成最终的聚合运算。
如果group by分组字段的值分布不均,就可能导致大量相同的key进入同一Reduce,从而导致数据倾斜问题。
由分组聚合导致的数据倾斜问题,有以下两种解决思路:
1)Map-Side聚合
开启Map-Side聚合后,数据会现在Map端完成部分聚合工作。这样一来即便原始数据是倾斜的,经过Map端的初步聚合后,发往Reduce的数据也就不再倾斜了。最佳状态下,Map-端聚合能完全屏蔽数据倾斜问题。
相关参数如下:
--启用map-side聚合
set hive.map.aggr=true;
--用于检测源表数据是否适合进行map-side聚合。检测的方法是:先对若干条数据进行map-side聚合,若聚合后的条数和聚合前的条数比值小于该值,则认为该表适合进行map-side聚合;否则,认为该表数据不适合进行map-side聚合,后续数据便不再进行map-side聚合。
set hive.map.aggr.hash.min.reduction=0.5;
--用于检测源表是否适合map-side聚合的条数。
set hive.groupby.mapaggr.checkinterval=100000;
--map-side聚合所用的hash table,占用map task堆内存的最大比例,若超出该值,则会对hash table进行一次flush。
set hive.map.aggr.hash.force.flush.memory.threshold=0.9;
2)Skew-GroupBy优化(Skew指的是数据倾斜)
Skew-GroupBy的原理是启动两个MR任务,第一个MR按照随机数分区,将数据分散发送到Reduce,完成部分聚合,第二个MR按照分组字段分区,完成最终聚合。
相关参数如下:
--启用分组聚合数据倾斜优化
set hive.groupby.skewindata=true;
1)示例SQL语句
hive (default)>
select
province_id,
count(*)
from order_detail
group by province_id;
2)优化前
该表数据中的province_id字段是存在倾斜的,若不经过优化,通过观察任务的执行过程,是能够看出数据倾斜现象的。
需要注意的是,hive中的map-side聚合是默认开启的,若想看到数据倾斜的现象,需要先将hive.map.aggr参数设置为false。
--优化前
set hive.map.aggr=false;
set hive.groupby.skewindata=false
3)优化思路
通过上述两种思路均可解决数据倾斜的问题。下面分别进行说明:
(1)方案一:Map-Side聚合
设置如下参数
1、启用map-side聚合
set hive.map.aggr=true;
2、关闭skew-groupby
set hive.groupby.skewindata=false;
开启map-side聚合后的执行计划如下图所示:
开启map-side聚合后,reduce数据不再倾斜。
(2)方案二:Skew-GroupBy优化
设置如下参数:
·1、启用skew-groupby
set hive.groupby.skewindata=true;
2、关闭map-side聚合
set hive.map.aggr=false;
开启Skew-GroupBy优化后,可以很明显看到该sql执行在yarn上启动了两个mr任务,第一个mr打散数据,第二个mr按照打散后的数据进行分组聚合。
小结:
前文提到过,未经优化的join操作,默认是使用common join算法,也就是通过一个MapReduce Job完成计算。Map端负责读取join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。
如果关联字段的值分布不均,就可能导致大量相同的key进入同一Reduce,从而导致数据倾斜问题。
由join导致的数据倾斜问题,有如下三种解决方案:
方案一:map join
使用map join算法,join操作仅在map端就能完成,没有shuffle操作,没有reduce阶段,自然不会产生reduce端的数据倾斜。该方案适用于大表join小表时发生数据倾斜的场景。
相关参数如下:
--启动Map Join自动转换
set hive.auto.convert.join=true;
--一个Common Join operator转为Map Join operator的判断条件,若该Common Join相关的表中,存在n-1张表的大小总和<=该值,则生成一个Map Join计划,此时可能存在多种n-1张表的组合均满足该条件,则hive会为每种满足条件的组合均生成一个Map Join计划,同时还会保留原有的Common Join计划作为后备(back up)计划,实际运行时,优先执行Map Join计划,若不能执行成功,则启动Common Join后备计划。
set hive.mapjoin.smalltable.filesize=250000;
--开启无条件转Map Join
set hive.auto.convert.join.noconditionaltask=true;
--无条件转Map Join时的小表之和阈值,若一个Common Join operator相关的表中,存在n-1张表的大小总和<=该值,此时hive便不会再为每种n-1张表的组合均生成Map Join计划,同时也不会保留Common Join作为后备计划。而是只生成一个最优的Map Join计划。
set hive.auto.convert.join.noconditionaltask.size=10000000;
方案二:skew join
skew join的原理是:==为倾斜的大key单独启动一个map join任务进行计算,其余key进行正常的common join。==原理图如下:
相关参数如下:
1、启用skew join优化
set hive.optimize.skewjoin=true;
2、触发skew join的阈值,若某个key的行数超过该参数值,则触发
set hive.skewjoin.key=100000;
这种方案对参与join的源表大小没有要求,但是对两表中倾斜的key的数据量有要求,要求一张表中的倾斜key的数据量比较小(方便走mapjoin)。
方案三:调整SQL语句
若参与join的两表均为大表,其中一张表的数据是倾斜的,此时也可通过以下方式对SQL语句进行相应的调整。
假设原始SQL语句如下:A,B两表均为大表,且其中一张表的数据是倾斜的。
hive (default)>
select
*
from A
join B
on A.id=B.id;
其join过程如下:
图中1001为倾斜的大key,可以看到,其被发往了同一个Reduce进行处理。
调整SQL语句如下:
hive (default)>
select
*
from(
select --打散操作
concat(id,'_',cast(rand()*2 as int)) id,
value
from A
)ta
join(
select --扩容操作
concat(id,'_',0) id,
value
from B
union all
select
concat(id,'_',1) id,
value
from B
)tb
on ta.id=tb.id;
调整之后的SQL语句执行计划如下图所示:
1)示例SQL语句
hive (default)>
select
*
from order_detail od
join province_info pi
on od.province_id=pi.id;
2)优化前
order_detail表中的province_id字段是存在倾斜的,若不经过优化,通过观察任务的执行过程,是能够看出数据倾斜现象的。
需要注意的是,hive中的map join自动转换是默认开启的,若想看到数据倾斜的现象,需要先将hive.auto.convert.join参数设置为false。
3)优化思路
上述两种优化思路均可解决该数据倾斜问题,下面分别进行说明:
(1)map join
设置如下参数:
1、启用map join(该参数默认开启)
set hive.auto.convert.join=true;
2、关闭skew join(该参数默认关闭)
set hive.optimize.skewjoin=false;
可以很明显看到开启map join以后,mr任务只有map阶段,没有reduce阶段,自然也就不会有数据倾斜发生。
(2)skew join
设置如下参数
1、启动skew join
set hive.optimize.skewjoin=true;
2、关闭map join
set hive.auto.convert.join=false;
开启skew join后,使用explain可以很明显看到执行计划如下图所示,说明skew join生效,任务既有common join,又有部分key走了map join。
并且该sql在yarn上最终启动了两个mr任务,而且第二个任务只有map没有reduce阶段,说明第二个任务是对倾斜的key进行了map join。
对于一个分布式的计算任务而言,设置一个合适的并行度十分重要。Hive的计算任务由MapReduce完成,故并行度的调整需要分为Map端和Reduce端。
Map端的并行度,也就是Map的个数。是由输入文件的切片数决定的。一般情况下,Map端的并行度无需手动调整。
以下特殊情况可考虑调整map端并行度:
1)查询的表中存在大量小文件
按照Hadoop默认的切片策略,一个小文件会单独启动一个map task负责计算。若查询的表中存在大量小文件,则会启动大量map task,造成计算资源的浪费。这种情况下,可以使用Hive提供的CombineHiveInputFormat
,多个小文件合并为一个切片,从而控制map task个数。
相关参数如下:
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
2)map端有复杂的查询逻辑
若SQL语句中有正则替换、json解析等复杂耗时的查询逻辑时,map端的计算会相对慢一些。若想加快计算速度,在计算资源充足的情况下,可考虑增大map端的并行度,令map task多一些,每个map task计算的数据少一些。相关参数如下:
一个切片的最大值
set mapreduce.input.fileinputformat.split.maxsize=256000000;
Reduce端的并行度,也就是Reduce个数。相对来说,更需要关注。Reduce端的并行度,可由用户自己指定,也可由Hive自行根据该MR Job输入的文件大小进行估算。
Reduce端的并行度的相关参数如下:
1、指定Reduce端并行度,默认值为-1,表示用户未指定
set mapreduce.job.reduces;
2、Reduce端并行度最大值
set hive.exec.reducers.max;
3、单个Reduce Task计算的数据量,用于估算Reduce并行度
set hive.exec.reducers.bytes.per.reducer;
Reduce端并行度的确定逻辑如下:
若指定参数mapreduce.job.reduces的值为一个非负整数,则Reduce并行度为指定值。否则,Hive自行估算Reduce并行度,估算逻辑如下:
假设Job输入的文件大小为totalInputBytes
参数hive.exec.reducers.bytes.per.reducer的值为bytesPerReducer。
参数hive.exec.reducers.max的值为maxReducers。
则Reduce端的并行度为:
根据上述描述,可以看出,Hive自行估算Reduce并行度时,是以整个MR Job输入的文件大小作为依据的。因此,在某些情况下其估计的并行度很可能并不准确,此时就需要用户根据实际情况来指定Reduce并行度了。
小文件合并优化,分为两个方面,分别是Map端输入的小文件合并,和Reduce端输出的小文件合并。
合并Map端输入的小文件,是指将多个小文件划分到一个切片中,进而由一个Map Task去处理。目的是防止为单个小文件启动一个Map Task,浪费计算资源。
相关参数为:
可将多个小文件切片,合并为一个切片,进而由一个map任务处理
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
合并Reduce端输出的小文件,是指将多个小文件合并成大文件。目的是减少HDFS小文件数量。其原理是根据计算任务输出文件的平均大小进行判断,若符合条件,则单独启动一个额外的任务进行合并。
相关参数为:
1、开启合并map only任务输出的小文件
set hive.merge.mapfiles=true;
2、开启合并map reduce任务输出的小文件
set hive.merge.mapredfiles=true;
3、合并后的文件大小
set hive.merge.size.per.task=256000000;
4、触发小文件合并任务的阈值,若某计算任务输出的文件平均大小低于该值,则触发合并
set hive.merge.smallfiles.avgsize=16000000;
CBO是指Cost based Optimizer,即基于计算成本的优化。
在Hive中,计算成本模型考虑到了:数据的行数、CPU、本地IO、HDFS IO、网络IO等方面。Hive会计算同一SQL语句的不同执行计划的计算成本,并选出成本最低的执行计划。目前CBO在hive的MR引擎下主要用于join的优化,例如多表join的join顺序。
相关参数为:
是否启用cbo优化
set hive.cbo.enable=true;
案例:
1)示例SQL语句
hive (default)>
select
*
from order_detail od
join product_info product on od.product_id=product.id
join province_info province on od.province_id=province.id;
2)关闭CBO优化
关闭cbo优化
set hive.cbo.enable=false;
为了测试效果更加直观,关闭map join自动转换
set hive.auto.convert.join=false;
根据执行计划,可以看出,三张表的join顺序如下:
3)开启CBO优化
1、开启cbo优化
set hive.cbo.enable=true;
2、为了测试效果更加直观,关闭map join自动转换
set hive.auto.convert.join=false;
根据执行计划,可以看出,三张表的join顺序如下:
4)总结
根据上述案例可以看出,CBO优化对于执行计划中join顺序是有影响的,其之所以会将province_info的join顺序提前,是因为province info的数据量较小,将其提前,会有更大的概率使得中间结果的数据量变小,从而使整个计算任务的数据量减小,也就是使计算成本变小。
谓词下推(predicate pushdown)是指,在不影响结果的前提下,尽量将where过滤操作前移,以减少后续计算步骤的数据量。这个过滤指的是where过滤。
相关参数为:
1、是否启动谓词下推(predicate pushdown)优化
set hive.optimize.ppd = true;
需要注意的是:
CBO优化也会完成一部分的谓词下推优化工作,因为在执行计划中,谓词越靠前,整个计划的计算成本就会越低。
Hive的矢量化查询优化,依赖于CPU的矢量化计算,CPU的矢量化计算的基本原理如下图:
Hive的矢量化查询,可以极大的提高一些典型查询场景(例如scans, filters, aggregates, and joins)下的CPU使用效率。
相关参数如下:
set hive.vectorized.execution.enabled=true;
若执行计划中,出现“Execution mode: vectorized”字样,即表明使用了矢量化计算。
Fetch抓取是指,Hive中对某些情况的查询可以不必使用MapReduce计算。例如:select * from emp;
在这种情况下,Hive可以简单地读取emp对应的存储目录下的文件,然后输出查询结果到控制台。
相关参数如下:
1、设置为none表示不转换
2、设置为minimal表示支持select *,分区字段过滤,Limit等
3、设置为more表示支持select 任意字段,包括函数,过滤,和limit等
set hive.fetch.task.conversion=more;
大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过,有时Hive的输入数据量是非常小的。在这种情况下,为查询触发执行任务消耗的时间可能会比实际job的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
相关参数如下:
1、开启自动转换为本地模式
set hive.exec.mode.local.auto=true;
2、设置local MapReduce的最大输入数据量,当输入数据量小于这个值时采用local MapReduce的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
3、设置local MapReduce的最大输入文件个数,当输入文件个数小于这个值时采用local MapReduce的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;
Hive会将一个SQL语句转化成一个或者多个Stage,每个Stage对应一个MR Job。默认情况下,Hive同时只会执行一个Stage。但是某SQL语句可能会包含多个Stage,但这多个Stage可能并非完全互相依赖,也就是说有些Stage是可以并行执行的。此处提到的并行执行就是指这些Stage的并行执行。相关参数如下:
1、启用并行执行优化
set hive.exec.parallel=true;
2、同一个sql允许最大并行度,默认为8
set hive.exec.parallel.thread.number=8;
Hive可以通过设置某些参数防止危险操作:
1)分区表不使用分区过滤
将hive.strict.checks.no.partition.filter设置为true时,对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。
2)使用order by没有limit过滤
将hive.strict.checks.orderby.no.limit设置为true时,==对于使用了order by语句的查询,要求必须使用limit语句。==因为order by为了执行排序过程会将所有的结果数据分发到同一个Reduce中进行处理,强制要求用户增加这个limit语句可以防止Reduce额外执行很长一段时间(开启了limit可以在数据进入到Reduce之前就减少一部分数据)。
3)笛卡尔积
将hive.strict.checks.cartesian.product设置为true时,会限制笛卡尔积的查询。 对关系型数据库非常了解的用户可能期望在执行JOIN查询的时候不使用ON语句而是使用where语句,这样关系数据库的执行优化器就可以高效地将WHERE语句转化成那个ON语句。不幸的是,Hive并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。