本文共 4187 字,大约阅读时间需要 13 分钟。
数据倾斜
数据倾斜是指,map /reduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致整个程序的处理时间很长,这是因为某一个key的条数比其他key多很多(有时是百倍或者千倍之多),这条key所在的reduce节点所处理的数据量比其他节点就大很多,从而导致某几个节点迟迟运行不完。
表现:任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。单一reduce的记录数与平均记录数差异过大,通常可能达到3倍甚至更多。
问题 类型 | 关键词 | 情形 | 后果 | 解决方法 |
数据倾斜 | Join | 其中一个表是小表(1000条以下记录或1G容量以下)
| 分发到某几个reduce上的数据远高于平均值 | SMB Join, Map Join, SMB Map Join |
数据倾斜 |
| 大表与大表join,但是关联值0或null过多 | 空值会由一个reduce处理,非常慢 | SMB Join, 空值特殊处理, 调参(skewjoin) |
数据倾斜 |
| 小表不小不大 | - | 嵌套Map Join |
数据倾斜 |
| 不同数据类型关联 | 默认的Hash操作会按int型的id来进行分配,所有string类型id的记录都分配到一个Reducer中 | 类型转换 |
数据倾斜 | Group by | 某些维度值数据过多 | 处理某值的reduce耗时 | 调参(skewindata) |
数据倾斜 | Count distinct | 某些值过多 | 处理某值的reduce耗时 | Count distinct优化 |
数据量大 | On … where … | 过滤在where条件 | 大数据量join | where优化 |
Job数多 | Union all | Union属于嵌套查询 | 生成过多的job | union all优化 |
解决方案
1、调参
hive.groupby.skewindata=true
有数据倾斜的时候进行负载均衡,当选项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操作。
set Hive.optimize.skewjoin = true;
skew join,其原理把这种user_id = 0的特殊值先不在Reduce端计算掉,而是先写入hdfs,然后启动一轮Mapjoin专门做这个特殊值的计算,期望能提高计算这部分值的处理速度。还有要告诉Hive如何判断特殊值,根据Hive.skewjoin.key设置的数量Hive可以知道,比如默认值是100000,那么超过100000条记录的值就是特殊值。
2、Sort Merge Bucket Join (SMB Join)
Hive桶:
对于每一个表(table)或者分区, Hive可以进一步组织成桶,也就是说桶是更为细粒度的数据范围划分。Hive采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中。
SMB Join:
(1)解决大表与小表间的 Join 问题。小表的number_buckets必须是大表的倍数。
(2)解决大表与大表间的 Join 问题。这一优化方法并不一定要求两个表必须桶的个数相同,两个表的桶个数是倍数关系也可以。
例子3、Map Join
所有的工作都在map端进行计算。首先小表的Map 阶段它会将自己转化成MapReduce Local Task ,然后从HDFS 取小表的所有数据,将自己转化成Hashtable file 并压缩打包放入DistributedCache 里面。
hive 0.7自动判断哪个是小表,哪个是大表,这个参数由(hive.auto.convert.join=true)来控制. 然后控制小表的大小由(hive.smalltable.filesize=25000000L)参数控制(默认是25M),当小表超过这个大小,hive会默认转化成common join。另一种是手动判断/*+mapjoin(map_table)*/。
例子见SMB Map Join。4、SMB Map Join
两个表关联键为imei,需要按imei分桶并且排序,小表(lxw_test)分桶数是大表(lxw_test1)的倍数。例子
create table lxw_test(imei string,sndaid string,data_time string)CLUSTERED BY(imei) SORTED BY(imei) INTO 10 BUCKETS;create table lxw_test1(imei string,sndaid string,data_time string)CLUSTERED BY(imei) SORTED BY(imei) INTO 5 BUCKETS;set hive.enforce.bucketing = true;--插入数据前需要打开该选项set hive.optimize.bucketmapjoin = true;set hive.optimize.bucketmapjoin.sortedmerge = true;set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;--join时需要打开的参数select /*+ mapjoin(b) */ count(1) from lxw_test1 a join lxw_test b on a.imei = b.imei--包括insert数据,差不多10分钟左右;--如果这两个表做普通的join, 耗时1个多小时,没跑完,kill掉了。
5、嵌套Map Join
小表不小不大,怎么用 map join 解决倾斜问题。例子select /*+mapjoin(x)*/* from log a left outer join ( select /*+mapjoin(c)*/d.* from ( select distinct user_id from log ) c join users d on c.user_id = d.user_id ) x on a.user_id = b.user_id;
6、where优化
Where条件放置到on条件中,以达到两表做join的时候,数据量相对变小的效果。
7、union all优化 union all优化还不太完善,嵌套查询时生成过多的job。
--BeforeSelect *From (select * from t1 Union all select * from t4 Union all Select * from t2 Join t3 On t2.id = t3.id ) xGroup by c1,c2;--After--先join生成临时表,后union all。原来4个jobs,改进后变成2个jobs。Insert overwrite table t5Select * from t2 Join t3 On t2.id = t3.id;Select * from (t1 union all t4 union all t5);
8、Count distinct优化
(1)sum() group by代替
(2)count … distinct … 代替
/*改写前*/select a, count(distinct b) as c from tbl group by a;/*改写后*/select a, count(*) as c from (select distinct a, b from tbl) group by a;
9、空值特殊处理
把空值的key变成一个字符串加上随机数,把倾斜的数据分到不同的reduce上
--Beforeselect * from log a join users b on a.user_id is not null and a.user_id = b.user_idunion allselect * from log a where a.user_id is null;--Afterselect * from log a left outer join users b on case when a.user_id is null then concat(‘hive’,rand() ) else a.user_id end = b.user_id;
10、类型转换
场景:用户表中user_id字段为int,log表中user_id字段既有string类型也有int类型。当按照user_id进行两个表的Join操作时,默认的Hash操作会按int型的id来进行分配,这样会导致所有string类型id的记录都分配到一个Reducer中。例子
select * from users a left outer join logs b on a.usr_id = cast(b.user_id as string)
参考
hadoopjob解决大数据量关联时数据倾斜的一种办法
hive join详解
Hive 基础之:分区、桶、Sort Merge Bucket Join
hive-SortMerge Bucket Map Join
hive大数据倾斜总结
深入浅出数据仓库中SQL性能优化之Hive篇