PySpark Window函数:rangeBetween/rowsBetween中的多个条件的orderBy的使用
在本文中,我们将介绍在PySpark的Window函数中,如何在rangeBetween/rowsBetween中使用多个条件的orderBy来进行数据范围或行范围的计算和排序。
PySpark是Apache Spark的Python API,它提供了在大规模数据处理和分析中使用分布式计算的功能。Window函数是一种在数据集的特定窗口内进行计算和排序的功能。它可以对数据进行聚合、排序、计数和其他分析操作。
在使用Window函数时,有时我们需要根据多个条件来排序数据,并在指定的范围或行范围内进行计算。PySpark提供了rangeBetween和rowsBetween两种方法来定义范围和行范围。并且可以在orderBy中指定多个条件来按多个列排序数据。
我们将通过以下示例来说明如何在PySpark中使用多个条件的orderBy来使用rangeBetween和rowsBetween。
阅读更多:PySpark 教程
1. 创建数据集
在开始使用Window函数之前,我们首先需要创建一个PySpark的DataFrame数据集。假设我们有一个学生分数的数据集,包含学生的ID、科目、分数等信息。我们可以使用以下代码创建数据集:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
# 创建SparkSession
spark = SparkSession.builder.getOrCreate()
# 创建学生分数数据集
data = [
(1, 'Math', 85),
(2, 'Math', 90),
(3, 'Math', 75),
(1, 'Science', 92),
(2, 'Science', 88),
(3, 'Science', 95),
(1, 'English', 78),
(2, 'English', 82),
(3, 'English', 85)
]
df = spark.createDataFrame(data, ['student_id', 'subject', 'score'])
df.show()
输出结果:
+----------+-------+-----+
|student_id|subject|score|
+----------+-------+-----+
| 1| Math| 85|
| 2| Math| 90|
| 3| Math| 75|
| 1|Science| 92|
| 2|Science| 88|
| 3|Science| 95|
| 1|English| 78|
| 2|English| 82|
| 3|English| 85|
+----------+-------+-----+
2. 使用rangeBetween和rowsBetween
首先,我们将使用rangeBetween和rowsBetween方法来为数据定义一个窗口,并使用orderBy来排序学生的分数。假设我们希望在每个学生的特定科目内,按照分数降序排序。
from pyspark.sql.window import Window
# 定义窗口
window = Window.partitionBy('student_id', 'subject').orderBy(col('score').desc())
# 使用rangeBetween定义范围
df_range = df.withColumn('range_rank', rank().over(window.rangeBetween(-1, 1)))
# 使用rowsBetween定义行范围
df_rows = df.withColumn('rows_rank', rank().over(window.rowsBetween(-1, 1)))
df_range.show()
df_rows.show()
输出结果:
+----------+-------+-----+----------+
|student_id|subject|score|range_rank|
+----------+-------+-----+----------+
| 1| Math| 85| 2|
| 1|English| 78| 1|
| 1|Science| 92| 1|
| 2| Math| 90| 2|
| 2|English| 82| 1|
| 2|Science| 88| 1|
| 3| Math| 75| 2|
| 3|English| 85| 1|
| 3|Science| 95| 1|
+----------+-------+-----+----------+
+----------+-------+-----+---------+
|student_id|subject|score|rows_rank|
+----------+-------+-----+---------+
| 1| Math| 85| 2|
| 1|English| 78| 1|
| 1|Science| 92| 2|
| 2| Math| 90| 2|
| 2|English| 82| 1|
| 2|Science| 88| 2|
| 3| Math| 75| 2|
| 3|English| 85| 2|
| 3|Science| 95| 1|
+----------+-------+-----+---------+
在这个示例中,我们定义了一个窗口,使用student_id和subject作为分区,按照分数降序排序。然后我们使用rangeBetween(-1, 1)来定义了范围,表示在每个学生的特定科目内,以当前行为中心,在前一行和后一行范围内进行计算。使用rowsBetween(-1, 1)定义了行范围,表示在每个学生的特定科目内,以当前行为中心,在前一行和后一行范围内进行计算。
在这两个示例中,我们使用了rank()函数来进行排名。range_rank列表示在范围内的排名,rows_rank列表示在行范围内的排名。
3. 使用多个条件的orderBy
接下来,我们将在orderBy中使用多个条件来进行排序。假设我们希望在每个学生的特定科目内,按照分数降序排序,如果分数相同,再按照学生ID升序排序。
from pyspark.sql.functions import desc
# 定义窗口
window = Window.partitionBy('subject').orderBy(desc('score'), 'student_id')
# 使用rangeBetween定义范围
df_range = df.withColumn('range_rank', rank().over(window.rangeBetween(-1, 1)))
# 使用rowsBetween定义行范围
df_rows = df.withColumn('rows_rank', rank().over(window.rowsBetween(-1, 1)))
df_range.show()
df_rows.show()
输出结果: