Kishan Yadav — Published On December 26, 2022 and Last Modified On January 12th, 2023
Beginner Guide Python Spark

This article was published as a part of the Data Science Blogathon.

Introduction

Innovations and new technologies are transforming the world in many facets. The Internet, the web, and smartphones have become a necessity of today’s life. The manual to digital transition in work has already occurred in developed nations. By using modern technologies, developing countries also strive for comprehensive national growth.

Today we have data from sectors like automobile, finance, technologies, aviation, the food industry, media, etc. These data are in different formats and massive in quantity. We need modern tools and technologies to dig out hidden insights from these data.

In this article, we will work on a case study using PySpark and try to find some hidden information from the data to answer some questions. To work on this case study, I am using the Databricks Community Edition.

PySpark
Source: Databricks

 

Overview of Business

There is a small restaurant that sells some fantastic Indian food items. They need our assistance to help the restaurant stay afloat. The restaurant has captured some data from its few months of operation but has yet to learn how to use its data to help them run the business.

PySpark
Source: Canva

Now we want to use the data to answer a few questions about his customers, especially their visiting patterns, the money they’ve spent, and which menu items are the customer’s favorite.

DataFrame Creation | PySpark, and Databricks

The restaurant has sales data which contains customers’ information. We have menu data with food items-related information and members data with subscriptions related information if any customer has purchased it.

Source: Databricks

We are going to create three data frames. Which contains the following information:-

1. Sales dataframe:- This dataframe contains information related to sales. It has three columns customer_id, order_date, and product_id.

  • customer_id:- This column contains the id of the customer. Ex:- A, B, C etc.
  • order_date:- This column includes the item’s ordering dates information- ‘2021-01-11’ etc.
  • product_id:- This holds the id of the product or food items. Ex:- 1, 2, 3 etc.
# create sales data
sales_data = [
          ('A', '2021-01-01', '1'),
          ('A', '2021-01-01', '2'),
          ('A', '2021-01-07', '2'),
          ('A', '2021-01-10', '3'),
          ('A', '2021-01-11', '3'),
          ('A', '2021-01-11', '3'),
          ('B', '2021-01-01', '2'),
          ('B', '2021-01-02', '2'),
          ('B', '2021-01-04', '1'),
          ('B', '2021-01-11', '1'),
          ('B', '2021-01-16', '3'),
          ('B', '2021-02-01', '3'),
          ('C', '2021-01-01', '3'),
          ('C', '2021-01-01', '1'),
          ('C', '2021-01-07', '3')]
# cols
sales_cols= ["customer_id", "order_date", "product_id"]
sales_df = spark.createDataFrame(data = sales_data, schema = sales_cols)
# view the data
sales_df.show()

The Sales dataframe will look like this:-

+-----------+----------+----------+
|customer_id|order_date|product_id|
+-----------+----------+----------+
|          A|2021-01-01|         1|
|          A|2021-01-01|         2|
|          A|2021-01-07|         2|
|          A|2021-01-10|         3|
|          A|2021-01-11|         3|
|          A|2021-01-11|         3|
|          B|2021-01-01|         2|
|          B|2021-01-02|         2|
|          B|2021-01-04|         1|
|          B|2021-01-11|         1|
|          B|2021-01-16|         3|
|          B|2021-02-01|         3|
|          C|2021-01-01|         3|
|          C|2021-01-01|         1|
|          C|2021-01-07|         3|
+-----------+----------+----------+

Now we will create the menu dataframe, which has food items related information.

2. Menu dataframe:- This dataframe contains food items-related information. It has three columns’ product_id’, ‘product_name’, and ‘price.’

  • product_id:- This column has the id of food items. Ex:- 1, 2, 3 etc.
  • product_name:- Name of food items stored in this column.
  • price:- The cost of food items will be in this column.
# menu data
menu_data = [ ('1', 'palak_paneer', 100),
              ('2', 'chicken_tikka', 150),
              ('3', 'jeera_rice', 120),
              ('4', 'kheer', 110),
              ('5', 'vada_pav', 80),
              ('6', 'paneer_tikka', 180)]
