- rdd的特性 缓存和checkpoint 作用都是进行容错
- rdd在计算是会有多个依赖,为了避免计算错误是从头开始计算,可以将中间* 依赖rdd进行缓存或checkpoint
- 缓存或checkpoint也叫作rdd的持久化
- 一般对某个计算特别复杂的rdd进行持久化
缓存是将数据存储在内存或者磁盘上,缓存的特点时,计算结束,缓存自动清空
StorageLevel.DISK_ONLY # 将数据缓存到磁盘上
StorageLevel.DISK_ONLY_2 # 将数据缓存到磁盘上 保存两份
StorageLevel.DISK_ONLY_3 # 将数据缓存到磁盘上 保存三份
StorageLevel.MEMORY_ONLY # 将数据缓存到内存 默认
StorageLevel.MEMORY_ONLY_2 # 将数据缓存到内存 保存两份
StorageLevel.MEMORY_AND_DISK # 将数据缓存到内存和磁盘 优先将数据缓存到内存上,内存不足可以缓存到磁盘
StorageLevel.MEMORY_AND_DISK_2 = # 将数据缓存到内存和磁盘
StorageLevel.OFF_HEAP # 不使用 缓存在系统管理的内存上 heap
StorageLevel.MEMORY_AND_DISK_ESER # 将数据缓存到内存和磁盘 序列化操作,按照二进制存储,节省空间
也是将中间rdd数据存储起来,但是存储的位置实时分布式存储系统,可以进行永久保存,程序结束不会释放
如果需要删除就在hdfs上删除对应的目录文件
# RDD的checkpoint
# 导入sparkcontext
from pyspark import SparkContext
# 导入缓存级别
from pyspark.storagelevel import StorageLevel
# 创建SparkContext对象
sc = SparkContext()
# 指定checkpoint存储的目录位置
sc.setCheckpointDir('hdfs://node1:8020/checkpoint_data')
# 将hdfs的文件数据读取后转为rdd
# 读取某单独文件
rdd = sc.textFile('hdfs://node1:8020/data/words.txt')
# rdd计算
# 对读取到的rdd中的每行数据,先进行切割获取每个单词的数据
# rdd_map = rdd.map(lambda x: x.split(','))
rdd_flatMap= rdd.flatMap(lambda x: x.split(','))
# 将单词数据转化为k-v结构数据 [(k,v),(k1,v1)] 给每个单词的value一个初始值1
rdd_map_kv = rdd_flatMap.map(lambda x:(x,1))
# 对rdd_map_kv进行checkpoint
rdd_map_kv.checkpoint()
# 触发checkpoint
rdd_map_kv.collect()
# 对kv数据进行聚合计算 hive:[1,1] 求和 求平均数 求最大值 求最小值
rdd_reduceByKey = rdd_map_kv.reduceByKey(lambda x,y:x+y) # 现将相同key值的数据放在一起,然后对相同key值内的进行累加
# 展示数据
# rdd_reduceByKey 依赖 rdd_map_kv 依赖 rdd_flatMap
res5 = rdd_reduceByKey.collect()
print(res5)