val spark = SparkSession
.builder()
.master("local[*]")
.appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate()
// $example on:create_df$
val df: DataFrame = spark.read.json("spark-demo/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:create_df$
// $example on:untyped_ops$
// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)
// Select only the "name" column
df.select("name").show()
// +-------+
// | name|
// +-------+
// |Michael|
// | Andy|
// | Justin|
// +-------+
// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// | name|(age + 1)|
// +-------+---------+
// |Michael| null|
// | Andy| 31|
// | Justin| 20|
// +-------+---------+
// df.select($"name", $"age" + 1).show() 这里的$ 是什么语法
// 在Spark中,$符号是一个特殊的语法,用于创建一个Column对象。Column对象表示一个数据集中的列,并提供了各种操作和转换方法。
// 在您提供的代码中,$符号用于创建Column对象,表示要选择的列。例如,$"name"表示选择名为"name"的列,$"age" + 1表示对名为"age"的列的值加1。
// 使用$符号可以使代码更加简洁和易读,同时也能够提供静态类型检查,以避免在运行时发生错误。
// 需要注意的是,$符号需要导入spark.implicits._,以便在Spark中使用该语法。通常,导入语句如下所示:
// import spark.implicits._
// df.select($"name", $"age" + 1).show 【正确:表达式表是把age 作为一列,此时$"age" + 1 返回的是列对象】
// df.select("name", "age" + 1).show【报错,没有age1列】
// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+
// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// | 19| 1|
// |null| 1|
// | 30| 1|
// +----+-----+
// $example off:untyped_ops$
// $example on:run_sql$
// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")
//
// Spark Dataset 货 DataFrame 注册临时视图的方法:
// 在 Spark 中,可以将 Dataset 或 DataFrame 注册为临时视图(Temporary View),以便在 SQL 查询中使用。注册表可以有以下几种形式:
// 临时视图(Temporary View):使用 createOrReplaceTempView 方法可以将 Dataset 或 DataFrame 注册为临时视图。
// 临时视图的作用范围限制在当前的 SparkSession 实例中。示例代码如下:
// scala dataset.createOrReplaceTempView("tempViewName")
// 全局临时视图(Global Temporary View):使用 createOrReplaceGlobalTempView 方法可以将 Dataset 或 DataFrame 注册为全局临时视图。
// 全局临时视图的作用范围跨越多个 SparkSession 实例,可以在不同的 SparkSession 实例之间共享。示例代码如下:
// scala dataset.createOrReplaceGlobalTempView("globalTempViewName")
// 持久化视图(Persisted View):使用 createOrReplaceTempView 方法将 Dataset 或 DataFrame 注册为临时视图后,
// 可以使用 CACHE 或 PERSIST 命令将其持久化到磁盘中,以便在后续查询中重用。示例代码如下:
// spark.sql("CACHE TABLE tempViewName")
// spark.sql("PERSIST TABLE tempViewName")
//
// spark cache 和 persist 的区别
// 在 Spark 中,cache 和 persist 都用于将数据持久化到内存或磁盘中,以便在后续的查询中重用。它们的主要区别在于持久化级别的选择和默认行为。
// 持久化级别:cache 方法是 persist 方法的简化形式,它使用默认的持久化级别 MEMORY_ONLY,将数据持久化到内存中。
// 而 persist 方法可以接受一个自定义的持久化级别参数(StorageLevel),您可以根据需要选择不同的级别,例如 MEMORY_AND_DISK、DISK_ONLY 等。
// 默认行为:cache 方法将数据持久化到内存中,相当于使用 persist 方法指定了默认的持久化级别为 MEMORY_ONLY。
// 而 persist 方法可以根据您的需求选择不同的持久化级别,默认情况下为 MEMORY_ONLY。
// 无论是使用 cache 还是 persist,当数据被持久化时,Spark 会尝试尽可能地将数据存储在内存中,以提供最佳的性能。
// 如果内存不足以容纳所有数据,Spark 将会将一部分数据溢写到磁盘上。
// 需要注意的是,通过持久化数据,可以避免在后续查询中重新计算数据,从而提高查询性能。
// 但是,持久化数据也会占用内存或磁盘空间,因此在选择持久化级别时需要权衡性能和资源消耗。
//
// spark.sql("PERSIST TABLE tempViewName") 的存储级别的指定:
// 在 Spark 中,PERSIST TABLE 语句并没有直接指定存储级别的选项。
// 持久化的存储级别是由 Spark 的配置属性 spark.sql.sources.default 决定的。
// 默认情况下,spark.sql.sources.default 的值为 inMemory,表示将表持久化到内存中。
// 您可以通过设置该属性来更改默认的持久化级别。
// 以下是设置 spark.sql.sources.default 属性为 disk 的示例代码:
// import org.apache.spark.sql.SparkSession
// val spark = SparkSession.builder()
// .appName("Example")
// .config("spark.sql.sources.default", "disk")
// .getOrCreate()
// // 执行 PERSIST TABLE 语句
// spark.sql("PERSIST TABLE tempViewName")
//
// sparkSession 可以在运行的时候改变“spark.sql.source.default”的内容吗?
// 在 Spark 中,spark.sql.sources.default 是一个静态的配置属性,
// 它在 SparkSession 创建时被读取,一旦 SparkSession 创建完成后,就不能直接更改这个属性的值。
// 如果您想要在运行时更改 spark.sql.sources.default 的值,
// 您可以通过创建一个新的 SparkSession 来实现。在新创建的 SparkSession 中,您可以指定不同的属性值。
// 以下是一个示例代码,演示如何在运行时更改 spark.sql.sources.default 的值:
// import org.apache.spark.sql.SparkSession
// val spark = SparkSession.builder()
// .appName("Example")
// .config("spark.sql.sources.default", "disk")
// .getOrCreate()
// // 创建新的 SparkSession,更改 `spark.sql.sources.default` 的值
// val newSpark = SparkSession.builder()
// .config("spark.sql.sources.default", "inMemory")
// .getOrCreate()
// // 使用新的 SparkSession 执行操作
// newSpark.sql("PERSIST TABLE tempViewName")
// 在上面的示例中,我们首先创建了一个 SparkSession,并将 spark.sql.sources.default 的值设置为 "disk"。
// 然后,我们创建了一个新的 SparkSession,并将 spark.sql.sources.default 的值设置为 "inMemory"。
// 在新的 SparkSession 中,我们可以使用新的属性值执行操作,例如执行 PERSIST TABLE 语句。
// 需要注意的是,创建新的 SparkSession 可能会导致额外的资源开销,因此在实际应用中,需要权衡资源的使用和操作的需要。
val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// $example off:run_sql$
// $example on:global_temp_view$
// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")
// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// 不加global 也可以访问。
spark.sql("SELECT * FROM people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// Global temporary view is cross-session,
// 不同的session 之间可以共享global_temp 的数据
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
// spark.newSession 的作用?
// spark.newSession 是 Spark 中的一个方法,用于创建一个新的 SparkSession 对象。
// SparkSession 是 Spark 2.0 引入的一个编程入口,用于与 Spark 进行交互和执行操作。它是 Spark 中最核心的概念之一,代表了一个与 Spark 集群的连接。
// 使用 spark.newSession 方法可以创建一个新的 SparkSession 对象,该对象具有独立的配置和上下文。这在某些情况下是有用的,例如:
// 并行执行任务:通过创建多个独立的 SparkSession 对象,可以并行执行多个任务,每个任务都有自己的配置和上下文。这对于同时处理多个数据集或任务非常有用。
// 隔离配置和状态:每个 SparkSession 对象都有自己的配置和状态,可以独立设置和修改。这样可以避免不同任务之间的相互干扰和影响。
// 需要注意的是,SparkSession 是 Spark 2.x 版本中的概念,如果您使用的是较早版本的 Spark,可能没有 SparkSession 类。在早期版本中,可以使用 SparkContext 对象来与 Spark 进行交互。
// 如下回报错:
// 需要指定默认的schema: global_temp
// spark.newSession().sql("SELECT * FROM people").show()
// $example off:global_temp_view$