Spark-RDD算子大全

发布时间:2024年01月16日

Spark RDD(弹性分布式数据集)是Spark中的核心抽象,它代表一个不可变、分区的分布式数据集合。下面是一些常用的RDD算子:

转换算子:

  1. map(func):对RDD中的每个元素应用给定的函数,返回一个新的RDD。

  2. filter(func):对RDD中的每个元素应用给定的函数,返回满足条件的元素组成的新的RDD。

  3. flatMap(func):对RDD中的每个元素应用给定的函数并返回一个迭代器,将所有迭代器的元素组合成一个新的RDD。

  4. distinct():去除RDD中的重复元素,返回一个包含唯一元素的新的RDD。

  5. groupByKey():对具有相同键的元素进行分组,返回一个键值对的RDD。

  6. sortByKey():按照键对RDD中的元素进行排序,返回一个键值对的RDD。

  7. join(otherRDD):将两个RDD按照键进行连接操作,返回一个键值对的RDD。

  8. union(otherRDD):将两个RDD进行合并,返回一个包含两个RDD所有元素的新的RDD。

  9. aggregateByKey(zeroValue)(seqOp, combOp):对每个键的元素进行聚合操作,返回一个键值对的RDD。

行动算子:

  1. collect():将RDD中的所有元素以数组的形式返回到驱动程序。

  2. count():返回RDD中的元素数量。

  3. first():返回RDD中的第一个元素。

  4. take(n):返回RDD中的前n个元素。

  5. reduce(func):使用给定的函数对RDD中的元素进行归约操作,返回一个元素。

  6. foreach(func):对RDD中的每个元素应用给定的函数。

  7. saveAsTextFile(path):将RDD中的元素保存为文本文件。

  8. saveAsObjectFile(path):将RDD中的元素保存为序列化的对象文件。


1.map(func):

# 创建RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])

# 对RDD中的每个元素进行平方操作
squaredRDD = rdd.map(lambda x: x**2)

# 输出结果
print(squaredRDD.collect())  # [1, 4, 9, 16, 25]

2.filter(func):

# 创建RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])

# 过滤RDD中的偶数元素
filteredRDD = rdd.filter(lambda x: x % 2 == 0)

# 输出结果
print(filteredRDD.collect())  # [2, 4]

3.flatMap(func):

# 创建RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])

# 对RDD中的每个元素进行重复操作
flatMapRDD = rdd.flatMap(lambda x: [x, x, x])

# 输出结果
print(flatMapRDD.collect())  # [1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5]

4.distinct():

# 创建RDD
rdd = sparkContext.parallelize([1, 2, 2, 3, 4, 4, 5])

# 去除RDD中的重复元素
distinctRDD = rdd.distinct()

# 输出结果
print(distinctRDD.collect())  # [1, 2, 3, 4, 5]

5.reduce(func):

# 创建RDD
rdd = sparkContext.parallelize([1, 2, 3, 4, 5])

# 对RDD中的元素求和
sum = rdd.reduce(lambda x, y: x + y)

# 输出结果
print(sum)  # 15

6.groupByKey():

# 创建键值对的RDD
rdd = sparkContext.parallelize([(1, 'a'), (2, 'b'), (1, 'c'), (2, 'd')])

# 按键进行分组
groupedRDD = rdd.groupByKey()

# 输出结果
for key, values in groupedRDD.collect():
    print(key, list(values))
# 1 ['a', 'c']
# 2 ['b', 'd']

7.sortByKey():

# 创建键值对的RDD
rdd = sparkContext.parallelize([(1, 'c'), (2, 'b'), (3, 'a')])

# 按键进行排序
sortedRDD = rdd.sortByKey()

# 输出结果
print(sortedRDD.collect())  # [(1, 'c'), (2, 'b'), (3, 'a')]

8.join(otherRDD):

# 创建两个键值对的RDD
rdd1 = sparkContext.parallelize([(1, 'a'), (2, 'b')])
rdd2 = sparkContext.parallelize([(1, 'c'), (2, 'd')])

# 按键进行连接
joinedRDD = rdd1.join(rdd2)

# 输出结果
print(joinedRDD.collect())  # [(1, ('a', 'c')), (2, ('b', 'd'))]

9.union(otherRDD):

# 创建两个RDD
rdd1 = sparkContext.parallelize([1, 2, 3])
rdd2 = sparkContext.parallelize([4, 5, 6])

# 合并两个RDD
unionRDD = rdd1.union(rdd2)

# 输出结果
print(unionRDD.collect())  # [1, 2, 3, 4, 5, 6]

10.aggregateByKey(zeroValue)(seqOp, combOp):

# 创建键值对的RDD
rdd = sparkContext.parallelize([(1, 2), (1, 4), (2, 3), (2, 5)])

# 对每个键的元素进行求和操作
sumRDD = rdd.aggregateByKey(0, lambda x, y: x + y, lambda a, b: a + b)

# 输出结果
print(sumRDD.collect())  # [(1, 6), (2, 8)]
文章来源:https://blog.csdn.net/Young_IT/article/details/135623243
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。