命令式的 API
RDD 版本的 WordCount
val conf = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(conf)
sc.textFile("hdfs://master:9000/dataset/wordcount.txt")
.flatMap(_.split(" "))
.map((_, 1))
.reduceByKey(_ + _)
.collect
命令式 API 的入门案例
pom.xml文件加
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>
code (dataset)
@Test
def dsIntro(): Unit = {
val spark = new SparkSession.Builder()
.appName("ds intro")
.master("local[6]")
.getOrCreate()
import spark.implicits._ // 导入隐式转换
val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15)))
val persons = sourceRDD.toDS()
val resultDS = persons.where('age > 10)
.where('age < 20)
.select('name)
.as[String]
resultDS.show()
}
case class Person(name: String, age: Int)
SparkSession
SparkContext 作为 RDD 的创建者和入口, 其主要作用有如下两点
为什么无法使用 SparkContext 作为 SparkSQL 的入口?
所以 SparkContext 确实已经不适合作为 SparkSQL 的入口, 所以刚开始的时候 Spark 团队为 SparkSQL 设计了两个入口点, 一个是 SQLContext 对应 Spark 标准的 SQL 执行, 另外一个是 HiveContext 对应 HiveSQL 的执行和 Hive 的支持.
在 Spark 2.0 的时候, 为了解决入口点不统一的问题, 创建了一个新的入口点 SparkSession, 作为整个 Spark 生态工具的统一入口点, 包括了 SQLContext, HiveContext, SparkContext 等组件的功能
新的入口应该有什么特性?
DataFrame & Dataset
SparkSQL 最大的特点就是它针对于结构化数据设计, 所以 SparkSQL 应该是能支持针对某一个字段的访问的, 而这种访问方式有一个前提, 就是 SparkSQL 的数据集中, 要 包含结构化信息, 也就是俗称的 Schema
而 SparkSQL 对外提供的 API 有两类, 一类是直接执行 SQL, 另外一类就是命令式. SparkSQL 提供的命令式 API 就是 DataFrame 和 Dataset, 暂时也可以认为 DataFrame 就是 Dataset, 只是在不同的 API 中返回的是 Dataset 的不同表现形式
// RDD
rdd.map { case Person(id, name, age) => (age, 1) }
.reduceByKey {case ((age, count), (totalAge, totalCount)) => (age, count + totalCount)}
// DataFrame
df.groupBy("age").count("age")
通过上面的代码, 可以清晰的看到, SparkSQL 的命令式操作相比于 RDD 来说, 可以直接通过 Schema 信息来访问其中某个字段, 非常的方便
声明式的API
SQL 版本 WordCount
@Test
def dfIntro(): Unit = {
val spark = new SparkSession.Builder()
.appName("df intro")
.master("local[6]")
.getOrCreate()
import spark.implicits._
val sourceRDD = spark.sparkContext.parallelize(Seq(Person("zhangsan", 10), Person("lisi", 15)))
val df = sourceRDD.toDF()
// 注册临时表
df.createOrReplaceTempView("person")
val resultDF = spark.sql("select name from person where age > 10 and age < 20")
resultDF.show()
}
// case class Person(name: String, age: Int)
以往使用 SQL 肯定是要有一个表的, 在 Spark 中, 并不存在表的概念, 但是有一个近似的概念, 叫做 DataFrame, 所以一般情况下要先通过 DataFrame 或者 Dataset 注册一张临时表, 然后使用 SQL 操作这张临时表
总结