Spark中Rdd算子和Action算子--学习笔记

发布时间:2024年01月12日

RDD算子

filter

"""
rdd.filter(f):根据f函数中的判断条件对rdd追踪的数据进行过滤
保留条件为True对应的rdd数据
"""
from pyspark import SparkContext

sc = SparkContext()

rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize(['a','b','c'])

rdd1_filter1 = rdd1.filter(lambda x:x%2==0)
print(rdd1_filter1.collect())

rdd2_filter = rdd2.filter(lambda x:x=='a')
print(rdd2_filter.collect())

distinct

"""
rdd.distinct():对rdd中的数据进行去重操作
"""
from pyspark import SparkContext

sc = SparkContext()

rdd1 = sc.parallelize([1,2,3,4,2])
#distinct中不需要参数,它可以直接对rdd进行去重
rdd1_distinct = rdd1.distinct()
print(rdd1_distinct.collect())

groupBy

from pyspark import SparkContext


sc = SparkContext()

rdd1 = sc.parallelize([1,2,3,4,2])
rdd2 = sc.parallelize(['a','b','c','a'])

#对rdd1中的数据进行hash计算后和2进行整除,证据余数值进行分组操作
#hash(x)% 2:获取余数,返回值,0,1   0是能被2整除  1是不能被2整除
rdd2_groupBy1 = rdd2.groupBy(lambda x:hash(x)%2)
# print(rdd2_groupBy1.collect())

for k,v in rdd2_groupBy1.collect():
    print(k)
    print(list(v))
    print(tuple(v))

#拿到groupBy后的值需要通过mapValues获取
rdd2_mapValues = rdd2_groupBy1.mapValues(lambda x:list(x))

print(rdd2_mapValues.collect())

groupByKey,sortBy,SortByKey

"""
kv数据格式的rdd:rdd中的每个元素是一个容器,容器中国有两个值
①[(key1,value1),(key2,value2)……]
②[([k1,k2],value1),([k1,k2],value2)……]多个key一个value
③[([k1],[v1,v2]),([k2],[v1,v2])……]一个key多个value
④[([k1,k2],[v1,v2]),([k3,k4],[v1,v2])……]多个key,多个value

算子:
rdd.groupByKey():对kv数据格式的rdd分局可以值进行分组,相同key值对应的value值合并到一起

rdd.sortByKey():根据rdd的value值进行排序,可以通过ascending=进行调整,默认为升序
rdd.sorBy(keyfunc):对kv数据格式的rdd经过可以func函数确定函数排序规则

"""
from pyspark import SparkContext


sc = SparkContext()

rdd1 = sc.parallelize([1,2,3,4,2])
rdd2 = sc.parallelize(['a','b','c','a'])
rdd3 = sc.parallelize([('a',1),('b',2),('a',2),('b',2)])

rdd3_groupBykey1 = rdd3.groupByKey()
rdd3_map1 = rdd3_groupBykey1.mapValues(lambda x:list(x))
print(rdd3_map1.collect())
#结果:[('b', [2, 2]), ('a', [1, 2])]
rdd_reduceByKey1 = rdd3.reduceByKey(lambda x,y:x+y)
print(rdd_reduceByKey1.collect())
#结果[('b', 4), ('a', 3)]


####sortByKey()
#ascending= ,默认是升序排列
rdd3_SortByKey1 = rdd3.sortByKey()
print(rdd3_SortByKey1.collect())

#根据rdd的value值进行降序排序
rdd_sortBy1 = rdd3.sortBy(keyfunc = lambda x:x[1],ascending=False)
print(rdd_sortBy1.collect())

#根据key值对rdd中的元素进行排序操作
rdd_sortBy2 = rdd3.sortBy(keyfunc= lambda x:x,ascending=False)
print(rdd_sortBy2.collect())

rdd之间的连接

"""
rdd1.join(rdd2) 内连接
rdd1.leftOuterJoin(rdd2)左外连接
rdd1.rightOuterJoin(rdd2)左外连接
rdd1.fullOuterjoin(rdd2)

"""


from pyspark import SparkContext


sc = SparkContext()

rdd1 = sc.parallelize([('a',1),('b',2),('d',5)])
rdd2 = sc.parallelize([('a',3),('b',6),('c',3)])

#直接连接只会显示双方都有的元素
print(rdd1.join(rdd2).collect())
#左连接显示左rdd拥有的元素,右边没有的位置用None替代
print(rdd1.leftOuterJoin(rdd2).collect())
#右连接显示右rdd拥有的元素,左边没有的位置用None替代
print(rdd1.rightOuterJoin(rdd2).collect())
#全连接,两个表的数据都保存
print(rdd1.fullOuterJoin(rdd2).collect())

Action算子

collect,take,count()类的聚合算子,saveAsTextFile,

统计算子,countByKey()

countByKey().items()

countByValue() , countByValue().items()

    from pyspark import SparkContext
    
    sc = SparkContext()
    
    rdd1 = sc.parallelize([1, 2, 3, 4, 2])
    rdd2 = sc.parallelize(['a', 'b', 'c', 'a'])
    
    # collect():获取rdd中所有的数据,注意点:使用此算子时要考虑rdd对象的数据量问题
    # 可能存在内存溢出的问题
    rdd1_collect1 = rdd1.collect()
    print(rdd1_collect1)
    
    # take():获取指定数量的数据
    rdd1_take = rdd1.take(num=2)
    print(type(rdd1_take))
    print(rdd1_take
文章来源:https://blog.csdn.net/yfq_29/article/details/135561577
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。