demo_active_learning / inference_model_main.py
bndl's picture
Upload 3 files
19b61e8
import argparse
import argparse
# import shap
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os
import tensorflow as tf
from utils import encode_categorical, scale_numerical, fill_nans, unpickle_file, EnsembleModel, read_data
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, r2_score
import pickle
def predict(model_path, data, explainer=None, scaler_targets=None):
model_extension = model_path.split(".")[-1]
if model_extension == "h5":
model = tf.keras.models.load_model(model_path)
else:
model = unpickle_file(model_path)
pred = model.predict(data)
if model_extension != "h5":
# Fix for the RF model in the case where there is one feature only (other cases not supported so far)
pred = pred.reshape(-1, 1)
if scaler_targets is not None:
pred = scaler_targets.inverse_transform(pred)
if explainer:
return pred, data.columns, explainer.shap_values(data[-10:])
else:
return pred
def predict_from_multiple_models(
models_order, model_path_dict, data, explainer_path_dict={}, scaler_targets_path_dict={}
):
"""
This function is used in the gradio to predict different targets from different models
"""
y_pred_list = []
shap_values_list = []
for predict_name in models_order:
if predict_name in scaler_targets_path_dict.keys():
scaler_targets = unpickle_file(scaler_targets_path_dict[predict_name])
else:
scaler_targets = None
if predict_name in explainer_path_dict.keys():
explainer = unpickle_file(explainer_path_dict[predict_name])
y_pred, _, shap_values = predict(
model_path_dict[predict_name], data, explainer=explainer, scaler_targets=scaler_targets
)
shap_values_list += [shap_values]
else:
explainer = None
y_pred = predict(model_path_dict[predict_name], data, explainer=explainer, scaler_targets=scaler_targets)
df_pred_task = pd.DataFrame(y_pred, columns=[predict_name])
# y_pred_list += [y_pred[0][0]]
y_pred_list.append(df_pred_task)
df_pred = pd.concat(y_pred_list, axis=1)
if len(shap_values_list) > 0:
return df_pred, shap_values_list
else:
return df_pred
def predict_from_ensemble_model(ensemble_model_path, data, explainer=None, uncertainty_type="confidence_interval"):
"""
Returns the prediction of the model defnied using the EnsembleModel class
"""
ensemble_model = unpickle_file(ensemble_model_path)
pred_mean, pred_uncertainty = ensemble_model.predict_w_uncertainty(data, uncertainty_type=uncertainty_type)
if explainer is not None:
shap_values = explainer.shap_values(data[-10:])
return pred_mean, pred_uncertainty, shap_values
return pred_mean, pred_uncertainty
def predict_all_results(
df,
main_model_path,
main_input_cols_order,
scaler_targets_main=None,
intermediate_model_path=None,
intermediate_results_columns=[],
return_uncertainty=False,
uncertainty_type="confidence_interval",
):
"""
Initial df must be scaled
Args:
-----
df: pd.DataFrame
Initial inputs
main_model_path: str
Path to the model to compute the main results
scaler_target_main: scaler for the main results
intermediate_model_path: None, str, dict can be a path to a model or a dict of models
intermediate_results_columns: List(str)
"""
if type(intermediate_model_path) == str:
# This section has not been checked (LB)s
predictions_constraint = predict(intermediate_model_path, df)
input_data_main = np.concatenate([df.values[:, :-1], predictions_constraint, [df.values[:, -1]]], axis=1)
elif type(intermediate_model_path) == dict:
### Predict the intermediaary results from a dictionary of models (not rescaled version of the intermediary outputs)
outputs_df = predict_from_multiple_models(
intermediate_results_columns,
intermediate_model_path,
df,
explainer_path_dict={},
scaler_targets_path_dict={},
)
input_data_main = pd.concat([df, outputs_df], axis=1) # Concatenate the scaled version of the data
else:
input_data_main = df.copy()
# Put data in the right order for the main model
input_data_main = input_data_main[main_input_cols_order]
# Run the main prediction
model_extension = main_model_path.split(".")[-1]
if model_extension == "h5":
predictions = predict(main_model_path, input_data_main, scaler_targets=scaler_targets_main)
uncertainty = None
else:
predictions, uncertainty = predict_from_ensemble_model(
main_model_path, input_data_main, uncertainty_type=uncertainty_type
)
if return_uncertainty:
return predictions, uncertainty
return predictions
def get_test_inference(
main_folder,
columns_numerical,
columns_target,
model_name,
test_data_path,
x_data_scaled=True,
y_data_rescaled=False,
):
X_test_data = read_data(os.path.join(main_folder, test_data_path))
columns_categorical = [column for column in X_test_data.columns if column not in columns_numerical]
y_test_data = unpickle_file(os.path.join(main_folder, "y_test_data.pickle"))
one_hot_scaler = unpickle_file(os.path.join(main_folder, "one_hot_scaler.pickle"))
minmax_scaler_targets = unpickle_file(os.path.join(main_folder, "minmax_scaler_targets.pickle"))
minmax_scaler_inputs = unpickle_file(os.path.join(main_folder, "minmax_scaler_inputs.pickle"))
for col in columns_target:
if col in columns_numerical:
columns_numerical.remove(col)
# If the data has not been already scaled
if not x_data_scaled:
df_with_results = X_test_data.copy()
X_test_data = scale_numerical(
X_test_data, minmax_scaler_inputs.feature_names_in_, scaler=minmax_scaler_inputs, fit=False
)
else:
df_with_results = pd.DataFrame(minmax_scaler_inputs.inverse_transform(X_test_data), columns=X_test_data.columns)
### Run model in inference mode
predictions = predict(os.path.join(main_folder, model_name), X_test_data)
y_test_data = minmax_scaler_targets.inverse_transform(y_test_data)
# Depending on the model used the targets may already be rescaled (case of Ensemble models to run the uncertainty)
if not y_data_rescaled:
predictions = minmax_scaler_targets.inverse_transform(predictions)
print("***************************************************")
print(predictions)
print(predictions.shape, y_test_data.shape)
results = pd.DataFrame(
{
"predictions": np.squeeze(predictions[:, 0]),
"ground truth": np.squeeze(y_test_data),
"mae": np.abs(np.squeeze(predictions[:, 0]) - np.squeeze(y_test_data)),
"mse": np.sqrt(np.square(np.squeeze(predictions[:, 0]) - np.squeeze(y_test_data))),
"percentage error": np.abs(
(np.squeeze(predictions[:, 0]) - np.squeeze(y_test_data)) / np.squeeze(predictions[:, 0])
)
* 100,
}
)
mean_results = pd.DataFrame(
{
"mean mae": [np.mean(results["mae"])],
"mean mse": [np.mean(results["mse"])],
"mean percentage error": [np.mean(results["percentage error"])],
}
)
print(mean_results)
metrics = {
"mae": mean_absolute_error(y_test_data, predictions),
"mape": mean_absolute_percentage_error(y_test_data, predictions),
"r2": r2_score(y_test_data, predictions),
}
with open(os.path.join(main_folder, "metrics.pkl"), "wb+") as file:
pickle.dump(metrics, file)
### Plot predictions vs ground truth
plt.clf()
plt.scatter(results["ground truth"], results["predictions"], c="r")
plt.plot(results["ground truth"], results["ground truth"])
plt.xlabel("Ground truth")
plt.ylabel("Predictions")
fig = plt.gcf()
fig.savefig(os.path.join(main_folder, "plot_performance_test.png"))
plt.show()
df_with_results["ground_truth"] = y_test_data
df_with_results["predictions"] = predictions
return metrics
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Process parameters")
parser.add_argument(
"--model_path", type=str, help="The path to your model file", default="model_hardness.h5", required=False
)
parser.add_argument(
"--model_folder", type=str, help="The path to your model folder", default="./models/phases", required=False
)
parser.add_argument(
"--df_columns",
type=str,
help="List of data columns of dataset",
default="%A,%B,%C,%D,%E,%F,%Phase_A,%Phase_B,%Phase_C,%Phase_D,%Phase_E,%Phase_F,%A_Matrice,%B_Matrice,%C_Matrice,%D_Matrice,%E_Matrice,%F_Matrice,H,Temperature",
required=False,
)
parser.add_argument("--columns_target", type=str, help="List of target columns", default="H", required=False)
parser.add_argument(
"--columns_numerical",
type=str,
help="List of data columns with numeric values",
default="%A,%B,%C,%D,%E,%F,%Phase_A,%Phase_B,%Phase_C,%Phase_D,%Phase_E,%Phase_F,%A_Matrice,%B_Matrice,%C_Matrice,%D_Matrice,%E_Matrice,%F_Matrice,H,Temperature_C",
required=False,
)
parser.add_argument(
"--data_path",
type=str,
help="The path to your input data for inference",
default="X_test_data.pickle",
required=False,
)
args = parser.parse_args()
### Get categorical and numerical columns
columns_numerical = args.columns_numerical.split(",") if args.columns_numerical else []
df_columns = args.df_columns.split(",")
columns_target = args.columns_target.split(",")
get_test_inference(
args.model_folder,
columns_numerical,
columns_target,
args.model_path,
args.data_path,
x_data_scaled=True,
y_data_rescaled=False,
)