如何将 Pandas 转换为 PySpark DataFrame?
Pandas 和 PySpark 是 Python 中两种受欢迎的数据处理工具。虽然 Pandas 很适合在单台机器上处理中小型数据集,但 PySpark 则是为跨多台机器处理大型数据集而设计的。
在处理大型数据集时,将 Pandas DataFrame 转换为 PySpark DataFrame 可能是必要的。在本指南中,我们将使用 Python 的 PySpark 库探索将 Pandas DataFrame 转换为 PySpark DataFrame 的过程。
我们将涵盖安装和设置 PySpark、将 Pandas DataFrame 转换为 PySpark DataFrame 的步骤以及您可以在 PySpark DataFrame 上执行的一些常见操作。
使用 createDataFrame() 方法创建 PySpark DataFrame 的语法如下:
spark.createDataFrame(data, schema)
这里,data 是 DataFrame 建立的值列表,而 schema 则是数据集或列名列表的结构。spark 参数是 PySpark 中的 SparkSession 对象。
使用 spark.createDataFrame() 方法
下面是一个示例代码,用它演示如何创建一个 Pandas DataFrame,然后使用 spark.createDataFrame() 方法将其转换为 PySpark DataFrame。
考虑下面的代码。在这段代码中,我们首先创建一个名为 df_pandas 的示例 pandas DataFrame。使用 SparkSession.builder 方法创建 SparkSession 对象,这允许我们使用 PySpark 开始工作。
接下来,我们使用 spark 对象提供的 createDataFrame() 方法,将 Pandas DataFrame 转换为 PySpark DataFrame。createDataFrame() 方法将 Pandas DataFrame 作为其输入,并返回一个新的 PySpark DataFrame 对象。
最后,我们使用 show() 方法将 PySpark DataFrame 的内容显示到控制台。
import pandas as pd
from pyspark.sql import SparkSession
# Create a sample pandas DataFrame
data = {'Name': ['John', 'Jane', 'Bob'],
'Age': [30, 25, 40],
'Salary': [50000.0, 60000.0, 70000.0]}
df_pandas = pd.DataFrame(data)
# Create a SparkSession object
spark = SparkSession.builder.appName('PandasToSparkDF').getOrCreate()
# Convert pandas DataFrame to PySpark DataFrame
df_spark = spark.createDataFrame(df_pandas)
# Show the PySpark DataFrame
df_spark.show()
在运行上述代码之前,请确保您的系统上安装了 Pandas 和 PySpark 库。
输出
执行时,它将产生以下输出:
+----+---+-------+
|Name|Age| Salary|
+----+---+-------+
|John| 30|50000.0|
|Jane| 25|60000.0|
| Bob| 40|70000.0|
+----+---+-------+
使用 ArrowSpark
下面是一个更新的代码,演示如何使用 Apache Arrow 来改善将 Pandas DataFrame 转换为 PySpark DataFrame 的性能。
请考虑下面的代码。在这段代码中,我们首先创建一个名为 df_pandas 的示例 pandas DataFrame。然后,我们使用 PyArrow 库和 Table.from_pandas() 方法将 pandas DataFrame 转换为 PyArrow Table。
接下来,我们使用 pq.write_table() 方法将 PyArrow Table 以 Parquet 格式写入磁盘。这将在当前目录中创建一个名为 data.parquet 的文件。
最后,我们使用 spark.read.parquet() 方法将 Parquet 文件读入名为 df_spark 的 PySpark DataFrame 中。然后,我们可以使用 show() 方法将 PySpark DataFrame 的内容显示到控制台。 使用 Apache Arrow 和 Parquet 格式在 Pandas 和 PySpark 之间转换数据可以通过减少数据序列化开销和启用有效的列式存储来提高性能。
import pandas as pd
from pyspark.sql import SparkSession
import pyarrow as pa
import pyarrow.parquet as pq
# 创建一个示例 Pandas DataFrame
data = {'姓名': ['约翰', '简', '鲍勃'],
'年龄': [30, 25, 40],
'工资': [50000.0, 60000.0, 70000.0]}
df_pandas = pd.DataFrame(data)
# 将 Pandas DataFrame 转换为 PyArrow Table
table = pa.Table.from_pandas(df_pandas)
# 将 PyArrow Table 写入 Parquet 格式
pq.write_table(table, 'data.parquet')
# 创建一个 SparkSession 对象
spark = SparkSession.builder.appName('PandasToSparkDF').getOrCreate()
# 将 Parquet 文件读入 PySpark DataFrame
df_spark = spark.read.parquet('data.parquet')
# 展示 PySpark DataFrame 的结果
df_spark.show()
要运行上面的代码,首先需要在计算机中安装 pyarrow 库,我们可以使用下面显示的命令。
pip3 install pyarrow
输出
在执行后,它将产生以下输出:
+-----+---+
| 姓名| 年龄|
+-----+---+
| 约翰| 30|
| 简| 25|
|鲍勃 | 40|
+-----+---+
结论
总之,将 Pandas DataFrame 转换为 PySpark DataFrame 可以使用 PyArrow 将 Pandas DataFrame 转换为 PyArrow Table,并将其写入 Parquet 格式。然后可以将结果 Parquet 文件读入 PySpark DataFrame。
PySpark 提供了一个强大的分布式计算框架,可以处理大规模数据处理,是大数据分析的理想选择。通过使用上述方法将 Pandas DataFrame 转换为 PySpark DataFrame,用户可以利用 PySpark 的强大功能和使用 Pandas DataFrame 的便利性。