mapPartitions
mapPartitions
操作
# (1)
@Test
def mapPartitions(): Unit = {
// 1. 数据生成
// 2. 算子使用
// 3. 获取结果
sc.parallelize(Seq(1, 2, 3, 4, 5, 6),2)
.mapPartitions( iter => {
iter.foreach(item => println(item))
iter // 传进来要求集合,返回要求是另一个集合,所以传出去也要是集合
})//传递的函数要是迭代器(iterator)
.collect()
}
# (2)
@Test
def mapPartitions2(): Unit = {
// 1. 数据生成
// 2. 算子使用
// 3. 获取结果
sc.parallelize(Seq(1, 2, 3, 4, 5, 6),2)
.mapPartitions( iter => {
// 遍历 iter 其中每一条数据进行转换,转换完以后,返回这个 iter
// iter 是 scala 中的集合类型
iter.map(item => item * 10) // scala 中的 map 返回值是另一个Iterator,刚好有作为最后一行,即满足传进来的是集合,传出去的也是
})//传递的函数要是迭代器(iterator)
.collect()
.foreach(item => println(item))
}
mapPartitionsWithIndex
操作
@Test
def mapPartitionsWithIndex(): Unit = {
sc.parallelize(Seq(1, 2, 3, 4, 5, 6),2)
.mapPartitionsWithIndex((index, iter) =>{ // 传分区和iterator
println("index: "+ index )
iter.foreach(item => println(item))
iter
})
.collect()
}
总结
filter
filter
操作
@Test
def filter(): Unit = {
// 1.定义集合
// 2. 过滤数据
// 3. 收集结果
sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.filter( item => item % 2 ==0) // 取出偶数
.collect()
.foreach( item => println(item))
}
sample(不需要接收任何函数)(是随机的。主要用于随机采样)
sample
操作
作用
Sample算子可以从一个数据集中抽样出来一部分,常用作于减小数据集以保证运行速度,并且尽可能少规律的损失
参数
withReplacement
, 意为是否取样以后是否还放回原数据集供下次使用,简单的说,如果这个参数的值为true,则抽样出来的 数据集中可能有重复fraction
,意为抽样的比例seed
随机种子,用于Sample 内部随机生成code
@Test
def sample(): Unit = {
// 1.定义集合
// 2. 随机抽样数据
// 3. 收集结果
val rdd1 = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
val rdd2 = rdd1.sample(false, 0.6) // false,不放回
val result = rdd2.collect()
result.foreach( item => println(item))
}
mapValues
mapValues
操作
作用
code
@Test
def mapValues():Unit = {
sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
.mapValues( item => item * 10) // 只有 Value * 10 了
.collect().foreach( item => println(item))
}
集合操作
intersection
(交集)
code
@Test
def intersection(): Unit = {
val rdd1 =sc.parallelize(Seq(1, 2, 3, 4, 5))
val rdd2 =sc.parallelize(Seq(3, 4, 5, 6, 8))
rdd1.intersection(rdd2)
.collect()
.foreach(println(_))
}
union
(并集)
code
@Test
def union(): Unit = {
val rdd1 =sc.parallelize(Seq(1, 2, 3, 4, 5))
val rdd2 =sc.parallelize(Seq(3, 4, 5, 6, 8))
rdd1.union(rdd2)
.collect()
.foreach(println(_))
}
subtract
(差集)
code
@Test
def subtract(): Unit = {
val rdd1 =sc.parallelize(Seq(1, 2, 3, 4, 5))
val rdd2 =sc.parallelize(Seq(3, 4, 5, 6, 8))
rdd1.subtract(rdd2)
.collect()
.foreach(println(_))
}
groupByKey
groupByKey
操作
作用
注意点
code
@Test
def groupByKey(): Unit = {
sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
.groupByKey()
.collect()
.foreach( item => println(item))
}
有无异议看其是否能减少I/O
combinerByKey ( groupByKey 和 reduceByKey 都是以他为底层)
combinerByKey
操作
作用
调用
combineByKey(createCombiner, mergeValue, mergeCombiners, [partitioner], [mapSideCombiner], [serializer])
参数
createCombiner
将 Value 进行初步转换mergeValue
在每个分区把上一步转换的结果聚合mergeCombiners
在所有分区上把每个分区的聚合结果聚合partitioner
可选, 分区函数mapSideCombiner
可选, 是否在 Map 端 Combineserializer
序列化器注意点
combineByKey
的要点就是三个函数的意义要理解groupByKey
, reduceByKey
的底层都是 combineByKey
code
@Test
def combinerByKey(): Unit = {
// 1. 准备集合
val rdd: RDD[(String, Double)] = sc.parallelize(Seq(
("zhangsan", 99.0),
("zhangsan", 96.0),
("lisi", 97.0),
("lisi", 98.0),
("zhangsan", 97.0))
) // 求以上同学成绩的平均数
// 2. 算子操作
// 2.1 createCombiner 转换数据
// 2.2 mergeValue 分区上聚合
// 2.3 mergeCombiners 把所有分区上的结果再次聚合
val combinerResult = rdd.combineByKey(
createCombiner = (curr: Double) => (curr, 1), // 先操作第一项 ("zhangsan", 99.0),
mergeValue = (curr: (Double, Int), nextValue: Double) => (curr._1 + nextValue, curr._2 + 1), // 上面处理后直接到这来处理第二项 ("zhangsan", 96.0),
mergeCombiners = (curr: (Double, Int), agg: (Double, Int)) => (curr._1 + agg._1, curr._2 + agg._2) // Double相加,Int相加
)
// combinerResult: (lisi,(195.0,2)),(zhangsan,(292.0,3)) (“name”,(分数,次数))
val resultRDD = combinerResult.map(item => (item._1, item._2._1 / item._2._2))
// 3. 获取结果,打印数据
resultRDD.collect().foreach(println(_))
}
结论
foldByKey(与reduceByKey的区别是。他有一个初始值,reduceByKey的初始值是0)
foldByKey
操作
作用
调用
foldByKey(zeroValue) (func)
参数
zeroValue
初始值func seqOp
和 combOp
相同,都是这个参数code
@Test
def flodByKey():Unit={
sc.parallelize(Seq(("a", 1), ("a", 1), ("b", 1)))
.foldByKey(zeroValue = 10)( (curr, agg) => curr + agg )//第一个函数传入初始值。第二个传入聚合的规则
.collect()
.foreach(println(_))
}
}
aggregateByKey (这个是foldByKey的底层)
aggregateByKey
操作
作用
调用
rdd.aggregateByKey(zeroValue)(seqOp, combOp)
参数
zeroValue
初始值seqOp
转换每一个值的函数comboOp
将转换过的值聚合的函数注意点
aggregateByKey
运行将一个RDD[(K, V)] 聚合为 RDD[(K, U)]
, 如果要做到这件事的话, 就需要先对数据做一次转换, 将每条数据从V
转为U
,seqOp
就是干这件事的 , 当 seqOp
的事情结束以后, combOp
把其结果聚合code
@Test
def aggregateByKey(): Unit = { // 东西要打八折
val rdd = sc.parallelize(Seq(("手机", 10), ("手机", 15), ("电脑", 20)))
rdd.aggregateByKey(0.8)(
seqOp = (zeroValue, price) => price * zeroValue,
combOp = (curr, agg) => curr + agg)
.collect()
.foreach(println(_))
}
}
join
join
操作
作用
调用
join(other,[partitioner or numPartitions])
参数
注意点
code
@Test
def join(): Unit = {
val rdd1 = sc.parallelize(Seq(("a",1),("a",2),("b",1)))
val rdd2 = sc.parallelize(Seq(("a",10),("a",11),("a",12)))
rdd1.join(rdd2)
.collect()
.foreach(println(_))
}
}
sortBy 和 sortByKey(两个与排序有关的算子)
sortBy
和 sortByKey
操作
作用
sortBy
,另外一个是sortByKey
调用
sortBy(func,ascending,numPartitions)
参数
func
通过这个函数返回要排序的字段ascending
是否升序numPartitions
分区数注意点
sortByKey
,只有 Key-Value 的RDD 才有sortBy
可以指定按照哪个字段来排序, sortByKey
直接按照Key来排序code
@Test
def sort(): Unit = {
val rdd1 =sc.parallelize(Seq(2, 4, 1, 5, 8))
val rdd2 =sc.parallelize(Seq(("a", 1), ("b", 3), ("c" ,2)))
rdd1.sortBy( item => item).collect().foreach(println(_)) // 1 2 4 5 8
rdd2.sortBy(item => item._2).collect().foreach(println(_)) // (a,1) (c,2) (b,3)
rdd2.sortByKey().collect().foreach(println(_)) //(a,1) (b,3) (c,2)
}
总结
repartition 和 coalesce重分区(都是改变分区数)
repartition
和 coalesce
重分区操作
作用
repartitioin
和 coalesce
, 两个算子都可以调大或者调小分区数量调用
repartitioin(numPartitions)
coalesce(numPartitions, shuffle)
注意点
repartition
和 coalesce
的不同就在于 coalesce
可以控制是否 Shufflerepartition
是一个 Shuffled 操作code
@Test
def partitioning(): Unit = { // 改变分区数
val rdd = sc.parallelize(Seq(2, 4, 1, 5, 8),2)
// repartition
println(rdd.repartition(5).partitions.size) //repartition可大
println(rdd.repartition(1).partitions.size) //repartition可小
// coalesce
println(rdd.coalesce(1).partitions.size) // coalesce可小但是不可大(变大之后还是原来的2)
println(rdd.coalesce(5,shuffle = true).partitions.size) //设置shuffle = true之后coalesce可大
}
转换操作总结