Code Re-usability through feature pipeline framework

Rohit Bhagwat 28 Mar, 2021 • 5 min read
This article was published as a part of the Data Science Blogathon.

Introduction

In any data science, project feature creation is a common stage where raw data is transformed into features. Often the raw data will be in the form of structured tables having a variety of columns & values. The feature creation stage involves picking out useful columns and utilizing the information to create meaningful features eg. In case of fraud detection, a typical raw data will have columns such as Loss Data & Report Date, during the feature creation stage we can pick these columns and create a more meaningful feature – Report Lag Days as,

    Report Lag Days = Report Date - Loss Date

At an abstract level, creating the features means applying computational logic on a subset of columns from raw data.

In practice, usually, the column names are hard-wired during the model building stage. i.e to compute Report Lag Days a traditional way is to just code it as follow,

    Report_Lag_Series = raw_data['Loss Date']  - raw_data['Report Date']

Depending on the complexity of the model there could be many such features where we directly refer and use column names specific to given input data. Although, this approach is the easiest & fastest for model building but not scalable due to the following reasons:

  1. Hard coding of column names hamper the reusability of the code, what if in the next project the column names changes (which often the case) but the computation logic remains the same
  2. With more features, the code looks clumsy and may not be easy to maintain/understand later on
  3. The underlying computation logic is not encapsulated

We can improve on the traditional feature coding approach with a more re-usable & generalizable framework – Feature Pipeline!

Feature Pipeline class defines the behavior for managing various transformations. It mainly does two jobs

  1. Adding the transformation sequence (which are to be applied to input raw data)
  2. Applying those transformations sequentially
import pandas as pd
import numpy as np
class FeaturePipeline():
    """
    Create a feature pipeline for a raw data source.
    Attributes:
        transformations: The feature transformations for the pipeline
    """
    def __init__(self, logger=None):
        self.transformations = []
        self.transformed_df = None
        self.logger = logger
        self.transformed_df = None
                
    def add_transformation(self, on_cols, transformation_f, transformation_args=None, name=None, transform_type='series'):
        """
        Add a feature.
        Args:
            on_cols: String or Tuple 
            transformation_f: The function to use to transform the input
            transformation_args: Additional arguments for transformation
            name: Transformation name
            transform_type: Transformation type
            
        Returns:
            self
        """
        if transformation_f in self.transformations:
            print('Transformation: {} already added, skipping...'.format(transformation_f))
            return self
        transformation_f.logger = self.logger
        if type(on_cols) in [str]:
            on_cols = [on_cols]
        self.transformations.append(
            Transformation(on_col=on_cols,
                           f=transformation_f,
                           args=transformation_args, 
                           name=name,
                           transform_type=transform_type
                          ))
        
        return self
    
    def get_args(self, t, df, transformed_df):
        """
        Creates argument for transformation
        """
        list_of_series = []
        on_col_list = []
        if isinstance(t.on_col, str):
            on_col_list = [t.on_col]
        else:
            on_col_list = t.on_col
        
        for c in on_col_list:
            if c in df.columns:
                s = df[c]
                list_of_series.append(s)
            else:
                try:
                    s = transformed_df[c]
                    list_of_series.append(s)
                except:
                    raise Exception('Column {} not found in input dataframes'.format(c))                                 
        if t.transform_type == 'series':
            args = {'ser{}'.format(i+1): l for i, l in enumerate(list_of_series)}     
        elif t.transform_type == 'dataframe':
            arg_df = pd.concat(list_of_series, axis=1)
            assert arg_df.shape[0] == list_of_series[0].shape[0], 'Argument dataframe has different shape than concatenated series'
            args = {'df' : arg_df}
        else:
            raise Exception('Unknwon transform type')    
        if t.transformation_args:
            args.update(t.transformation_args)
            
        return args
            
        
    def apply_transformations(self, df, index_cols=None, reapply_all=False):
        """
        Apply feature pipline on raw data
        Args:
            df: Input DataFrame 
        Returns:
            Features DataFrame
        """
        # Deduplicate the input dataframe on columns
        df = df.loc[:,~df.columns.duplicated()].copy()
        
        # Create index column 
        if index_cols:
            df['INDEX'] = df[index_cols].apply(lambda x: "".join([v for v in x[index_cols]]), axis=1)
            
            # Deduplicate the input dataframe at INDEX level
            df = df.drop_duplicates(['INDEX'])
        
            # Set The new index
            index_list = df['INDEX'].tolist()
            df.index = index_list
            del df['INDEX']
        else:
            index_list = df.index
                
        transformed_df = None or self.transformed_df
        for t in self.transformations:
            if t.transformation_applied and (not reapply_all):
                print('Transformation: {} applied, skipping...'.format(t))
                continue
            args = self.get_args(t=t, df=df, transformed_df=transformed_df)
            
            if transformed_df is None:
                transformed_df = t.apply(args)
                transformed_df.index = index_list
            else:
                tdf = t.apply(args)
                tdf.index = index_list
                transformed_df = pd.concat([transformed_df, tdf], axis=1)
                
            self.transformed_df = transformed_df
        return transformed_df

A Transformation class below controls the behavior of individual transformation, It encapsulates the underlying computation logic used to create features.

class Transformation():
    """
    Encapsulation for individual computation logic used for feature creation
    Applies a transformation on one or more Pandas Series
    in order to produce one or more features
    Args:
        on_col: String or Tuple, Series names to use when
            creating features
        f: The function to use to transform the input
        name: The name of the transformed feature (or prefix if >1 features)
        args: dictionary of additional arguments of f
        transform_type: Whether transform to be applied on series or dataframe object
    """
    
    def __init__(self, on_col, f, name, args=None, transform_type='series'):
        self.on_col = on_col
        self.transformation_f = f
        self.name = name
        self.transform_type = transform_type
        
        # Additional arguments for transformation
        self.transformation_args = args
        
        self.transformation_applied = False
        
    def apply(self, args):
        try:
            tdf = self.transformation_f(**args)
        except TypeError:
            tdf = self.transformation_f(*list(args.values()))
            
        if self.name is not None:
            if isinstance(tdf, pd.DataFrame):
                if isinstance(self.name, list):
                    tdf.columns = self.name
                else:
                    tdf.columns = [self.name + "_" + str(col) for col in tdf.columns]
            elif isinstance(tdf, pd.Series):
                tdf = pd.DataFrame(tdf)
                tdf.columns = [self.name]         
        tdf = tdf.reset_index(drop=True)
        self.transformation_applied = True
        
        return tdf

Let’s see these in action!

I am considering a sample dataset for fraudulent claims detection.

Read the sample data

raw_data = pd.read_excel("../data/sample_raw_data.xlsx")

 

Define computation logics as transforms functions

def transforms_above_threshold(ser1, threshold):
    """
    Creates indicators based on claimed amount
    
    ser1: Series of values
    threshold: threshold value
    """
    s = pd.Series(np.where(ser1 > threshold, 1, 0))
    s.index = ser1.index
    
    return s
def transforms_days_between(ser1, ser2):
    """
    Difference between Dates in Days
    
    ser1: 'From' Date Series
    ser2: 'To' Date Series   
    """
    s1 = pd.to_datetime(ser1)
    s2 = pd.to_datetime(ser2)
    ser1 = (s2 - s1).dt.days
    
    # To Date>= From Date
    ser1 = ser1.clip(0)
    return ser1

Initialize the Feature Pipeline

ff = FeaturePipeline()

Create features using transforms

Let’s create some features which could be relevant to fraudulent claims detection use case!

We will now use the above defined transforms functions to create individual features. For adding any transformation in the feature pipeline we need to specify on what columns the transformation will act, the transformation function if any variable arguments are needed for transformation, and the name of the transformation.

A. High claimed amount indicator

ff.add_transformation(on_cols=('Claimed_Amount'), 
                      transformation_f=transforms_above_threshold,
                      transformation_args={'threshold': 500},
                      name='HighClaimedAmount')

B. Travel Length

ff.add_transformation(on_cols=('DepartureDate', 'ReturnDate'), 
                      transformation_f=transforms_days_between,
                      transformation_args=None,
                      name='TravelLength')

C. Report lag

ff.add_transformation(on_cols=('LossDate', 'ReportDate'), 
                      transformation_f=transforms_days_between,
                      transformation_args=None,
                      name='ReportLag')

D. Hospital stay length

ff.add_transformation(on_cols=('Hospital_Start_Date', 'Hospital_End_Date'), 
                      transformation_f=transforms_days_between,
                      transformation_args=None,
                      name='HospitalStayLength')

E. Loss duration since policy inception

ff.add_transformation(on_cols=('POL_Eff_Date', 'LossDate'), 
                      transformation_f=transforms_days_between,
                      transformation_args=None,
                      name='LossDurationSincePolicyEffective')
Apply the transformations

features = ff.apply_transformations(df=raw_data, index_cols=[‘ClaimNumber’])

features.describe()
HighClaimedAmount TravelLength ReportLag HospitalStayLength LossDurationSincePolicyEffective
count 5000.00000 982.000000 5000.000000 131.000000 5000.000000
mean 0.07860 10.178208 43.579600 4.022901 1177.416000
std 0.26914 28.931812 85.770203 7.125756 1570.842842
min 0.00000 0.000000 0.000000 0.000000 0.000000
25% 0.00000 1.000000 10.000000 1.000000 91.000000
50% 0.00000 4.000000 22.000000 2.000000 454.500000
75% 0.00000 9.000000 51.000000 4.000000 1770.250000
max 1.00000 364.000000 2224.000000 66.000000 10833.000000
features.head()
HighClaimedAmount TravelLength ReportLag HospitalStayLength LossDurationSincePolicyEffective
Claim_0 0 NaN 4 NaN 4
Claim_1 0 2.0 11 NaN 9
Claim_2 0 NaN 11 NaN 4904
Claim_3 0 NaN 24 NaN 199
Claim_4 0 NaN 16 NaN 1340

 

Finally, we’ll have the features table!

Summary

  1. Feature Pipeline provides a structured way of handling the feature creation stage which is common in most data science projects
  2. It encapsulates the underlying computation logic, since no column names are hardcoded this approach enhances the reusability of code
  3. Computation logic defined once can be reused multiple times for similar feature creation. Observe that for creating duration based features such as Report Lag, Travel Length & Hospital Stay Length we used the same transforms the function
  4. A separate file can be maintained containing transforms functions that can be used across different projects, this will speed up the process of creating features for another project since we can leverage some of the pre-defined transforms for feature creation
    1. The proposed framework can help in monitoring the individual transforms, experimenting easily with adding more or deleting some transformations from the pipeline and enhancing the overall readability of the code.

Hope you find the article useful.

Do Like & share if you find this article useful!

The media shown in this article are not owned by Analytics Vidhya and is used at the Author’s discretion.

Rohit Bhagwat 28 Mar 2021

Frequently Asked Questions

Lorem ipsum dolor sit amet, consectetur adipiscing elit,

Responses From Readers

Clear