Spark SQL基础知识

发布时间:2024年01月09日

一.DataFrame详解

1.清洗相关的API

去重API:dropDuplicates

总结:用来删除重复数据,如果没有指定参数subset,那么要比对行中的所有字段内容,如果全部相同,就认为是重复数据,会被删除;如果有指定参数subset,那么只比对subset中指定的字段范围,如果指定不存在的字段会报错.

删除缺失值的API:dropna

总结:

1- 如果不传递任何参数,只要有任意一个字段值为null,那么就删除整行数据

2-如果只指定了subset,那么空值的检查,就只会限定在subset指定的范围内

3-如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh(>=thresh)个字段的值不为空,才不会被删除

替换缺失值的API:fillna

?总结:替换缺失值数据

value:必须要传递参数,是用来填充缺失值的

subset:限定缺失值替换范围

注意:

1- value如果不是字典,那么只会替换字段类型匹配空值

2- 最常用的是value传递字典的形式

?2. Spark SQL的shuffle分区设置

如何调整shuffle分区数量呢?

方案一(不推荐):直接修改spark的配置文件spark-defaults.conf,全局设置,默认值为200,设置为:

spark.sql.shuffle.partitions? 20

方案二(常用,推荐使用):在客户端通过submit命令提交的时候,动态设置shuffle的分区数量,部署,上线的时候,基于spark-submit提交运行的时候

./spark-submit --conf "spark.sql.shuffle.partitions=20"

方案三(比较常用):在代码中设置,主要在测试环境中使用,但是一般在部署上线的时候,会删除.优先级也是最高的,一般的使用场景,当你的数据量未来不会发生太大的波动.

sparkSession.conf.set('spark.sql.shuffle.partition',20)

3.数据写出操作

统一的输出语法:

对应的简写API格式如下,以CSV为例:

init_df.write.csv(

? ? ? ? path='存储路径',

? ? ? ? mode='模式',

? ? ? ? header=True,

? ? ? ? sep='\001',

? ? ? ? encoding='UTF-8'?

)

常用参数说明:
? ? 1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统
? ? 2- mode:当输出目录中文件已经存在的时候处理办法
? ? ? ? 2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件
? ? ? ? 2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去
? ? ? ? 2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作
? ? ? ? 2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path ?? ?
? ? ? ? ?? ??? ??? ?file:xxx already exists.
? ? ? ??
? ? 3- sep:字段间的分隔符
? ? 4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True
? ? 5- encoding:文件输出的编码方式

二.常见的DSL代码

分类格式含义示例
API/方法select查询字段select('id1', 'id2')
where对数据过滤where('avg_score>3')
groupBy对数据分组groupBy('userid')
orderBy对数据排序orderBy('cnt', ascending=False)
limit取前几条数据orderBy('cnt', ascending=False).limit(1)
agg聚合操作,里面可以写多个聚合表达式agg(F.round(F.avg('score'), 2).alias('avg_score'))
show打印数据init_df.show()
printSchema打印数据的schema信息,也就是元数据信息init_df.printSchema()
alias对字段取别名F.count('movieid').alias('cnt')
join关联2个DataFrameetl_df.join(avg_score_dsl_df, 'movieid')
withColumn基于目前的数据产生一个新列init_df.withColumn('word',F.explode(F.split('value', ' ')))
dropDuplicates删除重复数据init_df.dropDuplicates(subset=["id","name"])
dropna删除缺失值init_df.dropna(thresh=2,subset=["name","age","address"])
fillna替换缺失值init_df.fillna(value={"id":111,"name":"未知姓名","age":100,"address":"北京"})
first取DataFrame中的第一行数据
over创建一个窗口列
函数avg计算均值
count计数
col将字段包装成Column对象,一般用于对新列的包装
round保留小数位
desc降序排序
row_number行号。从1开始编号
窗口partitionBy对数据分区
orderBy对数据排序

1-什么时候使用select实现聚合,什么时候使用groupBy+agg/select实现聚合?

如果不需要对数据分组,那么可以直接使用select实现聚合;如果有分组操作,需要使用groupBy+agg/select,推荐使用agg

2- first()总结

如果某个DataFrame只有一行数据,并且不使用join来对比数据,那么一般需要使用first()明确指定和第一行进行比较

3-F.col()总结

对于在计算过程中临时产生的字段,需要使用F.col()封装成Column对象?

API/方法:是由DataFrame来调用

函数:需要先通过import pyspark.sql.functions as F导入,使用F调用.

窗口:需要先通过from pyspark.sql import Window导入?

文章来源:https://blog.csdn.net/MSJ3917/article/details/135488374
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。