This article was published as a part of the Data Science Blogathon.
With the demand for big data and machine learning, this article provides an introduction to Spark MLlib, its components, and how it works. This covers the main topics of using machine learning algorithms in Apache Spark.
Apache Spark is a data processing framework that can quickly perform processing tasks on very large data sets and can also distribute data processing tasks across multiple computers, either on its own or in tandem with other distributed computing tools. It is a lightning-fast unified analytics engine for big data and machine learning
To support Python with Spark, the Apache Spark community released a tool, PySpark. Using PySpark, one can work with RDDs in Python programming language.
The components of Spark are:
- Spark Core
- Spark SQL
- Spark Streaming
- Spark MLlib
- Spark R
All the functionalities being provided by Apache Spark are built on the top of Spark Core. It manages all essential I/O functionalities. It is used for task dispatching and fault recovery. Spark Core is embedded with a special collection called RDD (Resilient Distributed Dataset). RDD is among the abstractions of Spark. Spark RDD handles partitioning data across all the nodes in a cluster. It holds them in the memory pool of the cluster as a single unit. There are two operations performed on RDDs:
Transformation: It is a function that produces new RDD from the existing RDDs.
Action: In Transformation, RDDs are created from each other. But when we want to work with the actual dataset, then, at that point we use Action.
The Spark SQL component is a distributed framework for structured data processing. Spark SQL works to access structured and semi-structured information. It also enables powerful, interactive, analytical applications across both streaming and historical data. DataFrames and SQL provide a common way to access a variety of data sources. Its main feature is being a Cost-based optimizer and Mid query fault-tolerance.
It is an add-on to core Spark API which allows scalable, high-throughput, fault-tolerant stream processing of live data streams. Spark Streaming, groups the live data into small batches. It then delivers it to the batch system for processing. It also provides fault tolerance characteristics.
GraphX in Spark is an API for graphs and graph parallel execution. It is a network graph analytics engine and data store. Clustering, classification, traversal, searching, and pathfinding is also possible in graphs.
SparkR provides a distributed data frame implementation. It supports operations like selection, filtering, aggregation but on large datasets.
Spark MLlib is used to perform machine learning in Apache Spark. MLlib consists of popular algorithms and utilities. MLlib in Spark is a scalable Machine learning library that discusses both high-quality algorithm and high speed. The machine learning algorithms like regression, classification, clustering, pattern mining, and collaborative filtering. Lower level machine learning primitives like generic gradient descent optimization algorithm are also present in MLlib.
Spark.ml is the primary Machine Learning API for Spark. The library Spark.ml offers a higher-level API built on top of DataFrames for constructing ML pipelines.
Spark MLlib tools are given below:
- ML Algorithms
ML Algorithms form the core of MLlib. These include common learning algorithms such as classification, regression, clustering, and collaborative filtering.
MLlib standardizes APIs to make it easier to combine multiple algorithms into a single pipeline, or workflow. The key concepts are the Pipelines API, where the pipeline concept is inspired by the scikit-learn project.
A Transformer is an algorithm that can transform one DataFrame into another DataFrame. Technically, a Transformer implements a method transform(), which converts one DataFrame into another, generally by appending one or more columns. For example:
A feature transformer might take a DataFrame, read a column (e.g., text), map it into a new column (e.g., feature vectors), and output a new DataFrame with the mapped column appended.
A learning model might take a DataFrame, read the column containing feature vectors, predict the label for each feature vector, and output a new DataFrame with predicted labels appended as a column.
An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. Technically, an Estimator implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer. For example, a learning algorithm such as LogisticRegression is an Estimator, and calling fit() trains a LogisticRegressionModel, which is a Model and hence a Transformer.
Transformer.transform() and Estimator.fit() are both stateless. In the future, stateful algorithms may be supported via alternative concepts.
Each instance of a Transformer or Estimator has a unique ID, which is useful in specifying parameters (discussed below).
Featurization includes feature extraction, transformation, dimensionality reduction, and selection.
- Feature Extraction is extracting features from raw data.
- Feature Transformation includes scaling, renovating, or modifying features
- Feature Selection involves selecting a subset of necessary features from a huge set of features.
A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow. It also provides tools for constructing, evaluating and tuning ML Pipelines.
In machine learning, it is common to run a sequence of algorithms to process and learn from data. MLlib represents such a workflow as a Pipeline, which consists of a sequence of Pipeline Stages (Transformers and Estimators) to be run in a specific order. We will use this simple workflow as a running example in this section.
Example: Pipeline sample given below does the data preprocessing in a specific order as given below:
1. Apply String Indexer method to find the index of the categorical columns
2. Apply OneHot encoding for the categorical columns
3. Apply String indexer for the output variable “label” column
4. VectorAssembler is applied for both categorical columns and numeric columns. VectorAssembler is a transformer that combines a given list of columns into a single vector column.
The pipeline workflow will execute the data modelling in the above specific order.
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
categoricalColumns = ['job', 'marital', 'education', 'default', 'housing', 'loan'] stages =  for categoricalCol in categoricalColumns: stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Indexer') encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "Vec"]) stages += [stringIndexer, encoder] label_stringIdx = StringIndexer(inputCol = 'deposit', outputCol = 'label') stages += [label_stringIdx] numericColumns = ['age', 'balance', 'duration'] assemblerInputs = [c + "Vec" for c in categoricalColumns] + numericColumns Vassembler = VectorAssembler(inputCols = assemblerInputs, outputCol="features") stages += [Vassembler]
from pyspark.ml import Pipeline pipeline = Pipeline(stages = stages) pipelineModel = pipeline.fit(df) df = pipelineModel.transform(df) selectedCols = ['label', 'features'] + cols df = df.select(selectedCols)
Dataframes provide a more user-friendly API than RDDs. The DataFrame-based API for MLlib provides a uniform API across ML algorithms and across multiple languages. Dataframes facilitate practical ML Pipelines, particularly feature transformations.
from pyspark.sql import SparkSession spark = SparkSession.builder.appName('mlearnsample').getOrCreate() df = spark.read.csv('loan_bank.csv', header = True, inferSchema = True) df.printSchema()
Persistence helps in saving and loading algorithms, models, and Pipelines. This helps in reducing time and efforts as the model is persistence, it can be loaded/ reused any time when needed.
from pyspark.ml.classification import LogisticRegression lr = LogisticRegression(featuresCol = 'features', labelCol = 'label') lrModel = lr.fit(train)from pyspark.ml.evaluation import BinaryClassificationEvaluatorevaluator = BinaryClassificationEvaluator()print(‘Test Area Under ROC’, evaluator.evaluate(predictions))
predictions = lrModel.transform(test) predictions.select('age', 'label', 'rawPrediction', 'prediction').show()
Utilities for linear algebra, statistics, and data handling. Example: mllib.linalg is MLlib utilities for linear algebra.
Spark MLlib is required if you are dealing with big data and machine learning. In this article, you had learned about the details of Spark MLlib, Data frames, and Pipelines. In the future article, we will work on hands-on code in implementing Pipelines and building data model using MLlib.