目录
?????????2. RDD的checkpoint检查点:
????????3. 缓存和 checkpoint的区别:?
? ? ? ? 1.创建SparkContext对象
? ? ? ? 2.数据输入
? ? ? ? 3.数据处理
? ? ? ? 4.数据输出
? ? ? ? 5.释放资源
说明:
发现在数据中,并没有直接的关键词,关键词数据是包含在搜索词中,而且一个搜索词中包含了多个关键词,所有如何想基于关键词进行统计, 首先需求先拆分搜索词,获取关键词,思考:如何做呢?
借助第三方的分词工具实现中文分词
Java语言:IK分词器
Python语言:jieba(结巴)分词器
如何使用jieba分词器呢?
1- 需要在系统中安装jieba分词库: local模式只需要安装在node1即可 如果集群模式运行 需要各个节点都要安装
安装命令: pip install -i https://pypi.tuna.tsinghua.edu.cn/simple jieba
2- 分词器使用
from pyspark import SparkConf, SparkContext
import os
import jieba
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
content = "鸡你太美蔡徐坤"
# 精简分词模式
print(list(jieba.cut(content)))
# 全模式(切分会更加精细)
print(list(jieba.cut(content, cut_all=True)))
# 搜索引擎分词模式
print(list(jieba.cut_for_search(content)))
提升Spark程序的计算效率
rdd被重复使用,rdd的计算逻辑复杂,容错
RDD缓存主要是将数据存储在内存中,临时存储,不太稳定
使用了缓存,有向无环图会有个绿色的cache,?
使用缓存的代码?etl_rdd.persist(storageLevel:MEMORY_AND_DISK)? ,优先放到内存, 内存不够了再放到磁盘中
设置缓存的API:?
?? ?rdd.cache(): 将RDD的数据缓存储内存中
?? ?rdd.persist(缓存的级别/位置): 将RDD的数据存储在指定位置
提升Spark 程序的容错性
RDD缓存主要是将数据存储在内存中,临时存储,不太稳定;?RDD 的检查点主要是将数据存储在HDFS上,是持久化存储 , 因为HDFS的三大机制让数据变的安全可靠
? ? ? ?对指定RDD启动checkpoint
? ? ? ? etl_rdd.checkpoint()? ?checkpoint设置后会将依赖效果丢弃掉
????????相关API:
?? ?sc.setCheckpointDir(存储路径): 设置checkpoint数据存放路径
?? ?rdd.checkpoint(): 对指定RDD启用checkpoint
?? ?rdd.count(): 触发checkpoint
1- 数据存储位置不同
?? ?缓存: 存储在内存或者磁盘 或者 堆外内存中
?? ?checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上
2- 数据生命周期:
?? ?缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除
?? ?checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除
3- 血缘关系:
?? ?缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作
?? ?checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行
?? ?
4- 主要作用不同:
?? ?缓存: 提高Spark程序的运行效率
?? ?checkpoint检查点: 提高Spark程序的容错性
在一个项目中,推荐缓存和检查点配合使用, 在代码中先设置缓存, 再设置检查点, 然后再一同使用Action算子触发,推荐使用count算子
?窄依赖 ,让spark程序并行计算 ,一个分区数据计算有问题,其他分区不受影响 , 父RDD 的分区和子RDD的分区关系是一对一的关系
宽依赖 , 也叫shuffle依赖 , 父RDD的一个分区被多个 子RDD的分区依赖 ,shuffle下游的其他操作,必须等待shuffle执行完成以后才能够继续执行
Spark遇到一个Action算子,就会触发一个job任务, 一个job就会有一个有向无环图
一个DAG有向无环图 有多个Stage ,Stage的数量取决于宽依赖
一个Stage 有多个Task线程
一个RDD 有多个分区
一个分区 被一个Task线程所处理
DAG底层的形成
Stage内部的形成流程
Shuffle阶段的过程
????????MapReduce:?
????????sort shuffle : 普通机制和bypass机制??
?普通机制:每个上游Task线程处理数据,数据处理完以后,先放在内存中,接着对内存中的数据进行分区排序.将内存中的数据溢写到磁盘,形成一个个的小文件.写完后将多个小文件合并成为一个大的磁盘文件;并针对每个大的磁盘文件 ,会提供一个索引文件, 下游Task根据索引文件来读取相应的数据 ;?
bypass机制 : 在普通机制的基础上,省略了排序的过程?
bypass触发条件: 上游RDD的分区数量不能超过200个, 上游不能对数据进行提前聚合(提前聚合的话就会要分组,分组就会排序)
1- Driver进程启动后,底层PY4J创建SparkContext顶级对象。在创建该对象的过程中,还会创建另外两个对象,分别是: DAGScheduler和TaskScheduler
?? ?DAGScheduler: DAG调度器。将Job任务形成DAG有向无环图和划分Stage的阶段
?? ?TaskScheduler: Task调度器。将Task线程分配给到具体的Executor执行
2- 一个Spark程序遇到一个Action算子就会触发产生一个Job任务。SparkContext将Job任务给到DAG调度器,拿到Job任务后,会将Job任务形成DAG有向无环图和划分Stage的阶段。并且会确定每个Stage阶段有多少个Task线程,会将众多的Task线程放到TaskSet的集合中。DAG调度器将TaskSet集合给到Task调度器
3- Task调度器拿到TaskSet集合以后,将Task分配给到给到具体的Executor执行。底层是基于SchedulerBackend调度队列来实现的。
4- Executor开始执行任务。并且Driver会监控各个Executor的执行状态,直到所有的Executor执行完成,就认为任务运行结束
5- 后续过程和之前一样
说明: spark.default.parallelism该参数是SparkCore中的参数。该参数只会影响shuffle以后的分区数量。另外该参数对parallelize并行化本地集合创建的RDD不起作用。