HashPartitioner分区的原理很简单,对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID;弊端是数据不均匀,容易导致数据倾斜,极端情况下某几个分区会拥有rdd的所有数据。
原理:
RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。其原理是水塘抽样。
特点:
RangePartioner尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。RangePartitioner作用:将一定范围内的数映射到某一个分区内,在实现中,分界的算法尤为重要。算法对应的函数是rangeBounds。
1、定义一个Partitioner保证某一范围内的所有数据都在同一个分区
该分区根据数据范围划分为num个子范围,然后将每个数字分配到对应的子范围中,这种情况下当数据在各个子范围分布均匀时候可以表现良好。但是当数据严重聚集时候,会发生数据倾斜。当存在数据倾斜时候可以使用Spark提供的 RangePartitioner分区器进行分区。
import org.apache.spark.Partitioner
class SortPartitoner(num: Int) extends Partitioner {
override def numPartitions: Int = num
val partitionerSize = Integer.MAX_VALUE / num + 1
override def getPartition(key: Any): Int = {
val intKey = key.asInstanceOf[Int]
intKey / partitionerSize
}
}
2、分区内部排序,保证分区内有序?
object Sort {
def main(args: Array[String]) {
val conf = new SparkConf()
val sc = new SparkContext(conf)
val numbers = sc.textFile("/random.txt").flatMap(_.split(" ")).map(x => (x.toInt, 1)).cache()
val result = numbers.repartitionAndSortWithinPartitions(new SortPartitoner(numbers.partitions.length)).map(x=>x._1)
result.saveAsTextFile("/bigdatasort")
sc.stop()
}
}
参考
Spark Partitioner 分区器适用场景示例源码分析HashPartitioner RangePartitioner - 知乎
Spark分区器HashPartitioner和RangePartitioner/全局排序_spark全局排序-CSDN博客