目录

spark实验

spark实验

“data01.txt”数据集包含了某大学计算机系的成绩,数据格式如下所示: Tom,DataBase,80 Tom,Algorithm,50 Tom,DataStructure,60 Jim,DataBase,90 Jim,Algorithm,60 Jim,DataStructure,80 ……

请根据给定的实验数据,在pyspark中通过编程来计算以下内容: (1) 该系总共有多少学生; (2) 该系共开设了多少门课程; (3) Tom同学的总成绩平均分是多少; (4) 求每名同学的选修的课程门数; (5) 该系DataBase课程共有多少人选修; (6) 各门课程的平均分是多少; 2.编写独立应用程序实现数据去重 对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。 输入文件A的样例如下: 20170101 x 20170102 y 20170103 x 20170104 y 20170105 z 20170106 z 输入文件B的样例如下: 20170101 y 20170102 y 20170103 x 20170104 z 20170105 y 根据输入的文件A和B合并得到的输出文件C的样例如下: 20170101 x 20170101 y 20170102 y 20170103 x 20170104 y 20170104 z 20170105 y 20170105 z 20170106 z Plain Text

3.编写独立应用程序实现求平均值问题
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。
Algorithm成绩:
小明 92
小红 87
小新 82
小丽 90
Database成绩:
小明 95
小红 81
小新 89
小丽 85
Python成绩:
小明 82
小红 83
小新 94
小丽 91
平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)
下载spark 可以使用下载pip install pyspark
官网下载spark
实际代码
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col, countDistinct

初始化 SparkSession

spark = SparkSession.builder.appName(“GradeAnalysis”).getOrCreate()

读取数据

data_path = “data01.txt” lines = spark.read.text(data_path).rdd

分割数据并转换为 DataFrame

data = lines.map(lambda line: line.value.split(’,’))
.map(lambda x: (x[0], x[1], int(x[2])))
.toDF([“Student”, “Course”, “Score”])

1. 计算该系总共有多少学生

students_count = data.select(“Student”).distinct().count() print(f"学生总数: {students_count}")

2. 计算该系共开设了多少门课程

courses_count = data.select(“Course”).distinct().count() print(f"课程总数: {courses_count}")

3. 计算Tom同学的总成绩平均分

tom_avg_score = data.filter(data.Student == “Tom”).groupBy().agg(avg(“Score”).alias(“AvgScore”)).collect()[0][“AvgScore”] print(f"Tom同学的总成绩平均分: {tom_avg_score}")

4. 计算每名同学的选修的课程门数

students_courses_count = data.groupBy(“Student”).agg(countDistinct(“Course”).alias(“CourseCount”)).show() print(“每名同学的选修的课程门数:”) students_courses_count.show()

5. 计算该系DataBase课程共有多少人选修

db_students_count = data.filter(data.Course == “DataBase”).select(“Student”).distinct().count() print(f"DataBase课程选修人数: {db_students_count}")

6. 计算各门课程的平均分

courses_avg_score = data.groupBy(“Course”).agg(avg(“Score”).alias(“AvgScore”)).show() print(“各门课程的平均分:”) courses_avg_score.show()

停止 SparkSession

spark.stop() https://i-blog.csdnimg.cn/direct/67cdcb610b3e44ae9ca1fd18872a5bb4.png 二. from pyspark.sql import SparkSession from pyspark.sql.functions import col def merge_and_deduplicate(fileA, fileB, outputFile):

创建SparkSession

spark = SparkSession.builder
.appName(“MergeAndDeduplicate”)
.getOrCreate()

读取文件A和B,假设它们是以空格分隔的文本文件,并且没有列头

dfA = spark.read.csv(fileA, header=False, inferSchema=True, sep=" “) dfB = spark.read.csv(fileB, header=False, inferSchema=True, sep=” “)

为DataFrame设置列名(这里我们假设只有两列,分别是日期和值)

dfA = dfA.withColumnRenamed("_c0”, “date”).withColumnRenamed("_c1", “value”) dfB = dfB.withColumnRenamed("_c0", “date”).withColumnRenamed("_c1", “value”)

合并两个DataFrame

dfCombined = dfA.unionByName(dfB)

去重(基于两列的组合)

dfDeduplicated = dfCombined.dropDuplicates()

将结果写入新的文件C

dfDeduplicated.write.csv(outputFile, header=True, sep=" “)

停止SparkSession

spark.stop()

调用函数,传入文件路径和输出路径

merge_and_deduplicate(“A.txt”, “B.txt”, “C.txt”) https://i-blog.csdnimg.cn/direct/16db902f617c4df98c13e4f35019960c.png https://i-blog.csdnimg.cn/direct/8bb061e9659944bca90e5d9bed6205cb.png https://i-blog.csdnimg.cn/direct/51dad4958d0647d796a634c82d77795c.png 三. from pyspark.sql import SparkSession from pyspark.sql.functions import avg from pyspark.sql.group import GroupedData def calculate_average_scores(input_files, output_file):

创建SparkSession

spark = SparkSession.builder
.appName(“AverageScoresCalculator”)
.getOrCreate()

读取所有输入文件,假设它们是以空格分隔的文本文件,并且第一列是学生名字,第二列是成绩

由于我们不知道有多少个文件,所以使用reduce来逐个union它们

from functools import reduce dfs = [spark.read.csv(file, header=False, inferSchema=True, sep=” “) for file in input_files] df_combined = reduce(lambda df1, df2: df1.unionByName(df2), dfs)

为DataFrame设置列名

df_combined = df_combined.withColumnRenamed("_c0”, “student_name”).withColumnRenamed("_c1", “score”)

将成绩列转换为数值类型(如果inferSchema没有正确推断的话)

df_combined = df_combined.withColumn(“score”, df_combined[“score”].cast(“double”))

按学生名字分组,并计算平均成绩

grouped = df_combined.groupBy(“student_name”) avg_scores = grouped.agg(avg(“score”).alias(“average_score”))

将结果写入新的文件

avg_scores.write.csv(output_file, header=True, sep=" “)

停止SparkSession

spark.stop()

输入文件列表(这里假设有三个文件,分别代表不同学科的成绩)

input_files = [“1.txt”, “2.txt”, “3.txt”]

输出文件路径

output_file = “average_scores.txt”

调用函数,传入输入文件列表和输出文件路径

calculate_average_scores(input_files, output_file) https://i-blog.csdnimg.cn/direct/258c9b29250041428ddef5f493a9ac2b.png https://i-blog.csdnimg.cn/direct/cd7363690e0f4564856960d31c18a1fa.png https://i-blog.csdnimg.cn/direct/3876c1505ec64f07a83706e2a5fe74db.png