PySpark 基本概念RDD

PySpark 基本概念RDD

现在我们已经在系统上安装并配置了PySpark,我们可以在Apache Spark上使用Python进行编程。但在这之前,让我们先了解Spark中的一个基本概念 – RDD。

RDD代表 弹性分布式数据集 ,这些元素在多个节点上运行和操作,以在集群上进行并行处理。RDD是不可变的元素,这意味着一旦创建了一个RDD,就无法更改它。RDD也具有容错性,在任何故障的情况下,它们会自动恢复。您可以对这些RDD应用多个操作来完成特定的任务。

要对这些RDD应用操作,有两种方式 –

  • Transformation
  • Action

让我们详细了解这两种方式。

Transformation - 这些是应用于RDD以创建新RDD的操作。筛选、分组和映射是转换的例子。

Action - 这些是应用于RDD的操作,指示Spark执行计算并将结果发送回驱动程序。

要在PySpark中应用任何操作,我们需要先创建一个 PySpark RDD 。以下代码块包含了PySpark RDD类的详细信息 –

class pyspark.RDD (
   jrdd, 
   ctx, 
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

让我们看看如何使用PySpark运行一些基本操作。以下代码在一个Python文件中创建了RDD words,它存储了一组提到的单词。

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)

现在我们将对单词进行一些操作。

count()

返回RDD中的元素数量。

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

命令 − count()的命令为−

$SPARK_HOME/bin/spark-submit count.py

输出 − 以上命令的输出为 −

Number of elements in RDD → 8

collect()

返回RDD中的所有元素。

----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

命令 - collect()的命令是 –

$SPARK_HOME/bin/spark-submit collect.py

输出: 上述命令的输出结果如下:

Elements in RDD -> [
   'scala', 
   'java', 
   'hadoop', 
   'spark', 
   'akka', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

foreach(f)

只返回满足foreach内的函数条件的元素。在下面的示例中,我们在foreach中调用一个打印函数,用于打印RDD中的所有元素。

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f) 
----------------------------------------foreach.py---------------------------------------

命令 - 循环(foreach)的命令是:

$SPARK_HOME/bin/spark-submit foreach.py

输出 − 以上命令的输出为 −

scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

filter(f)

返回一个包含满足筛选函数的元素的新RDD。在以下示例中,我们筛选出包含”spark”的字符串。

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

命令 − filter(f) 的命令为 −

$SPARK_HOME/bin/spark-submit filter.py

输出 - 上述命令的输出如下:

Fitered RDD -> [
   'spark', 
   'spark vs hadoop', 
   'pyspark', 
   'pyspark and spark'
]

map(f, preservesPartitioning = False)

通过对RDD中的每个元素应用函数,返回一个新的RDD。在下面的例子中,我们构建了一个键值对,并将每个字符串映射为值1。

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

命令 − map(f, preservesPartitioning=False) 的命令是 −

$SPARK_HOME/bin/spark-submit map.py

输出 − 上述命令的输出结果是 −

Key value pair -> [
   ('scala', 1), 
   ('java', 1), 
   ('hadoop', 1), 
   ('spark', 1), 
   ('akka', 1), 
   ('spark vs hadoop', 1), 
   ('pyspark', 1), 
   ('pyspark and spark', 1)
]

reduce(f)

执行指定的可交换和可结合的二元操作后,RDD中的元素将被返回。在下面的示例中,我们从operator包中导入add包,并将其应用于’num’上进行简单的加法运算。

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

命令 − reduce(f)的命令是−

$SPARK_HOME/bin/spark-submit reduce.py

输出 − 执行上述命令的输出为 −

Adding all the elements -> 15

join(other, numPartitions = None)

它返回一个带有匹配键和该特定键的所有值的元素对的RDD。在下面的示例中,有两组不同的RDD中的两个元素对。在连接这两个RDD之后,我们得到一个含有匹配键和它们的值的元素的RDD。

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

命令 − join(other, numPartitions = None) 命令的语法为 −

$SPARK_HOME/bin/spark-submit join.py

输出 - 上述命令的输出为 –

Join RDD -> [
   ('spark', (1, 2)),  
   ('hadoop', (4, 5))
]

cache()

使用默认的存储级别 (MEMORY_ONLY) 将此 RDD 进行持久化。您也可以检查 RDD 是否已被缓存。

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext 
sc = SparkContext("local", "Cache app") 
words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "akka",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
) 
words.cache() 
caching = words.persist().is_cached 
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

命令 − cache() 的命令为 −

$SPARK_HOME/bin/spark-submit cache.py

输出 − 上述程序的输出为−

Words got cached -> True

这些是在PySpark RDD上执行的一些最重要的操作。

Python教程

Java教程

Web教程

数据库教程

图形图像教程

大数据教程

开发工具教程

计算机教程