PySpark 通过字典映射创建新列

PySpark 通过字典映射创建新列

在本文中,我们将介绍如何在 PySpark 中使用字典映射来创建新列。PySpark 是一个用于大数据处理的强大工具,它提供了丰富的功能和灵活性,可以轻松处理和分析海量数据。通过利用字典映射,我们可以根据某个列的值来创建新的列,并将其映射到相应的值上。

首先,让我们从创建一个示例数据集开始。假设我们有一个包含学生姓名和他们所属年级的数据集。我们将使用 PySpark 的 SparkSession 创建一个 DataFrame 并为其添加两列:

from pyspark.sql import SparkSession

# 创建 SparkSession
spark = SparkSession.builder.getOrCreate()

# 创建示例数据
data = [("Alice", "一年级"),
        ("Bob", "二年级"),
        ("Charlie", "三年级"),
        ("Dave", "四年级"),
        ("Eve", "五年级")]

# 创建 DataFrame
df = spark.createDataFrame(data, ["姓名", "年级"])
df.show()

上述代码将创建一个名为 df 的 DataFrame,并显示其内容:

+-------+----+
|   姓名| 年级|
+-------+----+
|  Alice|一年级|
|    Bob|二年级|
|Charlie|三年级|
|   Dave|四年级|
|    Eve|五年级|
+-------+----+

接下来,我们将为 DataFrame 添加一个新的列 年级对应积分,用于存储每个学生年级对应的积分。我们可以使用字典来创建一个映射,将每个年级映射到相应的积分值。然后,我们将使用 Spark 的 withColumn 方法和 PySpark 的内置函数 whenotherwise 来创建新列。

from pyspark.sql.functions import when

# 创建年级和积分的映射字典
grade_mapping = {"一年级": 10,
                 "二年级": 20,
                 "三年级": 30,
                 "四年级": 40,
                 "五年级": 50}

# 使用字典映射创建新列
df = df.withColumn("年级对应积分", 
                   when(df["年级"].isin(list(grade_mapping.keys())), 
                        grade_mapping[df["年级"]]).otherwise(None))

# 显示 DataFrame
df.show()

上述代码中的 when 函数用于检查每个学生的年级是否存在于映射字典的键中。如果是,则返回对应的积分值;否则返回 None。运行上述代码后,我们将得到如下输出:

+-------+----+---------+
|   姓名| 年级|年级对应积分|
+-------+----+---------+
|  Alice|一年级|       10|
|    Bob|二年级|       20|
|Charlie|三年级|       30|
|   Dave|四年级|       40|
|    Eve|五年级|       50|
+-------+----+---------+

从上述结果可以看出,新的列 年级对应积分 被成功地添加到了 DataFrame 中,并根据字典映射为每个学生的年级赋予了对应的积分值。

除了使用 whenotherwise 函数,我们还可以使用 PySpark 的 udf 函数来实现自定义的映射逻辑。下面是另一种实现方式的示例代码:

from pyspark.sql.functions import udf

# 创建自定义函数
@udf
def map_grade(grade):
    if grade in grade_mapping:
        return grade_mapping[grade]
    else:
        return None

# 使用自定义函数创建新列
df = df.withColumn("年级对应积分", map_grade(df["年级"]))

# 显示 DataFrame
df.show()

上述代码中的 udf 函数用于将自定义函数 map_grade 转换为 Spark 可识别的 UDF(User Defined Function)。然后,我们使用 withColumn 方法将新的列 年级对应积分 添加到 DataFrame 中。

阅读更多:PySpark 教程

总结

在本文中,我们介绍了在 PySpark 中使用字典映射来创建新的列的方法。我们通过示例代码演示了如何使用内置函数 whenotherwise,以及通过自定义函数和 udf 实现相同的功能。通过利用字典映射,我们可以轻松地根据列的值来创建新的列,并将其映射到相应的值上。这种方法可以帮助我们在数据处理和分析中更加灵活和高效地进行列的转换和映射。

Python教程

Java教程

Web教程

数据库教程

图形图像教程

大数据教程

开发工具教程

计算机教程