Prefect and CometML For Bulldozer Sales Price Prediction

Dhrubaraj Roy 31 Mar, 2024 • 10 min read

Introduction 

If you are a beginner and are just starting to learn MLOps, you might have a question: What are MLOps?

In simple words, MLOps (Machine Learning Operations) is a set of practices for collaboration and communication between data scientists and operations professionals. Applying these practices increases the quality, simplifies the management process, and automates the deployment of Machine Learning and Deep Learning models in large-scale production environments. It’s easier to align models with business needs and regulatory requirements. In this article, we will implement our project using Prefect and CometML.

In this MLOps project, we will build the best possible Machine Learning model using optimal hyperparameters to predict the sales price of a Bulldozer. As you may know, a Bulldozer is a powerful vehicle for shallow digging and ditching.

MLOPS

Learning Objectives

  • Learn MLOps concepts and end-to-end ML workflow.
  • Implement MLOps pipeline with Prefect and CometML.
  • Make reproducible, automated ML workflows.
  • Evaluate and monitor ML models.
  • End-to-end MLOps experience.

This article was published as a part of the Data Science Blogathon.

What is Prefect and CometML?

Prefect

Prefect is an open-source Python library that helps you define, schedule, and manage data workflows well. It simplifies orchestrating and automating complex data workflows, making tasks easier. Examples are data extraction, transformation, and model training. You can do them in a systematic and repeatable way.

pip install prefect

Another thing I should mention is Prefect Cloud. Prefect Cloud is a cloud-based platform provided by Prefect for managing, orchestrating, and monitoring data workflows in MLOps.

CometML

CometML is a platform for managing and tracking machine learning experiments in MLOps. It provides tools for versioning, collaboration, and visualizing results. It helps streamline the development and monitoring of machine-learning models.

pip install comet_ml

The MLOps project: Let’s get started.

Data Exploration

As we build an end-to-end machine learning model, we will focus more on the ML life cycle than model building.

MLOps project

If you observe the dataset, you will see there are 53 columns. We will use all 52 columns for input features or X, and since our target variable is SalePrice, this will be the y. In the data exploration part, we conducted all kinds of explorations, from df.info() to plotting missing values using a scatter plot. You will find all the steps in my notebook on the GitHub repository. You can also download the dataset from there. Now, let’s start working on the project.  

Set Up a Virtual Environment

What is Virtual Environment, and why do we need it?

A virtual environment is a self-contained Python workspace for isolating project dependencies.
You install many libraries on your computer for several projects. You might have installed Python3.11, but sometimes, you need Python3.9 for another project. To avoid conflict, you need to set up a virtual Environment.

Creating a Virtual Environment

  • For Windows:
python -m venv myenv
#then for activation
myenv\Scripts\activate
  • For Linux/macOS:
python3 -m venv myenv
#then for activation
source myenv/bin/activate

File Structure

MLOps project

Configure CometML and Prefect

To configure CometML, you need to create a file named .comet.config in your project directory and define its configuration parameters. Here is an example of how you can structure a basic .comet.config file:

[comet]
api_key = your_api_key
workspace = your_workspace
project_name = your_project_name

You should sign up for Comet for an api_key, workspace, and project_name. Let’s take a look at how to set up a Comet account.

Set up a Comet account

  • Please create a new account. It’s easy and free.
MLOps project

API key

  • When your account is created in the top right corner, click your avatar, then select Account Settings.
MLOps project
  •   To get the API key, click the API Keys tab. Your current API key is displayed there. Click Copy to copy the API key.  
MLOps project
  • You can see your workspace name and project name in the Workspaces Tab.

So now let’s configure Prefect.

Set Up the Prefect

Prefect provides a cloud platform and API for managing and monitoring workflows. By signing up, we can use Prefect Cloud. It has a dashboard for tracking workflows. It can set notifications, analyze logs, and more. The interesting part is that we can deploy our machine-learning model.

  • Step 1: Install Prefect
pip install -U prefect

See the install guide for more details.

  • Step 2: Connect to Prefect’s API

Prefect’s functionality relies on a backend cloud API. The API manages the execution of workflows and data pipelines. We need to connect Prefect installation to this API. This unlocks useful features. For example, a central dashboard can be used to watch workflow runs. It also lets you set notifications. You can get them when tasks fail, analyze logs, and track task history. Lastly, it lets you scale workloads across a cluster. We can build workflows locally without the API. But we can’t make them operational or ready for production. The Prefect Cloud handles scheduling and retries. It follows limits set through the API. So, using Prefect with its API service offers a serverless platform. It is for managing complex workflows without needing to host your own coordinators.

  1. Create a new account or sign in at 
  2. Use the prefect cloud login CLI command to 

Prefect cloud login

Choose Log in with a web browser and click the Authorize button in the open browser window.

Self-hosted Prefect server instance

You can also run this on your local machine. See the tutorial for help. Note that you must host your own server and run your flows on your own infrastructure.

  • Step 3: Turn your function into a Prefect flow

See the flow.py file where I added the @flow decorator. This is the fastest way to get started with Prefect. A “Flow” is a Directed Acyclic Graph (DAG) representing a workflow. In Prefect, a task is a fundamental unit of work in the workflow. We will discuss tasks more later in this tutorial.

5-steps to implement this MLOps project using Prefect and CometML

Here are the 5 steps to implement the MLops project using Prefect and CometML

Step 1 – Ingest data

In this step, we ingest our data from our data folder. Let’s have a look at our ingest_data.py file inside the steps folder

class IngestData:
    """Ingests data from a CSV file."""

    def __init__(self, data_path: str):
        self.data_path = data_path

    def get_data(self):
        logging.info(f"Ingest data from {self.data_path}")
        return pd.read_csv(self.data_path)

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def ingest_df(data_path: str) -> pd.DataFrame:
    """
    Ingest data from the specified path and return a DataFrame.

    Args:
        data_path (str): The path to the data file.

    Returns:
        pd.DataFrame: A pandas DataFrame containing the ingested data.
    """
    try:
        ingest_obj = IngestData(data_path)
        df = ingest_obj.get_data()
        print(f"Ingesting data from {data_path}")
        experiment.log_metric("data_ingestion_status", 1)
        return df
    except Exception as e:
        logging.error(f"Error while ingesting data: {e}")
        raise e
    finally:
        # Ensure that the experiment is ended to log all data
        experiment.end()

In Prefect, a task is a fundamental unit of work in a workflow. It represents an individual computation unit or an operation that needs to be performed. So, in this case, our first task is to ingest the data.

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))

This Prefect task decorator specifies caching parameters, using task_input_hash as the cache key function and setting a cache expiration of one hour. You can learn more about this in prefect doc.

Step 2 – Clean data

In this step, we will clean our data, and the bellow code will return X_train, X_test, y_train, y_test, for training and testing our ML model. Let’s have a look

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def clean_df(data: pd.DataFrame) -> Tuple[
    Annotated[pd.DataFrame, 'X_train'],
    Annotated[pd.DataFrame, 'X_test'],
    Annotated[pd.Series, 'y_train'],
    Annotated[pd.Series, 'y_test'],
]:
    """
    Data cleaning class which preprocesses the data and divides it into train and test data.

    Args:
        data: pd.DataFrame
    """
    try:
        preprocess_strategy = DataPreprocessStrategy()
        data_cleaning = DataCleaning(data, preprocess_strategy)
        preprocessed_data = data_cleaning.handle_data()

        divide_strategy = DataDivideStrategy()
        data_cleaning = DataCleaning(preprocessed_data, divide_strategy)
        X_train, X_test, y_train, y_test = data_cleaning.handle_data()
        logging.info(f"Data Cleaning Complete")
        experiment.log_metric("data_cleaning_status", 1)
        return X_train, X_test, y_train, y_test 
    except Exception as e: 
        logging.error(e)
        raise e
    finally:
        # Ensure that the experiment is ended to log all data
        experiment.end()

Till this point, if you observe the above code carefully, you might be thinking, where are the DataPreprocessStrategy(), and DataDivideStrategy() defined inside the model folder, we define these methods; let’s have a look

