from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.appName('sparksql_etl_api')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
init_df = spark.read.csv(
path='file:///export/data/clear_data.csv',
sep=',',
encoding='UTF-8',
header="True",
inferSchema=True
)
init_df.printSchema()
# 3- 数据处理
# 3.1- 删除重复数据:dropDuplicates
"""
dropDuplicates总结:用来删除重复数据。如果没有指定参数subset,那么要比对行中的所有字段内容,
如果全部相同,就认为是重复数据,会被删除;如果有指定参数subset,那么只比对subset中指定的字段范围
"""
init_df.dropDuplicates().show()
init_df.dropDuplicates(subset=["id","name"]).show()
# 指定不存在的字段会报错:AnalysisException: Cannot resolve column name "id2" among (id, name, age, address)
# init_df.dropDuplicates(subset=["id2","name"]).show()
print("-"*30)
# 3.2- 删除缺失值数据:dropna
"""
dropna(thresh,subset):删除缺失值数据.
1- 如果不传递任何参数,只要有任意一个字段值为null,那么就删除整行数据
2- 如果只指定了subset,那么空值的检查,就只会限定在subset指定的范围内
3- 如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh(>=thresh)个字段的值不为空,才不会被删除
"""
init_df.dropna().show()
init_df.dropna(subset=["id","name"]).show()
init_df.dropna(thresh=1,subset=["name","age","address"]).show()
init_df.dropna(thresh=2,subset=["name","age","address"]).show()
init_df.dropna(thresh=2).show()
print("-" * 30)
# 3.3- 替换缺失值数据:fillna
"""
fillna(value,subset):替换缺失值数据
value:必须要传递参数.是用来填充缺失值的
subset:限定缺失值替换范围
注意:
1-value如果不是字典,那么只会替换字段类型匹配的空值
2-最常用的是value传递字典的形式
"""
init_df.fillna(value=999).show()
init_df.fillna(value=999,subset=["id","name"]).show()
init_df.fillna(value={"id":111,"name":"未知姓名","age":100,"address":"北京"}).show()
# 4- 数据输出
# 5- 释放资源
spark.stop()
总结:
1- dropDuplicates总结:用来删除重复数据。如果没有指定参数subset,那么要比对行中的所有字段内容,
如果全部相同,就认为是重复数据,会被删除;如果有指定参数subset,那么只比对subset中指定的字段范围
2- dropna(thresh,subset):删除缺失值数据.
2.1- 如果不传递任何参数,只要有任意一个字段值为null,那么就删除整行数据
2.2- 如果只指定了subset,那么空值的检查,就只会限定在subset指定的范围内
2.3- 如果只指定了thresh,那么空值检查的这些字段中,至少需要有thresh(>=thresh)个字段的值不为空,才不会被删除
3- fillna(value,subset):替换缺失值数据
3.1- value:必须要传递参数.是用来填充缺失值的
3.1- subset:限定缺失值替换范围
注意:
3.1- value如果不是字典,那么只会替换字段类型匹配的空值
3.2- 最常用的是value传递字典的形式
Spark SQL底层本质上还是Spark的RDD程序,认为 Spark SQL组件就是一款翻译软件,用于将SQL/DSL翻译为Spark RDD程序, 执行运行
? Spark SQL中同样也是存在shuffle的分区的,在执行shuffle分区后, shuffle分区数量默认为 200个,但是实际中, 一般都是需要调整这个分区的, 因为当数据量比较少的数据, 200个分区相对来说比较大一些, 但是当数据量比较大的时候, 200个分区显得比较小
如何调整shuffle分区数量? spark.sql.shuffle.partitions
方案一(不推荐): 直接修改spark的配置文件spark-defaults.conf。全局设置,默认值为200。设置为:
spark.sql.shuffle.partitions 20
方案二(常用,推荐使用): 在客户端通过submit命令提交的时候, 动态设置shuffle的分区数量。部署、上线的时候、基于spark-submit提交运行的时候
./spark-submit --conf "spark.sql.shuffle.partitions=20"
方案三(比较常用): 在代码中设置。主要在测试环境中使用, 但是一般在部署上线的时候, 会删除。优先级也是最高的。一般的使用场景是,当你的数据量未来不会发生太大的波动。
sparkSession.conf.set('spark.sql.shuffle.partitions',20)
import time
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
start = time.time()
# 1- 创建SparkSession顶级对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions",1)\
.appName('sparksql_wordcount_demo')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
init_df = spark.read.text(
paths='hdfs://node1:8020/input/words.txt'
)
# 创建临时视图
init_df.createTempView('words')
# init_df.show()
# init_df.printSchema()
# 3- 数据处理
# 3.1- SQL方式:子查询
spark.sql("""
select
word,count(1) as cnt,max(word) as max_word,min(word) as min_word
from (
select
explode(split(value,' ')) as word
from words
) group by word
""").show()
print("-"*50)
# 3.2- SQL方式:侧视图
spark.sql("""
select
word,count(1) as cnt
from words
-- 侧视图
lateral view explode(split(value,' ')) t as word
group by word
""").show()
print("DSL运行结果")
print("-" * 50)
# 3.3- DSL方式一
"""
DSL方式总结:
withColumnRenamed(参数1,参数2):给字段重命名操作。参数1是旧字段名,参数2是新字段名
agg():推荐使用,更加通用。执行聚合操作。如果有多个聚合,聚合之间使用逗号分隔即可
withColumn(参数1,参数2):用来产生新列。参数1是新列的名称;参数2是新列数据的来源
"""
init_df.select(
F.explode(F.split('value',' ')).alias('word')
).groupBy('word').count().show()
print("-" * 50)
init_df.select(
F.explode(F.split('value', ' ')).alias('word')
).groupBy('word').count().withColumnRenamed('count','cnt').show()
print("-" * 50)
# 3.3- DSL方式二
init_df.select(
F.explode(F.split('value', ' ')).alias('word')
).groupBy('word').agg(
F.count('word').alias('cnt'),
F.max('word').alias('max_word')
).show()
print("-" * 50)
# 3.4- DSL方式三
init_df.withColumn(
'word',
F.explode(F.split('value', ' '))
).groupBy('word').agg(
F.count('word').alias('cnt')
).show()
run_time = time.time() - start
print("运行耗时:",run_time)
time.sleep(1000)
# 4- 数据输出
# 5- 释放资源
spark.stop()
统一的输出语法:
对应的简写API格式如下,以CSV为例:
init_df.write.csv(
path='存储路径',
mode='模式',
header=True,
sep='\001',
encoding='UTF-8'
)
输出到文件中 json csv orc text …
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("csv方式读取文件")
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions","1")\
.appName('数据输出到文件系统')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
init_df = spark.read.csv(
path='file:///export/data/stu.txt',
sep=' ',
encoding='UTF-8',
header="True",
inferSchema=True
)
# 3- 数据处理
result = init_df.where('age>=20')
# 4- 数据输出
result.show()
result.printSchema()
# 数据输出到文件系统:简单API
"""
常用参数说明:
1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统
2- mode:当输出目录中文件已经存在的时候处理办法
2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件
2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去
2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作
2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path file:xxx already exists.
3- sep:字段间的分隔符
4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True
5- encoding:文件输出的编码方式
"""
result.write.csv(
path="file:///export/data/output/",
mode='ignore',
sep=',',
header=True,
encoding="UTF-8"
)
# 数据输出到文件系统:复杂API
"""
设置mode,需要单独调用mode()方法
"""
result.write\
.format('json')\
.mode("overwrite")\
.option("encoding","UTF-8")\
.save('file:///export/data/output_json/')
# 5- 释放资源
spark.stop()
常用参数说明:
1- path:指定结果数据输出路径。支持本地文件系统和HDFS文件系统
2- mode:当输出目录中文件已经存在的时候处理办法
2.1- append:追加。如果文件已经存在,那么继续在该目录下产生新的文件
2.2- overwrite:覆盖。如果文件已经存在,那么就先将已有的文件清除,再写入进去
2.3- ignore:忽略。如果文件已经存在,那么不执行任何操作
2.4- error:报错。如果文件已经存在,那么直接报错。会报错AnalysisException: path
file:xxx already exists.
3- sep:字段间的分隔符
4- header:数据输出的时候,是否要将字段名称输出到文件的第一行。推荐设置为True
5- encoding:文件输出的编码方式
将结果数据基于JDBC方案, 输出到关系型数据库, 例如说: MySql
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("数据输出到数据库")
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config("spark.sql.shuffle.partitions","1")\
.appName('sparksql_database')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
init_df = spark.read.csv(
path='file:///export/data/stu.txt',
sep=' ',
encoding='UTF-8',
header="True",
inferSchema=True
)
# 3- 数据处理
result = init_df.where('age>=20')
# 4- 数据输出
result.show()
result.printSchema()
# 数据输出到数据
"""
创建数据库命令:create database text character set utf8;
"""
result.write.jdbc(
url='jdbc:mysql://node1:3306/text?useUnicode=true&characterEncoding=utf-8',
table='student',
mode='append',
properties={ 'user' : 'root', 'password' : '123456' }
)
# 5- 释放资源
spark.stop()
运行结果截图:
可能出现的错误一:
原因: 缺少连接MySQL数据库的驱动
数据库的驱动包, 一般都是一些Jar包
如何放置【mysql-connector-java-5.1.41.jar】驱动包呢?
1- 放置位置一: 当spark-submit提交的运行环境为Spark集群环境的时候,以及运行模式为local, 默认从 spark的jars目录下加载相关的jar包,
目录位置: /export/server/spark/jars
2- 放置位置二: 当我们使用pycharm运行代码的时候, 基于python的环境来运行的, 需要在python的环境中可以加载到此jar包
目录位置:
/root/anaconda3/lib/python3.8/site-packages/pyspark/jars/
3- 放置位置三: 当我们提交选择的on yarn模式 需要保证此jar包在HDFS上对应目录下
hdfs的spark的jars目录下: hdfs://node1:8020/spark/jars
请注意: 以上三个位置, 主要是用于放置一些 spark可能会经常使用的jar包, 对于一些不经常使用的jar包, 在后续spark-submit 提交运行的时候, 会有专门的处理方案: spark-submit --jars ....
可能出现的错误二:
原因:将中文输出到了数据表中
解决办法:
1- 数据库连接要加上:useUnicode=true&characterEncoding=utf-8
2- 创建数据库的时候需要指定编码character set utf8
分类 | 格式 | 含义 | 示例 |
---|---|---|---|
API/方法 | select | 查询字段 | select(‘id1’, ‘id2’) |
where | 对数据过滤 | where(‘avg_score>3’) | |
groupBy | 对数据分组 | groupBy(‘userid’) | |
orderBy | 对数据排序 | orderBy(‘cnt’, ascending=False) | |
limit | 取前几条数据 | orderBy(‘cnt’, ascending=False).limit(1) | |
agg | 聚合操作,里面可以写多个聚合表达式 | agg(F.round(F.avg(‘score’), 2).alias(‘avg_score’)) | |
show | 打印数据 | init_df.show() | |
printSchema | 打印数据的schema信息,也就是元数据信息 | init_df.printSchema() | |
alias | 对字段取别名 | F.count(‘movieid’).alias(‘cnt’) | |
join | 关联2个DataFrame | etl_df.join(avg_score_dsl_df, ‘movieid’) | |
withColumn | 基于目前的数据产生一个新列 | init_df.withColumn(‘word’,F.explode(F.split(‘value’, ’ '))) | |
dropDuplicates | 删除重复数据 | init_df.dropDuplicates(subset=[“id”,“name”]) | |
dropna | 删除缺失值 | init_df.dropna(thresh=2,subset=[“name”,“age”,“address”]) | |
fillna | 替换缺失值 | init_df.fillna(value={“id”:111,“name”:“未知姓名”,“age”:100,“address”:“北京”}) | |
first | 取DataFrame中的第一行数据 | ||
over | 创建一个窗口列 | ||
函数 | avg | 计算均值 | |
count | 计数 | ||
col | 将字段包装成Column对象,一般用于对新列的包装 | ||
round | 保留小数位 | ||
desc | 降序排序 | ||
row_number | 行号。从1开始编号 | ||
窗口 | partitionBy | 对数据分区 | |
orderBy | 对数据排序 | orderBy(F.desc(‘pv’)) |
1- 什么使用使用select实现聚合,什么时候使用groupBy+agg/select实现聚合?
如果不需要对数据分组,那么可以直接使用select实现聚合;如果有分组操作,需要使用groupBy+agg/select,推荐使用agg
2- first()总结
如果某个DataFrame中只有一行数据,并且不使用join来对比数据,那么一般需要使用first()明确指定和第一行进行比较
3- F.col()总结
对于在计算过程中临时产生的字段,需要使用F.col()封装成Column对象
在Spark SQL中使用窗口函数案例:
需求是找出每个cookie中pv排在前3位的数据,也就是分组取TOPN问题
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window as win
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.config('spark.sql.shuffle.partitions',1)\
.appName('sparksql_win_function')\
.master('local[*]')\
.getOrCreate()
# 2- 数据输入
init_df = spark.read.csv(
path='file:///export/data/cookie.txt',
schema='cookie string,datestr string,pv int',
sep=',',
encoding='UTF-8'
)
init_df.createTempView('win_data')
init_df.show()
init_df.printSchema()
# 3- 数据处理
# SQL
spark.sql("""
select
cookie,datestr,pv
from (
select
cookie,datestr,pv,
row_number() over (partition by cookie order by pv desc) as rn
from win_data
) tmp where rn<=3
""").show()
# DSL
"""
select:注意点,结果中需要看到哪几个字段,就要明确写出来
"""
init_df.select(
"cookie","datestr","pv",
F.row_number().over(win.partitionBy('cookie').orderBy(F.desc('pv'))).alias('rn')
).where('rn<=3').select("cookie","datestr","pv").show()
# 4- 数据输出
# 5- 释放资源
spark.stop()
运行结果截图:
SQL函数,主要分为以下三大类:
在SQL中提供的所有的内置函数,都是属于以上三类中某一类函数
有这么多的内置函数,为啥还需要自定义函数呢?
为了扩充函数功能。在实际使用中,并不能保证所有的操作函数都已经提前的内置好了。很多基于业务处理的功能,其实并没有提供对应的函数,提供的函数更多是以公共功能函数。此时需要进行自定义,来扩充新的功能函数
1- SparkSQL原生的时候,Python只能开发UDF函数
2- SparkSQL借助其他第三方组件,Python可以开发UDF、UDAF函数
Spark SQL原生存在的问题:大量的序列化和反序列
虽然Python支持自定义UDF函数,但是其效率并不是特别的高效。因为在使用的时候,传递一行处理一行,返回一行的方式。这样会带来非常大的序列化的开销的问题,导致原生UDF函数效率不好
早期解决方案: 基于Java/Scala来编写自定义UDF函数,然后基于python调用即可
目前主要的解决方案: 引入Arrow框架,可以基于内存来完成数据传输工作,可以大大的降低了序列化的开销,提供传输的效率,解决原生的问题。同时还可以基于pandas的自定义函数,利用pandas的函数优势完成各种处理操作
自定义函数流程:
第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可
第二步: 将Python函数注册到Spark SQL中
注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)
参数1: 【UDF函数名称】,此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范
参数2: 【自定义的Python函数】,表示将哪个Python的函数注册为Spark SQL的函数
参数3: 【UDF函数的返回值类型】。用于表示当前这个Python的函数返回的类型
udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用
说明: 如果通过方式一来注册函数, 【可以用在SQL和DSL】
注册方式二: udf对象 = F.udf(参数1,参数2)
参数1: Python函数的名称,表示将那个Python的函数注册为Spark SQL的函数
参数2: 返回值的类型。用于表示当前这个Python的函数返回的类型
udf对象: 返回值对象,是一个UDF对象,可以在DSL中使用
说明: 如果通过方式二来注册函数,【仅能用在DSL中】
注册方式三: 语法糖写法 @F.udf(returnType=返回值类型) 放置到对应Python的函数上面
说明: 实际是方式二的扩展。如果通过方式三来注册函数,【仅能用在DSL中】
第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可
Apache Arrow是Apache旗下的一款顶级的项目。是一个跨平台的在内存中以列式存储的数据层,它的设计目标就是作为一个跨平台的数据层,来加快大数据分析项目的运行效率
? Pandas 与 Spark SQL 进行交互的时候,建立在Apache Arrow上,带来低开销 高性能的UDF函数
? Arrow并不会自动使用,在某些情况下,需要配置 以及在代码中需要进行小的更改才可以使用
前提:服务器上已经安装pyspark
然后执行
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark[sql]
如何使用呢? 默认不会自动启动的, 一般建议手动配置
sparkSession.conf.set('spark.sql.execution.arrow.pyspark.enabled',True)
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
# 绑定指定的Python解释器
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
print("基于Arrow完成Pandas DataFrame和Spark DataFrame互转")
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.appName('dataframe')\
.master('local[*]')\
.getOrCreate()
# 手动开启Arrow框架
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
# 2- 数据输入
init_df = spark.createDataFrame(
data=[(1, '张三', '广州'), (2, '李四', '深圳')],
schema='id int,name string,address string'
)
# 3- 数据处理
# sparksql dataframe -> pandas dataframe
pd_df = init_df.toPandas()
print(type(pd_df),pd_df)
new_pd_df = pd_df[pd_df['id']==2]
# pandas dataframe -> sparksql dataframe
spark_df = spark.createDataFrame(data=new_pd_df)
spark_df.show()
spark_df.printSchema()
# 4- 数据输出
# 5- 释放资源
spark.stop()
使用场景:
1- Spark的DataFrame -> Pandas的DataFrame:当大数据处理到后期的时候,可能数据量会越来越少,这样可以考虑使用单机版的Pandas来做后续数据的分析
2- Pandas的DataFrame -> Spark的DataFrame:当数据量达到单机无法高效处理的时候,或者需要和其他大数据框架集成的时候,可以转成Spark中的DataFrame
总结:
Pandas的DataFrame -> Spark的DataFrame: spark.createDataFrame(data=pandas_df)
Spark的DataFrame -> Pandas的DataFrame: init_df.toPandas()
基于Pandas的UDF函数来转换为Spark SQL的UDF函数进行使用。底层是基于Arrow框架来完成数据传输,允许向量化(可以充分利用计算机CPU性能)操作。
Pandas的UDF函数其实本质上就是Python的函数,只不过函数的传入数据类型为Pandas的类型
? 基于Pandas的UDF可以使用自定义UDF函数和自定义UDAF函数
自定义函数流程:
第一步: 在PySpark中创建一个Python的函数,在这个函数中书写自定义的功能逻辑代码即可
第二步: 将Python函数包装成Spark SQL的函数
注册方式一: udf对象 = spark.udf.register(参数1, 参数2)
参数1: UDF函数名称。此名称用于后续在SQL中使用,可以任意取值,但是要符合名称的规范
参数2: Python函数的名称。表示将哪个Python的函数注册为Spark SQL的函数
使用: udf对象只能在DSL中使用。参数1指定的名称只能在SQL中使用
注意: 如果编写的是UDAF函数,那么注册方式一需要配合注册方式三,一起使用
注册方式二: udf对象 = F.pandas_udf(参数1, 参数2)
参数1: 自定义的Python函数。表示将哪个Python的函数注册为Spark SQL的函数
参数2: UDF函数的返回值类型。用于表示当前这个Python的函数返回的类型对应到Spark SQL的数据类型
udf对象: 返回值对象,是一个UDF对象。仅能用在DSL中使用
注册方式三: 语法糖写法 @F.pandas_udf(returnType=返回值Spark SQL的数据类型) 放置到对应Python的函数上面
说明: 实际是方式一的扩展。仅能用在DSL中使用
第三步: 在Spark SQL的 DSL/ SQL 中进行使用即可
自定义Python函数的要求:SeriesToSeries
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F
# 绑定指定的Python解释器
from pyspark.sql.types import IntegerType
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.appName('pandas_udf')\
.master('local[*]')\
.getOrCreate()
# 手动开启Arrow框架
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
# 2- 数据输入
init_df = spark.createDataFrame(
data=[(1,2),(2,3),(3,4)],
schema='num1 int,num2 int'
)
init_df.createTempView('tmp')
# 3- 数据处理
# 3.1- 自定义Python函数
"""
1- num1:pd.Series用来限定输入的参数类型是Pandas中的Series对象
2- -> pd.Series用来限定返回值类型是Pandas中的Series对象
"""
def my_sum(num1:pd.Series, num2:pd.Series) -> pd.Series:
return num1+num2
# 3.2- 注册进SparkSQL。注册方式一
dsl_my_sum = spark.udf.register('sql_my_sum',my_sum)
# 3.3- 使用
# SQL
spark.sql("""
select
num1,num2,
sql_my_sum(num1,num2) as result
from tmp
""").show()
# DSL
init_df.select(
"num1",
"num2",
dsl_my_sum("num1", "num2").alias("result")
).show()
# 注册方式二
dsl2_my_sum = F.pandas_udf(my_sum,IntegerType())
# DSL
init_df.select(
"num1",
"num2",
dsl2_my_sum("num1", "num2").alias("result")
).show()
# 注册方式三
@F.pandas_udf(IntegerType())
def my_sum_candy(num1:pd.Series, num2:pd.Series) -> pd.Series:
return num1+num2
# DSL
init_df.select(
"num1",
"num2",
my_sum_candy("num1", "num2").alias("result")
).show()
# 4- 数据输出
# 5- 释放资源
spark.stop()
运行结果截图:
自定义Python函数的要求:Series To 标量
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
import pandas as pd
import pyspark.sql.functions as F
# 绑定指定的Python解释器
from pyspark.sql.types import IntegerType, FloatType
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
if __name__ == '__main__':
# 1- 创建SparkSession对象
spark = SparkSession.builder\
.appName('pandas_udaf')\
.master('local[*]')\
.getOrCreate()
# 手动开启Arrow框架
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
# 2- 数据输入
init_df = spark.createDataFrame(
data=[(1,2),(2,3),(3,3)],
schema='num1 int,num2 int'
)
init_df.createTempView('tmp')
# 3- 数据处理
# 3.1- 自定义Python函数
"""
UDAF对自定义Python函数的要求:输入数据的类型必须是Pandas中的Series对象,返回值类型必须是Python中的标量数据类型
"""
@F.pandas_udf(returnType=FloatType())
def my_avg(num2_col:pd.Series) -> float:
print(type(num2_col))
print(num2_col)
# 计算平均值
return num2_col.mean()
# 3.2- 注册进SparkSQL。注册方式一
dsl_my_avg = spark.udf.register('sql_my_avg',my_avg)
# 3.3- 使用
# SQL
spark.sql("""
select
sql_my_avg(num2) as result
from tmp
""").show()
# DSL
init_df.select(dsl_my_avg("num2").alias("result")).show()
# 4- 数据输出
# 5- 释放资源
spark.stop()