Spaces:
Sleeping
Sleeping
""" Utils functions for preprocessing""" | |
import pandas as pd | |
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler | |
import pickle | |
import tensorflow as tf | |
import numpy as np | |
def aggregate_transform_df(original_df, transformed_df, transformed_cols): | |
""" | |
Helper function to aggregate the columns transformed with the original dataset | |
""" | |
print(original_df.shape) | |
print(transformed_df.shape) | |
df_final = original_df.drop(columns=transformed_cols) | |
df_final = df_final.merge(transformed_df, left_index=True, right_index=True) | |
print(df_final.shape) | |
return df_final | |
def encode_categorical(df, categorical_cols, method="OneHot", encoder=None, fit=True): | |
""" | |
Returns the dataframe where the categorical columns have been replaced | |
according to the method selected | |
Right now only OneHot is supported | |
""" | |
print(f"Running {method} encoding") | |
if fit: | |
encoder = OneHotEncoder() | |
encoder.fit(df[categorical_cols]) | |
array_transformed = encoder.transform(df[categorical_cols]).toarray() | |
df_encoded = pd.DataFrame(array_transformed, columns=encoder.get_feature_names_out(), index=df.index) | |
df_final = aggregate_transform_df(df, df_encoded, categorical_cols) | |
if fit: | |
return df_final, encoder | |
else: | |
return df_final | |
def scale_numerical(df, numerical_cols, method="MinMax", scaler=None, fit=True): | |
""" | |
Returns the dataframe where the numerical columns have been scaled | |
according to the method selected | |
Right now only MinMax is supported | |
""" | |
print(f"Running {method} scaling") | |
if fit: | |
scaler = MinMaxScaler() | |
scaler.fit(df[numerical_cols]) | |
array_transformed = scaler.transform(df[numerical_cols]) | |
df_transformed = pd.DataFrame(array_transformed, columns=numerical_cols, index=df.index) | |
df_final = aggregate_transform_df(df, df_transformed, numerical_cols) | |
if fit: | |
return df_final, scaler | |
else: | |
return df_final | |
def scale_numerical_w_missing(df, numerical_cols, scaler): | |
""" | |
Scale the dataframe when there are missing columns from the columns used to fit the scaler | |
""" | |
additional_cols = [c for c in numerical_cols if c not in df.columns] | |
df_w_cols = df.copy() | |
df_w_cols[additional_cols] = 0 | |
df_w_cols_scaled = scale_numerical(df_w_cols, numerical_cols, scaler=scaler, fit=False) | |
df_scaled = df_w_cols_scaled.drop(columns=additional_cols) | |
return df_scaled | |
def fill_nans(df, cols, method="mean"): | |
df_filled = df.copy() | |
print(f"Fill nans in {cols} with the {method} method") | |
for col in cols: | |
if method == "mean": | |
df_filled[col] = df_filled[col].fillna(df[col].mean()) | |
elif method == "mode": | |
df_filled[col] = df_filled[col].fillna(df[col].mode()) | |
return df_filled | |
def encode_and_predict( | |
model_path, | |
data, | |
one_hot_scaler, | |
minmax_scaler_inputs, | |
minmax_scaler_targets, | |
categorical_columns, | |
numerical_columns, | |
target_columns, | |
explainer=None, | |
): | |
model = tf.keras.models.load_model(model_path) | |
data = encode_categorical(data, categorical_columns, encoder=one_hot_scaler, fit=False) | |
data = scale_numerical(data, numerical_columns, scaler=minmax_scaler_inputs, fit=False) | |
if explainer: | |
return model.predict(data), data.columns, explainer.shap_values(data[-10:]) | |
else: | |
return model.predict(data) | |
class EnsembleModel: | |
""" | |
Class to store a list of models and to run predictions as the mean of those models | |
""" | |
def __init__(self, models_list, history_list, loss_threshold=0, scaler_targets=None) -> None: | |
""" | |
Initialized the Ensemble model and cleans the models that stayed stuck, or that didn't achieve a sufficient performance (if loss_threshold parameter is set) | |
By assumption the content of models_list are AI models that have a predict method | |
""" | |
self.models = [] | |
self.models_history = [] | |
self.loss_threshold = loss_threshold | |
for i, model in enumerate(models_list): | |
model_history = history_list[i] | |
if np.abs(min(model_history.history["loss"]) - max(model_history.history["loss"])) < 0.001: | |
print(f"Model {i} skipped due to loss getting stuck") | |
continue | |
if (self.loss_threshold > 0) and (model_history.history["loss"][-1] > self.loss_threshold): | |
print(f"Model {i} skipped due to performance") | |
continue | |
self.models.append(model) | |
self.models_history.append(model) | |
self.scaler_targets = scaler_targets | |
print(f"Ensemble model initialized with {len(self.models)} models") | |
def predict_list(self, data): | |
pred_list = [model.predict(data) for model in self.models] | |
if self.scaler_targets is not None: | |
pred_list = [self.scaler_targets.inverse_transform(pred) for pred in pred_list] | |
return pred_list | |
def predict_w_uncertainty(self, data, uncertainty_type="confidence_interval", model_bias=0.03): | |
""" | |
Returns the prediction and the confidence interval on the data | |
""" | |
# The prediction is the average of all predictions and the uncertainty is the variance of all predictions | |
# LB: not sure this works if multiple targets are predicted with the same model | |
n_models = len(self.models) | |
pred_mean, pred_list = self.predict(data, return_list=True) | |
pred_std = np.std(pred_list, axis=0) | |
training_average_dict = { | |
"%C": 0.587936, | |
"%Co": 0.306122, | |
"%Cr": 0, | |
"%V": 0, | |
"%Mo": 0, | |
"%W": 0.363942, | |
"Temperature_C": 0.387755, | |
} | |
eps = 0.1 | |
if uncertainty_type == "confidence_interval": | |
print("Confidence interval") | |
# Confidence interval = mean +- z * std/sqrt(n) | |
z = 1.96 # 95%: 1.96, 90% 1.645 | |
model_bias_vector = np.ones(pred_mean.shape) * model_bias * pred_mean | |
pred_uncertainty = z * (pred_std + model_bias_vector) / np.sqrt(n_models) | |
elif uncertainty_type == "std": | |
print("Standard deviation") | |
pred_uncertainty = pred_std.copy() | |
else: | |
print("Weighted uncertainty") | |
pred_uncertainty = pred_std.copy() | |
uncertainty_weights = np.ones((pred_std.shape[0],)) | |
dist_df = pd.DataFrame() | |
for col in training_average_dict.keys(): | |
print(training_average_dict[col]) | |
dist_vector = (data[col] - training_average_dict[col]) ** 2 | |
# dist_vector = np.abs(data[col] - training_average_dict[col]) | |
# Quick fix for the constant elements that are not properly scaled | |
if col in ["%Cr", "%V", "%Mo"]: | |
dist_vector = dist_vector / 10 | |
dist_df[col] = dist_vector | |
print(dist_vector) | |
uncertainty_weights = np.sqrt(dist_df.sum(axis=1)) + eps | |
pred_uncertainty = np.multiply(uncertainty_weights, pred_uncertainty[:, 0]) | |
return pred_mean, pred_uncertainty | |
def predict(self, data, return_list=False): | |
""" | |
Returns only the prediction of the Ensemble models on the data | |
""" | |
pred_list = self.predict_list(data) | |
preds = np.mean(pred_list, axis=0) | |
if return_list: | |
return preds, pred_list | |
return preds | |
def unpickle_file(path): | |
with open(path, "rb") as file: | |
unpickler = pickle.Unpickler(file) | |
unpickled_file = unpickler.load() | |
return unpickled_file | |
def read_data(data_path, sep=","): | |
""" | |
Opens the file based on the extension | |
""" | |
file_extension = data_path.split(".")[-1] | |
if file_extension == "csv": | |
return pd.read_csv(data_path, sep=sep) | |
elif file_extension in ["xls", "xlsx"]: | |
return pd.read_excel(data_path) | |
else: | |
return unpickle_file(data_path) | |
class NoPhysicsModels: | |
""" | |
Class to hide the physics-informed features to be able to run the shap interpreter on it | |
""" | |
def __init__(self, model, scaler_inputs=None, preprocessing_physics_fn=None): | |
self.model = model | |
self.scaler_inputs = scaler_inputs | |
self.physics_fn = preprocessing_physics_fn | |
def predict(self, x): | |
x_w_p = self.physics_fn(x) | |
x_w_p_for_scaling = x_w_p[self.scaler_inputs.feature_names_in_] | |
x_w_p_scaled = scale_numerical( | |
x_w_p_for_scaling, self.scaler_inputs.feature_names_in_, scaler=self.scaler_inputs, fit=False | |
) | |
return self.model.predict(x_w_p_scaled) | |