Pyspark – Handling Immutable Dataframes with Flexibility

Akil Ram 16 Jun, 2021 • 5 min read

This article was published as a part of the Data Science Blogathon

Background:

When dealing with Python data frames, it is easy to edit the 10th row, 5th column values. Also editing a column, based on the value of another column (s) is easy. In other words, the dataframe is mutable and provides great flexibility to work with.

While Pyspark derives its basic data types from Python, its own data structures are limited to RDD, Dataframes, Graphframes. These data frames are immutable and offer reduced flexibility during row/column level handling, as compared to Python.

During PySpark coding, it is crucial that we stick to Spark and not stray away into Python. This is pivotal for the reason of not losing out on the parallel computational efficiency that exclusively belongs to Spark. While handling massive amounts of data, as the processor reaches the code lines that are solely written in Python, the parallel computation advantage is lost. The entire data is cramped into 1 node and the job struggles to process them or worse fails.

Problem tackled:

When we write an end-to-end Spark code, there will be requirements to customize values at individual column level/row value level. And it is imperative to do this while retaining the parallel processing power of Spark.

This blog shows some ways for ‘Row/Column level flexible handling in immutable data frames while sticking to Spark.’

1. Single column – complicated conditions/ expressions:

The first go-to solution for column-level manipulations is UDF. Write the column level processing code as a Python function and call it Spark UDF.

Illustration:

Scenario:

There is structured data in one column that has to be converted into dictionary format.

Input:

id struct
1 “{‘2021-06-01’:300, ‘2021-06-02’:400, ‘2021-06-03’:300,’2021-06-04’:500}”

Output(generated by UDF function):

id dict_val
1 [{‘date:‘2021-06-01’, ‘count’:300},

{‘date:‘2021-06-02’, ‘count’:400},

{‘date:‘2021-06-03’, ‘count’:200},

{‘date:‘2021-06-04’, ‘count’:500}]

Coding:

from ast import literal_eval
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import StructType, StringType, StructField
import pyspark.sql.functions as F
spark = SparkSession.builder.getOrCreate()
def structure_change_fn(struct_col):
	''' 
    Desc -Function to change the format of the structure column to dictionary type
	Input- Struct column to be formatted
	Output- formatted intermediary output
	'''
    dict_val = literal_eval(struct_col)
    result = [‘{“date”:”’+str(val1)+’”,”val”:”’str(val2)+’”}’ for val1,val2 in dict_val.items()]
    return result
udf_restructure= F.udf(structure_change_fn, StringType())
input_df = [("1","{'2021-06-01':300, '2021-06-02':400, '2021-06-03':300,'2021-06-04':500}")]
schema = StructType([ 
    StructField("id",StringType(),True), 
    StructField("struct",StringType(),True)])
input_df = spark.createDataFrame(data=input_df, schema=schema)
result_df = input_df.withColumn('dict_val', udf_restructure('struct')).select('id','dict_val')

 

2. Multiple column values – simple conditions:

Let us now consider the case of deriving a column’s value based on multiple columns.

Illustration:

Scenario: The topper student name in each class is given subject-wise, along with the overall topper class name. Both are combined to generate the name of the student topper in all 3 classes, dynamically for each record. The output expected is a description column derived by substituting top class and top student names in a description text. Hence the description derived has customized text for each record.

In this particular scenario, the conditions are assumed to be mutually exclusive.

Input & Output:

‘Description_temp’ and ‘Desc_col_edit’ are intermediary outputs

‘Description’ column is the final output

Subject Class A topper
Class B topper
Class C topper
Overall topper class
Description_temp
Desc_col_edit (inside udf)
Description
English Aaron Brutus Chloe C of Class {3} got the first mark {2} of Class {3} got the first mark Chloe of Class C got the first mark
Maths Arjun Billy Clare A of Class {3} got the first mark {0} of Class {3} got the first mark Arjun of Class A got the first mark
Science Arjun Balu Connor B of Class {3} got the first mark {1} of Class {3} got the first mark Balu of Class B got the first mark

Coding:

def desc_fn(desc_col, 
            format_col0, 
            format_col1,
            format_col2, 
            format_class_col3):
    ''' Returns python formatted description column value'''
    get_class_value = '{0}' if format_class_col3=='A' else '{1}' if format_class_col3=='B' else '{2}'
    desc_col_edit = get_class_value + desc_col
    #Python string format function returned
    return(desc_col_edit.format(format_col0, format_col1, format_col2, format_class_col_3))
udf_desc_fn = F.udf(desc_fn, StringType())
#Hard coding description text in temp column
inter_df = input_df.withColumn('description_temp', ' of Class {3} got the first mark')
#Call to UDF function
final_df = inter_df.withColumn(‘Description’, udf_desc_fn('description_temp', 
‘class_A_topper’,
’class_B_topper’,
’class_C_topper’, ‘overall_topper_class’))

Working:

UDF is utilized here again. But instead of 1 column value, this UDF uses multiple column values and generates a dynamic description value for every column. This is achieved by writing a python string format function within the UDF.

3. Multiple column values- complex condition:

Let us now consider the case, where there are multiple conditions to be validated and a lot of possibilities for the resultant description column.

Illustration:

For every user, their status w.r.t three conditions(A, B, and C) are given. ‘1’ represents condition match, while ‘-1’ represents condition mismatch. A suitable description has to be generated. The description has a big list of possible values, based on the conditions.

Input & output:

‘Description’ column is the output to be generated based on the user’s values in Condition columns.

User ID Condition A Condition B Condition C Description
1 -1 1 -1 None of the conditions are met
2 -1 -1 1 Only Condition C is met
3 -1 -1 -1 None of the conditions are met
4 -1 1 1 Both conditions B and C are met
5 1 -1 -1 Only Condition A is met

 

In case we follow the previous UDF solution, all the conditions have to be hardcoded within the UDF function with multiple if-else conditions.

Also, these conditions are bound to change. For example, let us consider there is a new condition record with Conditions A, B, and C = 1. Changes will be tiresome and make the code lengthy.

Instead, the conditions can be split into a separate configuration file. This way,

i) the file can be easily configured and be separated from code, which is a good practice.

ii) the changes to the conditions does not affect the code

iii) the code is not lengthy anymore with nested if-else conditions that are hardly decipherable and maintainable

Config file:

The unique scenarios of conditions and their descriptions are written into config file-

Condition A Condition B Condition C Description
-1 -1 -1 None of the conditions are met
-1 -1 1 Only Condition C is met
-1 1 -1 Only Condition B is met
-1 1 1 Both conditions B and C are met
1 -1 -1 Only Condition A is met

 

Coding:

#the input and the config files are read as dataframes
input_df.createOrReplaceTempView(‘input_tbl’)
config_df.createOrReplaceTempView(‘config_tbl’)
join_qry = “SELECT input_tbl.user_id, 
                input_tbl.condition_A, 
                input_tbl.condition_B, 
                input_tbl.condition_C, 
                config_tbl.description
	FROM input_tbl, config_tbl
	WHERE input_tbl.condition_A*config_tbl.condition_A>0 
	    AND input_tbl.condition_B*config_tbl.condition_B>0 
            AND input_tbl.condition_C*config_tbl.condition_C>0 “
join_df = spark.sql(join_qry)

Working:

The input data and the config file data conditions are matched.

The main idea here is, if condition_A in both files match, then the product of the two values will be 1 (ie.,) >0.

Condition A (from input) Condition A (from config) Product
1 1 1 (condition match)
-1 -1 1 (condition match)
1 -1 -1 (condition mismatch)
-1 1 -1 (condition mismatch)

Similarly, the product of all 3 conditions is considered to identify the matching record in the config file.

For every record in the input dataframe, the matching record in the config file is identified this way and the description column is extracted correspondingly from the config file.

EndNotes

These are sample cases and scenarios that explain some ways of handling spark data frames to edit column-level information dynamically. The solutions provided pertain to the scenarios depicted.

The media shown in this article are not owned by Analytics Vidhya and are used at the Author’s discretion.

Akil Ram 16 Jun 2021

Frequently Asked Questions

Lorem ipsum dolor sit amet, consectetur adipiscing elit,

Responses From Readers

Clear