如何识别倾斜?
- 若表为分区分桶表,以分区字段作为聚合条件聚合,并进行抽样。
- 若有HDFS的权限,查看分区文件夹的大小是否存在明显差异。
过多:
使用不当:
count(distinct) ? => select FIELD_NAME FROM ... GROUP BY FIELD_NAME;
join ... on ... where
谓词下推(见下文)。select sum(field) from TABLE;
不支持全表聚合。
select sum(sub_total)
from(
select sum(order_amount) as sub_total
from (
select sum(order_amount),user_id%3 as id
from TABLE_NAME
)A group by id
)A;
数仓方案考虑整体性地解决问题
整体最优,考虑全局。
合理减少表数量:
充分了解业务,提前设计好预聚合。
set mapreduce.map.memory.mb=256;
Map任务内存。set mapreduce.reduce.memory.mb=512;
Reduce任务内存。set mapreduce.map.java.opts=?
Map的JVM。set mapreduce.reduce.java.opts=?
Reduce的JVM。set yarn.nodemanager.resource.memory-mb=-1;
NodeManager的内存。set yarn.scheduler.minimum-allocation-mb=1024;
YARN调度器的最小分配内存。set yarn.scheduler.maximum-allocation-mb=8192;
YARN调度器的最大分配内存。抽样
找到)。select user_id%3 as id group by id
)。set hive.auto.convert.join=true;
set hive.mapjoin.smalltable.filesize=25M;
set hive.optimize.bucketmapjoin=true;
set hive.map.aggr=true;
set mapreduce.job.reduces=n;
(见下面 Reducer 数量控制)。set hive.groupby.skewindata=true;
若满足以下设置条件,任务结束后会单起MapReduce对输出文件进行合并。
set hive.merge.mapfiles=true;
set hive.merge.mapredfiles=true;
set hive.merge.size.per.task=256M;
set hive.merge.smallfiles.avgsize=16M;
mapper的启动和初始化开销较大,数量过多导致开销大于逻辑处理,浪费资源。
默认的Mapper数量:
默认为2, 只有大于2时才会生效。
set mapreduce.job.maps=2;
Mapper数量有限值:
set dfs.block.size=128M;
set mapred.max.split.size=256M;
set mapred.min.split.size=1;
默认单个节点处理的数据下限1字节。
set mapred.min.split.size.per.node=1;
默认单个机架处理的数据下限
set mapred.min.split.size.per.rack=1;
Mapper输入多个小文件合并后再切片
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
Mapper切片的大小【越接近128M越好】?
set mapred.min.split.size = N;
若表A单行内容量大,且处理逻辑复杂,需要将文件拆分(列裁剪,行筛选)
将数据通过分区表拆分成更小粒度
将数据随机且均匀地分散到不同的Reducer,这有助于均衡负载。
set mapreduce.job.reduces=3;
create table A_SPLITS as
select * from A distribute by rand(3);
int n = Math.min(SIZE/bytes.per.reducer, reducers.max) | num_partitions
set mapreduce.job.reduces=n;
set hive.exec.reducers.bytes.per.reducer=256M;
set hive.exec.reducers.max=1009;
select sum(sum_a) from (select sum(a) from A group by STH)T
select * from (select * from A distribute by a sort by a) order by a;
小表+大表
的笛卡尔积:小表扩展join key,并根据需求复制 DN_COUNT 份,大表扩展join key,根据 DN_COUNT 随机生成
set hive.auto.convert.join=false
set mapreduce.job.reduces=DN_COUNT
set hive.default.fileformat=orc|textfile|rcfile|sequencefile;
set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
set hive.exec.compress.intermediate=true;
set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions=1000;
set hive.exec.max.dynamic.partitions.pernode=100;
insert into table TABLE_PARTITION partition(partition_field) select *, partition_field from TABLE_SOURCE where ...;
load data [local] inpath 'DATA_PATH' [overwrite|into] table TABLE_PARTITION partition(partition_field=VALUE);
show partitions TABLE_PARTITION;
alter table TABLE_PARTITION add partition(partition_field=VALUE);
alter table TABLE_PARTITION drop partition(partition_field=VALUE);
count(distinct)
select count(distict b) from TAB group by a
select count(b) from (select a,b from TAB group by a,b) group by a
CBO (COST BASED OPTIMIZER)
set hive.cbo.enable=true;
分区裁剪
设置执行引擎,默认mr, tez|spark(优先)|DAG
set hive.execution.engine=tez;
并行执行无依赖job
set hive.exec.parallel=true;
set hive.exec.parallel.thread.number=8;
JVM重用(Hive3已取消,作了解即可)
set mapreduce.job.jvm.numtasks = 8;
本地化运算
set mapreduce.job.reduces=0/1;
set mapreduce.framework.name=local;
set hive.exec.mode.local.auto=true;
set hive.exec.mode.local.auto.input.files.max=4;
set hive.exec.mode.local.auto.inputbytes.max=128M;
# export HADOOP_HEAPSIZE=1024
llap
set hive.execution.mode=llap;
fetch
set hive.fetch.task.conversion=more;
谓词下推(下推即优化的意思):确定主从表条件应该放在on后还是where后