SparkSQL扩展——数据读写

发布时间:2024年01月18日

数据读写

load data inpath ‘/dataset/studenttab10k’ overwrite into table student;

初识 DataFrameReader (读)

  • 初识 DataFrameReader (读)

    SparkSQL的一个非常重要的目标就是完善数据读取, 所以 SparkSQL 中增加了一个新的框架, 专门用于读取外部数据源, 叫做 DataFrameReader

    @Test
    def reader1(): Unit = {
      // 1. 创建SparkSession 对象
      val spark = SparkSession.builder()
        .master("local[6]")
        .appName("reader1")
        .getOrCreate()
      // 2. 框架在哪
      val reader:DataFrameReader = spark.read
    }
    

    DataFrameReader 由如下几个组件组成

    组件解释
    schema结构信息, 因为 Dataset是有结构的, 所以在读取数据的时候, 就需要有 Schema信息, 有可能是从外部数据源获取的, 也有可能是指定的
    option连接外部数据源的参数, 例如 JDBC 的 URL, 或者读取 CSV文件是否引入 Header等
    format外部数据源的格式, 例如 csv, jdbc, json

    DataFrameReader有两种访问方式, 一种是使用 load方法加载, 使用 format指定加载格式, 还有一种是使用封装方法, 类似 csv, json, jdbc

    @Test
    def reader2(): Unit = {
      // 1. 创建SparkSession 对象
      val spark = SparkSession.builder()
        .master("local[6]")
        .appName("reader2")
        .getOrCreate()
      // 2. 第一种形式
      spark.read
        .format("csv") // 文件格式
        .option("header", value = true) // 显示头部
        .option("inferSchema", value = true) // 除头部的其他信息,结构信息
        .load("./dataset/BeijingPM20100101_20151231.csv") // 文件地址
        .show(10) // 展示10行数据
      // 3. 第二种形式
      spark.read
        .option("header", value = true)
        .option("inferSchema", value = true)
        .csv("./dataset/BeijingPM20100101_20151231.csv")
        .show()
    }
    

    但是其实这两种方式本质上一样, 因为类似 csv 这样的方式只是 load 的封装

    在这里插入图片描述

    注意:

    • 如果使用 load 方法加载数据, 但是没有指定 format 的话, 默认是按照 Parquet 文件格式读取
    • 也就是说, SparkSQL 默认的读取格式是 Parquet

    总结

    1. 使用 spark.read 可以获取 SparkSQL 中的外部数据源访问框架 DataFrameReader
    2. DataFrameReader 有三个组件 format, schema, option
    3. DataFrameReader 有两种使用方式, 一种是使用 load 加 format 指定格式, 还有一种是使用封装方法 csv, json 等

初识 DataFrameWriter (写)

  • 初识 DataFrameWriter (写)

    对于 ETL 来说, 数据保存和数据读取一样重要, 所以 SparkSQL 中增加了一个新的数据写入框架, 叫做 DataFrameWriter

    @Test
    def writer1():Unit = {
      // 1. 创建SparkSession 对象
      val spark = SparkSession.builder()
        .master("local[6]")
        .appName("writer1")
        .getOrCreate()
      val df = spark.read
        .option("header", true)
        .csv("./dataset/BeijingPM20100101_20151231.csv")
      // 2. 框架在哪
      val writer: DataFrameWriter[Row] = df.write
    
    }
    

    DataFrameWriter 中由如下几个部分组成

    组件解释
    source写入目标, 文件格式等, 通过 format方法设定
    mode写入模式, 例如一张表已经存在, 如果通过 DataFrameWriter向这张表中写入数据, 是覆盖表呢, 还是向表中追加呢? 通过 mode方法设定
    extraOptions外部参数, 例如 JDBC的 URL, 通过 options, option设定
    partitioningColumns类似 Hive的分区, 保存表的时候使用, 这个地方的分区不是 RDD的分区, 而是文件的分区, 或者表的分区, 通过 partitionBy设定
    bucketColumnNames类似 Hive的分桶, 保存表的时候使用, 通过 bucketBy设定
    sortColumnNames用于排序的列, 通过 sortBy设定

    mode 指定了写入模式, 例如覆盖原数据集, 或者向原数据集合中尾部添加等

    Scala 对象表示字符串表示解释
    SaveMode.ErrorIfExists"error”将 DataFrame保存到 source时, 如果目标已经存在, 则报错
    SaveMode.Append"append”将 DataFrame保存到 source时, 如果目标已经存在, 则添加到文件或者 Tabl中
    SaveMode.Overwrite"overwrite”将 DataFrame保存到 source时, 如果目标已经存在, 则使用 DataFrame中的数据完全覆盖目标
    SaveMode.Ignore"ignore”将 DataFrame保存到 source时, 如果目标已经存在, 则不会保存 DataFrame数据, 并且也不修改目标数据集, 类似于 CREATE TABLE IF NOT EXISTS

    DataFrameWriter 也有两种使用方式, 一种是使用 format 配合 save, 还有一种是使用封装方法, 例如 csv, json, saveAsTable 等

    @Test
      def writer2():Unit = {
        System.setProperty("hadoop.home.dir","C:\\winutils") // Windows 写文件必须的
        // 1. 创建SparkSession 对象
        val spark = SparkSession.builder()
          .master("local[6]")
          .appName("writer2")
          .getOrCreate()
        // 2. 读取数据集
        val df = spark.read
          .option("header", true)
          .csv("./dataset/BeijingPM20100101_20151231.csv")
        // 3. 写入数据集
    		// 使用 json 保存, 因为方法是 json, 所以隐含的 format 是 json
        df.write.json(("./dataset/beijing_pm.json"))
    		// 使用 save 保存, 使用 format 设置文件格式
        df.write.format("json").save("./dataset/beijing_pm2.json")
    
      }
    

    在这里插入图片描述

    总结

    1. 类似 DataFrameReader, Writer 中也有 format, options, 另外 schema 是包含在 DataFrame 中的
    2. DataFrameWriter 中还有一个很重要的概念叫做 mode, 指定写入模式, 如果目标集合已经存在时的行为
    3. DataFrameWriter 可以将数据保存到 Hive 表中, 所以也可以指定分区和分桶信息

