Data drift occurs when the statistical properties of input data change over time, affecting model performance. Many data scientists overlook this issue, leading to inaccurate predictions and poor decision-making. Detecting data drift helps maintain model reliability and ensures consistent results.
Understanding data drift is crucial for any machine learning project. Without proper monitoring, models can fail silently, causing significant business impacts. This article explains what data drift is, its types, and how to detect it effectively.
In this article, you will get to know about Data Drift Detection, its types, and how to calculate it step by step.
This article was published as a part of the Data Science Blogathon
Data drift occurs when the statistical properties of input data change over time, causing model performance to decline. Unlike sudden errors, drift happens gradually, making it hard to detect. Common causes include shifts in user behavior, seasonality, or system updates. Monitoring data drift helps maintain model accuracy by prompting timely retraining or adjustments. Keep models reliable by checking for drift regularly
Concept drift means that the statistical properties of the target variable, which the model is trying to predict, change over time. This causes problems because the predictions become less accurate and become unreliable.
Example:
The abrupt changes in consumer behavior brought on by COVID-19 had a major impact on the accuracy of forecasting models that rely on historical data to inform their predictions. This can be treated as an example of concept drift.
One of the main reasons for concept drift to occur is the non-stationarity of data i.e., change in statistical properties of data with time.
Covariate shift is the change in the distribution of one or more of the independent variables or input variables of the dataset. This means that even though the relationship between feature and target variable remains unchanged, the distribution of the feature itself has changed. When statistical properties of this input data change, the same model which has been built before will not provide unbiased results. This leads to inaccurate predictions.
Example: Suppose a model is trained with a salary variable that ranges from 200$ to 300$ and is in production. Over time, salary increases and the model encounters real-time data with higher salary figures of 1000$,1200$, and so on. And the model will see an increase in mean and variance, and therefore it leads to a data drift.
Stage 1 (Data Retrieval) is used to retrieve data from data streams in chunks since a single data point cannot carry enough information to infer the overall distribution.
Stage 2 (Data Modeling) is used to extract the key features, that is, the features of the data that most impact a system if they drift.
Stage 3 (Hypothesis Test) is used to evaluate the statistical significance of the change observed in Stage 3 or the p-value.
All the methods for detecting data drift are lagging indicators of drift. Only after they have processed enough data after any kind of drift that has occurred, that the actual drift is detected.
The K-S test is a non-parametric test that compares the cumulative distributions of two data sets, in this case, the training data and the post-training data. The null hypothesis for this test states that the data distributions from both the datasets are same. If the null is rejected then we can conclude that there is adrift in the model.
In our analysis, we have only considered numerical columns for the test.
For generating our final Data Drift analysis, the chi-squared test can be applied for the categorical features to identify data drift.
Python implementation:
import numpy as np
import pandas as pd
df=pd.read_csv('Churn_Modelling.csv')
print(df.head())
df.drop(['RowNumber','CustomerId','Surname'],axis=1,inplace=True)
df_numerical=df.iloc[:,[3,4,5,9]]
df_numerical.head()
df_salary_low=df_numerical[df_numerical['EstimatedSalary']<=10000]
df_salary_high = df_numerical[df_numerical['EstimatedSalary'] > 10000]
from scipy import stats
p_value = 0.05
rejected = 0
for col in df_numerical.columns:
# Example statistical test placeholder (fill in with actual test logic)
# For instance, you might want to test normality:
stat, p = stats.shapiro(df_numerical[col])
if p < p_value:
rejected += 1
print(f"Null hypothesis rejected for column: {col}")
else:
print(f"Null hypothesis not rejected for column: {col}")
test = stats.ks_2samp(df_salary_low[col], df_salary_high[col])
if test[1] < p_value:
rejected += 1
print("Column rejected", col)
print("We rejected",rejected,"columns in total")
Thus, the K-S test has rejected the Tenure and Estimated Salary columns which means that the statistical properties of these two columns for both the datasets are not identical.
It compares the distribution of the target variable in the test dataset to a training data set that was used to develop the model.
a) When PSI<=1
This means there is no change or shift in the distributions of both datasets.
b) 0.1< PSI<0.2
This indicates a slight change or shift has occurred.
c) PSI>0.2
This indicates a large shift in the distribution has occurred between both datasets.
import numpy as np
def calculate_psi(expected, actual, buckettype='bins', buckets=10, axis=0):
def psi(expected_array, actual_array, buckets):
def scale_range(input_array, min_val, max_val):
input_array = input_array.copy()
input_array += -(np.min(input_array))
input_array /= np.max(input_array) / (max_val - min_val)
input_array += min_val
return input_array
breakpoints = np.arange(0, buckets + 1) / buckets * 100
breakpoints = scale_range(breakpoints, np.min(expected_array), np.max(expected_array))
expected_percents = np.histogram(expected_array, breakpoints)[0] / len(expected_array)
actual_percents = np.histogram(actual_array, breakpoints)[0] / len(actual_array)
def sub_psi(e_perc, a_perc):
if a_perc == 0:
a_perc = 0.0001
if e_perc == 0:
e_perc = 0.0001
value = (e_perc - a_perc) * np.log(e_perc / a_perc)
return value
psi_value = np.sum([
sub_psi(expected_percents[i], actual_percents[i])
for i in range(len(expected_percents))
])
return psi_value
# Determine if we're working with 1D or multi-dimensional arrays
if len(expected.shape) == 1:
psi_values = psi(expected, actual, buckets)
else:
psi_values = np.empty(expected.shape[axis])
for i in range(len(psi_values)):
if axis == 0:
psi_values[i] = psi(expected[i, :], actual[i, :], buckets)
else:
psi_values[i] = psi(expected[:, i], actual[:, i], buckets)
return psi_values
psi_list = []
top_feature_list = df_salary_high.columns
for feature in top_feature_list:
# Assuming you have a validation and training set
psi_t = calculate_psi(df_salary_high[feature], df_salary_low[feature])
psi_list.append(psi_t)
print('Stability index for column', feature, 'is', psi_t)
A Machine Learning-based model approach can also be used to detect data drift between two populations.
We need to label our data which has been used to build the current model in production as 0 and the real-time data gets labeled as 1. We now have to build a model and evaluate the results.
If the model gives high accuracy, it means that it can easily discriminate between the two sets of data. Thus, we could conclude that a covariate shift has occurred and the model will need to be recalibrated. On the other hand, if the model accuracy is around 0.5, it means that it is as good as a random guess. This means that a significant data shift has not occurred and we can continue to use the model.
The disadvantage of this model is that every time new input data is made available, the training and testing process needs to be repeated which can become computationally expensive.
The Adaptive Windowing (ADWIN) algorithm uses a sliding window approach to detect concept drift. Window size is fixed and ADWIN slides the fixed window for detecting any change on the newly arriving data. When two sub-windows show distinct means in the new observations the older sub-window is dropped.
A user-defined threshold is set to trigger a warning that drift is detected. If the absolute difference between the two means derived from two sub-windows exceeds the pre-defined threshold, an alarm is generated. This method is applicable for univariate data.
from skmultiflow.drift_detection import ADWIN
adwin = ADWIN()
for col in df_numerical.columns:
data_stream=[]
a = np.array(df_salary_low[col])
b = np.array(df_salary_high[col])
data_stream = np.concatenate((a,b))
for i in range(len(data_stream)):
adwin.add_element(data_stream[i])
if adwin.detected_change():
print('Change detected in data: ' + str(data_stream[i]) +
' - at index: ' + str(i) + ' for column: ' + col)
Page-Hinkley method
This drift detection method calculates the mean of the observed values and keeps updating the mean as and when new data arrives. A drift is detected if the observed mean at some instant is greater than a threshold value lambda.
min_instances – defaults to 30
The minimum number of instances before detecting change.
delta – defaults to 0.005
The delta factor for the Page Hinkley test.
threshold – defaults to 50
The change detection threshold (lambda).
alpha – defaults to 0.9999
The forgetting factor is used to weigh the observed value and the mean.
from river.drift import PageHinkley
np.random.seed(12345)
ph = PageHinkley(threshold=10,min_instances=10)
for col in df_numerical.columns:
data_stream = []
a = np.array(df_salary_low[col])
b = np.array(df_salary_high[col])
data_stream = np.concatenate((a, b))
for i, val in enumerate(data_stream):
in_drift, in_warning = ph.update(val)
if in_drift:
print(f"Change detected at index {i} for column: {col} with input value: {val}")
Handling data drift in production
In production, there are multiple ways to respond to data drift.
Some of the methods which are generally followed in the industry are:
1) Blindly update model:
This is a naïve approach. There is no proactive drift detection. Models are periodically retrained and updated with recent data. Without drift detection in place, it is difficult to estimate the time interval for re-training and model re-deployment.
2) Training with weighted data:
When a new model is trained instead of discarding old training data, use weight inversely proportional to the age of data.
3) Incremental learning:
As new data arrives, the models are continuously retrained and updated. As a result, the model is always adapting to the changes in the data distribution. This approach will work with machine learning models which allow incremental learning one instance of data at a time.
All the methods for detecting data drift are lagging indicators of drift. Only after they have processed enough data after any kind of drift that has occurred, that the actual drift is detected.
The K-S test is a nonparametric test that compares the cumulative distributions of two data sets, in this case, the training data and the post-training data. The null hypothesis for this test states that the data distributions from both the datasets are same. If the null is rejected then we can conclude that there is adrift in the model.
In our analysis, we have only considered numerical columns for the test.
For generating our final Data Drift analysis, the chi-squared test can be applied for the categorical features to identify data drift.
Python implementation:
df.drop(['RowNumber','CustomerId','Surname'],axis=1,inplace=True)
df_numerical=df.iloc[:,[3,4,5,9]]
df_numerical.head()
df_salary_low=df_numerical[df_numerical['EstimatedSalary']<=10000]
#splitting the data to analyze the difference in both the datasets
df_salary_high=df_numerical[df_numerical[‘EstimatedSalary’]>10000]
from scipy import stats
p_value = 0.05
rejected = 0
for col in df_numerical.columns:
test = stats.ks_2samp(df_salary_low[col], df_salary_high[col])
if test[1] < p_value:
rejected += 1
print("Column rejected", col)
print("We rejected",rejected,"columns in total")
Thus, the K-S test has rejected the Tenure and Estimated Salary columns which means that the statistical properties of these two columns for both the datasets are not identical.
It compares the distribution of the target variable in the test dataset to a training data set that was used to develop the model.
Steps for calculation:
a) When PSI<=1
This means there is no change or shift in the distributions of both datasets.
b) 0.1< PSI<0.2
This indicates a slight change or shift has occurred.
c) PSI>0.2
This indicates a large shift in the distribution has occurred between both datasets.
def calculate_psi(expected, actual, buckettype='bins', buckets=10, axis=0):
def psi(expected_array, actual_array, buckets):
def scale_range(input, min, max):
input += -(np.min(input))
input /= np.max(input) / (max - min)
input += min
return input
breakpoints = np.arange(0, buckets + 1) / buckets * 100
breakpoints = scale_range(breakpoints, np.min(expected_array), np.max(expected_array))
expected_percents = np.histogram(expected_array, breakpoints)[0] / len(expected_array)
actual_percents = np.histogram(actual_array, breakpoints)[0] / len(actual_array)
def sub_psi(e_perc, a_perc):
if a_perc == 0:
a_perc = 0.0001
if e_perc == 0:
e_perc = 0.0001
value = (e_perc - a_perc) * np.log(e_perc / a_perc)
return value
psi_value = np.sum(
sub_psi(expected_percents[i], actual_percents[i])
for i in range(0, len(expected_percents))
)
return psi_value
if len(expected.shape) == 1:
psi_values = np.empty(1)
psi_values[0] = psi(expected, actual, buckets)
else:
psi_values = np.empty(expected.shape[axis])
for i in range(0, len(psi_values)):
psi_values[i] = psi(expected[:, i], actual[:, i], buckets)
return psi_values
# Calculate psi for features
psi_list = []
top_feature_list = df_salary_high.columns
for feature in top_feature_list:
# Assuming you have a validation and training set
psi_t = calculate_psi(df_salary_high[feature].values, df_salary_low[feature].values)
psi_list.append(psi_t)
print('Stability index for column', feature, 'is', psi_t)
A Machine Learning-based model approach can also be used to detect data drift between two populations.
We need to label our data which has been used to build the current model in production as 0 and the real-time data gets labeled as 1. We now have to build a model and evaluate the results.
If the model gives high accuracy, it means that it can easily discriminate between the two sets of data. Thus, we could conclude that a covariate shift has occurred and the model will need to be recalibrated. On the other hand, if the model accuracy is around 0.5, it means that it is as good as a random guess. This means that a significant data shift has not occurred and we can continue to use the model.
The disadvantage of this model is that every time new input data is made available, the training and testing process needs to be repeated which can become computationally expensive.
The Adaptive Windowing (ADWIN) algorithm uses a sliding window approach to detect concept drift. Window size is fixed and ADWIN slides the fixed window for detecting any change on the newly arriving data. When two sub-windows show distinct means in the new observations the older sub-window is dropped.
A user-defined threshold is set to trigger a warning that drift is detected. If the absolute difference between the two means derived from two sub-windows exceeds the pre-defined threshold, an alarm is generated. This method is applicable for univariate data.
from skmultiflow.drift_detection import ADWIN
adwin = ADWIN()
for col in df_numerical.columns:
data_stream=[]
a = np.array(df_salary_low[col])
b = np.array(df_salary_high[col])
data_stream = np.concatenate((a,b))
# Adding stream elements to ADWIN and verifying if drift occurred
for i in range(len(data_stream)):
adwin.add_element(data_stream[i])
if adwin.detected_change():
print('Change detected in data: ' + str(data_stream[i]) + ' - at index: ' + str(i) +'for column:' + col)
This drift detection method calculates the mean of the observed values and keeps updating the mean as and when new data arrives. A drift is detected if the observed mean at some instant is greater than a threshold value lambda.
min_instances – defaults to 30
The minimum number of instances before detecting change.
delta – defaults to 0.005
The delta factor for the Page Hinkley test.
threshold – defaults to 50
The change detection threshold (lambda).
alpha – defaults to 0.9999
The forgetting factor is used to weigh the observed value and the mean.
from river.drift import PageHinkley
np.random.seed(12345)
ph = PageHinkley(threshold=10,min_instances=10)
# Update drift detector and verify if change is detected
for col in df_numerical.columns:
data_stream=[]
a = np.array(df_salary_low[col])
b = np.array(df_salary_high[col])
data_stream = np.concatenate((a,b))
for i, val in enumerate(data_stream):
in_drift, in_warning = ph.update(val)
if in_drift:
print(f"Change detected at index {i} for column: {col} with input value: {val}")
In production, there are multiple ways to respond to data drift.
Some of the methods which are generally followed in the industry are:
1) Blindly update model:
This is a naïve approach. There is no proactive drift detection. Models are periodically retrained and updated with recent data. Without drift detection in place, it is difficult to estimate the time interval for re-training and model re-deployment.
2) Training with weighted data:
When a new model is trained instead of discarding old training data, use weight inversely proportional to the age of data.
3) Incremental learning:
As new data arrives, the models are continuously retrained and updated. As a result, the model is always adapting to the changes in the data distribution. This approach will work with machine learning models which allow incremental learning one instance of data at a time.
Read also Monitor Data & Model in Airline Ops
There are various ways to detect and handle drift. Custom alerts and thresholds can be also set up by the user to trigger alerts for drift. In Cloud platforms e.g in Azure Machine Learning, you can use dataset monitors to detect and alert for data drift.
Thus, Model Monitoring and Drift Detection is an important part of the ML Model Lifecycle which needs to be optimized for successful and efficient deployments of models into production. Identifying any kind of drifts in the data in real-time and a proper strategy to handle such drifts is very crucial for our models to give better results with time.
nice article can you share the code . code is nit arrange in proper way ?
Hi Thanks for the article. I have a doubt about when the model is retrained - I am working on a computer vision project. For eg. we have identified drift in the production data and have got 30 different images varied from the baseline data. While retraining is one of the option - we can train the model with the new 30 images. We should be updating the Baseline data as well right?