Spaces:
Sleeping
Sleeping
import argparse | |
import argparse | |
# import shap | |
import pandas as pd | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import os | |
import tensorflow as tf | |
from utils import encode_categorical, scale_numerical, fill_nans, unpickle_file, EnsembleModel, read_data | |
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, r2_score | |
import pickle | |
def predict(model_path, data, explainer=None, scaler_targets=None): | |
model_extension = model_path.split(".")[-1] | |
if model_extension == "h5": | |
model = tf.keras.models.load_model(model_path) | |
else: | |
model = unpickle_file(model_path) | |
pred = model.predict(data) | |
if model_extension != "h5": | |
# Fix for the RF model in the case where there is one feature only (other cases not supported so far) | |
pred = pred.reshape(-1, 1) | |
if scaler_targets is not None: | |
pred = scaler_targets.inverse_transform(pred) | |
if explainer: | |
return pred, data.columns, explainer.shap_values(data[-10:]) | |
else: | |
return pred | |
def predict_from_multiple_models( | |
models_order, model_path_dict, data, explainer_path_dict={}, scaler_targets_path_dict={} | |
): | |
""" | |
This function is used in the gradio to predict different targets from different models | |
""" | |
y_pred_list = [] | |
shap_values_list = [] | |
for predict_name in models_order: | |
if predict_name in scaler_targets_path_dict.keys(): | |
scaler_targets = unpickle_file(scaler_targets_path_dict[predict_name]) | |
else: | |
scaler_targets = None | |
if predict_name in explainer_path_dict.keys(): | |
explainer = unpickle_file(explainer_path_dict[predict_name]) | |
y_pred, _, shap_values = predict( | |
model_path_dict[predict_name], data, explainer=explainer, scaler_targets=scaler_targets | |
) | |
shap_values_list += [shap_values] | |
else: | |
explainer = None | |
y_pred = predict(model_path_dict[predict_name], data, explainer=explainer, scaler_targets=scaler_targets) | |
df_pred_task = pd.DataFrame(y_pred, columns=[predict_name]) | |
# y_pred_list += [y_pred[0][0]] | |
y_pred_list.append(df_pred_task) | |
df_pred = pd.concat(y_pred_list, axis=1) | |
if len(shap_values_list) > 0: | |
return df_pred, shap_values_list | |
else: | |
return df_pred | |
def predict_from_ensemble_model(ensemble_model_path, data, explainer=None, uncertainty_type="confidence_interval"): | |
""" | |
Returns the prediction of the model defnied using the EnsembleModel class | |
""" | |
ensemble_model = unpickle_file(ensemble_model_path) | |
pred_mean, pred_uncertainty = ensemble_model.predict_w_uncertainty(data, uncertainty_type=uncertainty_type) | |
if explainer is not None: | |
shap_values = explainer.shap_values(data[-10:]) | |
return pred_mean, pred_uncertainty, shap_values | |
return pred_mean, pred_uncertainty | |
def predict_all_results( | |
df, | |
main_model_path, | |
main_input_cols_order, | |
scaler_targets_main=None, | |
intermediate_model_path=None, | |
intermediate_results_columns=[], | |
return_uncertainty=False, | |
uncertainty_type="confidence_interval", | |
): | |
""" | |
Initial df must be scaled | |
Args: | |
----- | |
df: pd.DataFrame | |
Initial inputs | |
main_model_path: str | |
Path to the model to compute the main results | |
scaler_target_main: scaler for the main results | |
intermediate_model_path: None, str, dict can be a path to a model or a dict of models | |
intermediate_results_columns: List(str) | |
""" | |
if type(intermediate_model_path) == str: | |
# This section has not been checked (LB)s | |
predictions_constraint = predict(intermediate_model_path, df) | |
input_data_main = np.concatenate([df.values[:, :-1], predictions_constraint, [df.values[:, -1]]], axis=1) | |
elif type(intermediate_model_path) == dict: | |
### Predict the intermediaary results from a dictionary of models (not rescaled version of the intermediary outputs) | |
outputs_df = predict_from_multiple_models( | |
intermediate_results_columns, | |
intermediate_model_path, | |
df, | |
explainer_path_dict={}, | |
scaler_targets_path_dict={}, | |
) | |
input_data_main = pd.concat([df, outputs_df], axis=1) # Concatenate the scaled version of the data | |
else: | |
input_data_main = df.copy() | |
# Put data in the right order for the main model | |
input_data_main = input_data_main[main_input_cols_order] | |
# Run the main prediction | |
model_extension = main_model_path.split(".")[-1] | |
if model_extension == "h5": | |
predictions = predict(main_model_path, input_data_main, scaler_targets=scaler_targets_main) | |
uncertainty = None | |
else: | |
predictions, uncertainty = predict_from_ensemble_model( | |
main_model_path, input_data_main, uncertainty_type=uncertainty_type | |
) | |
if return_uncertainty: | |
return predictions, uncertainty | |
return predictions | |
def get_test_inference( | |
main_folder, | |
columns_numerical, | |
columns_target, | |
model_name, | |
test_data_path, | |
x_data_scaled=True, | |
y_data_rescaled=False, | |
): | |
X_test_data = read_data(os.path.join(main_folder, test_data_path)) | |
columns_categorical = [column for column in X_test_data.columns if column not in columns_numerical] | |
y_test_data = unpickle_file(os.path.join(main_folder, "y_test_data.pickle")) | |
one_hot_scaler = unpickle_file(os.path.join(main_folder, "one_hot_scaler.pickle")) | |
minmax_scaler_targets = unpickle_file(os.path.join(main_folder, "minmax_scaler_targets.pickle")) | |
minmax_scaler_inputs = unpickle_file(os.path.join(main_folder, "minmax_scaler_inputs.pickle")) | |
for col in columns_target: | |
if col in columns_numerical: | |
columns_numerical.remove(col) | |
# If the data has not been already scaled | |
if not x_data_scaled: | |
df_with_results = X_test_data.copy() | |
X_test_data = scale_numerical( | |
X_test_data, minmax_scaler_inputs.feature_names_in_, scaler=minmax_scaler_inputs, fit=False | |
) | |
else: | |
df_with_results = pd.DataFrame(minmax_scaler_inputs.inverse_transform(X_test_data), columns=X_test_data.columns) | |
### Run model in inference mode | |
predictions = predict(os.path.join(main_folder, model_name), X_test_data) | |
y_test_data = minmax_scaler_targets.inverse_transform(y_test_data) | |
# Depending on the model used the targets may already be rescaled (case of Ensemble models to run the uncertainty) | |
if not y_data_rescaled: | |
predictions = minmax_scaler_targets.inverse_transform(predictions) | |
print("***************************************************") | |
print(predictions) | |
print(predictions.shape, y_test_data.shape) | |
results = pd.DataFrame( | |
{ | |
"predictions": np.squeeze(predictions[:, 0]), | |
"ground truth": np.squeeze(y_test_data), | |
"mae": np.abs(np.squeeze(predictions[:, 0]) - np.squeeze(y_test_data)), | |
"mse": np.sqrt(np.square(np.squeeze(predictions[:, 0]) - np.squeeze(y_test_data))), | |
"percentage error": np.abs( | |
(np.squeeze(predictions[:, 0]) - np.squeeze(y_test_data)) / np.squeeze(predictions[:, 0]) | |
) | |
* 100, | |
} | |
) | |
mean_results = pd.DataFrame( | |
{ | |
"mean mae": [np.mean(results["mae"])], | |
"mean mse": [np.mean(results["mse"])], | |
"mean percentage error": [np.mean(results["percentage error"])], | |
} | |
) | |
print(mean_results) | |
metrics = { | |
"mae": mean_absolute_error(y_test_data, predictions), | |
"mape": mean_absolute_percentage_error(y_test_data, predictions), | |
"r2": r2_score(y_test_data, predictions), | |
} | |
with open(os.path.join(main_folder, "metrics.pkl"), "wb+") as file: | |
pickle.dump(metrics, file) | |
### Plot predictions vs ground truth | |
plt.clf() | |
plt.scatter(results["ground truth"], results["predictions"], c="r") | |
plt.plot(results["ground truth"], results["ground truth"]) | |
plt.xlabel("Ground truth") | |
plt.ylabel("Predictions") | |
fig = plt.gcf() | |
fig.savefig(os.path.join(main_folder, "plot_performance_test.png")) | |
plt.show() | |
df_with_results["ground_truth"] = y_test_data | |
df_with_results["predictions"] = predictions | |
return metrics | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Process parameters") | |
parser.add_argument( | |
"--model_path", type=str, help="The path to your model file", default="model_hardness.h5", required=False | |
) | |
parser.add_argument( | |
"--model_folder", type=str, help="The path to your model folder", default="./models/phases", required=False | |
) | |
parser.add_argument( | |
"--df_columns", | |
type=str, | |
help="List of data columns of dataset", | |
default="%A,%B,%C,%D,%E,%F,%Phase_A,%Phase_B,%Phase_C,%Phase_D,%Phase_E,%Phase_F,%A_Matrice,%B_Matrice,%C_Matrice,%D_Matrice,%E_Matrice,%F_Matrice,H,Temperature", | |
required=False, | |
) | |
parser.add_argument("--columns_target", type=str, help="List of target columns", default="H", required=False) | |
parser.add_argument( | |
"--columns_numerical", | |
type=str, | |
help="List of data columns with numeric values", | |
default="%A,%B,%C,%D,%E,%F,%Phase_A,%Phase_B,%Phase_C,%Phase_D,%Phase_E,%Phase_F,%A_Matrice,%B_Matrice,%C_Matrice,%D_Matrice,%E_Matrice,%F_Matrice,H,Temperature_C", | |
required=False, | |
) | |
parser.add_argument( | |
"--data_path", | |
type=str, | |
help="The path to your input data for inference", | |
default="X_test_data.pickle", | |
required=False, | |
) | |
args = parser.parse_args() | |
### Get categorical and numerical columns | |
columns_numerical = args.columns_numerical.split(",") if args.columns_numerical else [] | |
df_columns = args.df_columns.split(",") | |
columns_target = args.columns_target.split(",") | |
get_test_inference( | |
args.model_folder, | |
columns_numerical, | |
columns_target, | |
args.model_path, | |
args.data_path, | |
x_data_scaled=True, | |
y_data_rescaled=False, | |
) | |