"""
rdd.filter(f):根据f函数中的判断条件对rdd追踪的数据进行过滤
保留条件为True对应的rdd数据
"""
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([1,2,3,4])
rdd2 = sc.parallelize(['a','b','c'])
rdd1_filter1 = rdd1.filter(lambda x:x%2==0)
print(rdd1_filter1.collect())
rdd2_filter = rdd2.filter(lambda x:x=='a')
print(rdd2_filter.collect())
distinct
"""
rdd.distinct():对rdd中的数据进行去重操作
"""
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([1,2,3,4,2])
#distinct中不需要参数,它可以直接对rdd进行去重
rdd1_distinct = rdd1.distinct()
print(rdd1_distinct.collect())
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([1,2,3,4,2])
rdd2 = sc.parallelize(['a','b','c','a'])
#对rdd1中的数据进行hash计算后和2进行整除,证据余数值进行分组操作
#hash(x)% 2:获取余数,返回值,0,1 0是能被2整除 1是不能被2整除
rdd2_groupBy1 = rdd2.groupBy(lambda x:hash(x)%2)
# print(rdd2_groupBy1.collect())
for k,v in rdd2_groupBy1.collect():
print(k)
print(list(v))
print(tuple(v))
#拿到groupBy后的值需要通过mapValues获取
rdd2_mapValues = rdd2_groupBy1.mapValues(lambda x:list(x))
print(rdd2_mapValues.collect())
"""
kv数据格式的rdd:rdd中的每个元素是一个容器,容器中国有两个值
①[(key1,value1),(key2,value2)……]
②[([k1,k2],value1),([k1,k2],value2)……]多个key一个value
③[([k1],[v1,v2]),([k2],[v1,v2])……]一个key多个value
④[([k1,k2],[v1,v2]),([k3,k4],[v1,v2])……]多个key,多个value
算子:
rdd.groupByKey():对kv数据格式的rdd分局可以值进行分组,相同key值对应的value值合并到一起
rdd.sortByKey():根据rdd的value值进行排序,可以通过ascending=进行调整,默认为升序
rdd.sorBy(keyfunc):对kv数据格式的rdd经过可以func函数确定函数排序规则
"""
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([1,2,3,4,2])
rdd2 = sc.parallelize(['a','b','c','a'])
rdd3 = sc.parallelize([('a',1),('b',2),('a',2),('b',2)])
rdd3_groupBykey1 = rdd3.groupByKey()
rdd3_map1 = rdd3_groupBykey1.mapValues(lambda x:list(x))
print(rdd3_map1.collect())
#结果:[('b', [2, 2]), ('a', [1, 2])]
rdd_reduceByKey1 = rdd3.reduceByKey(lambda x,y:x+y)
print(rdd_reduceByKey1.collect())
#结果[('b', 4), ('a', 3)]
####sortByKey()
#ascending= ,默认是升序排列
rdd3_SortByKey1 = rdd3.sortByKey()
print(rdd3_SortByKey1.collect())
#根据rdd的value值进行降序排序
rdd_sortBy1 = rdd3.sortBy(keyfunc = lambda x:x[1],ascending=False)
print(rdd_sortBy1.collect())
#根据key值对rdd中的元素进行排序操作
rdd_sortBy2 = rdd3.sortBy(keyfunc= lambda x:x,ascending=False)
print(rdd_sortBy2.collect())
"""
rdd1.join(rdd2) 内连接
rdd1.leftOuterJoin(rdd2)左外连接
rdd1.rightOuterJoin(rdd2)左外连接
rdd1.fullOuterjoin(rdd2)
"""
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([('a',1),('b',2),('d',5)])
rdd2 = sc.parallelize([('a',3),('b',6),('c',3)])
#直接连接只会显示双方都有的元素
print(rdd1.join(rdd2).collect())
#左连接显示左rdd拥有的元素,右边没有的位置用None替代
print(rdd1.leftOuterJoin(rdd2).collect())
#右连接显示右rdd拥有的元素,左边没有的位置用None替代
print(rdd1.rightOuterJoin(rdd2).collect())
#全连接,两个表的数据都保存
print(rdd1.fullOuterJoin(rdd2).collect())
collect,take,count()类的聚合算子,saveAsTextFile,
统计算子,countByKey()
countByKey().items()
countByValue() , countByValue().items()
from pyspark import SparkContext
sc = SparkContext()
rdd1 = sc.parallelize([1, 2, 3, 4, 2])
rdd2 = sc.parallelize(['a', 'b', 'c', 'a'])
# collect():获取rdd中所有的数据,注意点:使用此算子时要考虑rdd对象的数据量问题
# 可能存在内存溢出的问题
rdd1_collect1 = rdd1.collect()
print(rdd1_collect1)
# take():获取指定数量的数据
rdd1_take = rdd1.take(num=2)
print(type(rdd1_take))
print(rdd1_take