读写 Parquet 格式文件

  • 读写 Parquet 格式文件

    什么时候会用到 Parquet ?

    在这里插入图片描述

    在 ETL 中, Spark 经常扮演 T 的职务, 也就是进行数据清洗和数据转换.

    为了能够保存比较复杂的数据, 并且保证性能和压缩率, 通常使用 Parquet 是一个比较不错的选择.

    所以外部系统收集过来的数据, 有可能会使用 Parquet, 而 Spark 进行读取和转换的时候, 就需要支持对 Parquet 格式的文件的支持.

    一般处理数据都差不多是ETL的过程

    E -> 抽取 T -> 处理,转换L -> 装载,落底

    使用代码读写 Parquet 文件

    默认不指定 format, 默认的 就是读写 Parquet 格式的文件

    object Parquet {
    def main(args: Array[String]): Unit = {
      System.setProperty("hadoop.home.dir", "c:\\winutils")
      val spark = SparkSession.builder()
        .master("local[6]")
        .appName("parquet")
        .getOrCreate()
      // 读取 CSV 文件数据
      val df = spark.read.option("header", true).csv("./dataset/BeijingPM20100101_20151231.csv")
    
      // 把数据写为 Parquet 格式
      // 写入的格式默认parquet
      // 写入模式,报错,覆盖,追加,忽略
      df.write
        //    .format("parquet") // 设置类型 ,默认parquet
        .mode(SaveMode.Overwrite) //看mode源码,重写内容,将DataFrame保存到source时, 如果目标已经存在, 则使用DataFrame中的数据完全覆盖目标
        .save("dataset/beijing_pm3")
    
      // 读取 Parquet 格式文件
      // 默认格式是否是parquet? -> 是
      // 是否可能读取文件夹呢? -> 是
      spark.read
        .load("./dataset/beijing_pm3")
        .show()
    }
    }
    

    在这里插入图片描述

