目录
??????? 操作DataFrame一般有两种操作方案:一种为DSL方式,一种为SQL方式
SQL方式:通过编写SQL语句完成统计分析操作
DSL操作:特定领域语言,使用DataFrame特有的API完成计算,也就是代码形式
从使用角度来说:SQL更加方便一些,当适应了DSL写法后,会发现DSL比SQL更好用
从Soark角度来说:推荐使用DSL方案,更有利于Spark底层优化处理
df.createTempview('视图名称'):创建一个临时的视图(表名)
df.createorReplaceTempview('视图名称'):创建一个临时的视图(表名),如果视图存在,直接替换
临时视图:仅能在当前这个spark session的会话中使用
df.createGlobalTempview('视图名称'):创建一个全局视图,运行在一个Spark应用中,多个spark会话中读可以使用,使用的时候必须通过global_temp.视图名称方式才可以加载到,较少使用
执行SQL语句:
??????? spark.sql('书写SQL')
show():用于展示DF中数据,默认仅展示前20行
???????????? 参数1:设置默认展示多少行,默认为20
?????????????? 参数2:是否为阶段列,默认仅展示前20个字符数据,如果过长,不展示(一般不设置)
printSchema():用于打印当前这个DF的表结构信息
select():类似于SQL中的select,SQL中的select后面可以些什么,这里也一样
filter()和where():用于对数据进行过滤操作,一般在SparkSQL中只要使用where
groupBy():用于执行分组操作
orderBy():用于执行排序操作
DSL主要支持一下几种传递的方式: str | column对象 | 列表
??????? str格式: '字段'
??????? column对象:
????????????????????????????? DataFrame含有的字段? df['字段']
?????????????????????????????? 执行过程产生:F.col('字段')
??????? 列表:
??????????????? ['字段1','字段2'...]
??????????????? [df['字段1'],df['字段2']]
为了能够支持在编写Spark SQL的DSL时候,在DSL中使用SQL函数,专门提供一个SQL的函数库。直接加载使用即可
导入这个函数库:import pyspark.sql.functions as F
通过F调用对应的函数即可
SparkSQL中所支持的函数,都可以通过以下地址查询到:
https://spark.apache.org/docs/3.1.2/api/sql/index.html
world count 案例
已知HDFS又一个words.txt的文件,words.txt文件的内容如下:
hadoop hive hadoop sqoop hive
sqoop hadoop zookeeper hive hue
hue sqoop hue zookeeper hive
spark oozie spark hadoop oozie
hive oozie spark hadoop
SQL方式一:子查询
SQL方式二:侧视图
炸裂函数配合侧视图使用如下:
????????格式:select 原表别名.字段名,侧视图名.字段名 from 原表 原表别名 lateral view explode(要炸开的字段) 侧视图名 as 字段名
# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F
# 绑定指定的python解释器
"""
1.2 直接基于DataFrame来处理
需求分析:
1- 将每行内容切分得到单个的单词
2- 组织DataFrame的数据结构
2.1- 有两列。一列是单词,一列是次数
2.2- 只有一列。单词
"""
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
print('直接基于DataFrame来处理')
spark = SparkSession \
.builder \
.appName('dataFrame_world_count_demo') \
.master('local[*]') \
.getOrCreate()
# 数据输入
# text方式读取hdfs上的文件
init_df = spark.read.text(paths='hdfs://node1:8020/source/word.txt')
# # 查看数据
# init_df.show()
# # 打印dataframe表结构信息
# init_df.printSchema()
# 创建临时视图
init_df.createTempView('words')
# 数据处理
"""
sparksql方式处理数据-子查询
1.先切分每一行的数据
2.使用炸裂函数获得一个word单词列
3.使用子查询方式聚合统计每个单词出现的次数
"""
spark.sql("""select word,count(*) as cnt
from (select explode(split(value,' ')) as word from words)
group by word order by cnt desc
""").show()
"""
sparksql方式处理数据-侧视图
1.先切分每一行的数据
2.使用炸裂函数获得一个word单词列
3.使用侧视图方式聚合统计每个单词出现的次数
炸裂函数配合侧视图使用如下:
格式:select 原表别名.字段名,侧视图名.字段名 from 原表 原表别名 lateral view explode(要炸开的字段)
侧视图名 as 字段名
"""
spark.sql("""select word,count(*) as cnt
from words w
lateral view explode(split(value,' ')) t as word
group by word order by cnt desc
""").show()
DSL方式总结:
??????????? withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名
??????????? agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可
??????????? withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源
?
# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F
# 绑定指定的python解释器
"""
1.2 直接基于DataFrame来处理
需求分析:
1- 将每行内容切分得到单个的单词
2- 组织DataFrame的数据结构
2.1- 有两列。一列是单词,一列是次数
"""
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
print('直接基于DataFrame来处理')
spark = SparkSession \
.builder \
.appName('dataFrame_world_count_demo') \
.master('local[*]') \
.getOrCreate()
# 数据输入
# text方式读取hdfs上的文件
init_df = spark.read.text(paths='hdfs://node1:8020/source/word.txt')
# # 查看数据
# init_df.show()
# # 打印dataframe表结构信息
# init_df.printSchema()
# 创建临时视图
init_df.createTempView('words')
# 数据处理
"""
DSL方式处理数据-方式一
1.先切分每一行的数据
2.使用炸裂函数获得一个word单词列
3.调用API聚合统计单词个数再排序
"""
init_df.select(
F.explode(F.split('value', ' ')).alias('word')
).groupBy('word').count().orderBy('count', ascending=False).show()
"""
DSL方式处理数据-方式二
1.先切分每一行的数据
2.使用炸裂函数获得一个word单词列
3.调用API聚合统计单词个数再排序
4.agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可
"""
init_df.select(
F.explode(F.split('value', ' ')).alias('word')
).groupBy('word').agg(
F.count('word').alias('cnt'),
F.max('word').alias('max_word'),
F.min('word').alias('min_word'),
).orderBy('cnt', ascending=False).show()
"""
DSL方式处理数据-方式三
withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名
withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源
"""
init_df.withColumn(
'word',
F.explode(F.split('value', ' '))
).groupBy('word').agg(
F.count('word').alias('cnt'),
F.max('word').alias('max_word'),
F.min('word').alias('min_word')
).orderBy('cnt', ascending=False).show()
# 数据输出
# 是否资源
spark.stop()
# 直接基于DataFrame来处理
# 导包
import os
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType, StructField
import pyspark.sql.functions as F
# 绑定指定的python解释器
"""
基于RDD转换DataFrame的方式
需求分析:
1- 将每行内容切分得到单个的单词
2- 组织DataFrame的数据结构
2.1- 有两列。一列是单词,一列是次数
"""
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
# 创建main函数
if __name__ == '__main__':
print('直接基于DataFrame来处理')
# 创建SparkSession对象
spark = SparkSession \
.builder \
.appName('dataFrame_world_count_demo') \
.master('local[*]') \
.getOrCreate()
# 创建sparkContext顶级对象
sc = spark.sparkContext
# 数据输入
# text方式读取hdfs上的文件
init_rdd = sc.textFile('hdfs://node1:8020/source/word.txt')
# RDD数据结构转化为二维数据
map_rdd = init_rdd.flatMap(lambda line: line.split()).map(lambda word: (word,))
# 查看数据
# print(map_rdd.collect())
# 通过Rdd构建DataFrame
schema = StructType([StructField("value", StringType(), True)])
init_df = spark.createDataFrame(data=map_rdd, schema=schema)
# 打印dataframe表结构信息
# init_df.show()
# init_df.printSchema()
# 创建临时视图
init_df.createTempView('words')
# 数据处理
"""
sparksql方式处理数据
"""
spark.sql("""select value as word,count(*) as cnt
from words group by value order by cnt desc""").show()
print('=' * 50)
"""
DSL方式处理数据
"""
init_df.withColumn(
'word',
init_df.value
).groupBy('word').agg(
F.count('word').alias('cnt'),
F.max('word').alias('max_word'),
F.min('word').alias('min_word'),
).orderBy('cnt', ascending=False).show()
# 释放资源
spark.stop()