DataFrame相关的API

发布时间:2024年01月09日

目录

DataFrame的操作方案

?SQL相关的API

创建一个视图/表

?DSL相关的API

DSL的传递方式

?SQL的函数库

Spark SQL的综合应用

直接基于DataFrame来处理

SQL方式

?DSL方式

?基于RDD转换DataFrame的方式


DataFrame的操作方案

??????? 操作DataFrame一般有两种操作方案:一种为DSL方式,一种为SQL方式

SQL方式:通过编写SQL语句完成统计分析操作

DSL操作:特定领域语言,使用DataFrame特有的API完成计算,也就是代码形式

从使用角度来说:SQL更加方便一些,当适应了DSL写法后,会发现DSL比SQL更好用

从Soark角度来说:推荐使用DSL方案,更有利于Spark底层优化处理

?SQL相关的API

创建一个视图/表

df.createTempview('视图名称'):创建一个临时的视图(表名)

df.createorReplaceTempview('视图名称'):创建一个临时的视图(表名),如果视图存在,直接替换

临时视图:仅能在当前这个spark session的会话中使用

df.createGlobalTempview('视图名称'):创建一个全局视图,运行在一个Spark应用中,多个spark会话中读可以使用,使用的时候必须通过global_temp.视图名称方式才可以加载到,较少使用

执行SQL语句:

??????? spark.sql('书写SQL')

?DSL相关的API

show():用于展示DF中数据,默认仅展示前20行

???????????? 参数1:设置默认展示多少行,默认为20

?????????????? 参数2:是否为阶段列,默认仅展示前20个字符数据,如果过长,不展示(一般不设置)

printSchema():用于打印当前这个DF的表结构信息

select():类似于SQL中的select,SQL中的select后面可以些什么,这里也一样

filter()和where():用于对数据进行过滤操作,一般在SparkSQL中只要使用where

groupBy():用于执行分组操作

orderBy():用于执行排序操作

DSL的传递方式

DSL主要支持一下几种传递的方式: str | column对象 | 列表

??????? str格式: '字段'

??????? column对象:

????????????????????????????? DataFrame含有的字段? df['字段']

?????????????????????????????? 执行过程产生:F.col('字段')

??????? 列表:

??????????????? ['字段1','字段2'...]

??????????????? [df['字段1'],df['字段2']]

?SQL的函数库

为了能够支持在编写Spark SQL的DSL时候,在DSL中使用SQL函数,专门提供一个SQL的函数库。直接加载使用即可

导入这个函数库:import pyspark.sql.functions as F

通过F调用对应的函数即可

SparkSQL中所支持的函数,都可以通过以下地址查询到:
https://spark.apache.org/docs/3.1.2/api/sql/index.html

Spark SQL的综合应用

world count 案例

已知HDFS又一个words.txt的文件,words.txt文件的内容如下:

hadoop hive hadoop sqoop hive
sqoop hadoop zookeeper hive hue
hue sqoop hue zookeeper hive
spark oozie spark hadoop oozie
hive oozie spark hadoop

直接基于DataFrame来处理

SQL方式

SQL方式一:子查询

SQL方式二:侧视图

炸裂函数配合侧视图使用如下:

????????格式:select 原表别名.字段名,侧视图名.字段名 from 原表 原表别名 lateral view explode(要炸开的字段) 侧视图名 as 字段名

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F

