reduce
reduce
和 reduceByKey
有什么区别:
reduce
是一个 Action 算子,reduceByKey
是一个转换算子
假设一个 RDD 里面有一万条数据,大部分 Key 是相同的,有十个不同的 Key。
rdd.reduceByKey
生成 10 条数据 而rdd.reduce
生成一条数据
reduceByKey
本质上是先按照Key分组,然后把每组聚合
reduce
是针对于一整个数据集来进行聚合
reduceByKey
是针对 KV 型数据来进行计算,reduce 可以针对所有类型的数据进行计算
reduce 算子是一个shuffle操作吗?
Shuffle 操作分为 mapper 和 reducer,mapper 将数据放入 paritioner 的函数计算,求得分为哪个 reducer,后分到对应的 reducer 中
reduce
操作并没有 mapper 和 reducer, 因为 reduce
算子会作用于 RDD 中的每一个分区,然后在分区上求得局部结果,最终汇总到 Driver 中求得最终结果
RDD中有五大属性,Paritioner 在 Shufle 过程中使用 Partitioner 只有 KV 型的 RDD 才有
作用
对整个结果集规约, 最终生成一条数据, 是整个数据集的汇总
调用
reduce( (currValue[T], agg[T]) ? T )
注意点
reduce 和 reduceByKey 是完全不同的, reduce 是一个 action, 并不是 Shuffled 操作
本质上 reduce 就是现在每个 partition 上求值, 最终把每个 partition 的结果再汇总
code
@Test
def reduce(): Unit = {
?//注意,函数中传入的curr不是value而是整条数据:("手机", 10)。reduce整体上的结果, 只有一个
?//生成的结果类型是(“商品”,price)
?val rdd = sc.parallelize(Seq(("手机", 10), ("手机", 15), ("电脑", 20)))
?val rdd1 = rdd.reduce((curr, agg)=>("总价",curr._2+agg._2)) // agg 是局部结果,("总价",curr._2+agg)用curr._2+agg._2求总和
?println(rdd1)
}
foreach
code
@Test
def foreach(): Unit = {
?// //注意点。item的收集是一个异步的过程,并行执行。所以结果可能不是按顺序来的
?val rdd = sc.parallelize(Seq(1, 2, 3))
?rdd.foreach(item => println(item))
}
count
和 countByKey
(两个都是求数量的)
/*
?* count 和 countByKey 的结果相聚很远,每次调用Action都会生成一个job,job会运行获取结果
?* 所以在两个job之间有大量的log打出,其实就是在启动job
?*
?* countByKey 的运行结果是 Map(key,value 对应的是 Key的 count)
?* 如果要解决数据倾斜的问题,可以通过 countByKey 查看 key 对应的数据总数,从而解决倾斜
?* */
@Test
def count(): Unit = {
?val rdd = sc.parallelize(Seq(("a",1),("b",2),("c",3),("a",4)))
?println(rdd.count()) // 4
?println(rdd.countByKey()) // Map(a -> 2, b -> 1, c -> 1)
}
first
(只获取第一个元素) take
(获取前几个元素)takesample
(直接拿到结果和之前的sample类似)
/*
?* first take takesample
?* 注意点:
?* take按顺序获取。takesample是采样获取
?* first:一般情况下,action会从所有的分区获取数据,相对来说比较慢,但是first只会获取第一个元素。处理第一个分区,无需处理所有的数据比较快
?* */
@Test
def take(): Unit = {
?val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6))
?rdd.take(3).foreach(item => println(item)) // 取前3个
?println(rdd.first()) // 取第1个
?rdd.takeSample(withReplacement = true, 3).foreach(item => println(item))
//withReplacement = true,有放回抽3个
}
collect
(以数组形式返回元素)
总结
算子功能上进行分类:
转换算子transformation
动作算子Action
RDD中存放的数据类型:
基本类型 String 对象
KV 类型
数字类型