SPARK--cache(缓存)和checkpoint检查点机制

发布时间:2024年01月13日

SPARK–cache(缓存)和checkpoint检查点机制

  • rdd的特性 缓存和checkpoint 作用都是进行容错
  • rdd在计算是会有多个依赖,为了避免计算错误是从头开始计算,可以将中间* 依赖rdd进行缓存或checkpoint
  • 缓存或checkpoint也叫作rdd的持久化
  • 一般对某个计算特别复杂的rdd进行持久化

缓存使用

缓存是将数据存储在内存或者磁盘上,缓存的特点时,计算结束,缓存自动清空

  • 缓存级别
    • 指定缓存的数据位置
    • 默认是缓存到内存上
StorageLevel.DISK_ONLY # 将数据缓存到磁盘上
StorageLevel.DISK_ONLY_2 # 将数据缓存到磁盘上 保存两份
StorageLevel.DISK_ONLY_3 # 将数据缓存到磁盘上 保存三份
StorageLevel.MEMORY_ONLY # 将数据缓存到内存  默认
StorageLevel.MEMORY_ONLY_2 # 将数据缓存到内存 保存两份
StorageLevel.MEMORY_AND_DISK # 将数据缓存到内存和磁盘  优先将数据缓存到内存上,内存不足可以缓存到磁盘
StorageLevel.MEMORY_AND_DISK_2 = # 将数据缓存到内存和磁盘
StorageLevel.OFF_HEAP # 不使用  缓存在系统管理的内存上   heap
StorageLevel.MEMORY_AND_DISK_ESER # 将数据缓存到内存和磁盘  序列化操作,按照二进制存储,节省空间

chechkpoint

也是将中间rdd数据存储起来,但是存储的位置实时分布式存储系统,可以进行永久保存,程序结束不会释放
如果需要删除就在hdfs上删除对应的目录文件

# RDD的checkpoint

# 导入sparkcontext
from pyspark import SparkContext
# 导入缓存级别
from pyspark.storagelevel import StorageLevel

# 创建SparkContext对象
sc = SparkContext()
# 指定checkpoint存储的目录位置
sc.setCheckpointDir('hdfs://node1:8020/checkpoint_data')

# 将hdfs的文件数据读取后转为rdd
# 读取某单独文件
rdd = sc.textFile('hdfs://node1:8020/data/words.txt')

# rdd计算
# 对读取到的rdd中的每行数据,先进行切割获取每个单词的数据
# rdd_map = rdd.map(lambda x: x.split(','))
rdd_flatMap= rdd.flatMap(lambda x: x.split(','))

# 将单词数据转化为k-v结构数据   [(k,v),(k1,v1)]   给每个单词的value一个初始值1
rdd_map_kv = rdd_flatMap.map(lambda x:(x,1))

# 对rdd_map_kv进行checkpoint
rdd_map_kv.checkpoint()
# 触发checkpoint
rdd_map_kv.collect()


# 对kv数据进行聚合计算  hive:[1,1]  求和  求平均数  求最大值  求最小值
rdd_reduceByKey =  rdd_map_kv.reduceByKey(lambda x,y:x+y)  # 现将相同key值的数据放在一起,然后对相同key值内的进行累加


# 展示数据
# rdd_reduceByKey 依赖 rdd_map_kv  依赖 rdd_flatMap
res5 = rdd_reduceByKey.collect()
print(res5)
文章来源:https://blog.csdn.net/weixin_58026490/article/details/135565285
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。