Spark是可以处理大规模数据的统一分布式计算引擎。
在之前我们学习过MapReduce,同样作为大数据分布式计算引擎,究竟这两者有什么区别呢?
首先我们回顾一下MapReduce的架构:MR基于HDFS实现大数据存储,基于Yarn做资源调度,且MR是基于进程处理数据的
总结一下MR的缺点:
1.MR是基于进程进行数据处理,进程相对于线程来说,在创建和销毁的过程比较消耗资源,并且数据比较慢
2.MR在运行的时候,中间有大量的磁盘IO过程。也就是磁盘数据到内存,内存到磁盘反复的读写过程
3.MR只提供了非常低级或者说非常底层的编程API,如果想要开发比较复杂的程序,需要写大量的代码
这样对比起来,我们可以总结出Spark的优点:
1.Spark是基于线程来执行任务
2.引入了新的数据结构—RDD(弹性分布式数据集),能够让Spark程序主要基于内存进行运行。内存的读写数据相对磁盘来说要快得多
3.Spark提供了更加丰富的编程API,能够非常轻松地实现功能开发
进程和线程的对比:
1.线程的创建和销毁,比进程会更加的快速,以及更加的节省资源
2.进程很难共享内存中的数据;而同个进程中的线程可以共享内存中的数据
进程和线程具体介绍可以看上一篇笔记。
读取文本文件,文件内容是一行一行的文本,每行文本含有多个单词,单词间使用空格分隔。统计文本中每个单词出现的总次数。
文本内容如下:
hello hello spark
hello heima spark
读取数据是一行一行读取的,对每一行数据都要进行内容的切分
from pyspark import SparkConf, SparkContext
import os
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
pass
# 1- 创建对象
conf = SparkConf().setAppName("wordcount").setMaster("local[*]")
sc = SparkContext(conf=conf)
# 2- 数据输入,返回的类型是RDD
"""
textFile:支持读取HDFS文件系统和本地文件系统
HDFS文件系统:hdfs://node1:8020/文件路径
本地文件系统:file:///文件路径
"""
init_rdd= sc.textFile("file:///export/data/tfec/data/words.txt")
# 3- 数据处理
"""
使用到的部分RDD相关算子
flatMap
map
reduceByKey
collect
"""
#文本内容切片
"""
flatMap运行结果:
输入数据:['hello hello spark', 'hello heima spark']
输出数据:['hello', 'hello', 'spark', 'hello', 'heima', 'spark']
map运行结果:
输入数据:['hello hello spark', 'hello heima spark']
输出数据:[['hello', 'hello', 'spark'], ['hello', 'heima', 'spark']]
"""
fm_rdd = init_rdd.flatMap(lambda line:line.split(" "))
#数据转换
"""
输入数据:['hello', 'hello', 'spark', 'hello', 'heima', 'spark']
输出数据:[('hello', 1), ('hello', 1), ('spark', 1), ('hello', 1), ('heima', 1), ('spark', 1)]
"""
map_rdd = fm_rdd.map(lambda word:(word,1))
#分组和聚合
"""
输入数据:[('hello', 1), ('hello', 1), ('spark', 1), ('hello', 1), ('heima', 1), ('spark', 1)]
输出数据:[('hello', 3), ('spark', 2), ('heima', 1)]
reduceByKey底层运行过程分析:
1- 该算子同时具备分组和聚合的功能。而且是先对数据按照key进行分组,对相同key的value会形成得到List列表。再对分组后的value列表进行聚合。
2- 分组和聚合功能不能分割,也就是一个整体
结合案例进行详细分析:
1- 分组
输入数据:[('hello', 1), ('hello', 1), ('spark', 1), ('hello', 1), ('heima', 1), ('spark', 1)]
分组后的结果:
key value列表
hello [1,1,1]
spark [1,1]
heima [1]
2- 聚合(以hello为例)
lambda agg,curr: agg+curr -> agg表示中间临时value聚合结果,默认取列表中的第一个元素;curr表示当前遍历到的value元素,默认取列表中的第二个元素
第一次聚合:
agg =列表中的第一个1,curr=列表中的第二个1。聚合结果agg+curr=1+1=2,再将2赋值给agg
第二次聚合:
agg =上次临时聚合结果2,curr=列表中的第三个1。聚合结果agg+curr=2+1=3,再将3赋值给agg
最后发现已经遍历到value列表的最后一个元素,因此聚合过程结果。最终的hello的次数,就是3
"""
res = map_rdd.reduceByKey(lambda agg,curr:agg+curr)
# 4- 数据输出
"""
collect():用来收集数据,返回值类型是List列表
"""
print(res.collect())
# 5- 释放资源
sc.stop()