Data Engineering 101 – Data Sources in Apache Spark Every Data Engineer Must Know!
- Get to know different types of Apache Spark data sources
- Understand the options available on various spark data sources
The ability to read and write from different kinds of data sources and for the community to create its own contributions is arguably one of Spark’s greatest strengths.
As a general computing engine, Spark can process data from various data management/storage systems, including HDFS, Hive, Cassandra, and Kafka. For flexibility and high throughput, Spark defines the Data Source API, which is an abstraction of the storage layer.
This Data Source API has two requirements:
- Generality: Support reading/writing most data management/storage systems.
- Flexibility: Customize and optimize the read and write paths for different systems based on their capabilities.
This article will give you a better understanding of all the core data sources available. You will be introduced to a variety of data sources that you can use with Spark out of the box as well as the countless other sources built by the greater community. Spark has six “core” data sources and hundreds of external data sources written by the community.
Table of Contents
- Structure of Spark’s Data Source API
- Read API Structure
- Write API Structure
- Apache Spark Data Sources you Should Know About
- JDBC/ODBC connections
Core Data Sources in Apache Spark
Here are the core data sources in Apache Spark you should know about:
There are several community-created data sources as well:
4. AWS Redshift
And many, many others
Structure of Apache Spark’s DataSources API
where .format is used to read all data sources.
.format is optional as by default Spark will use parquet format. The Option allows us to set the key-value configuration to parameterize how data has to be read.
Lastly, the schema is optional if data sources provide schema or you intend to provide schema inference.
Read API structure
The basic of reading data in Spark is through DataFrameReader. This can be accessed through SparkSession through the read attribute shown below:
As we have DataFrameReader, we can specify multiple values. There are multiple sets of options for different data sources which determines how the data has to be read. All the options can be omitted except one. At the minimum DataFrameReader should be provided with the path from which files to read.
spark.read.format("csv") .option("mode", "FAILFAST") .option("inferSchema", "true") .option("path", "path/to/file(s)") .schema(someSchema) .load()
When working with semi-structured data sources more often we come across data that is malformed. Read mode specifies what needs to be done when such data is encountered.
permissive Sets all fields to null when it encounters a corrupted record and places all corrupted records in a string column
dropMalformed Drops the row that contains malformed records failFast Fails immediately upon encountering malformed records
The default is permissive.
Write API Structure
DataFrameWriter.format(...) .option(...) .partitionBy(...) .bucketBy(...) .sortBy(...) .save()
.format specified how the file needs to be written to the data sources.
.option is optional as Spark uses parquet by default.
.PartitionBy, .bucketBy, .sortBy are only used with file-based data sources and control the file structure otr layout at the destination.
Writing data is the same as reading data. Only, DataFrameReader is replaced by DataFrameWriter.
With the DataFrameWriter we need to give format, series of options, and save path. We can specify many options but at the minimum, we need to give the destination path.
dataframe.write.format("csv") .option("mode", "OVERWRITE") .option("dateFormat", "yyyy-MM-dd") .option("path", "path/to/file(s)") .save()
append Appends the output files to the list of files that already exist at that location overwrite Will completely overwrite any data that already exists there errorIfExists Throws an error and fails the write if data or files already exist at the specified location ignore If data or files exist at the location, do nothing with the current DataFrame
errorIfExists fails to write the data if Spark finds data present in the destination path.
The Different Apache Spark Data Sources You Should Know About
CSV stands for comma-separated values. This is a common text file format in which each line represents a single record and each field is separated by a comma within a record. CSV format is well structured but maybe one of the trickiest file formats to work within the production scenarios because not many assumptions can be made about what they contain and how they are structured.
For this reason, CSV reader has a large number of options. These options give you the ability to work around issues like certain characters needing to be escaped—for example, commas inside of columns when the file is also comma-delimited or null values labeled in an unconventional way.
Reading CSV Files
spark.read.format("csv") .option("header", "true") .option("mode", "FAILFAST") .option("inferSchema", "true") .load("some/path/to/file.csv")
If you have a header with column names on file, you need to explicitly specify
truefor the header option, the API treats the header as a data record.
You can also specify data sources with their fully qualified name(i.e.,
org.apache.spark.sql.csv), but for built-in sources, you can also use their short names (
When reading CSV files with a specified schema, it is possible that the data in the files does not match the schema. For example, a field containing the name of the city will not parse as an integer. The consequences depend on the mode that the parser runs in:
PERMISSIVE(default): nulls are inserted for fields that could not be parsed correctly
DROPMALFORMED: drops lines that contain fields that could not be parsed
FAILFAST: aborts the reading if any malformed data is found.
The table below presents the options available on CSV reader:
Read/write Key Potential values Default Description Both sep Any single string
, The single character that is used as a separator for each field and value. Both header true, false false A Boolean flag that declares whether the first line in the file(s) are the names of the columns. Read escape Any string character \ The character Spark should use to escape other characters in the file. Read inferSchema true, false false Specifies whether Spark should infer column types when reading the file. Read ignoreLeadingWhiteSpace true, false false Declares whether leading spaces from values being read should be skipped. Read ignoreTrailingWhiteSpace true, false false Declares whether trailing spaces from values being read should be skipped. Both nullValue JSON data source options: Any string character “” Declares what character represents a null value in the file. Both nanValue Any string character NaN Declares what character represents a NaN or missing character in the CSV file. Both positiveInf Any string or
Inf Declares what character(s) represent a positive infinite value. Both negativeInf Any string or
-Inf Declares what character(s) represent a negative infinite value. Both compression or codec None, uncompressed,
gzip, lz4, or snappy
none Declares what compression codec Spark should use to read or write the file. Both dateFormat Any string or
conforms to java’s
YYYY-MM-dd Declares the date format for any columns that are date type. Both timestampFormat Any string or
conforms to java’s
Declares the timestamp format for any timestamp type. Read maxColumns Any integer 20480 Declares the maximum number of columns in the file. Read maxCharsPerColumn Any integer 1000000 Declares the maximum number of characters in a column. Read escapeQuotes true, false true Declares whether Spark should escape quotes that are found in lines. Read maxMalformedLogPerPartition Any integer 10 Sets the maximum number of malformed rows Spark will log for each partition. Malformed records beyond this number will be ignored. Write quoteAll true, false false Specifies whether all values should be enclosed in quotes, as opposed to just escaping values that have a quote character. Read multiLine true, false false This option allows you to read multiline CSV files where each logical row in the CSV file might span multiple rows in the file itself.
Writing CSV Files
csvFile.write.format("csv") .mode("overwrite") .option("sep", "\t")\ .save("/tmp/my-tsv-file.tsv")
When working in Spark when we refer to JSON files, we refer to the line delimited JSON files. The line-delimited versus multiline trade-off is controlled by a single option: multiLine. When you
set this option to true, you can read an entire file as one JSON object and Spark will go through the work of parsing that into a DataFrame.
Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame. This conversion can be done using
SparkSession.read.jsona JSON file. Note that the file that is offered as a JSON file is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object.
For more information, please see JSON Lines text format, also called newline-delimited JSON.
JSON data source options:
Any single string
Read/write Key Potential values Default Description Both None None, uncompressed, bzip2, deflate,gzip, lz4, or snappy none DeclDeclares what compression codec Spark should use to read or write the file. Both dateFormat Any string or character that conforms to Java’s yyyy-MM-dd SimpleDataFormat. Declares the date format for any columns that are date type. Both dateFormat Any string or character that conforms to Java’s yyyy-MM-dd SimpleDataFormat. Declares the date format for any columns that are date type. Both primitiveAsString true, false false Infers all primitive values as string type. Both timestampFormat Any string or character that conforms to Java’s yyyy-MM-dd’T’HH:mm:ss.SSSZZ SimpleDataFormat. Declares the timestamp format for any columns that are timestamp type. Read allowComments true, false false Ignores Java/C++ style comment in JSON records. Read allowUnquoted-
true, false false Allows unquoted JSON field names. Read allowSingleQuotes true, false true Allows single quotes in addition to double quotes. Read multiLine true, false false Allows for reading in non-line- delimited JSON files. Read allowNumeric-
true, false false Allows leading zeroes in numbers (e.g., 00012). Read allowBackslash-
true, false false Allows accepting quoting of all characters using backslash quoting mechanism. Read columnNameOf-
Any string Value of spark.sql.column&NameOfCorruptRecord Allows renaming the new field having a malformed string created Value of by permissive mode. This will override the configuration value.
Reading JSON Files
spark.read.format("json") .option("mode", "FAILFAST")\ .option("inferSchema", "true")\ .load("/data/movie-data/json/2010-summary.json")
Writing JSON Files
csvFile.write.format("json") .mode("overwrite") .save("/tmp/my-json-file.json")
Parquet is an open-source file format available to any project in the Hadoop ecosystem. Apache Parquet is designed for efficiency as well as the performant flat columnar storage format of data compared to row-based files like CSV or TSV files.
Parquet uses the record shredding and assembly algorithm which is superior to the simple flattening of nested namespaces. Parquet is optimized to work with complex data in bulk and features different ways for efficient data compression and encoding types. This approach is best especially for those queries that need to read certain columns from a large table. Parquet can only read the needed columns therefore greatly minimizing the IO.
Reading Parquet Files
Writing Parquet Files
csvFile.write.format("parquet") .mode("overwrite")\ .save("/tmp/my-parquet-file.parquet"
The Optimized Row Columnar (ORC) file format provides a highly efficient way to store Hive data. It was designed to overcome the limitations of the other Hive file formats. Using ORC files improves performance when Hive is reading, writing, and processing data.
Compared with RCFile format, for example, ORC file format has many advantages such as:
- a single file as the output of each task, which reduces the NameNode’s load
- Hive type support including datetime, decimal, and the complex types (struct, list, map, and union)
- light-weight indexes stored within the file
- skip row groups that don’t pass predicate filtering
- seek to a given row
- block-mode compression based on the data type
- run-length encoding for integer columns
- dictionary encoding for string columns
- concurrent reads of the same file using separate RecordReaders
- ability to split files without scanning for markers
- bound the amount of memory needed for reading or writing
- metadata stored using Protocol Buffers, which allows addition and removal of fields
Reading Orc Files
Writing Orc Files
csvFile.write.format("orc") .mode("overwrite") .save("/tmp/my-json-file.orc"
Spark also allows you to read in plain-text files. Each line in the file becomes a record in the DataFrame. It is then up to you to transform it accordingly. As an example of how you would do this, suppose that you need to parse some Apache log files to some more structured format, or perhaps you want to parse some plain text for natural-language processing.
Text files make a great argument for the Dataset API due to its ability to take advantage of the flexibility of native types.
Reading Text Files
spark.read.textFile("/data/movie-data/csv/2020-summary.csv") .selectExpr("split(value, ',') as rows").show()
Writing Text Files
SQL data sources are one of the more powerful connectors because there are a variety of systems to which you can connect (as long as that system speaks SQL). For instance, you can connect to a MySQL database, a PostgreSQL database, or an Oracle database. You also can connect to SQLite, which is what we’ll do in this example.
Of course, databases aren’t just a set of raw files, so there are more options to consider regarding how you connect to the database. Namely, you’re going to need to begin considering things like authentication and connectivity (you’ll need to determine whether the network of your Spark cluster is connected to the network of your database system).
To get started you will need to include the JDBC driver for your particular database on the spark classpath. For example, to connect to Postgres from the Spark Shell you would run the following command:
--driver-class-path postgresql-9.4.1207.jar \
Tables from external or remote databases can be loaded as a Dataframe or temporary view using data source API.
Reading data from a JDBC source
jdbcDF = spark.read \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .load()
Saving data to a JDBC source
jdbcDF.write \ .format("jdbc") \ .option("url", "jdbc:postgresql:dbserver") \ .option("dbtable", "schema.tablename") \ .option("user", "username") \ .option("password", "password") \ .save()
We have discussed a variety of sources available to you for reading and writing data in Spark. This covers nearly everything you’ll need to know as an everyday user of Spark with respect to data sources. For the curious, there are ways of implementing your own data source.
I recommend you go through the following data engineering resources to enhance your knowledge-
- Getting Started with Apache Hive – A Must Know Tool For all Big Data and Data Engineering Professionals
- Introduction to the Hadoop Ecosystem for Big Data and Data Engineering
- Types of Tables in Apache Hive – A Quick Overview
I hope you liked the article. Do not forget to drop in your comments in the comments section below.