Dataset 是什么?
@Test
def dataset1():Unit ={
// 1. 创建 SparkSession
val spark = new SparkSession.Builder()
.master("local[6]")
.appName("dataset1")
.getOrCreate()
// 2. 导入隐式转换
import spark.implicits._
// 3. 演示
val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15)))
val dataset:Dataset[Person] = sourceRDD.toDS()
// DataSet 支持强类型的 API
dataset.filter(item=> item.age>10).show()
// Dataset 支持弱类型的API
dataset.filter('age>10).show()
dataset.filter($"age">10).show()
// DataSet 支持 直接写 SQL 表达式
dataset.filter("age>10").show()
}
case class Person(name: String, age: Int)
问题1: Person 是什么?
Person 是一个强类型的类
问题2: 这个 Dataset 中是结构化的数据吗?
非常明显是的, 因为 Person 对象中有结构信息, 例如字段名和字段类型
问题3: 这个 Dataset 能够使用类似 SQL 这样声明式结构化查询语句的形式来查询吗?
当然可以, 已经演示过了
问题4: Dataset 是什么?
Dataset 是一个强类型, 并且类型安全的数据容器, 并且提供了结构化查询 API 和类似 RDD 一样的命令式 API
Dataset 底层是什么
即使使用 Dataset 的命令式 API, 执行计划也依然会被优化
Dataset 具有 RDD 的方便, 同时也具有 DataFrame 的性能优势, 并且 Dataset 还是强类型的, 能做到类型安全.
@Test
def dataset2():Unit = {
// 1. 创建 SparkSession
val spark = new SparkSession.Builder()
.master("local[6]")
.appName("dataset2")
.getOrCreate()
// 2. 导入隐式转换
import spark.implicits._
// 3. 演示
val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15)))
val dataset: Dataset[Person] = sourceRDD.toDS()
//优化
dataset.explain(true) // 查看其逻辑计划和物理计划,运行会直接打印
}
case class Person(name: String, age: Int)
Dataset 的底层是什么?
Dataset 最底层处理的是对象的序列化形式, 通过查看 Dataset 生成的物理执行计划, 也就是最终所处理的 RDD, 就可以判定 Dataset 底层处理的是什么形式的数据
val dataset: Dataset[People] = spark.createDataset(Seq(Person("zhangsan", 10), Person("lisi", 15)))
val internalRDD: RDD[InternalRow] = dataset.queryExecution.toRdd
case class Person(name: String, age: Int)
dataset.queryExecution.toRdd
这个 API 可以看到 Dataset
底层执行的 RDD, 这个 RDD 中的范型是 InternalRow
, InternalRow
又称之为 Catalyst Row
, 是 Dataset
底层的数据结构, 也就是说, 无论 Dataset
的范型是什么, 无论是 Dataset[Person]
还是其它的, 其最底层进行处理的数据结构都是 InternalRow
所以, Dataset
的范型对象在执行之前, 需要通过 Encoder
转换为 InternalRow
, 在输入之前, 需要把 InternalRow
通过 Decoder
转换为范型对象
可以获取 Dataset 对应的 RDD 表示
在 Dataset 中, 可以使用一个属性 rdd 来得到它的 RDD 表示, 例如 Dataset[T] → RDD[T]
@Test
def dataset2():Unit = {
// 1. 创建 SparkSession
val spark = new SparkSession.Builder()
.master("local[6]")
.appName("dataset2")
.getOrCreate()
// 2. 导入隐式转换
import spark.implicits._
// 3. 演示
// val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15)))
// val dataset: Dataset[Person] = sourceRDD.toDS()
//优化
val dataset: Dataset[Person] = spark.createDataset(Seq(Person("zhangsan", 10), Person("lisi", 15)))
// dataset.explain(true) // 查看其逻辑计划和物理计划,运行会直接打印
// 无论DataSet 放置的是什么类型对象,最终执行计划中的RDD 上都是InternalRow
// 直接获取到已经分析和解析过的DataSet的执行计划,从中拿到RDD
val executionRdd:RDD[InternalRow] = dataset.queryExecution.toRdd // 把生成的计划转为rdd
//通过将DataSet 底层的RDD[InternalRow] 通过Decoder 转成了DataSet一样的类型RDD
val typeRDD: RDD[Person] = dataset.rdd
// (1)
println(executionRdd.toDebugString)
/*
* (2) MapPartitionsRDD[1] at toRdd at Intro.scala:97 []
* | ParallelCollectionRDD[0] at toRdd at Intro.scala:97 []
* */
println("-------------------------")
// (2)// 这段代码的执行计划为什么多了两个步骤?
println(typeRDD.toDebugString)
/*
* (2) MapPartitionsRDD[5] at rdd at Intro.scala:102 []
* | MapPartitionsRDD[4] at rdd at Intro.scala:102 []
* | MapPartitionsRDD[3] at rdd at Intro.scala:102 []
* | ParallelCollectionRDD[2] at rdd at Intro.scala:102 []
* */
}
case class Person(name: String, age: Int) // 放在class 外
(1)Dataset 的执行计划底层的 RDD
(2)使用 Dataset.rdd 将 Dataset 转为 RDD 的形式
可以看到 (1) 对比 (2) 对了两个步骤, 这两个步骤的本质就是将 Dataset 底层的 InternalRow 转为 RDD 中的对象形式, 这个操作还是会有点重的, 所以慎重使用 rdd 属性来转换 Dataset 为 RDD
总结