Best Practices and Performance Tuning Activities for PySpark
This article was published as a part of the Data Science Blogathon
Initialize pyspark:
import findspark findspark.init()
It should be the first line of your code when you run from the jupyter notebook. It attaches a spark to sys. path and initialize pyspark to Spark home parameter. You can also pass the spark path explicitly like below:
findspark.init(‘/usr/****/apache-spark/3.1.1/libexec’)
This way the engine recognizes it as a spark job and sends it to the correct queue. Also when you proceed with importing other packages into your code it will import a compatible version according to pyspark, else you might get the incompatible JVM error in a later part of the code which is hard to debug.
Create spark session with required configuration:
from pyspark.sql import SparkSession,SQLContext sql_jar="/path/to/sql_jar_file/sqljdbc42.jar" spark_snow_jar="/usr/.../snowflake/spark-snowflake_2.11-2.5.5-spark_2.3.jar" snow_jdbc_jar="/usr/.../snowflake/snowflake-jdbc-3.10.3.jar" oracle_jar="/usr/path/to/oracle_jar_file//v12/jdbc/lib/oracle6.jar" spark=(SparkSession .builder .master('yarn') .appName('Spark job new_job') .config('spark.driver.memory','10g') .config('spark.submit.deployMode','client') .config('spark.executor.memory','15g') .config('spark.executor.cores',4) .config('spark.yarn.queue','short') .config('spark.jars','{},{},{},{}'.frmat(sql_jar,spark_snow_jar,snow_jdbc_jar,oracle_jar)) .enableHiveSupport() .getOrCreate())
- You can give master as ‘local’ for development purposes but it should be ‘yarn’ in deployment.
- When you use master as ‘local’, it uses 2 cores and a single JVM for both driver and worker. Whereas in ‘yarn’, you have separate JVM for driver and workers and you can use more cores.
- You can add more driver memory and executor memory for some jobs if required to make the execution time faster.
- As a best practice, you should pass jar files for all the available database connections. This could be set either in the spark session or config file. This is because when you connect to an Oracle/SQL/snowflake database using the below code, you might get the “oracle.jdbc.driver.OracleDriver” class not found error if the engine picks an incorrect jar file.
data=spark.read.format("jdbc") .option("url",tns_path) .option("dbtable",query) .option("user",userid) .option("password",password) .option("driver","oracle.jdbc.driver.OracleDriver") .load()
Driver name “oracle.jdbc.driver.OracleDriver” could be different for different jar files as it changes sometimes with an update from python/java. As almost all projects have many versions installed in their server with each update there will be multiple jar files available from different versions. So it is advisable to explicitly pass the required jar file path as per the code. This applies to MySQL, snowflake, or any other DB connections as well.
Use fetch size option to make reading from DB faster:
Using the above data load code spark reads 10 rows(or what is set at DB level) per iteration which makes it very slow when dealing with large data. When the query output data was in crores, using fetch size to 100000 per iteration reduced reading time 20-30 minutes. PFB the code:
data=spark.read.format("jdbc") .option("url",tns_path) .option("dbtable",query) .option("user",userid) .option("password",password) .option("fetchsize","100000") .option("driver","oracle.jdbc.driver.OracleDriver") .load()
Use batch size option to make writing to DB faster:
When the data was in crores, using batch size to 100000 per iteration reduced writing time 20-30 minutes. PFB the code:
data.write.format("jdbc") .option("url",tns_path) .option("dbtable",schemaname.tablename) .option("user",userid) .option("password",password) .option("fetchsize","100000") .option("driver","oracle.jdbc.driver.OracleDriver") .option("batchsize","100000") .mode('append').save()
Handling Skew effectively:
Skew is the uneven distribution of data across partitions. Spark creates partitions in data and processes those partitions in parallel. With default partitioning of spark, the data might be skewed in some cases like join and group by if the key is not evenly distributed. In such cases, when one partition has 1000 records another partition might have millions of records and the former partition waits for the latter to complete, as a result, it can not utilize parallel processing and takes too long to complete or in some cases, it just stays in a hung state. To resolve this we can use repartition to increase the number of partitions before ingestion.
data = data.repartition(10, "term") or data = data.repartition(10)
You can use coalesce to reduce the number of partitions:
data = data.coalesce(3)
Cache/Persist Efficiently:
You can also clear all the cache at the end of the job by using the below code:
spark.catalog.clearCache()
Avoid using UDF functions unless that is the only option:
Use of Thread wherever necessary:
If there are multiple independent actions in one job, you can use a thread to call those actions simultaneously. For example, in one job we were reading many huge tables from one schema and writing to another schema. Due to sequential action, the job was taking more than 2 hours. After we used the thread for concurrent writing, the load time was reduced to 30 minutes. Please note you might need to increase the spark session configuration. For optimum use of the current spark session configuration, you might pair a small slower task with a bigger faster task.
Use mapPartitions() instead of map():
Both are rdd based operations, yet map partition is preferred over the map as using mapPartitions() you can initialize once on a complete partition whereas in the map() it does the same on one row each time.
Miscellaneous:
- Avoid using count() on the data frame if it is not necessary. Remove all those actions you used for debugging before deploying your code.
- Write intermediate or final files to parquet to reduce the read and write time.
- If you want to read any file from your local during development, use the master as “local” because in “yarn” mode you can’t read from local. In yarn mode, it references HDFS. So you have to get those files to the HDFS location for deployment.
Please let me know if you have any queries. You can also suggest added best practices to improve performance. You can connect with me using this link.
Used picture source: https://unsplash.com/photos/MrVEedTZLwM