# cols
menu_cols = ['product_id', 'product_name', 'price']
# create the menu dataframe
menu_df = spark.createDataFrame(data = menu_data, schema = menu_cols)
# view the data
menu_df.show()

The Menu dataframe will look like this:-

+----------+-------------+-----+
|product_id| product_name|price|
+----------+-------------+-----+
|         1| palak_paneer|  100|
|         2|chicken_tikka|  150|
|         3|   jeera_rice|  120|
|         4|        kheer|  110|
|         5|     vada_pav|   80|
|         6| paneer_tikka|  180|
+----------+-------------+-----+

3. Members dataframe:- This dataframe has details of those customers who hold any membership from this restaurant. The columns in this dataframe are- customer_id and join_date. The join_date column has the membership purchase date.

# create member data
members_data = [ ('A', '2021-01-07'),
                 ('B', '2021-01-09')]
# cols
members_cols = ["customer_id", "join_date"]
# create the member's dataframe
members_df = spark.createDataFrame(data = members_data, schema = members_cols)
# view the data
members_df.show()

The member’s dataframe will look like this:-

+-----------+----------+
|customer_id| join_date|
+-----------+----------+
|          A|2021-01-07|
|          B|2021-01-09|
+-----------+----------+

Solving Problem Statements of Restaurants using PySpark

Question 01:- What is the total amount each customer spent at the restaurant?

# solution:-
total_spent_df = (sales_df.join(menu_df, 'product_id') 
                         .groupBy('customer_id').agg({'price':'sum'}) 
                         .withColumnRenamed('sum(price)', 'total_spent_amounts') 
                         .orderBy('customer_id'))
# output:-
+-----------+-------------------+
|customer_id|total_spent_amounts|
+-----------+-------------------+
|          A|                760|
|          B|                740|
|          C|                340|
+-----------+-------------------+ 

Approach:-

  • First, we will join the sales and menu PySpark dataframe based on the ‘product_id’ column.
  • Then apply the group by function on ‘customer_id’ and do the sum using the aggregate operation.
  • Now we will rename the ‘sum(price)’ column with ‘total_spent_amounts’ and use the order by function to sort it.

Output:-

  • Customer ‘A’ has spent 760 rupees.
  • Customer ‘B’ has spent 740 rupees.
  • Customer ‘C’ has paid 340 rupees.

Question 02:- How many days has each customer visited the restaurant?

# solution:-
res_visit_df = sales_df.groupBy('customer_id').agg(countDistinct('order_date')) 
                                     .withColumnRenamed('count(order_date)', 'visit_days')
# output:-
+-----------+----------+
|customer_id|visit_days|
+-----------+----------+
|          A|         4|
|          B|         6|
|          C|         2|
+-----------+----------+

Approach:-

  • First, we will perform groupby on sales data based on ‘customer_id’.
  • Afterward, perform the ‘countDistinct’ aggregation operation on the ‘order_date’ columns to get each customer’s count of dates.
  • Finally, we will rename the ‘count(order_date)’ column with ‘visit_days’.

Output:-

  • Customer ‘A’ has visited the restaurant for four days.
  • Customer ‘B’ has seen the restaurant for six days.
  • Customer ‘C’ has visited for two days.

Question 03:- What was each customer’s first item from the menu?

# create windowspec
windowSpec  = Window.partitionBy("customer_id").orderBy("order_date")
items_purchased_df = sales_df.withColumn("dense_rank", dense_rank().over(windowSpec)) 
                          .filter("dense_rank == 1") 
                          .join(menu_df, 'product_id') 
                          .select('customer_id', 'product_name') 
                          .orderBy('customer_id')
# output:-
+-----------+-------------+
|customer_id| product_name|
+-----------+-------------+
|          A|chicken_tikka|
|          A| palak_paneer|
|          B|chicken_tikka|
|          C| palak_paneer|
|          C|   jeera_rice|
+-----------+-------------+