分区

  • 分区

    写入 Parquet 的时候可以指定分区

    Spark 在写入文件的时候是支持分区的, 可以像 Hive 一样设置某个列为分区列

    @Test
    def parquetPartitions():Unit={
      System.setProperty("hadoop.home.dir","C:\\winutils")
      // 1. 创建SparkSession 对象
      val spark = SparkSession.builder()
        .master("local[6]")
        .appName("writer2")
        .getOrCreate()
      // 2. 读取数据
      val df = spark.read
        .option("header",true)
        .csv("./dataset/BeijingPM20100101_20151231.csv")
      // 3. 写文件,表分区
      df.write
        .partitionBy("year","month")
        .save("dataset/beijing_pm4")
      // 4. 读文件,自动发现分区
      // 写分区表的时候,分区列不会包含在生成的文件中
      // 直接通过文件来进行读取的话,分区信息会丢失
      // SparkSQL 会进行自动的分区发现,
      spark.read
    //      .parquet("./dataset/beijing_pm4/year=2010/month=1")
        .parquet("./dataset/beijing_pm4/") // 所以直接读取最外层文件夹
        .printSchema() // 打印这个时把2 和 3步骤注释
    }
    

    在这里插入图片描述

    在这里插入图片描述

    注意:这里指的分区是类似 Hive 中表分区的概念, 而不是 RDD 分布式分区的含义

    分区发现

    在读取常见文件格式的时候, Spark 会自动的进行分区发现, 分区自动发现的时候, 会将文件名中的分区信息当作一列. 例如 如果按照性别分区, 那么一般会生成两个文件夹 gender=male 和 gender=female, 那么在使用 Spark 读取的时候, 会自动发现这个分区信息, 并且当作列放入创建的 DataFrame 中

    SparkSession 中有关 Parquet 的配置

    配置默认值含义
    spark.sql.parquet.binaryAsStringfalse一些其他 Parquet 生产系统, 不区分字符串类型和二进制类型, 该配置告诉 SparkSQL 将二进制数据解释为字符串以提供与这些系统的兼容性
    spark.sql.parquet.int96AsTimestamptrue一些其他 Parquet 生产系统, 将 Timestamp 存为 INT96, 该配置告诉 SparkSQL 将 INT96 解析为 Timestamp
    spark.sql.parquet.cacheMetadatatrue打开 Parquet 元数据的缓存, 可以加快查询静态数据
    spark.sql.parquet.compression.codecsnappy压缩方式, 可选 uncompressed, snappy, gzip, lzo
    spark.sql.parquet.mergeSchemafalse当为 true 时, Parquet 数据源会合并从所有数据文件收集的 Schemas 和数据, 因为这个操作开销比较大, 所以默认关闭
    spark.sql.optimizer.metadataOnlytrue如果为 true, 会通过原信息来生成分区列, 如果为 false 则就是通过扫描整个数据集来确定

    总结:

    1. Spark 不指定 format 的时候默认就是按照 Parquet 的格式解析文件
    2. Spark 在读取 Parquet 文件的时候会自动的发现 Parquet 的分区和分区字段
    3. Spark 在写入 Parquet 文件的时候如果设置了分区字段, 会自动的按照分区存储

读写 JSON 格式文件

  • 读写 JSON 格式文件

    什么时候会用到 JSON ?

    在这里插入图片描述

    在 ETL 中, Spark 经常扮演 T 的职务, 也就是进行数据清洗和数据转换.

    在业务系统中, JSON 是一个非常常见的数据格式, 在前后端交互的时候也往往会使用 JSON, 所以从业务系统获取的数据很大可能性是使用 JSON 格式, 所以就需要 Spark 能够支持 JSON 格式文件的读取

    • 读写 JSON 文件

      将要 Dataset 保存为 JSON 格式的文件比较简单, 是 DataFrameWriter 的一个常规使用

      @Test
        def json(): Unit = {
          System.setProperty("hadoop.home.dir","C:\\winutils")
          val spark = SparkSession.builder()
            .master("local[6]")
            .appName("json")
            .getOrCreate()
          // 读
          val df = spark.read
            .option("header",true)
            .csv("./dataset/BeijingPM20100101_20151231.csv")
          // 写
      //    df.write
      //      .json("./dataset/beijing_pm5")
      
          // 读json (先写json再读)
          spark.read
            .json("./dataset/beijing_pm5")
            .show()
        }
      

      在这里插入图片描述

      如果不重新分区, 则会为 DataFrame 底层的 RDD 的每个分区生成一个文件, 为了保持只有一个输出文件, 所以重新分区

      保存为 JSON 格式的文件有一个细节需要注意, 这个 JSON 格式的文件中, 每一行是一个独立的 JSON, 但是整个文件并不只是一个 JSON 字符串, 所以这种文件格式很多时候被成为 JSON Line 文件, 有时候后缀名也会变为 jsonl

      beijing_pm.jsonl

      {"day":"1","hour":"0","season":"1","year":2013,"month":3}
      {"day":"1","hour":"1","season":"1","year":2013,"month":3}
      {"day":"1","hour":"2","season":"1","year":2013,"month":3}
      

      JSON 格式的文件是有结构信息的, 也就是 JSON 中的字段是有类型的, 例如 “name”: “zhangsan” 这样由双引号包裹的 Value, 就是字符串类型, 而 “age”: 10 这种没有双引号包裹的就是数字类型, 当然, 也可以是布尔型 “has_wife”: true

      Spark 读取 JSON Line 文件的时候, 也会自动的推断类型信息

    总结

    1. JSON 通常用于系统间的交互, Spark 经常要读取 JSON 格式文件, 处理, 放在另外一处
    2. 使用 DataFrameReader 和 DataFrameWriter 可以轻易的读取和写入 JSON, 并且会自动处理数据类型信息
    • json的两个应用场景

      /**
       * toJSON 的场景:
       * 处理完了以后, DataFrame中如果是一个对象, 如果其他的系统只支持 JSON 格式的数据
       * SParkSQL 如果和这种系统进行整合的时候, 就需要进行转换
       */
      @Test
      def json1():Unit = {
        val spark = SparkSession.builder()
          .master("local[6]")
          .appName("json1")
          .getOrCreate()
      
        val df = spark.read
          .option("header",true)
          .csv("./dataset/BeijingPM20100101_20151231.csv")
      
        df.toJSON.show()
      }
      

      在这里插入图片描述

      /**
       * 从消息队列中取出JSON格式的数据, 需要使用 SparkSQL 进行处理
       */
      @Test
      def json2():Unit = {
        val spark = SparkSession.builder()
          .master("local[6]")
          .appName("json2")
          .getOrCreate()
      
        val df = spark.read
          .option("header", true)
          .csv("./dataset/BeijingPM20100101_20151231.csv")
      
        val jsonRDD = df.toJSON.rdd
        spark.read.json(jsonRDD).show() // 直接从rdd中读出DataFrame
      }
      

      在这里插入图片描述

