Abhishek Jaiswal — April 12, 2022
Beginner Data Engineering Python Spark SQL

This article was published as a part of the Data Science Blogathon.

Introduction

In this article, we are going to cover Spark SQL in Python. In the last article, we have already introduced Spark and its work and its role in Big data. If you haven’t checked it yet, please go to this link.

Spark is an in-memory distributed big data processing tool that has become the standard data processing tool in the field of Big data.

Spark is 100 times faster than traditional Hadoop’s Map-reduce due to in-memory processing in Spark. Spark is originally written in SCALA, but it also provides application development in Python and JAVA APIs.

Spark SQL is an inbuilt Spark module for structured data processing. It uses SQL or SQL-like dataframe API to query structured data inside Spark programs.

  • It supports both global temporary views as well as temporary views. It uses a View Table and SQL query to aggregate and generate data.
  • It supports a wide range of data types, ie. Parquet files, JSON and HIVE tables.
  • Spark also optimizes the SQL queries we provide using Catalyst and Tungsten tools.
  • Using it, we can Run our SQL queries.
  • A view of data in Spark SQL is treated as a table.

In this article, we will first understand why we should use Spark SQL and how it gives us flexibility while working in Spark with Implementation.

Objective

  • Importance and Features of Spark SQL
  • Setting up Spark with Python
  • Loading a data file as dataframe
  • Running SQL queries and creating Views
  • Creating Pandas UDF for columnar operations.

Features

Spark SQL is so feature-rich; Spark SQL supports a wide range of structured data like Hive Table, Pandas Dataframe, Parquet files, etc.

We can write data in a wide variety of Structure data formats using Spark SQL ie. Hive Table, Parquet, JSON etc.

Spark SQL leverage the Scalability and other advantages of the Spark RDD model.

Using connectors like JDBC or ODBC allows us to connect in a standard manner.

 Optimization

The main goal of Spark SQL Optimization is to improve the SQL query run-time performance by reducing the query’s time and memory consumption, hence saving organizations time and money.

It supports both rule-based and cost-based query optimization.

  • Catalyst- It is also known as the Catalyst Optimizer; it is a spark Built-in Query optimizer. We can add custom optimization techniques and features to it.
  • Tungsten It is a cost-based query Optimizer that uses a Tree data structure to calculate the lowest cost path. It improves the CPU performance instead of IO.
 Optimization | Spark SQL
Source: Ibm.com

Setup

We will be using Pandas to load the data into the dataframe. We can load pandas’ dataframe into Spark.

Installing Required packages need internet, so it’s advised to use any cloud notebook.

# Installing required packages
!pip install pyspark
!pip install findspark
!pip install pyarrow==1.0.0
!pip install pandas
!pip install numpy==1.19.5

After installing all the required packages we need to set up environment variables for Spark.

import findspark
findspark.init()

Initializing Spark Session

import pandas as pd
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

Note build Spark Context and Session

# Creating a spark context class
sc = SparkContext()
# Creating a spark session
spark = SparkSession 
    .builder 
    .appName("Python Spark DataFrames basic example") 
    .config("spark.some.config.option", "some-value") 
    .getOrCreate()

Running the command spark returns the information of Running Spark Instance.

Loading the Data and Creating Table View

In this section, we will first read a CSV file into the pandas dataframe and read it into the spark dataframe.

To create a Spark data frame we are simply converting the pandas dataframe into a Spark dataframe.

# Reading the file using `read_csv` function in pandas
mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')
mtcars.head()
read_csv |Spark SQL

Renaming the column “Unnamed

mtcars.rename( columns={'Unnamed: 0':'name'}, inplace=True )

The dataframe consists of 32 observations (rows ) on 11 variables.

Unnamed table

Loading the Dataframe into Spark

We use the createDataFrame function to load the data into the spark dataframe.

sdf = spark.createDataFrame(mtcars)

Printing Schema of the loaded spark dataframe

spark dataframe

Creating Table View

The table view is required for Spark SQL to run queries. The table view is treated as a SQL table.

There are two ways we can create a Table View.

  • Temporary view
  • Global-View

A temporary view persists in the local scope within the current spark session. Using the function createTempView() we can create a temporary table view.

sdf.createTempView("cars")

After Creating a View we can treat cars as a SQL table in Spark.

Running SQL Queries and Data Aggregation

After creating a table view we can run queries similar to querying a SQL table.

  • Showing the whole table
spark.sql("SELECT * FROM cars").show()
  • Selecting a Specific Table
# Showing a specific column
spark.sql("SELECT mpg FROM cars").show(5)
Table | Spark SQL
  • Basic SQL filtering
#Basic filtering query to determine cards that have a high mileage and low cylinder count

spark.sql ("SELECT* FROM cars where mpg>20 AND cycl<6"). show(5)
  • Aggregation
#Aggregating data and grouping by cylinders 
spark.sql ("SELECT count (*), cyl from cars GROUP BY cal").show()

Querying all the cars having their Name starts with “Merc”

spark.sql("SELECT * FROM cars where name like 'Merc%'").show()
Merc

Padas UDF in Spark

User-Defined Function (UDF) acts on one row at a time, and because of this it suffers high serialization thus many data pipelines use UDF for data processing calculation tasks.

By registering a regular python function using the @padas_udf() decorator becomes a UDF.

It can be applied on Spark dataframe as well as in Spark SQL.

Importing libraries

# import the Pandas UDF function 
from pyspark.sql.functions import pandas_udf, PandasUDFType

Creating a UDF

@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
    # conversion from imperial to metric tons
    return s * 0.45
spark.udf.register("convert_weight", convert_wt)

Creating a UDF for converting mileage ( miles per liter) to kmpl( kilometer per liter)

@pandas_udf("float")
def convert_mileage(s:pd.Series) -> pd.Series:
    return s*0.425
# Registering UDF in Spark
spark.udf.register("convert_mileage", convert_mileage)
  • spark.udf.register() makes a pandas udf available in Spark.
  • convert function takes a pandas series s as input and returns a series as output.
  • convert_weigth It will convert imperial weight to metric tons by multiplying it by 0.45.

Applying the UDF to the table view

Now we can use convert_weight udf in our spark SQL or Spark dataframe.We can create a more complex udf according to our calculations.

spark.sql("SELECT *, wt AS imperial_weight , convert_weight(wt) as weight_metric FROM cars").show()

Conclusion

In this article, we have discussed how to Load the data in Spark and how to perform queries with SparkSQL. Spark has now been a standard data processing tool because of its in-memory architecture that makes it faster than Hadoop map-reduce.

Spark SQL is so feature-rich, It provides:

  • Unified Data Acess
  • Scalability
  • Integration
  • High Compatibility

In this article, we have learned to work with Spark SQL and data processing in Spark SQL. Spark also provides datasets that are only available in JAVA and SCALA APIs. In the next article, we will learn machine learning with Spark in Python.

The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.

About the Author

Our Top Authors

Download Analytics Vidhya App for the Latest blog/Article

Leave a Reply Your email address will not be published. Required fields are marked *