关联函数补充
join为主基础算子
# -*- coding: utf-8 -*-
# Program function:演示join操作
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
print('PySpark join Function Program')
# TODO:1、创建应用程序入口SparkContext实例对象
conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
# TODO: 2、从本地文件系统创建RDD数据集
x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
y = sc.parallelize([(1001, "sales"), (1002, "tech")])
# TODO:3、使用join完成联合操作
print(x.join(y).collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
print(x.leftOuterJoin(y).collect())
print(x.rightOuterJoin(y).collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
sc.stop()
为什么使用缓存
如何进行缓存?
spark中提供cache方法
spark中提供persist方法
# -*- coding: utf-8 -*-
# Program function:演示join操作
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
import time
if __name__ == '__main__':
print('PySpark join Function Program')
# TODO:1、创建应用程序入口SparkContext实例对象
conf = SparkConf().setAppName("miniProject").setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)
# TODO: 2、从本地文件系统创建RDD数据集
x = sc.parallelize([(1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhangliu")])
y = sc.parallelize([(1001, "sales"), (1002, "tech")])
# TODO:3、使用join完成联合操作
join_result_rdd = x.join(y)
print(join_result_rdd.collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
print(x.leftOuterJoin(y).collect())
print(x.rightOuterJoin(y).collect()) # [(1001, ('zhangsan', 'sales')), (1002, ('lisi', 'tech'))]
# 缓存--基于内存缓存-cache底层调用的是self.persist(StorageLevel.MEMORY_ONLY)
join_result_rdd.cache()
# join_result_rdd.persist(StorageLevel.MEMORY_AND_DISK_2)
# 如果执行了缓存的操作,需要使用action算子触发,在4040页面上看到绿颜色标识
join_result_rdd.collect()
# 如果后续执行任何的操作会直接基于上述缓存的数据执行,比如count
print(join_result_rdd.count())
time.sleep(600)
sc.stop()
缓存级别
- 如何选:
- 1-首选内存
- 2-内存放不下,尝试序列化
- 3-如果算子比较昂贵可以缓存在磁盘中,否则不要直接放入磁盘
- 4-使用副本机制完成容错性质
释放缓存
后续讲到Spark内存模型中,缓存放在Execution内存模块
如果不在需要缓存的数据,可以释放
最近最少使用(LRU)
print(“释放缓存之后,直接从rdd的依赖链重新读取”)
print(join_result_rdd.count())* <img src="https://maynor.oss-cn-shenzhen.aliyuncs.com/img/20231009192818.png" alt="image-20210913104616717" style="zoom:150%;" />
何时缓存数据
- rdd来之不易
- 经过很长依赖链计算
- 经过shuffle
- rdd被使用多次
- 缓存cache或persist
- 问题:缓存将数据保存在内存或磁盘中,内存或磁盘都属于易失介质
- 内存在重启之后没有数据了,磁盘也会数据丢失
- 注意:缓存会将依赖链进行保存的
- 如何解决基于cache或persist的存储在易失介质的问题?
- 引入checkpoint检查点机制
- 将元数据和数据统统存储在HDFS的非易失介质,HDFS有副本机制
- checkpoint切断依赖链,直接基于保存在hdfs的中元数据和数据进行后续计算
- 什么是元数据?
- 管理数据的数据
- 比如,数据大小,位置等都是元数据
为什么有检查点机制?
如何使用检查点机制?
检查点机制那些作用?
面试题:如何实现Spark的容错?
检查点机制案例
持久化和Checkpoint的区别
- 存储位置:缓存放在内存或本地磁盘,检查点机制在hdfs
- 生命周期:缓存通过LRU或unpersist释放,检查点机制会根据文件一直存在
- 依赖关系:缓存保存依赖关系,检查点斩断依赖关系链
案例测试:
先cache在checkpoint测试
- 1-读取数据文件
- 2-设置检查点目录
- 3-rdd.checkpoint() 和rdd.cache()
- 4-执行action操作,根据spark容错选择首先从cache中读取数据,时间更少,速度更快
- 5-如果对rdd实现unpersist
- 6-从checkpoint中读取rdd的数据
- 7-通过action可以查看时间
AI副业实战手册:http://www.yibencezi.com/notes/253200?affiliate_id=1317(目前40+工具及实战案例,持续更新,实战类小册排名第一,做三个月挣不到钱找我退款,交个朋友的产品)
📢博客主页:https://manor.blog.csdn.net
📢欢迎点赞 👍 收藏 ?留言 📝 如有错误敬请指正!
📢本文由 Maynor 原创,首发于 CSDN博客🙉
📢感觉这辈子,最深情绵长的注视,都给了手机?
📢专栏持续更新,欢迎订阅:https://blog.csdn.net/xianyu120/category_12453356.html