# 绑定指定的python解释器
"""
1.2 直接基于DataFrame来处理

需求分析:

1- 将每行内容切分得到单个的单词

2- 组织DataFrame的数据结构

2.1- 有两列。一列是单词,一列是次数
2.2- 只有一列。单词
"""

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('直接基于DataFrame来处理')
    spark = SparkSession \
        .builder \
        .appName('dataFrame_world_count_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    # text方式读取hdfs上的文件
    init_df = spark.read.text(paths='hdfs://node1:8020/source/word.txt')
    # # 查看数据
    # init_df.show()
    # # 打印dataframe表结构信息
    # init_df.printSchema()
    # 创建临时视图
    init_df.createTempView('words')
    # 数据处理
    """
    sparksql方式处理数据-子查询
    1.先切分每一行的数据
    2.使用炸裂函数获得一个word单词列
    3.使用子查询方式聚合统计每个单词出现的次数
    """
    spark.sql("""select word,count(*) as cnt 
    from (select explode(split(value,' ')) as word from words)
    group by word order by cnt desc
    """).show()
    """
       sparksql方式处理数据-侧视图
       1.先切分每一行的数据
       2.使用炸裂函数获得一个word单词列
       3.使用侧视图方式聚合统计每个单词出现的次数
       炸裂函数配合侧视图使用如下:
       格式:select 原表别名.字段名,侧视图名.字段名 from 原表 原表别名 lateral view explode(要炸开的字段)
       侧视图名 as 字段名
       """
    spark.sql("""select word,count(*) as cnt
    from words w 
    lateral view explode(split(value,' ')) t as word
    group by word order by cnt desc
    """).show()

?DSL方式

DSL方式总结:
??????????? withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名
??????????? agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可
??????????? withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源

?

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F

# 绑定指定的python解释器
"""
1.2 直接基于DataFrame来处理

需求分析:

1- 将每行内容切分得到单个的单词

2- 组织DataFrame的数据结构

2.1- 有两列。一列是单词,一列是次数
"""

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('直接基于DataFrame来处理')
    spark = SparkSession \
        .builder \
        .appName('dataFrame_world_count_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 数据输入
    # text方式读取hdfs上的文件
    init_df = spark.read.text(paths='hdfs://node1:8020/source/word.txt')
    # # 查看数据
    # init_df.show()
    # # 打印dataframe表结构信息
    # init_df.printSchema()
    # 创建临时视图
    init_df.createTempView('words')
    # 数据处理
    """
           DSL方式处理数据-方式一
           1.先切分每一行的数据
           2.使用炸裂函数获得一个word单词列
           3.调用API聚合统计单词个数再排序
    """
    init_df.select(
        F.explode(F.split('value', ' ')).alias('word')
    ).groupBy('word').count().orderBy('count', ascending=False).show()

    """
           DSL方式处理数据-方式二
           1.先切分每一行的数据
           2.使用炸裂函数获得一个word单词列
           3.调用API聚合统计单词个数再排序
           4.agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可
    """
    init_df.select(
        F.explode(F.split('value', ' ')).alias('word')
    ).groupBy('word').agg(
        F.count('word').alias('cnt'),
        F.max('word').alias('max_word'),
        F.min('word').alias('min_word'),
    ).orderBy('cnt', ascending=False).show()

    """
    DSL方式处理数据-方式三
        withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名
        withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源
    """
    init_df.withColumn(
        'word',
        F.explode(F.split('value', ' '))
    ).groupBy('word').agg(
        F.count('word').alias('cnt'),
        F.max('word').alias('max_word'),
        F.min('word').alias('min_word')
    ).orderBy('cnt', ascending=False).show()

    # 数据输出
    # 是否资源
    spark.stop()

?基于RDD转换DataFrame的方式

# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F

# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式

需求分析:

1- 将每行内容切分得到单个的单词

2- 组织DataFrame的数据结构

2.1- 有两列。一列是单词,一列是次数
"""

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
    print('直接基于DataFrame来处理')
    # 创建SparkSession对象
    spark = SparkSession \
        .builder \
        .appName('dataFrame_world_count_demo') \
        .master('local[*]') \
        .getOrCreate()
    # 创建sparkContext顶级对象
    sc = spark.sparkContext
    # 数据输入
    # text方式读取hdfs上的文件
    init_rdd = sc.textFile('hdfs://node1:8020/source/word.txt')
    # RDD数据结构转化为二维数据
    map_rdd = init_rdd.flatMap(lambda line: line.split()).map(lambda word: (word,))
    # 查看数据
    # print(map_rdd.collect())
    # 通过Rdd构建DataFrame
    schema = StructType([StructField("value", StringType(), True)])
    init_df = spark.createDataFrame(data=map_rdd, schema=schema)
    # 打印dataframe表结构信息
    # init_df.show()
    # init_df.printSchema()
    # 创建临时视图
    init_df.createTempView('words')
    # 数据处理
    """
       sparksql方式处理数据
    """
    spark.sql("""select value as word,count(*) as cnt
    from words group by value order by cnt desc""").show()

    print('=' * 50)
    """
           DSL方式处理数据
    """
    init_df.withColumn(
        'word',
        init_df.value
    ).groupBy('word').agg(
        F.count('word').alias('cnt'),
        F.max('word').alias('max_word'),
        F.min('word').alias('min_word'),
    ).orderBy('cnt', ascending=False).show()

    # 释放资源
    spark.stop()
文章来源:https://blog.csdn.net/denglh525693/article/details/135449979
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。