Dataframe 是什么
DataFrame
是 SparkSQL
中一个表示关系型数据库中 表
的函数式抽象, 其作用是让 Spark
处理大规模结构化数据的时候更加容易. 一般 DataFrame
可以处理结构化的数据, 或者是半结构化的数据, 因为这两类数据中都可以获取到 Schema
信息. 也就是说 DataFrame
中有 Schema
信息, 可以像操作表一样操作 DataFrame
.
DataFrame
由两部分构成, 一是 row
的集合, 每个 row
对象表示一个行, 二是描述 DataFrame
结构的 Schema
DataFrame
支持 SQL
中常见的操作, 例如: select
, filter
, join
, group
, sort
, join
等
@Test
def dataframe1(): Unit = {
// 1. 创建 SparkSession 对象
val spark = SparkSession.builder()
.master("local[6]")
.appName("dataframe1")
.getOrCreate()
// 2. 创建 DataFrame
import spark.implicits._
val dataFrame: DataFrame = Seq(Person("zhangsan", 15), Person("lisi", 20)).toDF()
// 3. 看看 DataFrame 可以玩出什么什么花样
// select name from t where t.age >10
dataFrame.where('age > 10)
.select( 'name)
.show()
}
case class Person(name: String, age: Int)
DataFrame 如何创建
DataFrame如何创建数据集【BeijingPM20100101_20151231_noheader.rar】
@Test
def dataframe2():Unit = {
val spark = SparkSession.builder()
.master("local[6]")
.appName("dataframe2")
.getOrCreate()
import spark.implicits._
val personList = Seq(Person("zhangsan", 15), Person("lisi", 20))
// 创建 DataFrame 的方法
// 1.toDF
val df1 = personList.toDF()
val df2 = spark.sparkContext.parallelize(personList).toDF() // RDD.toDf()
// 2. createDatFrame
val df3 = spark.createDataset(personList)
// 3. read
val df4 = spark.read.csv("./dataset/BeijingPM20100101_20151231_noheader.csv")
df4.show()
}
case class Person(name: String, age: Int)
DataFrame 操作 (案例)
DataFrame操作数据集[BeijingPM20100101_20151231.rar]
需求:查看 PM_Dongsi 每个月的统计数量
object DataFrameTest {
def main(args: Array[String]): Unit = {
// 1. 创建SparkSession
val spark = SparkSession.builder()
.master("local[6]")
.appName("pm_analysis")
.getOrCreate()
import spark.implicits._
// 2. 读取数据集
val sourceDF = spark.read
.option("header",true) // 把表头读取出来
.csv("./dataset/BeijingPM20100101_20151231.csv")
//sourceDF.show()
//查看DataFrame 的 schema 信息,要意识到 DataFrame 中是有结构信息的,叫做Schema
sourceDF.printSchema()
// 3. 处理
// 1. 选择列
// 2. 过滤 NA 的 PM记录
// 3. 分组 select year, month, count(PM_Dongsi) from .. where PM_Dongsi != NA group by year, month
// 4. 聚合
// 4. 得出结论
sourceDF.select('year,'month,'PM_Dongsi)
.where('PM_Dongsi =!= "NA") // 过滤 NA 的 PM记录
.groupBy('year,'month)
.count()
.show() // action
// 是否能支持使用 SQL 语句进行查询
println("---------接下来是SQL语句查询的--------------")
// 1. 将 DataFrame 注册为临时表
sourceDF.createOrReplaceTempView("pm")
// 2. 执行查询
val resultDF = spark.sql("select year, month, count(PM_Dongsi) from pm where PM_Dongsi != 'NA' group by year,month")
resultDF.show()
spark.stop()
}
}
总结
小Tips
一般处理数据都差不多是ETL这个步骤
Spark代码编写的套路: