Create RDD in Apache Spark using Pyspark

Gitesh Dhore 02 Sep, 2022 • 5 min read

 This article was published as a part of the Data Science Blogathon.

Introduction

In this tutorial, we will learn about the building blocks of PySpark called Resilient Distributed Dataset, which is popularly known as PySpark RDD. Before we do so, let’s understand its basic concept.

rdd

 

What are RDDs?

RDD stands for Resilient Distributed Dataset, which are elements that run and work on multiple nodes to perform parallel processing in a cluster. RDDs are immutable, meaning you cannot change them once you create an RDD. These are fault-tolerant, so they automatically recover in case of failure. You can apply multiple operations on these RDDs to achieve a particular task.
There are two ways to apply operations −
• Transformation
• Events
Let’s see what these methods are:
Transformation − These operations are applied to create a new RDD. Filter, group, and map are examples of transformations.
Events− These are operations that are applied to an RDD that instruct Spark to perform a calculation and send the result back to the controller.
To use any operation in PySpark, we need to create a PySpark RDD first. The following code block details the PySpark RDD − class
class pyspark.RDD (
   Judd,
   ctx
   jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)
Let’s see how to run a few basic operations using PySpark. The following code in a Python file creates a words RDD that stores a set of said words.
words = sc.parallelize (
   ["scale",
   "Java",
   "Hadoop",
   "spark",
   "Akka",
   "spark vs Hadoop",
   "pyspark",
   "pyspark and spark"]
)
Now let’s do some word operations.

count()

An array of elements in an RDD will be returned.
---------------------------------------count.py------- --------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
   ["scale",
   "Java",
   "Hadoop",
   "spark",
   "Akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print "elements are -> %i" % (counts)
---------------------------------------count.py------- --------------------------------
Command − The command for count() is −
$SPARK_HOME/bin/spark-submit count.py
Output − output for the above code −
Number of elements in RDD → 8

collect()

All elements are returned.
---------------------------------------collect.py------- --------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
   ["scale",
   "Java",
   "Hadoop",
   "spark",
   "Akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
coll = words.collect()
print "RDD elements -> %s" % (collection)
---------------------------------------collect.py------- --------------------------------
Command − The command for collect() is −
$SPARK_HOME/bin/spark-submit collect.py
Output − output for the above code −
Elements in RDD -> [
   'scale',
   'Java',
   'hadoop',
   'spark',
   'Akka',
   'spark vs Hadoop,
   'pyspark',
   'pyspark and spark'
]

foreach(f)

Returns only those elements that satisfy the condition of the function inside the foreach. In the following example, we call the print in foreach function, which prints all the elements in the RDD.
----------------------------------------foreach.py------- --------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
   ["scale",
   "Java",
   "Hadoop",
   "spark",
   "Akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py------- --------------------------------
Command − This is a command for foreach(f)  −
$SPARK_HOME/bin/spark-submit foreach.py
Output − output for the above code −
scala
Java
hadoop
spark
Akka
spark vs hadoop
pyspark
pyspark and spark

filter(f)

A new RDD is returned containing the elements that satisfy the function inside the filter. In the following example, we filter out strings containing “spark.”
----------------------------------------filter.py------- --------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
   ["scale",
   "Java",
   "Hadoop",
   "spark",
   "Akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
word_filter = words.filter(lambda x: 'spark' in x)
filtered = word_filter.collect()
print "Filtered RDD -> %s" % (filtered)
----------------------------------------filter.py------- ---------------------------------
Command − filter(f) command is −
$SPARK_HOME/bin/spark-submit filter.py
Output − output for the above code −
Filtered RDD -> [
   'spark',
   'spark vs Hadoop,
   'pyspark',
   'pyspark and spark'
]

map(f, preservePartitioning = False)

A new RDD is returned by applying the function to each element in it. In the following example, we create a key-value pair and map each string to the value 1.
----------------------------------------map.py------- --------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
   ["scale",
   "Java",
   "Hadoop",
   "spark",
   "Akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
word_map = words.map(lambda x: (x, 1))
mapping = word_map.collect()
print "Key-value pair -> %s" % (mapping)
----------------------------------------map.py------- --------------------------------
Command − The command for map(f, protectsPartitioning=False) is −
$SPARK_HOME/bin/spark-submit map.py
Output − output of the above code −
Key-value pair -> [
   ('scale', 1),
   ('java', 1),
   ('Hadoop, 1),
   ('spark', 1),
   ('akka', 1),
   ('spark vs hHadoop 1),
   ('pyspark', 1),
   ('pyspark and spark', 1)
]

reduce (f)

After performing the specified associative and commutative binary operation, the element in the RDD is returned. In the following example, we import the add pack from the operator and apply it to “num” to perform a simple addition operation.
-----------------------------------------reduce.py------- --------------------------------
from pyspark import SparkContext
from the import add operator
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
add = nums.reduce(add)
print "Adding elements -> %i" % (add)
-----------------------------------------reduce.py------- --------------------------------
Command − The command calling ce(f) is −
$SPARK_HOME/bin/spark-submit reduction.py
Output − output of the above code −
Adding all elements -> 15

join(other, numPartitions = None)

Returns an RDD with a pair of elements with the corresponding keys and all values ​​for that particular key. The following example shows pairs of elements in two different RDDs. After joining these two RDDs, we get an RDD with elements that have the corresponding keys and their values.
----------------------------------------join.py------- --------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("Hadoop", 5)])
joined = x.join(y)
final = join.collect()
print "connect RDD -> %s" % (final)
----------------------------------------join.py------- --------------------------------
Command − command for join(other, numPartitions = None) is −
$SPARK_HOME/bin/spark-submit join.py
Output − output for the above code −
Connect to RDD -> [
   ('spark', (1, 2)),
   ('Hadoop, (4, 5))
]

cache()

Keep this RDD with the default storage level (MEMORY_ONLY). You can also check whether it is cached or not.
---------------------------------------cache.py------- --------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
   ["scale",
   "Java",
   "Hadoop",
   "spark",
   "Akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "cached words are > %s" % (caching)
---------------------------------------cache.py------- --------------------------------
Command − The command for cache() is −
$SPARK_HOME/bin/spark-submit cache.py
Output − output for the above code −
Words have been cached -> True
These were some of the most important operations performed.

Conclusion

RDD stands for Resilient Distributed Dataset, which are elements that run and work on multiple nodes to perform parallel processing in a cluster. RDDs are immutable, meaning that you cannot change them once you create an RDD.

  • Transformation − These operations are applied to an RDD to create a new one. Filter, group, and map are examples of transformations.
  • Events− These are operations that are applied to an RDD that instruct Spark to perform a calculation and send the result back to the controller.
  • It returns only those elements that satisfy the condition of the function inside the foreach. In the following example, we call the print in foreach function, which prints all the elements in the RDD.

The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.

Gitesh Dhore 02 Sep 2022

Frequently Asked Questions

Lorem ipsum dolor sit amet, consectetur adipiscing elit,

Responses From Readers

Clear