PySpark之RDD的持久化

发布时间:2024年01月09日

RDD的持久化

RDD的缓存

当RDD被重复使用,或者计算该RDD比较容易出错,而且需要消耗比较多的资源和时间的时候,我们就可以将该RDD缓存起来。

主要作用: 提升Spark程序的计算效率
注意事项: RDD的缓存可以存储在内存或者是磁盘上,甚至可以存储在Executor进程的堆外内存中。主要是放在内存中,因此缓存的数据是不太稳定可靠。

由于是临时存储,可能会存在丢失,所以缓存操作,并不会将RDD之间的依赖关系给截断掉(丢失掉),因为当缓存
失效后,可以全部重新计算
缓存的API都是Lazy惰性的,如果需要触发缓存操作,推荐调用count算子,因为运行效率高


设置缓存的相关API:
?? ?????????rdd.cache():将RDD的数据缓存在内存中
?? ?????????rdd.persist(缓存的级别/位置):将RDD的数据存储在指定位置
手动清理缓存:rdd.unpersits()
默认情况下,当整个Spark应用程序执行完成后,缓存数据会自动失效,会自动删除

缓存的级别/位置:

????????DISK_ONLY: 只存储在磁盘
??? DISK_ONLY_2: 只存储在磁盘,并且有2个副本
??? DISK_ONLY_3: 只存储在磁盘,并且有3个副本
??? MEMORY_ONLY: 只存储在内存中
??? MEMORY_ONLY_2: 只存储在内存中,并且有2个副本
??? MEMORY_AND_DISK: 存储在内存和磁盘中,先放在内存,再放在磁盘
??? MEMORY_AND_DISK_2: 存储在内存和磁盘中,先放在内存,再放在磁盘,并且有2个副本
??? OFF_HEAP: Executor进程的堆外内存
?? ?
工作中最常用的是: MEMORY_AND_DISK和MEMORY_AND_DISK_2。优先推荐使用MEMORY_AND_DISK

import time

from pyspark import SparkConf, SparkContext, StorageLevel
import os
import jieba

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 需要过滤的关键词黑名单
keyword_black_list = ['+','.','的','com']

# ctrl+alt+M将代码封装成函数/方法
# 3.2- 需求一:统计每个关键词出现了多少次。先提取需要操作的字段并且分词,这一步类似WordCount中的对每行进行切分处理,再仿照WordCount实现。
def top10_keyword():
    keyword_rdd = etl_rdd.flatMap(lambda line_tup: list(jieba.cut(line_tup[2])))
    # print(keyword_rdd.take(10))

    # 数据结构转变。将单词变成元组
    # keyword_map_rdd = keyword_rdd.filter(lambda word:word!='+' or word!='.').map(lambda word:(word,1))
    keyword_map_rdd = keyword_rdd.filter(lambda word: word not in keyword_black_list).map(lambda word: (word, 1))

    # 分组聚合操作
    keyword_result_rdd = keyword_map_rdd.reduceByKey(lambda agg, curr: agg + curr)
    # print(keyword_result_rdd.take(100))

    # 对结果中关键词的次数降序排序,取TOP10
    keyword_result = keyword_result_rdd.top(10, key=lambda tup: tup[1])
    print(keyword_result)


# 3.3- 需求二:统计每个用户每个搜索内容点击的次数
def content():
    """
        hive sql:
            select
                用户,搜索内容,count(1) as cnt
            from table
            group by 用户,搜索内容
    """
    # 从原始的6个字段中,提取出2个字段,得到 (用户,搜索内容)
    new_tup_tmp_rdd = etl_rdd.map(lambda tup: (tup[1], tup[2]))

    # 数据格式转换
    """
            输入:(张三,鸡你太美) -> hello
            输出:((张三,鸡你太美),1) -> (hello,1)
        """
    new_tup_rdd = new_tup_tmp_rdd.map(lambda tup: (tup, 1))
    # new_tup_rdd = new_tup_tmp_rdd.map(lambda tup:(tup[0],tup[1],1))

    # 分组聚合
    content_result = new_tup_rdd.reduceByKey(lambda agg, curr: agg + curr)
    print(content_result.take(10))


