目录

PySpark实现导出两个包含多个Parquet数据文件的S3目录里的对应值的差异值分析

PySpark实现导出两个包含多个Parquet数据文件的S3目录里的对应值的差异值分析

编写PySpark代码实现从一个包含多个Parquet数据文件的Amazon S3目录的dataframe数据里取两个维度字段,一个度量字段的数据,根据这两个维度字段的数据分组统计,计算度量字段的数据的分组总计值,得到一个包含两个维度字段和度量字段的分组总计值字段的dataframe,再从另一个包含多个Parquet数据文件的S3目录的dataframe数据里取两个维度字段,一个度量字段的数据组成一个dataframe,将这两个字段的对应字段full outer join起来,显示所有字段和两个度量字段值相减的值,以及相减的值除以第一个dataframe的度量值的百分比数值。

PySpark代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, coalesce, lit

# 初始化SparkSession
spark = SparkSession.builder \
    .appName("MetricComparison") \
    .getOrCreate()

# 读取第一个S3目录数据并进行聚合
df1 = spark.read.parquet("s3a://your-bucket/first-data-path/")
df1_agg = df1.select("dim_col1", "dim_col2", "metric_col") \
    .groupBy("dim_col1", "dim_col2") \
    .agg(sum("metric_col").alias("total_metric1"))

# 读取第二个S3目录数据并进行聚合
df2 = spark.read.parquet("s3a://your-bucket/second-data-path/")
df2_agg = df2.select("dim_col1", "dim_col2", "metric_col") \
    .groupBy("dim_col1", "dim_col2") \
    .agg(sum("metric_col").alias("total_metric2"))

# 执行Full Outer Join
joined_df = df1_agg.join(
    df2_agg,
    ["dim_col1", "dim_col2"],
    "full_outer"
)

# 计算度量差值及百分比
result_df = joined_df.withColumn(
    "metric_diff",
    coalesce(col("total_metric1"), lit(0)) - coalesce(col("total_metric2"), lit(0))
).withColumn(
    "percentage_diff",
    (col("metric_diff") / coalesce(col("total_metric1"), lit(1))) * 100
)

# 显示结果(可根据需要调整显示行数)
result_df.show(truncate=False)

关键步骤说明:

  1. 数据读取与聚合

    • 从两个S3路径分别读取Parquet数据
    • 使用 groupBy 对两个维度字段分组
    • 使用 sum 聚合函数计算度量字段总计
  2. 表连接

    • 使用 full_outer 连接方式合并两个数据集
    • 保留两个表中所有的维度组合
  3. 差值计算

    • 使用 coalesce 处理空值(将null转换为0进行运算)
    • 计算绝对值差值和百分比差值
    • 添加防止除零机制(将分母null转换为1)

注意事项:

  1. 需要将代码中的以下占位符替换为实际值:

    • s3a://your-bucket/first-data-path/
    • s3a://your-bucket/second-data-path/
    • dim_col1 , dim_col2 (维度字段名)
    • metric_col (度量字段名)
  2. 如果数据量较大:

    • 建议添加缓存策略 .cache()
    • 调整shuffle分区数: spark.conf.set("spark.sql.shuffle.partitions", 200)
  3. 空值处理策略可根据业务需求调整:

    • 当前代码将null视为0进行计算
    • 如需保留null可使用 .withColumn("metric_diff", col("total_metric1") - col("total_metric2"))
  4. 结果输出方式:

    • 当前使用 .show() 直接打印
    • 可改用 .write 方法写入存储系统

最终结果示例:

+---------+---------+-------------+-------------+-----------+------------------+
|dim_col1 |dim_col2 |total_metric1|total_metric2|metric_diff|percentage_diff   |
+---------+---------+-------------+-------------+-----------+------------------+
|A        |X        |1500         |1200         |300        |20.0              |
|B        |Y        |800          |null         |800        |100.0             |
|C        |Z        |null         |500          |-500       |null              |
+---------+---------+-------------+-------------+-----------+------------------+