目录

Spark-中的窗口函数

Spark 中的窗口函数

在 Spark 中, 窗口函数(Window Functions) 是一种强大的工具,用于在分组数据上执行复杂的聚合操作,同时保留原始数据的行。窗口函数允许你在数据的某个“窗口”内进行计算,例如计算排名、累积和、移动平均等。

窗口函数的核心思想是定义一个“窗口”(通过 Window 类),然后在这个窗口上应用聚合函数(如 row_numberranksumavg 等)。


1. 窗口函数的基本概念

(1)窗口的定义

窗口函数通过 Window 类定义,主要包括以下两个部分:

  • 分区(Partitioning) :将数据分为多个组(类似于 GROUP BY )。
  • 排序(Ordering) :在每个分区内对数据进行排序。
  • 窗口范围(Frame) :定义窗口的大小(如当前行及其前后若干行)。
(2)常见的窗口函数
  • 排名函数row_numberrankdense_rankpercent_rank
  • 聚合函数sumavgminmaxcount
  • 分析函数leadlagfirst_valuelast_value

2. 窗口函数的语法

(1)定义窗口
import org.apache.spark.sql.expressions.Window

val windowSpec = Window
  .partitionBy("column1", "column2") // 按列分区
  .orderBy("column3")                // 按列排序
  .rowsBetween(start, end)           // 定义窗口范围可选
  • partitionBy :指定分区的列。
  • orderBy :指定排序的列。
  • rowsBetween :定义窗口的范围(如 Window.unboundedPreceding 表示从分区的第一行开始)。
(2)应用窗口函数
import org.apache.spark.sql.functions._

val resultDF = df.withColumn("new_column", F.row_number().over(windowSpec))
  • withColumn :添加新列。
  • row_number().over(windowSpec) :在定义的窗口上应用 row_number 函数。

3. 窗口函数的示例

示例 1:计算每个部门的工资排名

假设有一个 DataFrame,包含用户的姓名、部门和工资:

import org.apache.spark.sql.{SparkSession, functions => F}
import org.apache.spark.sql.expressions.Window

val spark = SparkSession.builder()
  .appName("Window Function Example")
  .master("local[*]")
  .getOrCreate()

// 示例数据
val data = Seq(
  ("Alice", "HR", 3000),
  ("Bob", "IT", 4000),
  ("Charlie", "HR", 3500),
  ("David", "IT", 4500),
  ("Eva", "Finance", 5000)
)

// 创建 DataFrame
val df = spark.createDataFrame(data).toDF("name", "department", "salary")

// 定义窗口:按部门分区,按工资降序排序
val windowSpec = Window
  .partitionBy("department")
  .orderBy(F.desc("salary"))

// 计算每个部门的工资排名
val rankedDF = df.withColumn("rank", F.row_number().over(windowSpec))

// 显示结果
rankedDF.show()

输出:

+-------+----------+------+----+
|   name|department|salary|rank|
+-------+----------+------+----+
|    Eva|   Finance|  5000|   1|
|  Alice|        HR|  3000|   2|
|Charlie|        HR|  3500|   1|
|   David|        IT|  4500|   1|
|    Bob|        IT|  4000|   2|
+-------+----------+------+----+
  • partitionBy("department") :按部门分区。
  • orderBy(F.desc("salary")) :按工资降序排序。
  • row_number() :计算每行的排名。

示例 2:计算每个部门的累积工资

使用 sum 函数计算每个部门的累积工资:

// 定义窗口:按部门分区,按工资升序排序
val windowSpec = Window
  .partitionBy("department")
  .orderBy("salary")
  .rowsBetween(Window.unboundedPreceding, Window.currentRow)

// 计算累积工资
val cumulativeDF = df.withColumn("cumulative_salary", F.sum("salary").over(windowSpec))

// 显示结果
cumulativeDF.show()

输出:

+-------+----------+------+----------------+
|   name|department|salary|cumulative_salary|
+-------+----------+------+----------------+
|    Eva|   Finance|  5000|            5000|
|  Alice|        HR|  3000|            3000|
|Charlie|        HR|  3500|            6500|
|    Bob|        IT|  4000|            4000|
|   David|        IT|  4500|            8500|
+-------+----------+------+----------------+
  • rowsBetween(Window.unboundedPreceding, Window.currentRow) :定义窗口范围为从分区的第一行到当前行。
  • sum("salary").over(windowSpec) :计算累积工资。

示例 3:计算每个部门的工资移动平均
使用 avg 函数计算每个部门的工资移动平均当前行及其前一行):

// 定义窗口:按部门分区,按工资升序排序,窗口范围为当前行及其前一行
val windowSpec = Window
  .partitionBy("department")
  .orderBy("salary")
  .rowsBetween(-1, Window.currentRow)

// 计算移动平均
val movingAvgDF = df.withColumn("moving_avg", F.avg("salary").over(windowSpec))

// 显示结果
movingAvgDF.show()

输出:

+-------+----------+------+----------+
|   name|department|salary| moving_avg|
+-------+----------+------+----------+
|    Eva|   Finance|  5000|    5000.0|
|  Alice|        HR|  3000|    3000.0|
|Charlie|        HR|  3500|    3250.0|
|    Bob|        IT|  4000|    4000.0|
|   David|        IT|  4500|    4250.0|
+-------+----------+------+----------+
  • rowsBetween(-1, Window.currentRow) :定义窗口范围为当前行及其前一行。
  • avg("salary").over(windowSpec) :计算移动平均。

4. 常见的窗口函数

(1)排名函数
  • row_number() :为每行分配一个唯一的序号(从 1 开始)。
  • rank() :计算排名,相同值会有相同的排名,后续排名会跳过。
  • dense_rank() :计算排名,相同值会有相同的排名,后续排名不会跳过。
  • percent_rank() :计算百分比排名。
(2)聚合函数
  • sum() :计算窗口内的总和。
  • avg() :计算窗口内的平均值。
  • min() :计算窗口内的最小值。
  • max() :计算窗口内的最大值。
  • count() :计算窗口内的行数。
(3)分析函数
  • lead() :获取当前行之后的某一行。
  • lag() :获取当前行之前的某一行。
  • first_value() :获取窗口内的第一个值。
  • last_value() :获取窗口内的最后一个值。

5. 窗口范围的定义

窗口范围通过 rowsBetweenrangeBetween 定义:

  • rowsBetween(start, end) :基于行的偏移量定义窗口范围。

    • Window.unboundedPreceding :从分区的第一行开始。
    • Window.unboundedFollowing :到分区的最后一行结束。
    • Window.currentRow :当前行。
  • rangeBetween(start, end) :基于值的范围定义窗口范围(适用于数值或日期类型)。


6. 总结

  • 窗口函数 用于在分组数据上执行复杂的聚合操作,同时保留原始数据的行。
  • 通过 Window 类定义窗口,包括分区、排序和窗口范围。
  • 常见的窗口函数包括排名函数、聚合函数和分析函数。
  • 窗口范围可以通过 rowsBetweenrangeBetween 定义。