Approach:-

  • We are going to use the windows function to solve this problem using PySpark. We have created the window spec in which we have partitioned the data based on ‘customer_id’ and sorted the data using the ‘order_date’ column.
  • Now we use the dense_rank() window function to create the ranking as we may have the chance that the customer has purchased more than one item in his first visit.
  • We will apply the filter function to get the first purchase items data. Then we will join the menu data based on ‘product_id’ and select the columns which need to display and sort this data based on ‘customer_id’.

Output:-

  • Customer ‘A’ has purchased two items, chicken_tikka, and palak_paneer.
  • Customer ‘B’ has purchased only one food item, chicken_tikka.
  • Customer ‘C’ also purchased two items, palak_paneer, and jeera_rice.

Question 04:- Find out the most purchased item from the menu and how many times the customers purchased it.

# code
most_purchased_df = sales_df.join(menu_df, 'product_id') 
                            .groupBy('product_id', 'product_name') 
                            .agg(count('product_id').alias('product_count')) 
                            .orderBy('product_count', ascending=0) 
                            .drop('product_id') 
                            .limit(1)
# output:-
+------------+-------------+
|product_name|product_count|
+------------+-------------+
|  jeera_rice|            7|
+------------+-------------+

Approach:-

  • We will first join the sales and menu data to solve this problem.
  • Then perform grouping based on ‘product_id’ and ‘product_name’.
  • Next, we will perform a ‘count’ aggregate operation on ‘product_id’ to count the items and then sort the data using ‘product_id’ and limit the result to show only the first row.

Output:-

  • The most purchased item is ‘jeera_rice,’ purchased seven times by customers.

 Question 05:- Which item was the most popular for each customer?

# code:-
popular_item_df = sales_df.join(menu_df, 'product_id') 
                .groupBy('customer_id', 'product_name')
                .agg(count('product_id').alias('count_orders'))
                .withColumn('dense_rank', dense_rank().over(Window.partitionBy("customer_id")
                                                    .orderBy(col("count_orders").desc())))
                .filter('dense_rank=1')
                .drop('dense_rank')
# output:-
+-----------+-------------+------------+
|customer_id| product_name|count_orders|
+-----------+-------------+------------+
|          A|   jeera_rice|           3|
|          B|chicken_tikka|           2|
|          B| palak_paneer|           2|
|          B|   jeera_rice|           2|
|          C|   jeera_rice|           2|
+-----------+-------------+------------+

Approach:-

  • We need to join the sales and menu data to find the most popular items for each customer. Then apply the groupby function to create different groups using ‘customer_id’ and ‘product_name’.
  • Now we use the count aggregate operation to count the order given by each customer for different items.
  • Afterward, we use the dense rank window function to rank the items based on orders_count. Then use the filter function to get the data where ‘dense_rank’ equals one. Finally, drop the ‘dense_rank’ column.

Output:-

  • Customer ‘A’ has ordered ‘jeera_rice’ three times.
  • Customer ‘B’ has ordered ‘chicken_tikka’, ‘jeera_rice’, and palak_paneer’, all three items twice.
  • Customer ‘C’ has ordered ‘jeera_rice’ twice.

Question 06:- Which item was ordered first by the customer after becoming a restaurant member?

# code:-
windowSpec = Window.partitionBy("customer_id").orderBy("order_date")
items_after_member_df = sales_df.join(members_df, 'customer_id') 
                                .filter(sales_df.order_date >= members_df.join_date) 
                                .withColumn('dense_rank', dense_rank().over(windowSpec)) 
                                .filter('dense_rank = 1') 
                                .join(menu_df,'product_id') 
                                .select('customer_id', 'order_date', 'product_name')
# output:-
+-----------+----------+-------------+
|customer_id|order_date| product_name|
+-----------+----------+-------------+
|          B|2021-01-11| palak_paneer|
|          A|2021-01-07|chicken_tikka|
+-----------+----------+-------------+

Approach:-

  • We use all three sales, menu, and members data frames to solve this problem. First, we will join the sales and members dataframe.
  • Then filter out the records where the ‘order_date’ value is more than the member ‘join_date’ value. Now we have the data points after the users have become a member.
  • We will apply the dense_rank function to rank the data and take out those data points where rank equals one.
  • Finally, we join the menu dataframe with the filtered data and select the required columns.

