PySpark RDD
现在我们已经在我们的系统上安装和配置了PySpark,我们可以在Apache Spark上用Python编程。然而,在这样做之前,让我们了解Spark中的一个基本概念 – RDD。
RDD是 Resilient Distributed Dataset 的缩写,这些元素在多个节点上运行和操作,在集群上做并行处理。RDD是不可改变的元素,这意味着一旦你创建了一个RDD,你就不能改变它。RDDs也是容错的,因此在发生任何故障时,它们会自动恢复。你可以在这些RDDs上应用多种操作,以实现某个任务。
要在这些RDD上应用操作,有两种方法
- Transformation
- Action
让我们详细了解这两种方式。
Transformation --这些是应用于RDD的操作,以创建一个新的RDD。过滤器、groupBy和地图是转换的例子。
Action - 这些是应用在RDD上的操作,它指示Spark执行计算并将结果送回给驱动。
为了在PySpark中应用任何操作,我们需要首先创建一个 PySpark RDD 。 下面的代码块有PySpark RDD类的细节 –
class pyspark.RDD (
jrdd,
ctx,
jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)
让我们看看如何使用PySpark运行一些基本操作。下面的代码在Python文件中创建了RDD词,它存储了一组提到的词。
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
我们现在将对单词进行一些操作。
count()
返回RDD中元素的数量。
----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------
命令 - count()的命令是 —
$SPARK_HOME/bin/spark-submit count.py
输出 - 上述命令的输出是 —
Number of elements in RDD → 8
collect()
返回RDD中的所有元素。
----------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------
命令 - collect()的命令是 —
$SPARK_HOME/bin/spark-submit collect.py
输出 - 上述命令的输出是 —
Elements in RDD -> [
'scala',
'java',
'hadoop',
'spark',
'akka',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
]
foreach(f)
只返回那些符合foreach里面的函数条件的元素。在下面的例子中,我们在foreach中调用了一个打印函数,它打印了RDD中的所有元素。
----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------
命令 - foreach(f)的命令是 —
$SPARK_HOME/bin/spark-submit foreach.py
输出 - 上述命令的输出是 —
scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark
filter(f)
一个新的RDD被返回,其中包含满足过滤器内部函数的元素。在下面的例子中,我们过滤掉含有 “spark “的字符串。
----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------
命令 - filter(f)的命令是 —
$SPARK_HOME/bin/spark-submit filter.py
输出 - 上述命令的输出是 —
Fitered RDD -> [
'spark',
'spark vs hadoop',
'pyspark',
'pyspark and spark'
]
map(f, preservesPartitioning = False)
通过对RDD中的每个元素应用一个函数,返回一个新的RDD。在下面的例子中,我们形成了一个键值对,并将每个字符串映射为1的值。
----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------
命令 - map(f, preservesPartitioning=False)的命令是 —
$SPARK_HOME/bin/spark-submit map.py
输出 - 上述命令的输出是 —
Key value pair -> [
('scala', 1),
('java', 1),
('hadoop', 1),
('spark', 1),
('akka', 1),
('spark vs hadoop', 1),
('pyspark', 1),
('pyspark and spark', 1)
]
reduce(f)
在执行指定的换元和关联二元操作后,RDD中的元素被返回。在下面的例子中,我们从运算器中导入add包,并在’num’上应用它来执行一个简单的加法运算。
----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------
命令 - reduce(f)的命令是 —
$SPARK_HOME/bin/spark-submit reduce.py
输出 - 上述命令的输出是 —
Adding all the elements -> 15
join(other, numPartitions = None)
它返回具有一对元素的RDD,这些元素具有匹配的键和该特定键的所有值。在下面的例子中,两个不同的RDD中有两对元素。连接这两个RDD后,我们得到一个RDD,其中的元素具有匹配的键和它们的值。
----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------
命令 - join(other, numPartitions = None)的命令是 —
$SPARK_HOME/bin/spark-submit join.py
输出 - 上述命令的输出是 —
Join RDD -> [
('spark', (1, 2)),
('hadoop', (4, 5))
]
cache()
以默认的存储级别(MEMORY_ONLY)持久化这个RDD。你也可以检查该RDD是否被缓存了。
----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
["scala",
"java",
"hadoop",
"spark",
"akka",
"spark vs hadoop",
"pyspark",
"pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------
命令 - cache()的命令是 —
$SPARK_HOME/bin/spark-submit cache.py
输出 - 上述程序的输出是 —
Words got cached -> True
这些是在PySpark RDD上进行的一些最重要的操作。