2024.1.7 Spark SQL , DataFrame

发布时间:2024年01月07日

目录

一 . SparkSQL简介

二 . Spark SQL与HIVE的异同?

?三 . DataFrame

1. 创建 DataFrame

2. RDD转换DataFrame

四 . 操作DataFrame

?SQL方式:

DSL方式:


一 . SparkSQL简介

Spark SQL只能处理结构化数据 ,属于Spark框架一个部分? Schema:元数据信息

特点: 融合性 ,统一数据访问,hive兼容 , 标准化连接??

将hive sql翻译成Spark上对应的RDD操作 ,底层运行SparkRDD?

DataFrames是在RDD上面增加与省略了一些东西

DataFrame? =? RDD -泛型 +Schema? +方便到的SQL操作 + 优化? ,是个特殊的RDD

RDD存储任意结构数据? ;? ? ? ? ?DataFrame存储二维表结构数据

二 . Spark SQL与HIVE的异同?

1- Spark SQL是基于内存计算, 而HIVE SQL是基于磁盘进行计算的
2- Spark SQL没有元数据管理服务(自己维护), 而HIVE SQL是有metastore的元数据管理服务的
3- Spark SQL底层执行Spark RDD程序, 而HIVE SQL底层执行是MapReduce
4- Spark SQL可以编写SQL也可以编写代码,但是HIVE SQL仅能编写SQL语句

?三 . DataFrame

DataFrame表示的是一个二维的表。二维表,必然存在行、列等表结构描述信息

表结构描述信息(元数据Schema): StructType对象
字段: StructField对象,可以描述字段名称、字段数据类型、是否可以为空
行: Row对象
列: Column对象,包含字段名称和字段值

在一个StructType对象下,由多个StructField组成,构建成一个完整的元数据信息

1. 创建 DataFrame

import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    spark = SparkSession.builder.appName('创建DataFrame')\
        .master('local[*]').getOrCreate()

    init_df = spark.createDataFrame(
        data=[(1,'张三',18),(2,'李四',40),(3,'王五',60)],
        schema='id:int,name:string,age:int'
    )
    init_df2 = spark.createDataFrame(
        data=[(1, '张三', 18), (2, '李四', 30),(3,'王五',60)],
        schema=["id","name","age"]
    )
    init_df.show()
    '''
    +---+----+---+
    | id|name|age|
    +---+----+---+
    |  1|张三| 18|
    |  2|李四| 30|
    +---+----+---+
    '''
    init_df2.show()

    init_df.printSchema()
    '''
    root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
    '''
    init_df2.printSchema()
    '''
    root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
    '''

    spark.stop()

2. RDD转换DataFrame

from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession

# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructField

os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建SparkSession对象
    spark = SparkSession.builder\
        .appName('rdd_2_dataframe')\
        .master('local[*]')\
        .getOrCreate()

    # 通过SparkSession得到SparkContext
    sc = spark.sparkContext

    # 2- 数据输入
    # 2.1- 创建一个RDD
    init_rdd = sc.parallelize(["1,李白,20","2,安其拉,18"])

    # 2.2- 将RDD的数据结构转换成二维结构
    new_rdd = init_rdd.map(lambda line: (
            int(line.split(",")[0]),
            line.split(",")[1],
            int(line.split(",")[2])
        )
    )

    # 将RDD转成DataFrame:方式一
    # schema方式一
    schema = StructType()\
        .add('id',IntegerType(),False)\
        .add('name',StringType(),False)\
        .add('age',IntegerType(),False)


    # schema方式二
    schema = StructType([
        StructField('id',IntegerType(),False),
        StructField('name',StringType(),False),
        StructField('age',IntegerType(),False)
    ])

    # schema方式三
    schema = "id:int,name:string,age:int"

    # schema方式四
    schema = ["id","name","age"]

    init_df = spark.createDataFrame(
        data=new_rdd,
        schema=schema
    )

    # 将RDD转成DataFrame:方式二
    """
        toDF:中的schema既可以传List,也可以传字符串形式的schema信息
    """
    # init_df = new_rdd.toDF(schema=["id","name","age"])
    init_df = new_rdd.toDF(schema="id:int,name:string,age:int")

    # 3- 数据处理
    # 4- 数据输出
    init_df.show()
    init_df.printSchema()

    # 5- 释放资源
    sc.stop()
    spark.stop()

?

四 . 操作DataFrame

?SQL方式:

df.createTempView('视图名称'): 创建一个临时的视图(表名)
df.createOrReplaceTempView('视图名称'): 创建一个临时的视图(表名),如果视图存在,直接替换
临时视图,仅能在当前这个Spark Session的会话中使用


df.createGlobalTempView('视图名称'): 创建一个全局视图,运行在一个Spark应用中多个spark会话中都可以使用。在使用的时候必须通过 global_temp.视图名称 方式才可以加载到。较少使用

DSL方式:

  • show():用于展示DF中数据, 默认仅展示前20行

    • 参数1:设置默认展示多少行 默认为20

    • 参数2:是否为阶段列, 默认仅展示前20个字符数据, 如果过长, 不展示(一般不设置)

  • printSchema():用于打印当前这个DF的表结构信息

  • select():类似于SQL中select, SQL中select后面可以写什么, 这样同样也一样

  • filter()和 where():用于对数据进行过滤操作, 一般在spark SQL中主要使用where

  • groupBy():用于执行分组操作

  • orderBy():用于执行排序操作

文章来源:https://blog.csdn.net/m0_49956154/article/details/135436040
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。