Hive

  • Hive
    • SparkSQL 整合 Hive

      和一个文件格式不同, Hive是一个外部的数据存储和查询引擎, 所以如果 Spark要访问 Hiv的话, 就需要先整合 HiveHive

      整合什么 ?

      如果要讨论 SparkSQL 如何和 Hive 进行整合, 首要考虑的事应该是 Hive 有什么, 有什么就整合什么就可以

      • **MetaStore, 元数据存储**

        SparkSQL 内置的有一个 MetaStore, 通过嵌入式数据库 Derby 保存元信息, 但是对于生产环境来说, 还是应该使用 HiveMetaStore, 一是更成熟, 功能更强, 二是可以使用 Hive 的元信息

      • 查询引擎

        SparkSQL 内置了 HiveSQL 的支持, 所以无需整合

      为什么要开启HiveMetaStore

      HiveMetaStore 是一个 Hive 的组件, 一个 Hive 提供的程序, 用以保存和访问表的元数据, 整个 Hive 的结构大致如下

      在这里插入图片描述

      由上图可知道, 其实 Hive 中主要的组件就三个, HiveServer2 负责接受外部系统的查询请求, 例如 JDBC, HiveServer2 接收到查询请求后, 交给 Driver 处理, Driver 会首先去询问 MetaStore 表在哪存, 后 Driver 程序通过 MR 程序来访问 HDFS 从而获取结果返回给查询请求者

      HiveMetaStoreSparkSQL 的意义非常重大, 如果 SparkSQL 可以直接访问 HiveMetaStore, 则理论上可以做到和 Hive 一样的事情, 例如通过 Hive 表查询数据

      而 Hive 的 MetaStore 的运行模式有三种

      • 内嵌 Derby 数据库模式

        这种模式不必说了, 自然是在测试的时候使用, 生产环境不太可能使用嵌入式数据库, 一是不稳定, 二是这个 Derby 是单连接的, 不支持并发

      • Local 模式

        Local 和 Remote 都是访问 MySQL 数据库作为存储元数据的地方, 但是 Local 模式的 MetaStore 没有独立进程, 依附于 HiveServer2 的进程

      • Remote 模式

        和 Loca 模式一样, 访问 MySQL 数据库存放元数据, 但是 Remote 的 MetaStore 运行在独立的进程中

        我们显然要选择 Remote 模式, 因为要让其独立运行, 这样才能让 SparkSQL 一直可以访问

      Hive 开启 MetaStore

      hive 下载:

      https://archive.apache.org/dist/hive/hive-2.3.4/

      Step 1: 修改 hive-site.xml

      <property>
      	<name>javax.jdo.option.ConnectionURL</name>
      	<value>jdbc:mysql://master:3306/metastore?createDatabaseIfNotExist=true&amp;useSSL=false</value>
      	<description>JDBC connect string for a JDBC metastore</description>
      </property>
      
      <property>
      	<name>javax.jdo.option.ConnectionDriverName</name>
      	<value>com.mysql.jdbc.Driver</value>
      	<description>Driver class name for a JDBC metastore</description>
      </property>
      
      <property>
      	<name>javax.jdo.option.ConnectionUserName</name>
      	<value>root</value>
      	<description>username to use against metastore database</description>
      </property>
      
      <property>
      	<name>javax.jdo.option.ConnectionPassword</name>
      	<value>Password123.</value>
       	<description>password to use against metastore database</description>
      </property>
      
      <property>
        	<name>hive.metastore.warehouse.dir</name>
        	<value>/user/hive/warehouse</value> # hdfs 上的目录,默认这个位置
      </property>
      
      <!-不校验元数据->
      <property>
      <name>hive.metastore.schema.verification</name>
      <value>false</value>
      </property>
      
      <property>
      	<name>hive.metastore.uris</name>
        	<value>thrift://master:9083</value>  # 当前服务器
      </property>
      
      # 下面四个也要
      <property> 
      	<name>hive.downloaded.resources.dir</name> 
      	<value>/root/hive/tmp</value> 
      </property> 
      
      <property>  
      	<name>hive.exec.local.scratchdir</name> 
      	<value>/root/hive/tmp/${hive.session.id}_resources</value> 
      </property> 
      
      <property> 
      	<name>hive.querylog.location</name> 
      	<value>/root/hive/tmp</value> 
      </property> 
      
      <property> 
      	<name>hive.server2.logging.operation.log.location</name> 
      	<value>/root/hive/tmp/operation_logs</value> 
      </property>
      
      # 解决Hive执行insert命令调用MR任务报错问题
      <property>
          <name>hive.exec.mode.local.auto</name>
          <value>true</value>
          <description>Let Hive determine whether to run in local mode automatically</description>
       </property>
      
      #  下面两个不动
      <property>
      	<name>hive.metastore.local</name> # hive 2.x版本没这个
        	<value>false</value>
      </property>
      <↓-若元数据不存在,则自动生成>
      <property>
      <name>datanucleus.schema.autoCreateAll</name>
      <value>true</value>
      </property>
      

      Step 2: 启动 Hive MetaStore

      nohup /root/hive/bin/hive --service metastore 2>&1 >> /var/log.log &
      

      SparkSQL 整合 Hive 的 MetaStore

      即使不去整合 MetaStore, Spark 也有一个内置的 MateStore, 使用 Derby 嵌入式数据库保存数据, 但是这种方式不适合生产环境, 因为这种模式同一时间只能有一个 SparkSession 使用, 所以生产环境更推荐使用 Hive 的 MetaStore

      SparkSQL 整合 Hive 的 MetaStore 主要思路就是要通过配置能够访问它, 并且能够使用 HDFS 保存 WareHouse, 这些配置信息一般存在于 Hadoop 和 HDFS 的配置文件中, 所以可以直接拷贝 Hadoop 和 Hive 的配置文件到 Spark 的配置目录

      # 将 hive/conf 下的 hive-site.xml,拷贝到spark的conf下
      cp /root/hive/conf/hive-site.xml /root/spark/conf/
      # 将 hadoop/etc/hadoop 下的 hdfs-site.xml 和 core-site.xml 拷贝到spark的conf下
      cp /root/hadoop/etc/hadoop/core-site.xml /root/spark/conf/
      cp /root/hadoop/etc/hadoop/hdfs-site.xml /root/spark/conf/
      

      小总结:

      1. Spark 需要 hive-site.xml 的原因是, 要读取 Hive 的配置信息, 主要是元数据仓库的位置等信息
      2. Spark 需要 core-site.xml 的原因是, 要读取安全有关的配置
      3. Spark 需要 hdfs-site.xml 的原因是, 有可能需要在 HDFS 中放置表文件, 所以需要 HDFS 的配置

      如果不希望通过拷贝文件的方式整合 Hive, 也可以在 SparkSession 启动的时候, 通过指定 Hive 的 MetaStore 的位置来访问, 但是更推荐整合的方式

    • 在 Hive 中创建表

      第一步, 需要先将文件上传到集群中, 使用如下命令上传到 HDFS 中

      hdfs dfs -mkdir -p /dataset
      hdfs dfs -put studenttab10k /dataset
      

      第二步, 使用 Hive 或者 Beeline 执行如下 SQL

      **# 进入 hive
      hive
      # SQL如下
      create database if not exists spark01; # 建库,默认在hive-site.xml下配置的hdfs目录下/user/hive/warehouse
      
      use spark01; # 进入当前库
      
      create external table student # 建表
      (
        name  STRING, # 名字
        age   INT, # 年龄
        gpa   string # 平均分
      )
      row format delimited 
      fields terminated by '\t' # fields(字段) 之间的分隔符 by 制表符
      lines terminated by '\n' # lines(行) 之间的分隔符 by 换行
      stored as TEXTFILE # 指定文件是以什么格式存储,这里是文本文件,所以是TEXTFILE 
      location '/user/hive/warehouse'; # 指定目录/user/hive/warehouse
      
      LOAD DATA INPATH '/dataset/studenttab10k' OVERWRITE INTO TABLE student; # 将目标文件数据上传到表中
      
      select * from student limit 10; # 查询前10条数据**
      
    • SparkSQL操作Hive表 (查询 Hive 的表 和 创建 Hive 表)

      (只有使用spark-shell或者spark-submit的时候才能享受这个整合的便利)

      通过 SparkSQL 查询 Hive 的表

      查询 Hive 中的表可以直接通过 spark.sql(…) 来进行, 可以直接在其中访问 Hive 的 MetaStore, 前提是一定要将 Hive 的配置文件拷贝到 Spark 的 conf 目录

      **scala> spark.sql("use spark01")
      res0: org.apache.spark.sql.DataFrame = []
      
      scala> spark.sql("select * from student limit 100")
      res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
      
      scala> res1.show() // 拿数据的时候show
      // ....数据**
      

      通过 SparkSQL 创建 Hive 表

      通过 SparkSQL 可以直接创建 Hive 表, 并且使用 load data加载数据

      scala> :paste
      // Entering paste mode (ctrl-D to finish)
      
      val createTableStr =
        """
          |create EXTERNAL TABLE student
          |(
          |  name  STRING,
          |  age   INT,
          |  gpa   string
          |)
          |ROW FORMAT DELIMITED
          |  FIELDS TERMINATED BY '\t'
          |  LINES TERMINATED BY '\n'
          |STORED AS TEXTFILE
          |LOCATION '/user/hive/warehouse'
        """.stripMargin
      
      spark.sql("CREATE DATABASE IF NOT EXISTS spark02")
      spark.sql("USE spark02")
      spark.sql(createTableStr)
      spark.sql("LOAD DATA INPATH '/dataset/studenttab10k' OVERWRITE INTO TABLE student")
      
      // Exiting paste mode, now interpreting.
      
      22/08/06 03:24:42 ERROR hdfs.KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !!
      createTableStr: String =
      "
      create EXTERNAL TABLE student
      (
        name  STRING,
        age   INT,
        gpa   string
      )
      ROW FORMAT DELIMITED
        FIELDS TERMINATED BY '\t'
        LINES TERMINATED BY '\n'
      STORED AS TEXTFILE
      LOCATION '/user/hive/warehouse'
        "
      res0: org.apache.spark.sql.DataFrame = []
      
      scala> spark.sql("select * from student limit 100")
      res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
      
      scala> res1.where('age > 50).show()
      # 。。。数据
      

      目前 SparkSQL 支持的文件格式有 sequencefile, rcfile, orc, parquet, textfile, avro, 并且也可以指定 serde 的名称

    • SparkSQL 将数据插入到 Hive表中 (idea 打包上传运行)

      使用 SparkSQL 处理数据并保存进 Hive 表

      前面都在使用 SparkShell 的方式来访问 Hive, 编写 SQL, 通过 Spark 独立应用的形式也可以做到同样的事, 但是需要一些前置的步骤, 如下

      Step 1: 导入 Maven 依赖

      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive_2.11</artifactId>
          <version>2.1.1</version>
      </dependency>
      

      Step2:配置SparkSession
      如果希望使用SparkSQL访问Hive的话,需要做两件事

      1. 开启SparkSession的Hive支持

        经过这一步配置, SparkSQL 才会把 SQL 语句当作 HiveSQL 来进行解析

      2. 设置WareHouse的位置

        虽然 hive-stie.xml 中已经配置了 WareHouse 的位置, 但是在 Spark 2.0.0 后已经废弃了 hive-site.xml 中设置的 hive.metastore.warehouse.dir, 需要在 SparkSession 中设置 WareHouse 的位置

      3. 设置MetaStore的位置

      val spark = SparkSession
        .builder()
        .appName("hive example")
        .config("spark.sql.warehouse.dir", "hdfs://master:9000/user/hive/warehouse")  //1.设置 WareHouse 的位置
        .config("hive.metastore.uris", "thrift://Bigdata01:9083") //2.设置 MetaStore 的位置
        .enableHiveSupport()                   //3.开启 Hive 支持                                
        .getOrCreate()
      

      配置好了以后, 就可以通过 DataFrame 处理数据, 后将数据结果推入 Hive 表中了, 在将结果保存到 Hive 表的时候, 可以指定保存模式

      全套代码如下:

      object HiveAccess {
        def main(args: Array[String]): Unit = {
          //1.创建SparkSession
          //  1.开启hive支持
          //  2.指定Metastore 的位置
          //  3.指定Warehouse 的位置
          val spark = SparkSession.builder()
            .appName("hive_example")
            .config("Spark.sql.warehouse.dir", "hdfs://master:9000/usr/hive/warehouse")
            .config("hive.metastore.uris", "thrift://master:9083")
            .enableHiveSupport()
            .getOrCreate()
      
          // 2. 读取数据
          /**
           * 1.上传HDFS, 因为要在集群中执行,所以没办法保证程序在哪个机器上执行
           * 所以,要把文件上传到所有机器中,才能读取本地文件,
           * 上传到HDFS中就可以解决这个问题,所有的机器都可以读取HDFS中的文件
           * 它是一个外部系统
           * // 这里将数据文件放到hdfs://master:9000/dataset/下
           * 2.使用DF读取文件
           */
          val schema = StructType( // 新建schema信息
            List(
              StructField("name", StringType),
              StructField("age", IntegerType),
              StructField("gpa", FloatType)
            )
          )
      
          import spark.implicits._
          val df = spark.read
            .option("delimiter", "\t") // 指定分隔符
            .schema(schema) // 传入结构信息
            .csv("hdfs:///dataset/studenttab10k")
      
          val resultDF = df.where('age>50) // 将年龄大于50岁的插入进去
          // 3. 写入数据, 使用写入表的API, saveAsTable
      		// 通过 mode 指定保存模式, 通过 saveAsTable 保存数据到 Hive
          resultDF.write.mode(SaveMode.Overwrite).saveAsTable("spark02.student")
        }
      }
      

      打包上传jar

      在这里插入图片描述

      集群运行

      cd /root/
      spark/bin/spark-submit --master spark://master:7077 --class cn.itcast.spark.sql.HiveAccess original-spark-1.0-SNAPSHOT.jar
      

      hive 查看数据

      hive
      select * from spark02.student;
      

      加粗样式

JDBC

  • JDBC
    • 准备 MySQL 环境

      在使用 SparkSQL 访问 MySQL 之前, 要对 MySQL 进行一些操作, 例如说创建用户, 表和库等

      Step 1: 连接 MySQL 数据库

      在 MySQL 所在的主机上执行如下命令

      mysql -uroot -p(密码)
      

      Step 2:创建库和表

      create database spark02; # 建库
       
      use spark02; # 使用库
      
      # 建表
      create table student( 
      	id int auto_increment,
      	name varchar(100) not null,
      	age int not null,
      	gpa float not null,
      	primary key (id))
      	engine=InnoDB default CHARSET=utf8;
      

      在这里插入图片描述

      Step 3: 创建 Spark 使用的用户

      **# 创建spar03用户,在所有地方都可以访问@'%',再指定密码
      create user 'spark03'@'%' identified by 'Password123.';
      # 赋予 spark03用户有spark02下所有表的权限,在任何位置
      grant all on spark02.* to 'spark03'@'%';**
      
    • 使用 SparkSQL 向 MySQL 中写入数据

      其实在使用 SparkSQL 访问 MySQL 是通过 JDBC, 那么其实所有支持 JDBC 的数据库理论上都可以通过这种方式进行访问

      使用 JDBC 访问关系型数据的时候, 其实也是使用 DataFrameReader, 对 DataFrameReader 提供一些配置, 就可以使用 Spark访问 JDBC, 有如下几个配置可用

      属性含义
      url要连接的 JDBC URL
      dbtable要访问的表, 可以使用任何 SQL 语句中 from 子句支持的语法
      fetchsize数据抓取的大小(单位行), 适用于读的情况
      batchsize数据传输的大小(单位行), 适用于写的情况
      isolationLevel事务隔离级别, 是一个枚举, 取值 NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE, 默认为 READ_UNCOMMITTED

      读取数据集,处理过后存往MySQL中的代码如下

      /*
      * MySQL 的访问方式有两种:1.使用本地运行,2.提交到集群运行
      * 写入 MYSQL 数据时,使用本地运行, 读取数据时,使用集群运行
      * */
      object MySQLWrite {
        def main(args: Array[String]): Unit = {
          // 1. 创建SparkSession 对象
          val spark = SparkSession.builder()
            .appName("mysql_write")
            .master("local[6]")
            .getOrCreate()
      
          // 2. 读取数据创建 DataFrame
          //    1. 拷贝文件
          //    2. 读取
          import spark.implicits._
      
          // 编写 schema 结构信息
          val schema = StructType(
            List(
              StructField("name", StringType),
              StructField("age", IntegerType),
              StructField("float", FloatType)
            )
          )
          // 读数据
          val df = spark.read
            .option("delimiter", "\t")
            .schema(schema) // 加入schema 结果信息
            .csv("./dataset/studenttab10k")
          // 3. 处理数据
          val resultDF = df.where("age < 30")
      
          // 4. 落地数据
          resultDF.write
            .format("jdbc") 
            .option("url","jdbc:mysql://192.168.169.134:3306/spark02") // 到库
            .option("dbtable","student") // 表
            .option("user","spark03") // 用户
            .option("password","Password123.") // 
            .mode(SaveMode.Overwrite) // 指定覆盖模式
            .save()
        }
      }
      

      运行程序

      如果是在本地运行, 需要导入 Maven 依赖

      <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.47</version>
      </dependency>
      

      Linux查询

      mysql> use spark02;
      
      mysql> select * from student;
      

      在这里插入图片描述

    • 从 MySQL 中读取数据

      读取 MySQL 的方式也非常的简单, 只是使用 SparkSQL 的 DataFrameReader 加上参数配置即可访问

      object MySQLRead {
        def main(args: Array[String]): Unit = {
          val spark = SparkSession.builder()
            .appName("mysql_read")
            .master("local[6]")
            .getOrCreate()
      
          spark.read
            .format("jdbc")
            .option("url","jdbc:mysql://192.168.169.134:3306/spark02")
            .option("dbtable","student")
            .option("user","spark03")
            .option("password","Password123.")
            .load() // 加载数据
            .show()
        }
      }
      

      在这里插入图片描述

      默认情况下读取 MySQL 表时, 从 MySQL 表中读取的数据放入了一个分区, 拉取后可以使用 DataFrame 重分区来保证并行计算和内存占用不会太高, 但是如果感觉 MySQL 中数据过多的时候, 读取时可能就会产生 OOM, 所以在数据量比较大的场景, 就需要在读取的时候就将其分发到不同的 RDD 分区

      属性含义
      partitionColumn指定按照哪一列进行分区, 只能设置类型为数字的列, 一般指定为 ID
      lowerBound, upperBound确定步长的参数, lowerBound - upperBound 之间的数据均分给每一个分区, 小于 lowerBound 的数据分给第一个分区, 大于 upperBound 的数据分给最后一个分区
      numPartition分区数量
      // 上面的 read 修改一下
      spark.read
            .format("jdbc")
            .option("url","jdbc:mysql://192.168.169.134:3306/spark02")
            .option("dbtable","student")
            .option("user","spark03")
            .option("password","Password123.")
            .option("partitionColumn","age")
            .option("lowerBound", 1)
            .option("upperBound", 60)
            .option("numPartitions", 10)
            .load() // 加载数据
            .show()
      

      在这里插入图片描述

      有时候可能要使用非数字列来作为分区依据, Spark 也提供了针对任意类型的列作为分区依据的方法

      val predicates = Array(
        "age < 20",
        "age >= 20, age < 30",
        "age >= 30"
      )
      
          val connectionProperties = new Properties()
          connectionProperties.setProperty("user", "spark03")
          connectionProperties.setProperty("password", "Password123.")
      
          spark.read
            .jdbc(
              url = "jdbc:mysql://192.168.169.134:3306/spark02",
              table = "student",
              predicates = predicates, // predicates where 子句中的条件列表;每一个定义一个分区
              connectionProperties = connectionProperties
            )
            .show()
      

      在这里插入图片描述

      SparkSQL 中并没有直接提供按照 SQL 进行筛选读取数据的 API 和参数, 但是可以通过 dbtable 来曲线救国, dbtable 指定目标表的名称, 但是因为 dbtable 中可以编写 SQL, 所以使用子查询即可做到

      spark.read
            .format("jdbc")
            .option("url","jdbc:mysql://192.168.169.134:3306/spark02")
      //      .option("dbtable","student")
      			// 执行语句
            .option("dbtable", "(select name, age from student where age > 10 and age < 20) as stu")
            .option("user","spark03")
            .option("password","Password123.")
            .option("partitionColumn","age")
            .option("lowerBound", 1)
            .option("upperBound", 60)
            .option("numPartitions", 10)
            .load() // 加载数据
            .show(500)
      

      在这里插入图片描述

文件

  • 文件

    DataFrame 所用文件

    DataFrame 所用文件[BeijingPM20100101_20151231.rar]

    Windows写文件的软件

    Windows写文件的软件[winutils.rar]

    Hive JDBC 所用文件

    Hive JDBC 所用文件[studenttab10k.rar]

【资源中一一上传】

文章来源:https://blog.csdn.net/m0_56181660/article/details/135637415
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。