Spark中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题,由不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题。
本节学习数据倾斜问题的避免与缓解。
学习资料:https://mp.weixin.qq.com/s/caCk3mM5iXy0FaXCLkDwYQ
例如reduced端一共要处理100w条数据,第一个和第二个task分别分配到了1w条数据,5min内计算完成,第三个task分配到了98w条数据,可能需要10h完成,这使得整个Spark作业需要10个小时才能运行完成。
数据倾斜的表现:
数据倾斜的问题定位:
3. 查阅代码中的shuffle算子,例如reduceByKey,countByKey,groupByKey,join等算子,根据代码逻辑判断是否会出现数据倾斜;
4. 查看Spark作业log文件,log对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个stage,对应的shuffle算子是哪一个;
大多数情况下,Spark作业的来源数据都是Hive表,为了避免数据倾斜,可以考虑避免shuffle过程。避免了shuffle过程,那么从根本上就消除了发生数据倾斜问题的可能。
如果Spark作业的数据来源于Hive表,那么可以现在Hive表中对数据进行聚合,例如按照key进行分组:
将同一key对应的所有value用一种特殊的格式拼接到一个字符串里去,这样,一个key就只有一条数据了;
对一个key的所有value进行处理时,只需要进行map操作即可,无需再进行任何的shuffle操作。
如果没有办法对每个key聚合出来一条数据,在特定场景下,可以考虑扩大key的聚合粒度。
例如,目前有10万条用户数据,当前key的粒度是(省,城市,区,日期),现在我们考虑扩大粒度,将key的粒度扩大为(省,城市,日期),这样的话,key的数量会减少,key之间的数据量差异也有可能会减少,由此可以减轻数据倾斜的现象和问题。(此方法只针对特定类型的数据有效,当应用场景不适宜时,会加重数据倾斜)
如果在Spark作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的key进行过滤,滤除可能导致数据倾斜的key对应的数据,这样,在Spark作业中就不会发生数据倾斜了。
当使用了类似于groupByKey、reduceByKey这样的算子时,可以考虑使用随机key实现双重聚合,如下图所示:
过程如下:
适用场景:对于由groupByKey、reduceByKey这类算子造成的数据倾斜有比较好的效果,仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。
在Spark中,如果某个RDD只有一个key,那么在shuffle过程中会默认将此key对应的数据打散,由不同的reduce端task进行处理。
所以当由单个key导致数据倾斜时,可将发生数据倾斜的key单独提取出来,组成一个RDD;
然后用这个原本会导致倾斜的key组成的RDD和其他RDD单独join;
根据Spark的运行机制,此RDD中的数据会在shuffle阶段被分散到多个task中去进行join操作。
适用场景:
对于RDD中的数据,可以将其转换为一个中间表,或者是直接使用countByKey()的方式,看一下这个RDD中各个key对应的数据量,此时如果你发现整个RDD就一个key的数据量特别多,那么就可以考虑使用这种方法。
当数据量非常大时,可以考虑使用sample采样获取10%的数据,然后分析这10%的数据中哪个key可能会导致数据倾斜,然后将这个key对应的数据单独提取出来。
不适用场景:
如果一个RDD中导致数据倾斜的key很多,那么此方案不适用。
当方案一和方案二对于数据倾斜的处理没有很好的效果时,可以考虑提高shuffle过程中的reduce端并行度,reduce端并行度的提高就增加了reduce端task的数量,那么每个task分配到的数据量就会相应减少,由此缓解数据倾斜问题。
在大部分的shuffle算子中,都可以传入一个并行度的设置参数,比如reduceByKey(500),这个参数会决定shuffle过程中reduce端的并行度,在进行shuffle操作的时候,就会对应着创建指定数量的reduce task。
对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions
,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。
提高reduce端并行度并没有从根本上改变数据倾斜的本质和问题(方案一和方案二从根本上避免了数据倾斜的发生),只是尽可能地缓解和减轻shuffle reduce task的数据压力,以及数据倾斜的问题,适用于有较多key对应的数据量都比较大的情况。
正常情况下,join操作都会执行shuffle过程,并且执行的是reduce join,也就是先将所有相同的key和对应的value汇聚到一个reduce task中,然后再进行join。普通join的过程如下图所示:
普通join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉去到一个shuffle read task中在进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。
注意:RDD是并不能直接进行广播的,只能将RDD内部的数据通过collect拉取到Driver内存然后再进行广播。
不使用join算子进行连接操作,而使用broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。
适用场景:当join操作有数据倾斜问题并且其中一个RDD的数据量较小时,可以优先考虑这种方式,效果非常好。
map join过程:
不适用场景:由于Spark的广播变量是在每个Executor中保存一个副本,如果两个RDD数据量都比较大,那么如果将一个数据量比较大的RDD做成广播变量,那么很有可能会造成内存溢出。