map 算子
# spark-shell
sc.parallelize(Seq(1, 2, 3))
.map( num => num * 10)
.collect()
# IDEA
@Test
def mapTest(): Unit = {
// 1. 创建RDD
val rdd1 = sc.parallelize(Seq(1, 2, 3))
// 2. 执行 map 操作
val rdd2 = rdd1.map(item => item * 10)
// 3. 得到结果
val result:Array[Int] = rdd2.collect()
result.foreach(item => println(item))
// 关闭sc
sc.stop()
}
def map[U: ClassTag] (f: T ? U) : RDD[U]
map 是一对一, 如果函数是?String → Array[String]
则新的 RDD 中每条数据就是一个数组
flatMap算子
# spark-shell
sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim"))
.flatMap( line => line.split(" "))
.collect()
# IDEA
@Test
def flatMapTest(): Unit = {
// 1. 创建RDD
val rdd1 = sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim"))
// 2. 执行 flatMap 操作
val rdd2 = rdd1.flatMap( line => line.split(" "))
// 3. 得到结果
val result:Array[String] = rdd2.collect()
result.foreach(line => (println(line)))
// 关闭sc
sc.stop()
}
def flatMap[U: ClassTag](f: T ? List[U]): RDD[U]
f
?→ 参数是原 RDD 数据, 返回值是经过函数转换的新 RDD 的数据, 需要注意的是返回值是一个集合, 集合中的数据会被展平后再放入新的 RDDmap + flatten
, 也就是先转换, 后把转换而来的 List 展开ReduceByKey算子
# spark-shell
sc.parallelize(Seq(("a",1), ("a", 1), ("b", 1)))
.reduceByKey( ( cur, agg) => cur + agg)
.collect()
# IDEA
@Test
def reduceByKeyTest(): Unit = {
// 1. 创建RDD
val rdd1 = sc.parallelize(Seq("Hello lily", "Hello lucy", "Hello tim"))
// 2. 处理数据
val rdd2 = rdd1.flatMap( item => item.split(" "))
.map(item => (item, 1))
.reduceByKey( (cur, agg) => cur + agg)
// 3. 得到结果
val result:Array[(String, Int)] = rdd2.collect()
result.foreach(item => (println(item)))
// 4. 关闭sc
sc.stop()
}
def reduceByKey(func: (V, V) ? V): RDD[(K, V)]