Output:-

  • Customer ‘A’ has ordered the ‘chicken_tikka’ as the first item after becoming a member.
  • Customer ‘B’ has purchased ‘palak_paneer’ as the first item.

Question 07:- Which item was purchased before the customer became a member?

# code:-
windowSpec = Window.partitionBy("customer_id").orderBy(col("order_date").desc())
items_before_member_df = sales_df.join(members_df, 'customer_id') 
                                .filter(sales_df.order_date < members_df.join_date) 
                                .withColumn('dense_rank', dense_rank().over(windowSpec)) 
                                .filter('dense_rank = 1') 
                                .join(menu_df,'product_id') 
                                .select('customer_id', 'order_date', 'product_name')
# output:-
+-----------+----------+-------------+
|customer_id|order_date| product_name|
+-----------+----------+-------------+
|          B|2021-01-04| palak_paneer|
|          A|2021-01-01| palak_paneer|
|          A|2021-01-01|chicken_tikka|
+-----------+----------+-------------+

Approach:-

  • All the steps of the above problem will be the same for this problem with little changes. In the filter condition, we select those data where order_date is less than the member’s join date.
  • Then in the window function, sort the data in descending order and select the required columns.

Output:-

  • Customer ‘A’ has ordered ‘palak paneer’ and ‘chicken tikka’ after becoming a member.
  • Customer ‘B’ has ordered ‘palak paneer’.

Question 08:- What is the total items and amount spent for each member before they became a member?

# code:-
total_items_spent_df = sales_df.join(menu_df, 'product_id')
                               .join(members_df, 'customer_id') 
                               .filter(sales_df.order_date < members_df.join_date) 
                               .groupBy('customer_id') 
                               .agg(countDistinct('product_id').alias('item_counts'), 
                                sum('price').alias('total_amount'))
# output:-
+-----------+-----------+------------+
|customer_id|item_counts|total_amount|
+-----------+-----------+------------+
|          A|          2|         250|
|          B|          2|         400|
+-----------+-----------+------------+

Approach:-

  • We use all three sales, menu, and members data frames to solve this problem using PySpark functions. First, we will join all three data frames.
  • Then apply the filter function to select those rows where ‘order_date is less than ‘join_date’. Doing this allows us to get data when the customer did not become a member.
  • After that, we use the groupby function based on ‘customer_id’ and then use aggregate operations on it to select different items and find the total spent amount.

output:-

  • Customer ‘A’ has purchased two items, and he has spent 250 rupees on them.
  • Customer ‘B’ also purchased two food items and spent 400 rupees on them.

Question 09:- If each rupee spent equates to 10 points and item ‘jeera_rice’ has a 2x points multiplier, find out how many points each customer would have.

# code:-
earned_points_df = sales_df.join(menu_df, 'product_id')
                         .withColumn('points', when(col('product_id') == 3, col('price')*20)
                                     .otherwise(col('price')*10)) 
                         .groupBy('customer_id') 
                         .agg(sum('points').alias('rewards_points'))
# output:-
+-----------+--------------+
|customer_id|rewards_points|
+-----------+--------------+
|          B|          9800|
|          C|          5800|
|          A|         11200|
+-----------+--------------+

Approach:-

  • First, we joined the sales and menu dataframe and created the new column using the ‘withColumn’ function using PySpark functions.
  • In this, we use condition when the ‘product_id’ is equal to three, i.e. the product id of item’ jeera_rice’, then we multiply the price col values with 20, and the other will get multiplied by 10.
  • Last, we perform grouping on the data and then use aggregate operation on ‘points’ data to calculate total rewards points for each customer.

Output:-

  • Customer ‘A’ has the highest reward points, i.e. 11,200.
  • Customer ‘B’ has a total of 9800 reward points.
  • The customer ‘C’ has scored a total of 5800 reward points.

Question 10:- Create the complete table with all data and columns like customer_id, order_date, product_name, price, and member(Y/N).

