over
函数是一个非常重要的概念,尤其是在使用窗口函数(例如 row_number
, rank
, dense_rank
, lead
, lag
等)时。over
函数允许你对一个数据集进行分组,然后在每个分组内应用窗口函数。窗口函数:
在 PySpark 中,窗口函数是用于执行聚合和其他复杂操作的函数,这些操作涉及到某种形式的分区和排序。
窗口函数不会导致行被折叠成单个输出行,不像标准的聚合函数那样。相反,它们会生成与输入行数相同的输出行数。
窗口规范(Window Specification):
在使用 over
函数时,你需要定义一个窗口规范。这个规范描述了窗口函数的作用范围,包括如何对数据进行分区(partitioning)、如何排序(ordering)以及是否有行或范围限制(frame specification)
在 PySpark 中,使用 over
函数通常涉及以下步骤:
定义窗口规范:
使用 Window
类来定义分区和排序规则。
例如,Window.partitionBy("column1").orderBy("column2")
表示按 column1
进行分区,并在每个分区内按 column2
排序。
应用窗口函数:
F.row_number().over(windowSpec)
会在每个按 windowSpec
定义的窗口内对行进行编号。
F.row_number()
是窗口函数,而 .over(windowSpec)
则指定了这个函数应该如何在数据上操作。假设有一个如下的 DataFrame:
from pyspark.sql import Row
data = [
Row(id=1, Group='A',Value=10),
Row(id=2, Group='A',Value=20),
Row(id=3, Group='B',Value=30),
Row(id=4, Group='B',Value=40)
]
df = spark.createDataFrame(data)
df.show()
?
现在,如果你想在每个 Group
内部对 Value
进行排名,你可以使用 over
函数与 rank()
窗口函数结合来实现这一点:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
windowSpec = Window.partitionBy("Group").orderBy("Value")
'''
partitionBy("Group") 表示数据将根据 Group 列的值进行分区。在每个分区内,数据行将独立于其他分区处理。
orderBy("Value") 指定了在每个分区内,数据将根据 Value 列的值进行排序。
注:此时windowSpec 本身并不知道它将被应用于哪个 DataFrame。它只是定义了一个窗口规范
'''
windowSpec
本身并不知道它将被应用于哪个 DataFrame。它只是定义了一个窗口规范。当在 df.withColumn
中使用 .over(windowSpec)
时,就指定了在 df
上应用这个窗口规范。
df.withColumn("rank", F.rank().over(windowSpec)).show()
'''
df.withColumn———— 创建了 df 的一个新版本,其中包含了一个新列 "rank"
F.rank().over(windowSpec) ————计算了一个窗口函数 rank,该函数在 windowSpec 定义的每个分区内为每行分配一个排名
'''