In this post, I have penned down AWS Glue and PySpark functionalities which can be helpful when thinking of creating AWS pipeline and writing AWS Glue PySpark scripts.
AWS Glue is a fully managed extract, transform, and load (ETL) service to process large amounts of datasets from various sources for analytics and data processing.
While creating the AWS Glue job, you can select between Spark, Spark Streaming, and Python shell. These jobs can run a proposed script generated by AWS Glue, or an existing script that you provide or a new script authored by you. Also, you can select different monitoring options, job execution capacity, timeouts, delayed notification threshold, and non-overridable and overridable parameters.
Recently AWS recently launched Glue version 2.0 which features 10x faster Spark ETL job start times and reducing the billing duration from a 10-minute minimum to 1-minute minimum.
With AWS Glue you can create development endpoint and configure SageMaker or Zeppelin notebooks to develop and test your Glue ETL scripts.
I create a SageMaker notebook connected to the Dev endpoint to the author and test the ETL scripts. Depending on the language you are comfortable with, you can spin up the notebook.
Now, let’s talk about some specific features and functionalities in AWS Glue and PySpark which can be helpful.
1. Spark DataFrames
Spark DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database. You can create DataFrame from RDD, from file formats like csv, json, parquet.
With SageMaker Sparkmagic(PySpark) Kernel notebook, the Spark session is automatically created.
GlueContext is the entry point for reading and writing DynamicFrames in AWS Glue. It wraps the Apache SparkSQL SQLContext object providing mechanisms for interacting with the Apache Spark platform.
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.context import GlueContext
from pyspark.context import SparkContext
from awsglue.utils import getResolvedOptions
from awsglue.dynamicframe import DynamicFrameglueContext = GlueContext(SparkContext.getOrCreate())
3. DynamicFrame
AWS Glue DynamicFrames are similar to SparkSQL DataFrames. It represents a distributed collection of data without requiring you to specify a schema. Also, it can be used to read and transform data that contains inconsistent values and types.
DynamicFrame can be created using the following options –
create_dynamic_frame_from_rdd — created from an Apache Spark Resilient Distributed Dataset (RDD)
create_dynamic_frame_from_catalog — created using a Glue catalog database and table name
create_dynamic_frame_from_options — created with the specified connection and format. Example — The connection type, such as Amazon S3, Amazon Redshift, and JDBC
DynamicFrames can be converted to and from DataFrames using .toDF() and fromDF(). Use the following syntax-
AWS Glue Job bookmark helps process incremental data when rerunning the job on a scheduled interval, preventing reprocessing of old data.
You can read more about this here. Also, you can read this.
5. Write out data
The DynamicFrame of the transformed dataset can be written out to S3 as non-partitioned (default) or partitioned. “partitionKeys” parameter can be specified in connection_option to write out the data to S3 as partitioned. AWS Glue organizes these datasets in Hive-style partition.
In the following code example, AWS Glue DynamicFrame is partitioned by year, month, day, hour, and written in parquet format in Hive-style partition on to S3.
7. S3 Lister and other options for optimizing memory management
AWS Glue provides an optimized mechanism to list files on S3 while reading data into DynamicFrame which can be enabled using additional_options parameter “useS3ListImplementation” to true.
purge_s3_path is a nice option available to delete files from a specified S3 path recursively based on retention period or other available filters. As an example, suppose you are running AWS Glue job to fully refresh the table per day writing the data to S3 with the naming convention of s3://bucket-name/table-name/dt=<data-time>. Based on the defined retention period using the Glue job itself you can delete the dt=<date-time> s3 folders. Another option is to set the S3 bucket lifecycle policy with the prefix.
#purge locations older than 3 days
print("Attempting to purge S3 path with retention set to 3 days.")
glueContext.purge_s3_path(
s3_path=output_loc,
options={"retentionPeriod": 72})
You have other options like purge_table, transition_table, and transition_s3_path also available. The transition_table option transitions the storage class of the files stored on Amazon S3 for the specified catalog’s database and table.
To print the Spark or Glue DynamicFrame schema in tree format use printSchema().
datasource0.printSchema()root
|-- ID: int
|-- Name: string
|-- Identity: string
|-- Alignment: string
|-- EyeColor: string
|-- HairColor: string
|-- Gender: string
|-- Status: string
|-- Appearances: int
|-- FirstAppearance: choice
| |-- int
| |-- long
| |-- string
|-- Year: int
|-- Universe: string
13. Fields Selection
select_fields can be used to select fields from Glue DynamicFrame.
# From DynamicFramedatasource0.select_fields(["Status","HairColor"]).toDF().distinct().show()
To select fields from Spark Dataframe to use “select” –
# From Dataframedatasource0_df.select(["Status","HairColor"]).distinct().show()
14. Timestamp
For instance, the application writes data into DynamoDB and has a last_updated attribute/column. But, DynamoDB does not natively support date/timestamp data type. So, you could either store it as String or Number. In case stored as a number, it’s usually done as epoch time — the number of seconds since 00:00:00 UTC on 1 January 1970. You could see something like “1598331963” which is 2020–08–25T05:06:03+00:00 in ISO 8601.
To convert the last_updated long data type into timestamp data type, you can use the following code-
import pyspark.sql.functions as f
import pyspark.sql.types as tnew_df = (
df
.withColumn("last_updated", f.from_unixtime(f.col("last_updated")/1000).cast(t.TimestampType()))
)
15. Temporary View from Spark DataFrame
In case you want to store the Spark DataFrame as a table and query it using spark SQL, you can convert the DataFrame into a temporary view that is available for only that spark session using createOrReplaceTempView.
Suppose from the above example, you want to create a new attribute/column to store only the last event. How would you do it?
You use the element_at function. It returns an element of the array at the given index in extraction if col is an array. Also, it can be used to extract the given key in extraction if col is a map.
The explode function in PySpark is used to explode array or map columns in rows. For example, let’s try to explode “event” column from the above example-
In case, you want to find records based on a string match you can use “startswith”.
In the following example I am searching for all records where value for description column starts with “[{“.
import pyspark.sql.functions as fdf.filter(f.col("description").startswith("[{")).show()
20. Extract year, month, day, hour
One of the common use cases is to write the AWS Glue DynamicFrame or Spark DataFrame to S3 in Hive-style partition. To do so you can extract the year, month, day, hour, and use it as partitionkeys to write the DynamicFrame/DataFrame to S3.
A verification link has been sent to your email id
If you have not recieved the link please goto
Sign Up page again
Loading...
Please enter the OTP that is sent to your registered email id
Loading...
Please enter the OTP that is sent to your email id
Loading...
Please enter your registered email id
This email id is not registered with us. Please enter your registered email id.
Don't have an account yet?Register here
Loading...
Please enter the OTP that is sent your registered email id
Loading...
Please create the new password here
We use cookies on Analytics Vidhya websites to deliver our services, analyze web traffic, and improve your experience on the site. By using Analytics Vidhya, you agree to our Privacy Policy and Terms of Use.Accept
Privacy & Cookies Policy
Privacy Overview
This website uses cookies to improve your experience while you navigate through the website. Out of these, the cookies that are categorized as necessary are stored on your browser as they are essential for the working of basic functionalities of the website. We also use third-party cookies that help us analyze and understand how you use this website. These cookies will be stored in your browser only with your consent. You also have the option to opt-out of these cookies. But opting out of some of these cookies may affect your browsing experience.
Necessary cookies are absolutely essential for the website to function properly. This category only includes cookies that ensures basic functionalities and security features of the website. These cookies do not store any personal information.
Any cookies that may not be particularly necessary for the website to function and is used specifically to collect user personal data via analytics, ads, other embedded contents are termed as non-necessary cookies. It is mandatory to procure user consent prior to running these cookies on your website.