Create RDD in Apache Spark using Pyspark
This article was published as a part of the Data Science Blogathon.
Introduction
In this tutorial, we will learn about the building blocks of PySpark called Resilient Distributed Dataset, which is popularly known as PySpark RDD. Before we do so, let’s understand its basic concept.
.png)
What are RDDs?
class pyspark.RDD ( Judd, ctx jrdd_deserializer = AutoBatchedSerializer(PickleSerializer()) )
words = sc.parallelize ( ["scale", "Java", "Hadoop", "spark", "Akka", "spark vs Hadoop", "pyspark", "pyspark and spark"] )
count()
---------------------------------------count.py------- -------------------------------- from pyspark import SparkContext sc = SparkContext("local", "count app") words = sc.parallelize ( ["scale", "Java", "Hadoop", "spark", "Akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) counts = words.count() print "elements are -> %i" % (counts) ---------------------------------------count.py------- --------------------------------
$SPARK_HOME/bin/spark-submit count.py
Number of elements in RDD → 8
collect()
---------------------------------------collect.py------- -------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Collect app") words = sc.parallelize ( ["scale", "Java", "Hadoop", "spark", "Akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) coll = words.collect() print "RDD elements -> %s" % (collection) ---------------------------------------collect.py------- --------------------------------
$SPARK_HOME/bin/spark-submit collect.py
Elements in RDD -> [ 'scale', 'Java', 'hadoop', 'spark', 'Akka', 'spark vs Hadoop, 'pyspark', 'pyspark and spark' ]
foreach(f)
----------------------------------------foreach.py------- -------------------------------- from pyspark import SparkContext sc = SparkContext("local", "ForEach app") words = sc.parallelize ( ["scale", "Java", "Hadoop", "spark", "Akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) def f(x): print(x) fore = words.foreach(f) ----------------------------------------foreach.py------- --------------------------------
$SPARK_HOME/bin/spark-submit foreach.py
scala Java hadoop spark Akka spark vs hadoop pyspark pyspark and spark
filter(f)
----------------------------------------filter.py------- -------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Filter app") words = sc.parallelize ( ["scale", "Java", "Hadoop", "spark", "Akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) word_filter = words.filter(lambda x: 'spark' in x) filtered = word_filter.collect() print "Filtered RDD -> %s" % (filtered) ----------------------------------------filter.py------- ---------------------------------
$SPARK_HOME/bin/spark-submit filter.py
Filtered RDD -> [ 'spark', 'spark vs Hadoop, 'pyspark', 'pyspark and spark' ]
map(f, preservePartitioning = False)
----------------------------------------map.py------- -------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Map app") words = sc.parallelize ( ["scale", "Java", "Hadoop", "spark", "Akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) word_map = words.map(lambda x: (x, 1)) mapping = word_map.collect() print "Key-value pair -> %s" % (mapping) ----------------------------------------map.py------- --------------------------------
$SPARK_HOME/bin/spark-submit map.py
Key-value pair -> [ ('scale', 1), ('java', 1), ('Hadoop, 1), ('spark', 1), ('akka', 1), ('spark vs hHadoop 1), ('pyspark', 1), ('pyspark and spark', 1) ]
reduce (f)
-----------------------------------------reduce.py------- -------------------------------- from pyspark import SparkContext from the import add operator sc = SparkContext("local", "Reduce app") nums = sc.parallelize([1, 2, 3, 4, 5]) add = nums.reduce(add) print "Adding elements -> %i" % (add) -----------------------------------------reduce.py------- --------------------------------
$SPARK_HOME/bin/spark-submit reduction.py
Adding all elements -> 15
join(other, numPartitions = None)
----------------------------------------join.py------- -------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Join app") x = sc.parallelize([("spark", 1), ("hadoop", 4)]) y = sc.parallelize([("spark", 2), ("Hadoop", 5)]) joined = x.join(y) final = join.collect() print "connect RDD -> %s" % (final) ----------------------------------------join.py------- --------------------------------
$SPARK_HOME/bin/spark-submit join.py
Connect to RDD -> [ ('spark', (1, 2)), ('Hadoop, (4, 5)) ]
cache()
---------------------------------------cache.py------- -------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Cache app") words = sc.parallelize ( ["scale", "Java", "Hadoop", "spark", "Akka", "spark vs hadoop", "pyspark", "pyspark and spark"] ) words.cache() caching = words.persist().is_cached print "cached words are > %s" % (caching) ---------------------------------------cache.py------- --------------------------------
$SPARK_HOME/bin/spark-submit cache.py
Words have been cached -> True
Conclusion
RDD stands for Resilient Distributed Dataset, which are elements that run and work on multiple nodes to perform parallel processing in a cluster. RDDs are immutable, meaning that you cannot change them once you create an RDD.
- Transformation − These operations are applied to an RDD to create a new one. Filter, group, and map are examples of transformations.
- Events− These are operations that are applied to an RDD that instruct Spark to perform a calculation and send the result back to the controller.
- It returns only those elements that satisfy the condition of the function inside the foreach. In the following example, we call the print in foreach function, which prints all the elements in the RDD.
The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.