if __name__ == '__main__':

    # 1- 创建SparkContext
    conf = SparkConf().setAppName('sogou_demo').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    # 2- 数据输入
    init_rdd = sc.textFile('file:///export/data/gz16_pyspark/01_spark_core/data/SogouQ.sample')

    print("ETL处理前数据条数:",init_rdd.count())

    # 3- 数据处理
    # 3.1- ETL:数据的清洗、转换、加载
    """
        split():默认按照空白字符进行切分。例如:空格、制表符、回车换行符等
        
        map和flatMap的主要区别:flatMap对每一个元素处理以后,会将结果打平/压扁到一个更大的容器当中。
    """
    map_rdd = init_rdd.map(lambda line:line.split())
    # print("调用map算子后的内容:",map_rdd.take(10))

    # flatmap_rdd = init_rdd.flatMap(lambda line: line.split())
    # print("调用flatMap算子后的内容:",flatmap_rdd.take(10))

    # 过滤掉每行中没有6个字段的数据
    filter_rdd = map_rdd.filter(lambda line_list: len(line_list)==6)


    # 数据结构转换(为了演示而演示)
    etl_rdd = filter_rdd.map(lambda line_list:(
        line_list[0],
        line_list[1],
        line_list[2][1:-1], # 省略前后的中括号
        line_list[3],
        line_list[4],
        line_list[5]
    ))

    # 设置缓存。并且调用count算子触发操作
    # etl_rdd.cache().count()
    etl_rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()

    print("ETL处理后数据条数:", etl_rdd.count())

    # 3.2- 需求一:统计每个关键词出现了多少次
    # top10_keyword()

    # 3.3- 需求二:统计每个用户每个搜索内容点击的次数
    content()


    time.sleep(20)

    # 手动清理缓存。你对哪个RDD设置了缓存,那么你就对那个RDD清理缓存。也需要调用count算子触发。
    etl_rdd.unpersist().count()

    time.sleep(100)

    # 5- 释放资源
    sc.stop()

?无缓存的DAG流程图显示:

?有缓存的DAG流程图显示:

RDD的checkpoint检查点

RDD缓存主要是将数据存储在内存中,是临时存储,不太稳定,它主要是用来提升程序运行效率的。RDD的checkpoint(检查点)主要是将数据存储在HDFS上,是持久化存储。而HDFS存储数据有3副本的机制,让数据更加安全可靠。

checkpoint认为使用磁盘或者HDFS存储数据之后,数据非常的安全可靠,因此checkpoint会将RDD间的依赖关系给删除/丢弃掉。因此如果checkpoint的数据真的出现了问题,是无法在从头开始计算。

checkpoint主要作用: 提高程序的容错性
注意事项: checkpoint可以将数据存储在磁盘或者HDFS上,主要是将数据存储在HDFS上。

相关API:
?? ?sc.setCheckpointDir(存储路径): 设置checkpoint数据存放路径
?? ?rdd.checkpoint(): 对指定RDD启用checkpoint
?? ?rdd.count(): 触发checkpoint

import time

from pyspark import SparkConf, SparkContext, StorageLevel
import os
import jieba

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 需要过滤的关键词黑名单
keyword_black_list = ['+','.','的','com']

# ctrl+alt+M将代码封装成函数/方法
# 3.2- 需求一:统计每个关键词出现了多少次。先提取需要操作的字段并且分词,这一步类似WordCount中的对每行进行切分处理,再仿照WordCount实现。
def top10_keyword():
    keyword_rdd = etl_rdd.flatMap(lambda line_tup: list(jieba.cut(line_tup[2])))
    # print(keyword_rdd.take(10))

    # 数据结构转变。将单词变成元组
    # keyword_map_rdd = keyword_rdd.filter(lambda word:word!='+' or word!='.').map(lambda word:(word,1))
    keyword_map_rdd = keyword_rdd.filter(lambda word: word not in keyword_black_list).map(lambda word: (word, 1))

    # 分组聚合操作
    keyword_result_rdd = keyword_map_rdd.reduceByKey(lambda agg, curr: agg + curr)
    # print(keyword_result_rdd.take(100))

    # 对结果中关键词的次数降序排序,取TOP10
    keyword_result = keyword_result_rdd.top(10, key=lambda tup: tup[1])
    print(keyword_result)