# code:-
all_data_df = sales_df.join(menu_df, 'product_id', 'left')
                      .join(members_df, 'customer_id', 'left') 
                      .withColumn('member', when(col('order_date') < col('join_date'), 'N') 
                                             .when(col('order_date')>= col('join_date'), 'Y')
                                             .otherwise('N'))
                       .drop('product_id', 'product_name', 'join_date')
# output:-
+-----------+----------+-------------+-----+------+
|customer_id|order_date| product_name|price|member|
+-----------+----------+-------------+-----+------+
|          A|2021-01-01| palak_paneer|  100|     N|
|          A|2021-01-01|chicken_tikka|  150|     N|
|          A|2021-01-07|chicken_tikka|  150|     Y|
|          A|2021-01-10|   jeera_rice|  120|     Y|
|          A|2021-01-11|   jeera_rice|  120|     Y|
|          A|2021-01-11|   jeera_rice|  120|     Y|
|          B|2021-01-04| palak_paneer|  100|     N|
|          B|2021-01-11| palak_paneer|  100|     Y|
|          B|2021-01-01|chicken_tikka|  150|     N|
|          B|2021-01-02|chicken_tikka|  150|     N|
|          B|2021-01-16|   jeera_rice|  120|     Y|
|          B|2021-02-01|   jeera_rice|  120|     Y|
|          C|2021-01-01| palak_paneer|  100|     N|
|          C|2021-01-01|   jeera_rice|  120|     N|
|          C|2021-01-07|   jeera_rice|  120|     N|
+-----------+----------+-------------+-----+------+

Approach:-

  • We joined all three data frames using ‘left’ join and then used the condition, when the ‘order_date’ is less than ‘join_date’, then populate ‘N’ in the ‘member’ column. If ‘order_date’ is greater or equal to ‘joined_date’, then set the value with ‘Y’, and otherwise ‘N’.
  • Finally dropped the other columns which are not required.

Output:-

  • After applying the transformations, we get all the required details as a final output.

Question 11:- We also require further information about the ranking of customer products. The owner does not need the ranking for non-member purchases, so he expects null ranking values for the records when customers still need to be part of the membership program.

# code:-
ranking_final_df = sales_df.join(menu_df, 'product_id', 'left')
                      .join(members_df, 'customer_id', 'left') 
                      .withColumn('is_member', when(col('order_date') < col('join_date'), 'N') 
                                               .when(col('order_date')>= col('join_date'), 'Y')
                                               .otherwise('N'))
                      .withColumn('rank', when(col('is_member') == 'N', None) 
                                          .when(col('is_member') == 'Y', rank().over(Window.partitionBy('customer_id','is_member')
                                                .orderBy('order_date'))) 
                                           .otherwise(0))

Approach:-

  • All the steps of the above problem will be the same for this problem, with slight changes. For populating null values against non-members, we will use the rank function.
  • We check if the ‘is_member’ value is ‘None’, then the rank column will have ‘null’ values corresponding to that row; otherwise, populate the rank for that particular data point.

Output:-

  • After applying the transformations, we get all the required details as a final output.

Conclusion

In this article, we worked on a case study using restaurant data. We find some hidden insights from the data that case increases the sales of items for the restaurants using PySpark. Now the owner can understand which items are popular and which need more focus. Based on this insight, the owner can increase his sales in many ways by giving good discounts to the user without worrying about losses.

Some Important Takeaways from this article are as follows:-

1. We worked with real-world data and gathered some insightful information that can benefit businesses in various ways.

2. We worked with different types of PySpark functions, including join, groupBy, withColumn, withColumnRenamed, partitionBy, and Windows functions like rank and dense rank, to provide answers.

3. Furthermore, we talked about various step-by-step solutions to the problem using PySpark functions. We also analyze the output at the end of each problem statement.

I hope this article helps you to understand the PySpark functions in more detail. If you have any opinions or questions, then comment down below. Connect with me on LinkedIn for further discussion. Keep Learning!!!

The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.

About the Author

Kishan Yadav

Our Top Authors

Download Analytics Vidhya App for the Latest blog/Article

Leave a Reply Your email address will not be published. Required fields are marked *