bndl's picture
Upload utils.py
edf1058
""" Utils functions for preprocessing"""
import pandas as pd
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler
import pickle
import tensorflow as tf
import numpy as np
def aggregate_transform_df(original_df, transformed_df, transformed_cols):
"""
Helper function to aggregate the columns transformed with the original dataset
"""
print(original_df.shape)
print(transformed_df.shape)
df_final = original_df.drop(columns=transformed_cols)
df_final = df_final.merge(transformed_df, left_index=True, right_index=True)
print(df_final.shape)
return df_final
def encode_categorical(df, categorical_cols, method="OneHot", encoder=None, fit=True):
"""
Returns the dataframe where the categorical columns have been replaced
according to the method selected
Right now only OneHot is supported
"""
print(f"Running {method} encoding")
if fit:
encoder = OneHotEncoder()
encoder.fit(df[categorical_cols])
array_transformed = encoder.transform(df[categorical_cols]).toarray()
df_encoded = pd.DataFrame(array_transformed, columns=encoder.get_feature_names_out(), index=df.index)
df_final = aggregate_transform_df(df, df_encoded, categorical_cols)
if fit:
return df_final, encoder
else:
return df_final
def scale_numerical(df, numerical_cols, method="MinMax", scaler=None, fit=True):
"""
Returns the dataframe where the numerical columns have been scaled
according to the method selected
Right now only MinMax is supported
"""
print(f"Running {method} scaling")
if fit:
scaler = MinMaxScaler()
scaler.fit(df[numerical_cols])
array_transformed = scaler.transform(df[numerical_cols])
df_transformed = pd.DataFrame(array_transformed, columns=numerical_cols, index=df.index)
df_final = aggregate_transform_df(df, df_transformed, numerical_cols)
if fit:
return df_final, scaler
else:
return df_final
def scale_numerical_w_missing(df, numerical_cols, scaler):
"""
Scale the dataframe when there are missing columns from the columns used to fit the scaler
"""
additional_cols = [c for c in numerical_cols if c not in df.columns]
df_w_cols = df.copy()
df_w_cols[additional_cols] = 0
df_w_cols_scaled = scale_numerical(df_w_cols, numerical_cols, scaler=scaler, fit=False)
df_scaled = df_w_cols_scaled.drop(columns=additional_cols)
return df_scaled
def fill_nans(df, cols, method="mean"):
df_filled = df.copy()
print(f"Fill nans in {cols} with the {method} method")
for col in cols:
if method == "mean":
df_filled[col] = df_filled[col].fillna(df[col].mean())
elif method == "mode":
df_filled[col] = df_filled[col].fillna(df[col].mode())
return df_filled
def encode_and_predict(
model_path,
data,
one_hot_scaler,
minmax_scaler_inputs,
minmax_scaler_targets,
categorical_columns,
numerical_columns,
target_columns,
explainer=None,
):
model = tf.keras.models.load_model(model_path)
data = encode_categorical(data, categorical_columns, encoder=one_hot_scaler, fit=False)
data = scale_numerical(data, numerical_columns, scaler=minmax_scaler_inputs, fit=False)
if explainer:
return model.predict(data), data.columns, explainer.shap_values(data[-10:])
else:
return model.predict(data)
class EnsembleModel:
"""
Class to store a list of models and to run predictions as the mean of those models
"""
def __init__(self, models_list, history_list, loss_threshold=0, scaler_targets=None) -> None:
"""
Initialized the Ensemble model and cleans the models that stayed stuck, or that didn't achieve a sufficient performance (if loss_threshold parameter is set)
By assumption the content of models_list are AI models that have a predict method
"""
self.models = []
self.models_history = []
self.loss_threshold = loss_threshold
for i, model in enumerate(models_list):
model_history = history_list[i]
if np.abs(min(model_history.history["loss"]) - max(model_history.history["loss"])) < 0.001:
print(f"Model {i} skipped due to loss getting stuck")
continue
if (self.loss_threshold > 0) and (model_history.history["loss"][-1] > self.loss_threshold):
print(f"Model {i} skipped due to performance")
continue
self.models.append(model)
self.models_history.append(model)
self.scaler_targets = scaler_targets
print(f"Ensemble model initialized with {len(self.models)} models")
def predict_list(self, data):
pred_list = [model.predict(data) for model in self.models]
if self.scaler_targets is not None:
pred_list = [self.scaler_targets.inverse_transform(pred) for pred in pred_list]
return pred_list
def predict_w_uncertainty(self, data, uncertainty_type="confidence_interval", model_bias=0.03):
"""
Returns the prediction and the confidence interval on the data
"""
# The prediction is the average of all predictions and the uncertainty is the variance of all predictions
# LB: not sure this works if multiple targets are predicted with the same model
n_models = len(self.models)
pred_mean, pred_list = self.predict(data, return_list=True)
pred_std = np.std(pred_list, axis=0)
training_average_dict = {
"%C": 0.587936,
"%Co": 0.306122,
"%Cr": 0,
"%V": 0,
"%Mo": 0,
"%W": 0.363942,
"Temperature_C": 0.387755,
}
eps = 0.1
if uncertainty_type == "confidence_interval":
print("Confidence interval")
# Confidence interval = mean +- z * std/sqrt(n)
z = 1.96 # 95%: 1.96, 90% 1.645
model_bias_vector = np.ones(pred_mean.shape) * model_bias * pred_mean
pred_uncertainty = z * (pred_std + model_bias_vector) / np.sqrt(n_models)
elif uncertainty_type == "std":
print("Standard deviation")
pred_uncertainty = pred_std.copy()
else:
print("Weighted uncertainty")
pred_uncertainty = pred_std.copy()
uncertainty_weights = np.ones((pred_std.shape[0],))
dist_df = pd.DataFrame()
for col in training_average_dict.keys():
print(training_average_dict[col])
dist_vector = (data[col] - training_average_dict[col]) ** 2
# dist_vector = np.abs(data[col] - training_average_dict[col])
# Quick fix for the constant elements that are not properly scaled
if col in ["%Cr", "%V", "%Mo"]:
dist_vector = dist_vector / 10
dist_df[col] = dist_vector
print(dist_vector)
uncertainty_weights = np.sqrt(dist_df.sum(axis=1)) + eps
pred_uncertainty = np.multiply(uncertainty_weights, pred_uncertainty[:, 0])
return pred_mean, pred_uncertainty
def predict(self, data, return_list=False):
"""
Returns only the prediction of the Ensemble models on the data
"""
pred_list = self.predict_list(data)
preds = np.mean(pred_list, axis=0)
if return_list:
return preds, pred_list
return preds
def unpickle_file(path):
with open(path, "rb") as file:
unpickler = pickle.Unpickler(file)
unpickled_file = unpickler.load()
return unpickled_file
def read_data(data_path, sep=","):
"""
Opens the file based on the extension
"""
file_extension = data_path.split(".")[-1]
if file_extension == "csv":
return pd.read_csv(data_path, sep=sep)
elif file_extension in ["xls", "xlsx"]:
return pd.read_excel(data_path)
else:
return unpickle_file(data_path)
class NoPhysicsModels:
"""
Class to hide the physics-informed features to be able to run the shap interpreter on it
"""
def __init__(self, model, scaler_inputs=None, preprocessing_physics_fn=None):
self.model = model
self.scaler_inputs = scaler_inputs
self.physics_fn = preprocessing_physics_fn
def predict(self, x):
x_w_p = self.physics_fn(x)
x_w_p_for_scaling = x_w_p[self.scaler_inputs.feature_names_in_]
x_w_p_scaled = scale_numerical(
x_w_p_for_scaling, self.scaler_inputs.feature_names_in_, scaler=self.scaler_inputs, fit=False
)
return self.model.predict(x_w_p_scaled)