# 3.3- 需求二:统计每个用户每个搜索内容点击的次数
def content():
    """
        hive sql:
            select
                用户,搜索内容,count(1) as cnt
            from table
            group by 用户,搜索内容
    """
    # 从原始的6个字段中,提取出2个字段,得到 (用户,搜索内容)
    new_tup_tmp_rdd = etl_rdd.map(lambda tup: (tup[1], tup[2]))

    # 数据格式转换
    """
            输入:(张三,鸡你太美) -> hello
            输出:((张三,鸡你太美),1) -> (hello,1)
        """
    new_tup_rdd = new_tup_tmp_rdd.map(lambda tup: (tup, 1))
    # new_tup_rdd = new_tup_tmp_rdd.map(lambda tup:(tup[0],tup[1],1))

    # 分组聚合
    content_result = new_tup_rdd.reduceByKey(lambda agg, curr: agg + curr)
    print(content_result.take(10))


if __name__ == '__main__':

    # 1- 创建SparkContext
    conf = SparkConf().setAppName('sogou_demo').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    # 设置checkpoint路径
    sc.setCheckpointDir("hdfs://node1:8020/day04/chk")

    # 2- 数据输入
    init_rdd = sc.textFile('file:///export/data/gz16_pyspark/01_spark_core/data/SogouQ.sample')

    print("ETL处理前数据条数:",init_rdd.count())

    # 3- 数据处理
    # 3.1- ETL:数据的清洗、转换、加载
    """
        split():默认按照空白字符进行切分。例如:空格、制表符、回车换行符等
        
        map和flatMap的主要区别:flatMap对每一个元素处理以后,会将结果打平/压扁到一个更大的容器当中。
    """
    map_rdd = init_rdd.map(lambda line:line.split())
    # print("调用map算子后的内容:",map_rdd.take(10))

    # flatmap_rdd = init_rdd.flatMap(lambda line: line.split())
    # print("调用flatMap算子后的内容:",flatmap_rdd.take(10))

    # 过滤掉每行中没有6个字段的数据
    filter_rdd = map_rdd.filter(lambda line_list: len(line_list)==6)


    # 数据结构转换(为了演示而演示)
    etl_rdd = filter_rdd.map(lambda line_list:(
        line_list[0],
        line_list[1],
        line_list[2][1:-1], # 省略前后的中括号
        line_list[3],
        line_list[4],
        line_list[5]
    ))

    # 对指定RDD启用checkpoint
    etl_rdd.checkpoint()
    # 调用count算子,触发checkpoint操作
    etl_rdd.count()


    print("ETL处理后数据条数:", etl_rdd.count())

    # 3.2- 需求一:统计每个关键词出现了多少次
    # top10_keyword()

    # 3.3- 需求二:统计每个用户每个搜索内容点击的次数
    content()

    time.sleep(1000)

    # 5- 释放资源
    sc.stop()

?

?

持久化方案对比

Spark的两种持久化方案缓存操作,checkpoint检查点的不同点

1.数据存储位置不同
缓存:rdd存储在内存,磁盘,或者是堆外内存中
checkpoint检查点:rdd存储在磁盘或者HDFS中,集群模式下仅能存储在HDFS中
2.生命周期不同
缓存:可以使用unpersist手动删除,或者程序运行结束后会自动销毁,自动删除
checkpoint检查点:程序运行结束后被保留,需要手动删除
3.血缘关系不同
缓存:RDD之间会保留血缘关系,缓存数据可能会失效,失效后可以重新回溯计算
checkpoint检查点:会丢掉依赖关系,因为checkpoint可以将数据保存到更加安全可靠的位置,当执行失败时也不需要重新回溯执行
4.目的不同
缓存:为了提高Spark程序的运行效率
checkpoint检查点:提高Spark程序的容错性

相同点:缓存的API都是Lazy惰性的,如果需要触发缓存操作,推荐调用count算子,因为运行效率高

实际应用

在同一个项目中,推荐缓存和checkpoint(检查点)同时配合使用。

使用顺序如下: 在代码中先设置缓存,再设置checkpoint检查点,然后再一同使用Action算子触发,推荐使用count算子。因为这个顺序,只会有一次IO写的过程。

实际过程如下: 程序会优先从缓存中读取数据,如果发现缓存中没有数据。再从checkpoint中读取数据,并且接着将读取到的数据重新在内存中放置一份,后续还是优先从缓存中读取

import time

