Apache Spark是专为大规模数据处理而设计的快速通用的分布式计算引擎,是开源的类Hadoop MapReduce的通用分布式计算框架。和MapReduce一样,都是完成大规模数据的计算处理。
简而言之,Spark 借鉴了 MapReduce思想发展而来,保留了其分布式并行计算的优点并改进了其明显的缺陷。让中间数据存储在内存中提高了运行速度、并提供丰富的操作数据的API提高了开发速度。
Hadoop Spark
类型 基础平台,包含计算、存储、调度 分布式计算工具
场景 大规模数据的批处理 迭代计算、交互式计算、流计算
价格 对机器要求低,便宜 对内存有要求,相对较贵
编程范式 Map+Reduce,API 较为底层,算法适应性差 API 较为顶层,方便使用
数据存储结构 MapReduce中间计算结果在HDFS磁盘上,延迟大 RDD中间运算结果在内存中,延迟小
运行方式 Task以进程方式维护,任务启动慢 Task以线程方式维护,任务启动快
尽管Spark相对于Hadoop而言具有较大优势,但Spark并不能完全替代Hadoop
进程和线程回顾
将RDD任务(使用Spark Sql时,也是转换成RDD任务)提交给yarn服务管理
Yarn中RM随机找到NM创建container(容器),在container中创建applicationMaster
applicationMaster向RM保持通讯,申请计算资源
applicationMaster找到其他的NM创建container,container中创建map task和reduce task,来执行计算任务
1.5组成架构(五大组件)
Spark Core:最基本核心的组件,处RDD数据结构,其它组件都是基于RDD的
Spark SQL:处理DateFrame/DataSet数据结构(结构化数据),类似于HiveSQL,SparkSQL底层也是转换成RDD任务
Spark/Structured streaming:处理流数据(Spark SQL),实时计算
Spark ML/MLlib:机器学习计算,分类算法,回归算法
Graphx:图计算算法,DAG有向无环图,有响有环图
本地模式部署,使用一台服务器进行部署,一般用于测试代码,在本地能运行成功的代码在集群下也能运行成功
Standalone模式被称为集群单机模式。Spark框架自带了完整的资源调度管理服务,可以独立部署到一个集群中,无需依赖任何其他的资源管理系统。在该模式下,Spark集群架构为主从模式,即一台Master节点与多台Slave节点,Slave节点启动的进程名称为Worker。此时集群会存在单点故障问题,利用Zookeeper搭建Spark HA集群解决单点问题。
Spark交互式开发步骤
注意点spark需要连接HDFS读取文件,如果hdfs没有启动会出现连接失败错误
需要先启动Hadoop服务
命令:start_all.sh
启动python终端
命令:pyspark
退出应用程序
命令exit()或Ctrl + d
scala交互式开发
启动终端命令:
spark-shell
退出交互界面:quit或者ctrl+d
pyspark脚本式开发步骤
将开发的代码写入文件中,通过运行代码文件进而运行计算程序
python开发的脚本文件后缀为.py
常用的脚本开发方式步骤为:
①编写XX.py 文件
②进入base虚拟机环境(默认为base环境不用切换了)
命令:conda activate base
③执行XX.py脚本程序
python3 XX.py
默认情况下不需要开启任何服务,Spark需要连接hdfs读取数据文件,所以使用前需要开启Hadoop 集群
命令为:start-all.sh
Spark中可以查看历史服务,查看Spark的计算历史信息
命令:/export/server/spark/sbin/start-history-server.sh
开启后可以在浏览器端输入网址查看
http://192.168.88.100:18080/
# 进入base虚拟环境
[root@node1 ~]# conda activate base
# 启动hadoop集群
(base) [root@node1 ~]# start-all.sh
# 启动历史服务
(base) [root@node1 ~]# /export/server/spark/sbin/start-history-server.sh
# 启动spark本地模式
# 没有任何指定,采用是local模式,调用的是本机资源无法使用集群资源,相当于是单机计算
(base) [root@node1 ~]# pyspark
# 导入模块
from pyspark import SparkContext
# 创建SparkContext对象
# 没有指定任何参数,使用本地local模式
# master='local[*]'
sc = SparkContext()
# 创建python列表数据
a = [1, 2, 3, 4]
# 转换成RDD
rdd = sc.parallelize(a)
# 对rdd数据进行计算
res = rdd.reduce(lambda a, b: a + b)
print(res)
需要启动yarn集群服务,包括ResourceManager和NodeManager
启动命令start-all.sh
启动完以后可以在浏览器查看网页
命令:http://192.168.88.100:8088/
建议:两个资源调度服务在使用时,只需要选择一个服务即可,实际开发更多采用yarn进行资源调度
# 启动yarn集群服务
(base) [root@node1 ~]# start-all.sh
# 启动pyspark, yarn资源调度
(base) [root@node1 ~]# pyspark --master yarn
# 导入模块
from pyspark import SparkContext
# 创建SparkContext对象
# master参数可以指定调用的资源服务
# 使用yarn资源调度
sc = SparkContext(master='yarn')
# 创建python列表数据
a = [1, 2, 3, 4]
# 转换成RDD
rdd = sc.parallelize(a)
# 对rdd数据进行计算
res = rdd.reduce(lambda a, b: a + b)
print(res)
standalone是自带的资源调度管理服务
master类似yarn中的ResourceManger负责管理找资源服务
worker 类似于yarn中的NodeManager负责将每台机器上的资源给到计算任务
node1上的启动指令
/export/server/spark/sbin/start-all.sh
查看相关网页指令
http://192.168.88.100:8080/
因为配置了高可用模式, 三台虚拟机要先启动ZooKeeper服务
(base) [root@node1 ~]# zkServer.sh start
(base) [root@node2 ~]# zkServer.sh start
(base) [root@node3 ~]# zkServer.sh start
# 在node1虚拟机上启动standalone服务
(base) [root@node1 ~]# /export/server/spark/sbin/start-all.sh
# 启动pyspark, 使用standalone资源调度
(base) [root@node1 ~]# pyspark --master spark://node1:7077
导入模块
from pyspark import SparkContext
# 创建SparkContext对象
# master参数可以指定调用的资源服务
# 使用standalone资源调度
sc = SparkContext(master='spark://node1:7077')
# 创建python列表数据
a = [1, 2, 3, 4]
# 转换成RDD
rdd = sc.parallelize(a)
# 对rdd数据进行计算
res = rdd.reduce(lambda a, b: a + b)
print(res)
Standalone 高可用集群模式
因为配置了高可用模式, 三台虚拟机要先启动ZooKeeper服务
(base) [root@node1 ~]# zkServer.sh start
(base) [root@node2 ~]# zkServer.sh start
(base) [root@node3 ~]# zkServer.sh start
# 在node1虚拟机上启动standalone服务
(base) [root@node1 ~]# /export/server/spark/sbin/start-all.sh
# 在node2虚拟机上启动standalone服务
(base) [root@node2 ~]# /export/server/spark/sbin/start-master.sh
# 启动pyspark, 使用standalone高可用资源调度
(base) [root@node1 ~]# pyspark --master spark://node1:7077,node2:7077
# 导入模块
from pyspark import SparkContext
# 创建SparkContext对象
# master参数可以指定调用的资源服务
# 使用standalone高可用资源调度
sc = SparkContext(master='spark://node1:7077,node2:7077')
# 创建python列表数据
a = [1, 2, 3, 4]
# 转换成RDD
rdd = sc.parallelize(a)
# 对rdd数据进行计算
res = rdd.reduce(lambda a, b: a + b)
print(res)