SparkSQL——DataFrame

发布时间:2024年01月17日

DataFrame

  • Dataframe 是什么

    DataFrameSparkSQL中一个表示关系型数据库中 的函数式抽象, 其作用是让 Spark处理大规模结构化数据的时候更加容易. 一般 DataFrame可以处理结构化的数据, 或者是半结构化的数据, 因为这两类数据中都可以获取到 Schema信息. 也就是说 DataFrame中有 Schema信息, 可以像操作表一样操作 DataFrame.

    在这里插入图片描述

    DataFrame 由两部分构成, 一是 row的集合, 每个 row对象表示一个行, 二是描述 DataFrame结构的 Schema

    在这里插入图片描述

    DataFrame支持 SQL中常见的操作, 例如: select, filter, join, group, sort, join

    • code
    @Test
      def dataframe1(): Unit = {
        // 1. 创建 SparkSession 对象
        val spark = SparkSession.builder()
          .master("local[6]")
          .appName("dataframe1")
          .getOrCreate()
    
        // 2. 创建 DataFrame
        import spark.implicits._
    
        val dataFrame: DataFrame = Seq(Person("zhangsan", 15), Person("lisi", 20)).toDF()
        // 3. 看看 DataFrame 可以玩出什么什么花样
        // select name from t where t.age >10
        dataFrame.where('age > 10)
          .select( 'name)
          .show()
      }
    
    case class Person(name: String, age: Int)
    

    在这里插入图片描述

  • DataFrame 如何创建

    DataFrame如何创建数据集【BeijingPM20100101_20151231_noheader.rar】

    @Test
      def dataframe2():Unit = {
        val spark = SparkSession.builder()
          .master("local[6]")
          .appName("dataframe2")
          .getOrCreate()
    
        import spark.implicits._
    
        val personList = Seq(Person("zhangsan", 15), Person("lisi", 20))
    
        // 创建 DataFrame 的方法
        // 1.toDF
        val df1 = personList.toDF()
        val df2 = spark.sparkContext.parallelize(personList).toDF() // RDD.toDf()
        // 2. createDatFrame
        val df3 = spark.createDataset(personList)
        // 3. read 
        val df4 = spark.read.csv("./dataset/BeijingPM20100101_20151231_noheader.csv")
        df4.show()
      }
    
    case class Person(name: String, age: Int)
    

    在这里插入图片描述

    在这里插入图片描述

  • DataFrame 操作 (案例)

    DataFrame操作数据集[BeijingPM20100101_20151231.rar]

    需求:查看 PM_Dongsi 每个月的统计数量

    object DataFrameTest {
      def main(args: Array[String]): Unit = {
        // 1. 创建SparkSession
        val spark = SparkSession.builder()
          .master("local[6]")
          .appName("pm_analysis")
          .getOrCreate()
    
        import spark.implicits._
        // 2. 读取数据集
        val sourceDF = spark.read
          .option("header",true) // 把表头读取出来
          .csv("./dataset/BeijingPM20100101_20151231.csv")
        //sourceDF.show()
        //查看DataFrame 的 schema 信息,要意识到 DataFrame 中是有结构信息的,叫做Schema
        sourceDF.printSchema()
        // 3. 处理
        //    1. 选择列
        //    2. 过滤 NA 的 PM记录
        //    3. 分组 select year, month, count(PM_Dongsi) from .. where PM_Dongsi != NA group by year, month
        //    4. 聚合
        // 4. 得出结论
        sourceDF.select('year,'month,'PM_Dongsi)
          .where('PM_Dongsi =!= "NA") // 过滤 NA 的 PM记录
          .groupBy('year,'month)
          .count()
          .show() // action
    
        // 是否能支持使用 SQL 语句进行查询
        println("---------接下来是SQL语句查询的--------------")
        // 1. 将 DataFrame 注册为临时表
        sourceDF.createOrReplaceTempView("pm")
    
        // 2. 执行查询
        val resultDF = spark.sql("select year, month, count(PM_Dongsi) from pm where PM_Dongsi != 'NA' group by year,month")
    
        resultDF.show()
    
        spark.stop()
      }
    }
    

总结

  1. DataFrame 是一个类似于关系型数据库表的函数式组件
  2. DataFrame 一般处理结构化数据和半结构化数据
  3. DataFrame 具有数据对象的 Schema 信息
  4. 可以使用命令式的 API 操作 DataFrame, 同时也可以使用 SQL 操作 DataFrame
  5. DataFrame 可以由一个已经存在的集合直接创建, 也可以读取外部的数据源来创建

小Tips

一般处理数据都差不多是ETL这个步骤

  • E -> 抽取
  • T -> 处理转换
  • L -> 装载,落地

Spark代码编写的套路:

  • 创建DataFrame Dataset RDD,制造或者读取数据
  • 通过DataFrame Dataset RDD的API来进行数据处理
  • 通过DataFrame Dataset RDD进行数据落地
文章来源:https://blog.csdn.net/m0_56181660/article/details/135637206
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。