class DataPreprocessStrategy(DataStrategy):
    """
    Data preprocessing strategy which preprocesses the data.
    """

    def handle_data(self, data: pd.DataFrame) -> pd.DataFrame:
        try:
            """
            Performs transformations on df and returns transformaed df.
            """
            # Convert 'saledate' column to datetime
            data['saledate'] = pd.to_datetime(data['saledate'])
            data["saleYear"] = data.saledate.dt.year
            data["saleMonth"] = data.saledate.dt.month
            data["saleDay"] =data.saledate.dt.day
            data["saleDayOfWeek"] = data.saledate.dt.dayofweek
            data["saleDayOfYear"] = data.saledate.dt.dayofyear

            data.drop("saledate", axis=1, inplace=True)


            # Fill the numeric row with median
            for label, content in data.items():
                    if pd.api.types.is_numeric_dtype(content):
                        if pd.isnull(content).sum():
                            # Add a binary column which tells us if the data was missing 
                            # or not
                            data[label+"is_missing"] = pd.isnull(content)
                            # Fill missing numeric values with median
                            data[label] = content.fillna(content.median())

                    # Filled categorical missing data and turn categories into numbers
                    if not pd.api.types.is_numeric_dtype(content):
                        data[label+"is_missing"] = pd.isnull(content)
                        # We add +1 to the category code because pandas encodes
                        # missing categories as -1
                        data[label] = pd.Categorical(content).codes+1
                
        
        
            return data
        except Exception as e:
            logging.error("Error in Data handling: {}".format(e))
            raise e

In my GitHub repository, you can find all methods.

Step 3 – Train model

We will train a simple linear regression model using the Scikit learn library.

# Create a CometML experiment
experiment = Experiment()
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def train_model(
    X_train: pd.DataFrame,
    X_test: pd.DataFrame,
    y_train: pd.Series,
    y_test: pd.Series,
    config: ModelNameConfig = ModelNameConfig(),
) -> RegressorMixin:
    """
    Train a regression model based on the specified configuration.

    Args:
        X_train (pd.DataFrame): Training data features.
        X_test (pd.DataFrame): Testing data features.
        y_train (pd.Series): Training data target.
        y_test (pd.Series): Testing data target.
        config (ModelNameConfig): Model configuration.

    Returns:
        RegressorMixin: Trained regression model.
    """
    try:
        model = None
        if config.model_name == "random_forest_regressor":
            model = RandomForestRegressor(n_estimators=40,
                                                min_samples_leaf=1,
                                                min_samples_split=14,
                                                max_features=0.5,
                                                n_jobs=-1,
                                                max_samples=None,
                                                random_state=42)
            trained_model = model.fit(X_train, y_train)
             # Save the trained model to a file
            model_filename = "trained_model.pkl"
            with open(model_filename, 'wb') as model_file:
                pickle.dump(trained_model, model_file)
            print("train model finished")
            experiment.log_metric("model_training_status", 1)
            return trained_model
        else:
            raise ValueError("Model name not supported")
    except Exception as e:
        logging.error(f"Error in train model: {e}")
        raise e
    finally:
    # Ensure that the experiment is ended to log all data
        experiment.end()

Step 4 – Evaluate model

# Create a CometML experiment
experiment = Experiment()
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def evaluate_model(
    model: RegressorMixin, X_test: pd.DataFrame, y_test: pd.Series
) -> Tuple[Annotated[float, "r2"], 
           Annotated[float, "rmse"],
]:
    """
    Args:
        model: RegressorMixin
        x_test: pd.DataFrame
        y_test: pd.Series
    Returns:
        r2_score: float
        rmse: float
    """
    try:
        prediction = model.predict(X_test)

        # Using the MSE class for mean squared error calculation
        mse_class = MSE()
        mse = mse_class.calculate_score(y_test, prediction)
        experiment.log_metric("MSE", mse)
        # Using the R2Score class for R2 score calculation
        r2_class = R2Score()
        r2 = r2_class.calculate_score(y_test, prediction)
        experiment.log_metric("R2Score", r2)
        # Using the RMSE class for root mean squared error calculation
        rmse_class = RMSE()
        rmse = rmse_class.calculate_score(y_test, prediction)
        experiment.log_metric("RMSE", rmse)
       # Log metrics to CometML
        
        experiment.log_metric("model_evaluation_status", 1)
        print("Evaluate model finished")

        return r2, rmse
    except Exception as e:
        logging.error(f"Error in evaluation: {e}")
        raise e
    finally:
        # Ensure that the experiment is ended to log all data
        experiment.end()

We have logged all those metrics, like r2 score, mse, and rmse. You can see the above code. We can visualize those matrices on the CometML dashboard. However, when you run the flow, you can see the dashboard. In the next step, we discuss that.

MLOps project

