load data inpath ‘/dataset/studenttab10k’ overwrite into table student;
初识 DataFrameReader (读)
SparkSQL
的一个非常重要的目标就是完善数据读取, 所以 SparkSQL
中增加了一个新的框架, 专门用于读取外部数据源, 叫做 DataFrameReader
@Test
def reader1(): Unit = {
// 1. 创建SparkSession 对象
val spark = SparkSession.builder()
.master("local[6]")
.appName("reader1")
.getOrCreate()
// 2. 框架在哪
val reader:DataFrameReader = spark.read
}
DataFrameReader
由如下几个组件组成
组件 | 解释 |
---|---|
schema | 结构信息, 因为 Dataset是有结构的, 所以在读取数据的时候, 就需要有 Schema信息, 有可能是从外部数据源获取的, 也有可能是指定的 |
option | 连接外部数据源的参数, 例如 JDBC 的 URL, 或者读取 CSV文件是否引入 Header等 |
format | 外部数据源的格式, 例如 csv, jdbc, json |
等 |
DataFrameReader
有两种访问方式, 一种是使用 load
方法加载, 使用 format
指定加载格式, 还有一种是使用封装方法, 类似 csv
, json
, jdbc
等
@Test
def reader2(): Unit = {
// 1. 创建SparkSession 对象
val spark = SparkSession.builder()
.master("local[6]")
.appName("reader2")
.getOrCreate()
// 2. 第一种形式
spark.read
.format("csv") // 文件格式
.option("header", value = true) // 显示头部
.option("inferSchema", value = true) // 除头部的其他信息,结构信息
.load("./dataset/BeijingPM20100101_20151231.csv") // 文件地址
.show(10) // 展示10行数据
// 3. 第二种形式
spark.read
.option("header", value = true)
.option("inferSchema", value = true)
.csv("./dataset/BeijingPM20100101_20151231.csv")
.show()
}
但是其实这两种方式本质上一样, 因为类似 csv 这样的方式只是 load 的封装
注意:
总结
初识 DataFrameWriter (写)
对于 ETL 来说, 数据保存和数据读取一样重要, 所以 SparkSQL 中增加了一个新的数据写入框架, 叫做 DataFrameWriter
@Test
def writer1():Unit = {
// 1. 创建SparkSession 对象
val spark = SparkSession.builder()
.master("local[6]")
.appName("writer1")
.getOrCreate()
val df = spark.read
.option("header", true)
.csv("./dataset/BeijingPM20100101_20151231.csv")
// 2. 框架在哪
val writer: DataFrameWriter[Row] = df.write
}
DataFrameWriter
中由如下几个部分组成
组件 | 解释 |
---|---|
source | 写入目标, 文件格式等, 通过 format方法设定 |
mode | 写入模式, 例如一张表已经存在, 如果通过 DataFrameWriter向这张表中写入数据, 是覆盖表呢, 还是向表中追加呢? 通过 mode方法设定 |
extraOptions | 外部参数, 例如 JDBC的 URL, 通过 options, option设定 |
partitioningColumns | 类似 Hive的分区, 保存表的时候使用, 这个地方的分区不是 RDD的分区, 而是文件的分区, 或者表的分区, 通过 partitionBy设定 |
bucketColumnNames | 类似 Hive的分桶, 保存表的时候使用, 通过 bucketBy设定 |
sortColumnNames | 用于排序的列, 通过 sortBy设定 |
mode
指定了写入模式, 例如覆盖原数据集, 或者向原数据集合中尾部添加等
Scala 对象表示 | 字符串表示 | 解释 |
---|---|---|
SaveMode.ErrorIfExists | "error” | 将 DataFrame保存到 source时, 如果目标已经存在, 则报错 |
SaveMode.Append | "append” | 将 DataFrame保存到 source时, 如果目标已经存在, 则添加到文件或者 Tabl中 |
SaveMode.Overwrite | "overwrite” | 将 DataFrame保存到 source时, 如果目标已经存在, 则使用 DataFrame中的数据完全覆盖目标 |
SaveMode.Ignore | "ignore” | 将 DataFrame保存到 source时, 如果目标已经存在, 则不会保存 DataFrame数据, 并且也不修改目标数据集, 类似于 CREATE TABLE IF NOT EXISTS |
DataFrameWriter 也有两种使用方式, 一种是使用 format 配合 save, 还有一种是使用封装方法, 例如 csv, json, saveAsTable 等
@Test
def writer2():Unit = {
System.setProperty("hadoop.home.dir","C:\\winutils") // Windows 写文件必须的
// 1. 创建SparkSession 对象
val spark = SparkSession.builder()
.master("local[6]")
.appName("writer2")
.getOrCreate()
// 2. 读取数据集
val df = spark.read
.option("header", true)
.csv("./dataset/BeijingPM20100101_20151231.csv")
// 3. 写入数据集
// 使用 json 保存, 因为方法是 json, 所以隐含的 format 是 json
df.write.json(("./dataset/beijing_pm.json"))
// 使用 save 保存, 使用 format 设置文件格式
df.write.format("json").save("./dataset/beijing_pm2.json")
}
总结
读写 Parquet 格式文件
什么时候会用到 Parquet ?
在 ETL 中, Spark 经常扮演 T 的职务, 也就是进行数据清洗和数据转换.
为了能够保存比较复杂的数据, 并且保证性能和压缩率, 通常使用 Parquet 是一个比较不错的选择.
所以外部系统收集过来的数据, 有可能会使用 Parquet, 而 Spark 进行读取和转换的时候, 就需要支持对 Parquet 格式的文件的支持.
一般处理数据都差不多是ETL的过程
E -> 抽取 T -> 处理,转换L -> 装载,落底
使用代码读写 Parquet 文件
默认不指定 format, 默认的 就是读写 Parquet 格式的文件
object Parquet {
def main(args: Array[String]): Unit = {
System.setProperty("hadoop.home.dir", "c:\\winutils")
val spark = SparkSession.builder()
.master("local[6]")
.appName("parquet")
.getOrCreate()
// 读取 CSV 文件数据
val df = spark.read.option("header", true).csv("./dataset/BeijingPM20100101_20151231.csv")
// 把数据写为 Parquet 格式
// 写入的格式默认parquet
// 写入模式,报错,覆盖,追加,忽略
df.write
// .format("parquet") // 设置类型 ,默认parquet
.mode(SaveMode.Overwrite) //看mode源码,重写内容,将DataFrame保存到source时, 如果目标已经存在, 则使用DataFrame中的数据完全覆盖目标
.save("dataset/beijing_pm3")
// 读取 Parquet 格式文件
// 默认格式是否是parquet? -> 是
// 是否可能读取文件夹呢? -> 是
spark.read
.load("./dataset/beijing_pm3")
.show()
}
}
分区
写入 Parquet 的时候可以指定分区
Spark 在写入文件的时候是支持分区的, 可以像 Hive 一样设置某个列为分区列
@Test
def parquetPartitions():Unit={
System.setProperty("hadoop.home.dir","C:\\winutils")
// 1. 创建SparkSession 对象
val spark = SparkSession.builder()
.master("local[6]")
.appName("writer2")
.getOrCreate()
// 2. 读取数据
val df = spark.read
.option("header",true)
.csv("./dataset/BeijingPM20100101_20151231.csv")
// 3. 写文件,表分区
df.write
.partitionBy("year","month")
.save("dataset/beijing_pm4")
// 4. 读文件,自动发现分区
// 写分区表的时候,分区列不会包含在生成的文件中
// 直接通过文件来进行读取的话,分区信息会丢失
// SparkSQL 会进行自动的分区发现,
spark.read
// .parquet("./dataset/beijing_pm4/year=2010/month=1")
.parquet("./dataset/beijing_pm4/") // 所以直接读取最外层文件夹
.printSchema() // 打印这个时把2 和 3步骤注释
}
注意:这里指的分区是类似 Hive 中表分区的概念, 而不是 RDD 分布式分区的含义
分区发现
在读取常见文件格式的时候, Spark 会自动的进行分区发现, 分区自动发现的时候, 会将文件名中的分区信息当作一列. 例如 如果按照性别分区, 那么一般会生成两个文件夹 gender=male 和 gender=female, 那么在使用 Spark 读取的时候, 会自动发现这个分区信息, 并且当作列放入创建的 DataFrame 中
SparkSession 中有关 Parquet 的配置
配置 | 默认值 | 含义 |
---|---|---|
spark.sql.parquet.binaryAsString | false | 一些其他 Parquet 生产系统, 不区分字符串类型和二进制类型, 该配置告诉 SparkSQL 将二进制数据解释为字符串以提供与这些系统的兼容性 |
spark.sql.parquet.int96AsTimestamp | true | 一些其他 Parquet 生产系统, 将 Timestamp 存为 INT96, 该配置告诉 SparkSQL 将 INT96 解析为 Timestamp |
spark.sql.parquet.cacheMetadata | true | 打开 Parquet 元数据的缓存, 可以加快查询静态数据 |
spark.sql.parquet.compression.codec | snappy | 压缩方式, 可选 uncompressed, snappy, gzip, lzo |
spark.sql.parquet.mergeSchema | false | 当为 true 时, Parquet 数据源会合并从所有数据文件收集的 Schemas 和数据, 因为这个操作开销比较大, 所以默认关闭 |
spark.sql.optimizer.metadataOnly | true | 如果为 true, 会通过原信息来生成分区列, 如果为 false 则就是通过扫描整个数据集来确定 |
总结:
读写 JSON 格式文件
什么时候会用到 JSON ?
在 ETL 中, Spark 经常扮演 T 的职务, 也就是进行数据清洗和数据转换.
在业务系统中, JSON 是一个非常常见的数据格式, 在前后端交互的时候也往往会使用 JSON, 所以从业务系统获取的数据很大可能性是使用 JSON 格式, 所以就需要 Spark 能够支持 JSON 格式文件的读取
读写 JSON 文件
将要 Dataset 保存为 JSON 格式的文件比较简单, 是 DataFrameWriter 的一个常规使用
@Test
def json(): Unit = {
System.setProperty("hadoop.home.dir","C:\\winutils")
val spark = SparkSession.builder()
.master("local[6]")
.appName("json")
.getOrCreate()
// 读
val df = spark.read
.option("header",true)
.csv("./dataset/BeijingPM20100101_20151231.csv")
// 写
// df.write
// .json("./dataset/beijing_pm5")
// 读json (先写json再读)
spark.read
.json("./dataset/beijing_pm5")
.show()
}
如果不重新分区, 则会为 DataFrame
底层的 RDD 的每个分区生成一个文件, 为了保持只有一个输出文件, 所以重新分区
保存为 JSON 格式的文件有一个细节需要注意, 这个 JSON
格式的文件中, 每一行是一个独立的 JSON, 但是整个文件并不只是一个 JSON 字符串, 所以这种文件格式很多时候被成为 JSON Line
文件, 有时候后缀名也会变为 jsonl
beijing_pm.jsonl
{"day":"1","hour":"0","season":"1","year":2013,"month":3}
{"day":"1","hour":"1","season":"1","year":2013,"month":3}
{"day":"1","hour":"2","season":"1","year":2013,"month":3}
JSON 格式的文件是有结构信息的, 也就是 JSON 中的字段是有类型的, 例如 “name”: “zhangsan”
这样由双引号包裹的 Value, 就是字符串类型, 而 “age”: 10 这种没有双引号包裹的就是数字类型, 当然, 也可以是布尔型 “has_wife”: true
Spark 读取 JSON Line 文件的时候, 也会自动的推断类型信息
总结
json的两个应用场景
/**
* toJSON 的场景:
* 处理完了以后, DataFrame中如果是一个对象, 如果其他的系统只支持 JSON 格式的数据
* SParkSQL 如果和这种系统进行整合的时候, 就需要进行转换
*/
@Test
def json1():Unit = {
val spark = SparkSession.builder()
.master("local[6]")
.appName("json1")
.getOrCreate()
val df = spark.read
.option("header",true)
.csv("./dataset/BeijingPM20100101_20151231.csv")
df.toJSON.show()
}
/**
* 从消息队列中取出JSON格式的数据, 需要使用 SparkSQL 进行处理
*/
@Test
def json2():Unit = {
val spark = SparkSession.builder()
.master("local[6]")
.appName("json2")
.getOrCreate()
val df = spark.read
.option("header", true)
.csv("./dataset/BeijingPM20100101_20151231.csv")
val jsonRDD = df.toJSON.rdd
spark.read.json(jsonRDD).show() // 直接从rdd中读出DataFrame
}
SparkSQL 整合 Hive
和一个文件格式不同, Hive
是一个外部的数据存储和查询引擎, 所以如果 Spark
要访问 Hiv
的话, 就需要先整合 Hive
Hive
整合什么 ?
如果要讨论 SparkSQL
如何和 Hive
进行整合, 首要考虑的事应该是 Hive
有什么, 有什么就整合什么就可以
**MetaStore
, 元数据存储**
SparkSQL
内置的有一个 MetaStore
, 通过嵌入式数据库 Derby
保存元信息, 但是对于生产环境来说, 还是应该使用 Hive
的 MetaStore
, 一是更成熟, 功能更强, 二是可以使用 Hive
的元信息
查询引擎
SparkSQL
内置了 HiveSQL
的支持, 所以无需整合
为什么要开启Hive
的MetaStore
Hive
的 MetaStore
是一个 Hive
的组件, 一个 Hive
提供的程序, 用以保存和访问表的元数据, 整个 Hive
的结构大致如下
由上图可知道, 其实 Hive
中主要的组件就三个, HiveServer2
负责接受外部系统的查询请求, 例如 JDBC
, HiveServer2
接收到查询请求后, 交给 Driver
处理, Driver
会首先去询问 MetaStore
表在哪存, 后 Driver
程序通过 MR
程序来访问 HDFS
从而获取结果返回给查询请求者
而 Hive
的 MetaStore
对 SparkSQL
的意义非常重大, 如果 SparkSQL
可以直接访问 Hive
的 MetaStore
, 则理论上可以做到和 Hive
一样的事情, 例如通过 Hive
表查询数据
而 Hive 的 MetaStore 的运行模式有三种
内嵌 Derby 数据库模式
这种模式不必说了, 自然是在测试的时候使用, 生产环境不太可能使用嵌入式数据库, 一是不稳定, 二是这个 Derby 是单连接的, 不支持并发
Local 模式
Local 和 Remote 都是访问 MySQL 数据库作为存储元数据的地方, 但是 Local 模式的 MetaStore 没有独立进程, 依附于 HiveServer2 的进程
Remote 模式
和 Loca 模式一样, 访问 MySQL 数据库存放元数据, 但是 Remote 的 MetaStore 运行在独立的进程中
我们显然要选择 Remote 模式, 因为要让其独立运行, 这样才能让 SparkSQL 一直可以访问
Hive 开启 MetaStore
hive 下载:
https://archive.apache.org/dist/hive/hive-2.3.4/
Step 1: 修改 hive-site.xml
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:mysql://master:3306/metastore?createDatabaseIfNotExist=true&useSSL=false</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>com.mysql.jdbc.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>root</value>
<description>username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>Password123.</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/user/hive/warehouse</value> # hdfs 上的目录,默认这个位置
</property>
<!-不校验元数据->
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.uris</name>
<value>thrift://master:9083</value> # 当前服务器
</property>
# 下面四个也要
<property>
<name>hive.downloaded.resources.dir</name>
<value>/root/hive/tmp</value>
</property>
<property>
<name>hive.exec.local.scratchdir</name>
<value>/root/hive/tmp/${hive.session.id}_resources</value>
</property>
<property>
<name>hive.querylog.location</name>
<value>/root/hive/tmp</value>
</property>
<property>
<name>hive.server2.logging.operation.log.location</name>
<value>/root/hive/tmp/operation_logs</value>
</property>
# 解决Hive执行insert命令调用MR任务报错问题
<property>
<name>hive.exec.mode.local.auto</name>
<value>true</value>
<description>Let Hive determine whether to run in local mode automatically</description>
</property>
# 下面两个不动
<property>
<name>hive.metastore.local</name> # hive 2.x版本没这个
<value>false</value>
</property>
<↓-若元数据不存在,则自动生成>
<property>
<name>datanucleus.schema.autoCreateAll</name>
<value>true</value>
</property>
Step 2: 启动 Hive MetaStore
nohup /root/hive/bin/hive --service metastore 2>&1 >> /var/log.log &
SparkSQL 整合 Hive 的 MetaStore
即使不去整合 MetaStore, Spark 也有一个内置的 MateStore, 使用 Derby 嵌入式数据库保存数据, 但是这种方式不适合生产环境, 因为这种模式同一时间只能有一个 SparkSession 使用, 所以生产环境更推荐使用 Hive 的 MetaStore
SparkSQL 整合 Hive 的 MetaStore 主要思路就是要通过配置能够访问它, 并且能够使用 HDFS 保存 WareHouse, 这些配置信息一般存在于 Hadoop 和 HDFS 的配置文件中, 所以可以直接拷贝 Hadoop 和 Hive 的配置文件到 Spark 的配置目录
# 将 hive/conf 下的 hive-site.xml,拷贝到spark的conf下
cp /root/hive/conf/hive-site.xml /root/spark/conf/
# 将 hadoop/etc/hadoop 下的 hdfs-site.xml 和 core-site.xml 拷贝到spark的conf下
cp /root/hadoop/etc/hadoop/core-site.xml /root/spark/conf/
cp /root/hadoop/etc/hadoop/hdfs-site.xml /root/spark/conf/
小总结:
如果不希望通过拷贝文件的方式整合 Hive, 也可以在 SparkSession 启动的时候, 通过指定 Hive 的 MetaStore 的位置来访问, 但是更推荐整合的方式
在 Hive 中创建表
第一步, 需要先将文件上传到集群中, 使用如下命令上传到 HDFS 中
hdfs dfs -mkdir -p /dataset
hdfs dfs -put studenttab10k /dataset
第二步, 使用 Hive 或者 Beeline 执行如下 SQL
**# 进入 hive
hive
# SQL如下
create database if not exists spark01; # 建库,默认在hive-site.xml下配置的hdfs目录下/user/hive/warehouse
use spark01; # 进入当前库
create external table student # 建表
(
name STRING, # 名字
age INT, # 年龄
gpa string # 平均分
)
row format delimited
fields terminated by '\t' # fields(字段) 之间的分隔符 by 制表符
lines terminated by '\n' # lines(行) 之间的分隔符 by 换行
stored as TEXTFILE # 指定文件是以什么格式存储,这里是文本文件,所以是TEXTFILE
location '/user/hive/warehouse'; # 指定目录/user/hive/warehouse
LOAD DATA INPATH '/dataset/studenttab10k' OVERWRITE INTO TABLE student; # 将目标文件数据上传到表中
select * from student limit 10; # 查询前10条数据**
SparkSQL操作Hive表 (查询 Hive 的表 和 创建 Hive 表)
(只有使用spark-shell或者spark-submit的时候才能享受这个整合的便利)
通过 SparkSQL 查询 Hive 的表
查询 Hive 中的表可以直接通过 spark.sql(…) 来进行, 可以直接在其中访问 Hive 的 MetaStore, 前提是一定要将 Hive 的配置文件拷贝到 Spark 的 conf 目录
**scala> spark.sql("use spark01")
res0: org.apache.spark.sql.DataFrame = []
scala> spark.sql("select * from student limit 100")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
scala> res1.show() // 拿数据的时候show
// ....数据**
通过 SparkSQL 创建 Hive 表
通过 SparkSQL
可以直接创建 Hive 表, 并且使用 load data
加载数据
scala> :paste
// Entering paste mode (ctrl-D to finish)
val createTableStr =
"""
|create EXTERNAL TABLE student
|(
| name STRING,
| age INT,
| gpa string
|)
|ROW FORMAT DELIMITED
| FIELDS TERMINATED BY '\t'
| LINES TERMINATED BY '\n'
|STORED AS TEXTFILE
|LOCATION '/user/hive/warehouse'
""".stripMargin
spark.sql("CREATE DATABASE IF NOT EXISTS spark02")
spark.sql("USE spark02")
spark.sql(createTableStr)
spark.sql("LOAD DATA INPATH '/dataset/studenttab10k' OVERWRITE INTO TABLE student")
// Exiting paste mode, now interpreting.
22/08/06 03:24:42 ERROR hdfs.KeyProviderCache: Could not find uri with key [dfs.encryption.key.provider.uri] to create a keyProvider !!
createTableStr: String =
"
create EXTERNAL TABLE student
(
name STRING,
age INT,
gpa string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE
LOCATION '/user/hive/warehouse'
"
res0: org.apache.spark.sql.DataFrame = []
scala> spark.sql("select * from student limit 100")
res1: org.apache.spark.sql.DataFrame = [name: string, age: int ... 1 more field]
scala> res1.where('age > 50).show()
# 。。。数据
目前 SparkSQL 支持的文件格式有 sequencefile, rcfile, orc, parquet, textfile, avro, 并且也可以指定 serde 的名称
SparkSQL 将数据插入到 Hive表中 (idea 打包上传运行)
使用 SparkSQL 处理数据并保存进 Hive 表
前面都在使用 SparkShell 的方式来访问 Hive, 编写 SQL, 通过 Spark 独立应用的形式也可以做到同样的事, 但是需要一些前置的步骤, 如下
Step 1: 导入 Maven 依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
Step2:配置SparkSession
如果希望使用SparkSQL访问Hive的话,需要做两件事
开启SparkSession的Hive支持
经过这一步配置, SparkSQL
才会把 SQL
语句当作 HiveSQL
来进行解析
设置WareHouse的位置
虽然 hive-stie.xml
中已经配置了 WareHouse
的位置, 但是在 Spark 2.0.0
后已经废弃了 hive-site.xml
中设置的 hive.metastore.warehouse.dir
, 需要在 SparkSession
中设置 WareHouse
的位置
设置MetaStore的位置
val spark = SparkSession
.builder()
.appName("hive example")
.config("spark.sql.warehouse.dir", "hdfs://master:9000/user/hive/warehouse") //1.设置 WareHouse 的位置
.config("hive.metastore.uris", "thrift://Bigdata01:9083") //2.设置 MetaStore 的位置
.enableHiveSupport() //3.开启 Hive 支持
.getOrCreate()
配置好了以后, 就可以通过 DataFrame 处理数据, 后将数据结果推入 Hive 表中了, 在将结果保存到 Hive 表的时候, 可以指定保存模式
全套代码如下:
object HiveAccess {
def main(args: Array[String]): Unit = {
//1.创建SparkSession
// 1.开启hive支持
// 2.指定Metastore 的位置
// 3.指定Warehouse 的位置
val spark = SparkSession.builder()
.appName("hive_example")
.config("Spark.sql.warehouse.dir", "hdfs://master:9000/usr/hive/warehouse")
.config("hive.metastore.uris", "thrift://master:9083")
.enableHiveSupport()
.getOrCreate()
// 2. 读取数据
/**
* 1.上传HDFS, 因为要在集群中执行,所以没办法保证程序在哪个机器上执行
* 所以,要把文件上传到所有机器中,才能读取本地文件,
* 上传到HDFS中就可以解决这个问题,所有的机器都可以读取HDFS中的文件
* 它是一个外部系统
* // 这里将数据文件放到hdfs://master:9000/dataset/下
* 2.使用DF读取文件
*/
val schema = StructType( // 新建schema信息
List(
StructField("name", StringType),
StructField("age", IntegerType),
StructField("gpa", FloatType)
)
)
import spark.implicits._
val df = spark.read
.option("delimiter", "\t") // 指定分隔符
.schema(schema) // 传入结构信息
.csv("hdfs:///dataset/studenttab10k")
val resultDF = df.where('age>50) // 将年龄大于50岁的插入进去
// 3. 写入数据, 使用写入表的API, saveAsTable
// 通过 mode 指定保存模式, 通过 saveAsTable 保存数据到 Hive
resultDF.write.mode(SaveMode.Overwrite).saveAsTable("spark02.student")
}
}
打包上传jar
集群运行
cd /root/
spark/bin/spark-submit --master spark://master:7077 --class cn.itcast.spark.sql.HiveAccess original-spark-1.0-SNAPSHOT.jar
hive 查看数据
hive
select * from spark02.student;
准备 MySQL 环境
在使用 SparkSQL 访问 MySQL 之前, 要对 MySQL 进行一些操作, 例如说创建用户, 表和库等
Step 1: 连接 MySQL 数据库
在 MySQL 所在的主机上执行如下命令
mysql -uroot -p(密码)
Step 2:创建库和表
create database spark02; # 建库
use spark02; # 使用库
# 建表
create table student(
id int auto_increment,
name varchar(100) not null,
age int not null,
gpa float not null,
primary key (id))
engine=InnoDB default CHARSET=utf8;
Step 3: 创建 Spark 使用的用户
**# 创建spar03用户,在所有地方都可以访问@'%',再指定密码
create user 'spark03'@'%' identified by 'Password123.';
# 赋予 spark03用户有spark02下所有表的权限,在任何位置
grant all on spark02.* to 'spark03'@'%';**
使用 SparkSQL 向 MySQL 中写入数据
其实在使用 SparkSQL 访问 MySQL 是通过 JDBC, 那么其实所有支持 JDBC 的数据库理论上都可以通过这种方式进行访问
使用 JDBC 访问关系型数据的时候, 其实也是使用 DataFrameReader, 对 DataFrameReader 提供一些配置, 就可以使用 Spark访问 JDBC, 有如下几个配置可用
属性 | 含义 |
---|---|
url | 要连接的 JDBC URL |
dbtable | 要访问的表, 可以使用任何 SQL 语句中 from 子句支持的语法 |
fetchsize | 数据抓取的大小(单位行), 适用于读的情况 |
batchsize | 数据传输的大小(单位行), 适用于写的情况 |
isolationLevel | 事务隔离级别, 是一个枚举, 取值 NONE, READ_COMMITTED, READ_UNCOMMITTED, REPEATABLE_READ, SERIALIZABLE, 默认为 READ_UNCOMMITTED |
读取数据集,处理过后存往MySQL中的代码如下
/*
* MySQL 的访问方式有两种:1.使用本地运行,2.提交到集群运行
* 写入 MYSQL 数据时,使用本地运行, 读取数据时,使用集群运行
* */
object MySQLWrite {
def main(args: Array[String]): Unit = {
// 1. 创建SparkSession 对象
val spark = SparkSession.builder()
.appName("mysql_write")
.master("local[6]")
.getOrCreate()
// 2. 读取数据创建 DataFrame
// 1. 拷贝文件
// 2. 读取
import spark.implicits._
// 编写 schema 结构信息
val schema = StructType(
List(
StructField("name", StringType),
StructField("age", IntegerType),
StructField("float", FloatType)
)
)
// 读数据
val df = spark.read
.option("delimiter", "\t")
.schema(schema) // 加入schema 结果信息
.csv("./dataset/studenttab10k")
// 3. 处理数据
val resultDF = df.where("age < 30")
// 4. 落地数据
resultDF.write
.format("jdbc")
.option("url","jdbc:mysql://192.168.169.134:3306/spark02") // 到库
.option("dbtable","student") // 表
.option("user","spark03") // 用户
.option("password","Password123.") //
.mode(SaveMode.Overwrite) // 指定覆盖模式
.save()
}
}
运行程序
如果是在本地运行, 需要导入 Maven 依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
Linux查询
mysql> use spark02;
mysql> select * from student;
从 MySQL 中读取数据
读取 MySQL 的方式也非常的简单, 只是使用 SparkSQL 的 DataFrameReader 加上参数配置即可访问
object MySQLRead {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("mysql_read")
.master("local[6]")
.getOrCreate()
spark.read
.format("jdbc")
.option("url","jdbc:mysql://192.168.169.134:3306/spark02")
.option("dbtable","student")
.option("user","spark03")
.option("password","Password123.")
.load() // 加载数据
.show()
}
}
默认情况下读取 MySQL 表时, 从 MySQL 表中读取的数据放入了一个分区, 拉取后可以使用 DataFrame 重分区来保证并行计算和内存占用不会太高, 但是如果感觉 MySQL 中数据过多的时候, 读取时可能就会产生 OOM, 所以在数据量比较大的场景, 就需要在读取的时候就将其分发到不同的 RDD 分区
属性 | 含义 |
---|---|
partitionColumn | 指定按照哪一列进行分区, 只能设置类型为数字的列, 一般指定为 ID |
lowerBound, upperBound | 确定步长的参数, lowerBound - upperBound 之间的数据均分给每一个分区, 小于 lowerBound 的数据分给第一个分区, 大于 upperBound 的数据分给最后一个分区 |
numPartition | 分区数量 |
// 上面的 read 修改一下
spark.read
.format("jdbc")
.option("url","jdbc:mysql://192.168.169.134:3306/spark02")
.option("dbtable","student")
.option("user","spark03")
.option("password","Password123.")
.option("partitionColumn","age")
.option("lowerBound", 1)
.option("upperBound", 60)
.option("numPartitions", 10)
.load() // 加载数据
.show()
有时候可能要使用非数字列来作为分区依据, Spark 也提供了针对任意类型的列作为分区依据的方法
val predicates = Array(
"age < 20",
"age >= 20, age < 30",
"age >= 30"
)
val connectionProperties = new Properties()
connectionProperties.setProperty("user", "spark03")
connectionProperties.setProperty("password", "Password123.")
spark.read
.jdbc(
url = "jdbc:mysql://192.168.169.134:3306/spark02",
table = "student",
predicates = predicates, // predicates where 子句中的条件列表;每一个定义一个分区
connectionProperties = connectionProperties
)
.show()
SparkSQL 中并没有直接提供按照 SQL 进行筛选读取数据的 API 和参数, 但是可以通过 dbtable 来曲线救国, dbtable 指定目标表的名称, 但是因为 dbtable 中可以编写 SQL, 所以使用子查询即可做到
spark.read
.format("jdbc")
.option("url","jdbc:mysql://192.168.169.134:3306/spark02")
// .option("dbtable","student")
// 执行语句
.option("dbtable", "(select name, age from student where age > 10 and age < 20) as stu")
.option("user","spark03")
.option("password","Password123.")
.option("partitionColumn","age")
.option("lowerBound", 1)
.option("upperBound", 60)
.option("numPartitions", 10)
.load() // 加载数据
.show(500)
文件
DataFrame 所用文件
DataFrame 所用文件[BeijingPM20100101_20151231.rar]
Windows写文件的软件
Windows写文件的软件[winutils.rar]
Hive JDBC 所用文件
Hive JDBC 所用文件[studenttab10k.rar]
【资源中一一上传】