Spark-中的窗口函数
目录
Spark 中的窗口函数
在 Spark 中, 窗口函数(Window Functions) 是一种强大的工具,用于在分组数据上执行复杂的聚合操作,同时保留原始数据的行。窗口函数允许你在数据的某个“窗口”内进行计算,例如计算排名、累积和、移动平均等。
窗口函数的核心思想是定义一个“窗口”(通过
Window
类),然后在这个窗口上应用聚合函数(如
row_number
、
rank
、
sum
、
avg
等)。
1. 窗口函数的基本概念
(1)窗口的定义
窗口函数通过
Window
类定义,主要包括以下两个部分:
- 分区(Partitioning)
:将数据分为多个组(类似于
GROUP BY
)。 - 排序(Ordering) :在每个分区内对数据进行排序。
- 窗口范围(Frame) :定义窗口的大小(如当前行及其前后若干行)。
(2)常见的窗口函数
- 排名函数
:
row_number
、rank
、dense_rank
、percent_rank
。 - 聚合函数
:
sum
、avg
、min
、max
、count
。 - 分析函数
:
lead
、lag
、first_value
、last_value
。
2. 窗口函数的语法
(1)定义窗口
import org.apache.spark.sql.expressions.Window
val windowSpec = Window
.partitionBy("column1", "column2") // 按列分区
.orderBy("column3") // 按列排序
.rowsBetween(start, end) // 定义窗口范围(可选)
partitionBy
:指定分区的列。orderBy
:指定排序的列。rowsBetween
:定义窗口的范围(如Window.unboundedPreceding
表示从分区的第一行开始)。
(2)应用窗口函数
import org.apache.spark.sql.functions._
val resultDF = df.withColumn("new_column", F.row_number().over(windowSpec))
withColumn
:添加新列。row_number().over(windowSpec)
:在定义的窗口上应用row_number
函数。
3. 窗口函数的示例
示例 1:计算每个部门的工资排名
假设有一个 DataFrame,包含用户的姓名、部门和工资:
import org.apache.spark.sql.{SparkSession, functions => F}
import org.apache.spark.sql.expressions.Window
val spark = SparkSession.builder()
.appName("Window Function Example")
.master("local[*]")
.getOrCreate()
// 示例数据
val data = Seq(
("Alice", "HR", 3000),
("Bob", "IT", 4000),
("Charlie", "HR", 3500),
("David", "IT", 4500),
("Eva", "Finance", 5000)
)
// 创建 DataFrame
val df = spark.createDataFrame(data).toDF("name", "department", "salary")
// 定义窗口:按部门分区,按工资降序排序
val windowSpec = Window
.partitionBy("department")
.orderBy(F.desc("salary"))
// 计算每个部门的工资排名
val rankedDF = df.withColumn("rank", F.row_number().over(windowSpec))
// 显示结果
rankedDF.show()
输出:
+-------+----------+------+----+
| name|department|salary|rank|
+-------+----------+------+----+
| Eva| Finance| 5000| 1|
| Alice| HR| 3000| 2|
|Charlie| HR| 3500| 1|
| David| IT| 4500| 1|
| Bob| IT| 4000| 2|
+-------+----------+------+----+
partitionBy("department")
:按部门分区。orderBy(F.desc("salary"))
:按工资降序排序。row_number()
:计算每行的排名。
示例 2:计算每个部门的累积工资
使用
sum
函数计算每个部门的累积工资:
// 定义窗口:按部门分区,按工资升序排序
val windowSpec = Window
.partitionBy("department")
.orderBy("salary")
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
// 计算累积工资
val cumulativeDF = df.withColumn("cumulative_salary", F.sum("salary").over(windowSpec))
// 显示结果
cumulativeDF.show()
输出:
+-------+----------+------+----------------+
| name|department|salary|cumulative_salary|
+-------+----------+------+----------------+
| Eva| Finance| 5000| 5000|
| Alice| HR| 3000| 3000|
|Charlie| HR| 3500| 6500|
| Bob| IT| 4000| 4000|
| David| IT| 4500| 8500|
+-------+----------+------+----------------+
rowsBetween(Window.unboundedPreceding, Window.currentRow)
:定义窗口范围为从分区的第一行到当前行。sum("salary").over(windowSpec)
:计算累积工资。
示例 3:计算每个部门的工资移动平均
使用 avg 函数计算每个部门的工资移动平均(当前行及其前一行):
// 定义窗口:按部门分区,按工资升序排序,窗口范围为当前行及其前一行
val windowSpec = Window
.partitionBy("department")
.orderBy("salary")
.rowsBetween(-1, Window.currentRow)
// 计算移动平均
val movingAvgDF = df.withColumn("moving_avg", F.avg("salary").over(windowSpec))
// 显示结果
movingAvgDF.show()
输出:
+-------+----------+------+----------+
| name|department|salary| moving_avg|
+-------+----------+------+----------+
| Eva| Finance| 5000| 5000.0|
| Alice| HR| 3000| 3000.0|
|Charlie| HR| 3500| 3250.0|
| Bob| IT| 4000| 4000.0|
| David| IT| 4500| 4250.0|
+-------+----------+------+----------+
rowsBetween(-1, Window.currentRow)
:定义窗口范围为当前行及其前一行。avg("salary").over(windowSpec)
:计算移动平均。
4. 常见的窗口函数
(1)排名函数
row_number()
:为每行分配一个唯一的序号(从 1 开始)。rank()
:计算排名,相同值会有相同的排名,后续排名会跳过。dense_rank()
:计算排名,相同值会有相同的排名,后续排名不会跳过。percent_rank()
:计算百分比排名。
(2)聚合函数
sum()
:计算窗口内的总和。avg()
:计算窗口内的平均值。min()
:计算窗口内的最小值。max()
:计算窗口内的最大值。count()
:计算窗口内的行数。
(3)分析函数
lead()
:获取当前行之后的某一行。lag()
:获取当前行之前的某一行。first_value()
:获取窗口内的第一个值。last_value()
:获取窗口内的最后一个值。
5. 窗口范围的定义
窗口范围通过
rowsBetween
或
rangeBetween
定义:
rowsBetween(start, end)
:基于行的偏移量定义窗口范围。Window.unboundedPreceding
:从分区的第一行开始。Window.unboundedFollowing
:到分区的最后一行结束。Window.currentRow
:当前行。
rangeBetween(start, end)
:基于值的范围定义窗口范围(适用于数值或日期类型)。
6. 总结
- 窗口函数 用于在分组数据上执行复杂的聚合操作,同时保留原始数据的行。
- 通过
Window
类定义窗口,包括分区、排序和窗口范围。 - 常见的窗口函数包括排名函数、聚合函数和分析函数。
- 窗口范围可以通过
rowsBetween
或rangeBetween
定义。