Step 5 – Run the flow (The final step)

We have to run the flow.

We import all the tasks and flows into the flow.py file and run our flow from there.

python3 flow.py
from prefect import flow

from steps. ingest_data import ingest_df
from steps.clean_data import clean_df
from steps.train_model import train_model
from steps.evaluation import evaluate_model
## import comet_ml at the top of your file
from comet_ml import Experiment

## Create an experiment with your api key
@flow(retries=3, retry_delay_seconds=5, log_prints=True)
def my_flow():
    data_path="/home/dhrubaubuntu/gigs_projects/Bulldozer-price-prediction/data/TrainAndValid.csv"
    df = ingest_df(data_path)
    X_train, X_test, y_train, y_test = clean_df(df)
    model = train_model(X_train, X_test, y_train, y_test)
    r2_score, rmse = evaluate_model(model, X_test, y_test)

# Run the Prefect Flow
if __name__ == "__main__":
    my_flow()
MLOps project

Here, you can see all the run-in flow dashboards in Prefect 

MLOps project

Conclusion

Implementing end-to-end MLOps enables organizations to reliably scale-out machine learning solutions in production. This tutorial demonstrated an automated workflow for predicting electric vehicle ranges using open-source libraries like Prefect and CometML.

Key highlights from the project include:

  • Orchestrating an ML pipeline with Prefect involves handling steps ranging from data ingestion, preprocessing, model development, evaluation, and monitoring.
  • Tracking experiments in CometML to visualize model metrics like RMSE and R2 scores over time for comparison.
  • Monitoring workflow executions in Prefect Cloud showing task durations.

Overall, this showcase implements data science best practices of automation, reproducibility, and monitoring in a structured workflow critical for real-world ML systems. Extending and operationalizing to production can further leverage Prefect’s scalability in managing large-scale flows across distributed infrastructure.

Key Takeaways

Some key takeaways from this end-to-end MLOps tutorial include:

  • Implementing MLOps improves data scientists and IT collaboration with automation and DevOps practices.
  • Prefect enables the creation of robust data pipelines and workflows to ingest, process, train, and evaluate models.
  • CometML provides an easy way to track ML experiments with logging and visualization.
  • Orchestrating the ML lifecycle end-to-end ensures models remain relevant as new data comes in.
  • Monitoring workflow executions helps identify and troubleshoot failures quickly.
  • MLOps unlocks faster experimentation by simplifying retraining and deployment of updated models.

Frequently Asked Questions

Q1. What is MLOps?

Ans. MLOps for machine learning is a set of practices that aims to streamline and automate the end-to-end machine learning lifecycle, including model development, deployment, and maintenance, to enhance collaboration and efficiency in data science and operations teams.

Q2. What is Prefect?

Ans. Prefect is an open-source Python library for workflow management. It enables the creation, scheduling, and orchestration of data workflows and tasks commonly used in data science and automation pipelines. It simplifies complex workflows, focusing on flexibility, reliability, and monitoring.

Q3. What is CometML?

Ans. CometML is a platform for machine learning experimentation and collaboration. It provides tools for tracking, comparing, and optimizing machine learning experiments, enabling teams to log and share experiment details, metrics, and visualizations to improve model development and collaboration.

Q4. What is Prefect used for?

Ans. Prefect is used for workflow management in data science and automation. It helps streamline and orchestrate complex data workflows, making designing, scheduling, and cohesively monitoring tasks easier. Prefect is commonly employed for data processing, machine learning model training, and other data-centric operations, providing a framework for building, running, and managing workflows efficiently.

Q5. What is the difference between MLflow and Comet?

Ans. MLflow is an open-source platform for managing the end-to-end machine learning lifecycle, including experiment tracking, packaging code into reproducible runs, and sharing and deploying models. Comet is a platform for machine learning experimentation and collaboration, focusing on experiment tracking, visualizations, and collaboration features. It provides a centralized hub for teams to analyze and share results. While both support experiment tracking, MLflow offers additional model packaging and deployment features, while Comet emphasizes collaboration and visualization capabilities.

The media shown in this article is not owned by Analytics Vidhya and is used at the Author’s discretion.

Dhrubaraj Roy 31 Mar 2024

Frequently Asked Questions

Lorem ipsum dolor sit amet, consectetur adipiscing elit,

Responses From Readers

Clear

Related Courses