from pyspark import SparkConf, SparkContext, StorageLevel
import os
import jieba

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

# 需要过滤的关键词黑名单
keyword_black_list = ['+','.','的','com']

# ctrl+alt+M将代码封装成函数/方法
# 3.2- 需求一:统计每个关键词出现了多少次。先提取需要操作的字段并且分词,这一步类似WordCount中的对每行进行切分处理,再仿照WordCount实现。
def top10_keyword():
    keyword_rdd = etl_rdd.flatMap(lambda line_tup: list(jieba.cut(line_tup[2])))
    # print(keyword_rdd.take(10))

    # 数据结构转变。将单词变成元组
    # keyword_map_rdd = keyword_rdd.filter(lambda word:word!='+' or word!='.').map(lambda word:(word,1))
    keyword_map_rdd = keyword_rdd.filter(lambda word: word not in keyword_black_list).map(lambda word: (word, 1))

    # 分组聚合操作
    keyword_result_rdd = keyword_map_rdd.reduceByKey(lambda agg, curr: agg + curr)
    # print(keyword_result_rdd.take(100))

    # 对结果中关键词的次数降序排序,取TOP10
    keyword_result = keyword_result_rdd.top(10, key=lambda tup: tup[1])
    print(keyword_result)


# 3.3- 需求二:统计每个用户每个搜索内容点击的次数
def content():
    """
        hive sql:
            select
                用户,搜索内容,count(1) as cnt
            from table
            group by 用户,搜索内容
    """
    # 从原始的6个字段中,提取出2个字段,得到 (用户,搜索内容)
    new_tup_tmp_rdd = etl_rdd.map(lambda tup: (tup[1], tup[2]))

    # 数据格式转换
    """
            输入:(张三,鸡你太美) -> hello
            输出:((张三,鸡你太美),1) -> (hello,1)
        """
    new_tup_rdd = new_tup_tmp_rdd.map(lambda tup: (tup, 1))
    # new_tup_rdd = new_tup_tmp_rdd.map(lambda tup:(tup[0],tup[1],1))

    # 分组聚合
    content_result = new_tup_rdd.reduceByKey(lambda agg, curr: agg + curr)
    print(content_result.take(10))


if __name__ == '__main__':

    # 1- 创建SparkContext
    conf = SparkConf().setAppName('sogou_demo').setMaster('local[*]')
    sc = SparkContext(conf=conf)

    # 设置checkpoint路径
    sc.setCheckpointDir("hdfs://node1:8020/day04/chk")

    # 2- 数据输入
    init_rdd = sc.textFile('file:///export/data/gz16_pyspark/01_spark_core/data/SogouQ.sample')

    print("ETL处理前数据条数:",init_rdd.count())

    # 3- 数据处理
    # 3.1- ETL:数据的清洗、转换、加载
    """
        split():默认按照空白字符进行切分。例如:空格、制表符、回车换行符等
        
        map和flatMap的主要区别:flatMap对每一个元素处理以后,会将结果打平/压扁到一个更大的容器当中。
    """
    map_rdd = init_rdd.map(lambda line:line.split())
    # print("调用map算子后的内容:",map_rdd.take(10))

    # flatmap_rdd = init_rdd.flatMap(lambda line: line.split())
    # print("调用flatMap算子后的内容:",flatmap_rdd.take(10))

    # 过滤掉每行中没有6个字段的数据
    filter_rdd = map_rdd.filter(lambda line_list: len(line_list)==6)


    # 数据结构转换(为了演示而演示)
    etl_rdd = filter_rdd.map(lambda line_list:(
        line_list[0],
        line_list[1],
        line_list[2][1:-1], # 省略前后的中括号
        line_list[3],
        line_list[4],
        line_list[5]
    ))

    # 先缓存
    etl_rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)

    # 再checkpoint
    etl_rdd.checkpoint()

    # 最后调用count算子,一同触发
    etl_rdd.count()


    print("ETL处理后数据条数:", etl_rdd.count())

    # 3.2- 需求一:统计每个关键词出现了多少次
    # top10_keyword()

    # 3.3- 需求二:统计每个用户每个搜索内容点击的次数
    content()

    time.sleep(1000)

    # 5- 释放资源
    sc.stop()

文章来源:https://blog.csdn.net/denglh525693/article/details/135432095
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。