有关RDD编程(Python版)的基础操作可参考:spark:RDD编程(Python版)
先来看一道比较简单的用 pyspark交互式环境的编程题目:
该数据集包含了某大学计算机系的成绩,数据格式如下所示:
Tom,DataBase,80
Tom,Algorithm,50
Tom,DataStructure,60
Jim,DataBase,90
Jim,Algorithm,60
Jim,DataStructure,80
……
请根据给定的实验数据,在pyspark中通过编程来计算以下内容:
(1)该系总共有多少学生;
(2)该系共开设了多少门课程;
(3)Tom同学的总成绩平均分是多少;
(4)求每名同学的选修的课程门数;
(5)该系DataBase课程共有多少人选修;
(6)各门课程的平均分是多少;
想要完成这道题目的难度并不大,第(1)(2)(3)(5)不需要构建RDD键值对,没有太多的争议,直接操作RDD即可:
>>> lines = sc.textFile("file:///opt/spark/sparksqldata/Data01.txt")
>>> res = lines.map(lambda x:x.split(",")).map(lambda x: x[0]) #获取每行数据的第 1 列
>>> distinct_res = res.distinct() #去重操作
>>> distinct_res.count() #取元素总个数
265
答案为:265 人
>>> lines = sc.textFile("file:///opt/spark/sparksqldata/Data01.txt")
>>> res = lines.map(lambda x:x.split(",")).map(lambda x:x[1]) #获取每行数据的第 2 列
>>> distinct_res = res.distinct() #去重操作
>>> distinct_res.count() #取元素总个数
8
答案为 :8 门
>>> lines = sc.textFile("file:///opt/spark/sparksqldata/Data01.txt")
>>> res = lines.map(lambda x:x.split(",")).filter(lambda x:x[0]=="Tom") #筛选 Tom 同学的成绩信息
>>> res.foreach(print)
26
12
16
40
60
>>> score = res.map(lambda x:int(x[2])) #提取 Tom 同学的每门成绩,并转换为 int 类型
>>> num = res.count() #Tom 同学选课门数
>>> sum_score = score.reduce(lambda x,y:x+y) #Tom 同学的总成绩
>>> avg = sum_score/num #总成绩/门数=平均分
>>> print(avg)
30.8
答案为:30.8 分
>>> lines = sc.textFile("file:///opt/spark/sparksqldata/Data01.txt")
>>> res = lines.map(lambda x:x.split(",")).filter(lambda x:x[1]=="DataBase")
>>> res.count()
126
答案为:126人
接下来重点讨论第(4)(6)题用于不用键值对的区别
要想统计出每名学生选修的课程门数,即统计出每个学生的名字在数据集中出现了几次即可,
在不用键值对的情况下可采用 map + lambda表达式 +列表解析 的格式记录每个名字出现的次数:
>>> lines = sc.textFile("file:///opt/spark/sparksqldata/Data01.txt")
>>> data_local = lines.collect() # 将lines中的数据放入列表
>>> stuname = lines.map(lambda line: line.split(",")[0]).distinct() # 构建去重后的学生姓名的rdd
>>> each_res = stuname.map(lambda x :(x, len([line for line in data_local if x in line]))) # 以列表解析是形式查找每个学生名字出现的次数
>>> each_res.foreach(print)
('Lewis', 4)
('Mike', 3)
('Walter', 4)
('Lewis', 4)
('Mike', 3)
('Walter', 4)
......
而如果采取构建键值对的方式就不需要多次调用 collect() ,也不需要使用列表解析,直接使用RDD键值对的常见操作 reduceByKey() 即可
>>> lines = sc.textFile("file:///opt/spark/sparksqldata/Data01.txt")
>>> res = lines.map(lambda x:x.split(",")).map(lambda x:(x[0],1)) # 学生每门课程都对应(学生姓名,1),学生有 n 门课程则有 n 个(学生姓名,1)
>>> each_res = res.reduceByKey(lambda x,y: x+y) #按学生姓名获取每个学生的选课总数
>>> each_res.foreach(print)
('Lewis', 4)
('Mike', 3)
('Walter', 4)
('Lewis', 4)
('Mike', 3)
('Walter', 4)
......
要想统计出每门课程的平均分,即统计出每门课程的分数和去除以该课程在数据集中出现的次数,相较于(4),仅多了个求和操作,需要遍历每一个成绩的值,便先对lines进行 split() 操作,再用 collect() 将数据存入列表,使用两个列表解析(分别求 sum 和 len)后作商即可:
>>> lines = sc.textFile("file:///opt/spark/sparksqldata/Data01.txt")
>>> coursename_score = lines.map(lambda line: (line.split(",")[1], line.split(",")[2])).collect()
>>> coursename = lines.map(lambda line: line.split(",")[1]).distinct() # 构建去重后的科目的rdd
>>> avg = coursename.map(lambda x:(x, sum([int(line[1]) for line in
...coursename_score if x in line]) / len([line for line in
...coursename_score if x in line])))
>>> avg.foreach(print)
('Software', 50.90909090909091)
('OperatingSystem', 54.940298507462686)
('Python', 57.8235294117647)
('ComputerNetwork', 51.901408450704224)
('DataBase', 50.53968253968254)
('Algorithm', 48.833333333333336)
('DataStructure', 47.57251908396947)
('CLanguage', 50.609375)
采用构建键值对的方式同样可以很简洁直观地达到效果:
>>> lines = sc.textFile("file:///opt/spark/sparksqldata/Data01.txt")
>>> res = lines.map(lambda x:x.split(",")).map(lambda x:(x[1],(int(x[2]),1))) # 为每门课程的分数后面新增一列 1,表示 1 个学生选择了该课程。格式如('ComputerNetwork', (44, 1))
>>> temp = res.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])) #按课程名聚合课程总分和选课人数。格式如('ComputerNetwork', (7370, 142))
>>> avg = temp.map(lambda x:(x[0], round(x[1][0]/x[1][1],2))) #课程总分/选课人数 = 平均分,并利用 round(x,2)保留两位小数
>>> avg.foreach(print)
('Software', 50.90909090909091)
('OperatingSystem', 54.940298507462686)
('Python', 57.8235294117647)
('ComputerNetwork', 51.901408450704224)
('DataBase', 50.53968253968254)
('Algorithm', 48.833333333333336)
('DataStructure', 47.57251908396947)
('CLanguage', 50.609375)
虽然这只是一个很简单的例子,数据量较少,但也能从中看出RDD键值对操作可提供更高效且更灵活的数据处理方式,当数据量较为庞大时,RDD键值对操作能使得分布式计算框架处理更加大规模且复杂的数据集。