import argparse import argparse # import shap import pandas as pd import numpy as np import matplotlib.pyplot as plt import os import tensorflow as tf from utils import encode_categorical, scale_numerical, fill_nans, unpickle_file, EnsembleModel, read_data from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, r2_score import pickle def predict(model_path, data, explainer=None, scaler_targets=None): model_extension = model_path.split(".")[-1] if model_extension == "h5": model = tf.keras.models.load_model(model_path) else: model = unpickle_file(model_path) pred = model.predict(data) if model_extension != "h5": # Fix for the RF model in the case where there is one feature only (other cases not supported so far) pred = pred.reshape(-1, 1) if scaler_targets is not None: pred = scaler_targets.inverse_transform(pred) if explainer: return pred, data.columns, explainer.shap_values(data[-10:]) else: return pred def predict_from_multiple_models( models_order, model_path_dict, data, explainer_path_dict={}, scaler_targets_path_dict={} ): """ This function is used in the gradio to predict different targets from different models """ y_pred_list = [] shap_values_list = [] for predict_name in models_order: if predict_name in scaler_targets_path_dict.keys(): scaler_targets = unpickle_file(scaler_targets_path_dict[predict_name]) else: scaler_targets = None if predict_name in explainer_path_dict.keys(): explainer = unpickle_file(explainer_path_dict[predict_name]) y_pred, _, shap_values = predict( model_path_dict[predict_name], data, explainer=explainer, scaler_targets=scaler_targets ) shap_values_list += [shap_values] else: explainer = None y_pred = predict(model_path_dict[predict_name], data, explainer=explainer, scaler_targets=scaler_targets) df_pred_task = pd.DataFrame(y_pred, columns=[predict_name]) # y_pred_list += [y_pred[0][0]] y_pred_list.append(df_pred_task) df_pred = pd.concat(y_pred_list, axis=1) if len(shap_values_list) > 0: return df_pred, shap_values_list else: return df_pred def predict_from_ensemble_model(ensemble_model_path, data, explainer=None, uncertainty_type="confidence_interval"): """ Returns the prediction of the model defnied using the EnsembleModel class """ ensemble_model = unpickle_file(ensemble_model_path) pred_mean, pred_uncertainty = ensemble_model.predict_w_uncertainty(data, uncertainty_type=uncertainty_type) if explainer is not None: shap_values = explainer.shap_values(data[-10:]) return pred_mean, pred_uncertainty, shap_values return pred_mean, pred_uncertainty def predict_all_results( df, main_model_path, main_input_cols_order, scaler_targets_main=None, intermediate_model_path=None, intermediate_results_columns=[], return_uncertainty=False, uncertainty_type="confidence_interval", ): """ Initial df must be scaled Args: ----- df: pd.DataFrame Initial inputs main_model_path: str Path to the model to compute the main results scaler_target_main: scaler for the main results intermediate_model_path: None, str, dict can be a path to a model or a dict of models intermediate_results_columns: List(str) """ if type(intermediate_model_path) == str: # This section has not been checked (LB)s predictions_constraint = predict(intermediate_model_path, df) input_data_main = np.concatenate([df.values[:, :-1], predictions_constraint, [df.values[:, -1]]], axis=1) elif type(intermediate_model_path) == dict: ### Predict the intermediaary results from a dictionary of models (not rescaled version of the intermediary outputs) outputs_df = predict_from_multiple_models( intermediate_results_columns, intermediate_model_path, df, explainer_path_dict={}, scaler_targets_path_dict={}, ) input_data_main = pd.concat([df, outputs_df], axis=1) # Concatenate the scaled version of the data else: input_data_main = df.copy() # Put data in the right order for the main model input_data_main = input_data_main[main_input_cols_order] # Run the main prediction model_extension = main_model_path.split(".")[-1] if model_extension == "h5": predictions = predict(main_model_path, input_data_main, scaler_targets=scaler_targets_main) uncertainty = None else: predictions, uncertainty = predict_from_ensemble_model( main_model_path, input_data_main, uncertainty_type=uncertainty_type ) if return_uncertainty: return predictions, uncertainty return predictions def get_test_inference( main_folder, columns_numerical, columns_target, model_name, test_data_path, x_data_scaled=True, y_data_rescaled=False, ): X_test_data = read_data(os.path.join(main_folder, test_data_path)) columns_categorical = [column for column in X_test_data.columns if column not in columns_numerical] y_test_data = unpickle_file(os.path.join(main_folder, "y_test_data.pickle")) one_hot_scaler = unpickle_file(os.path.join(main_folder, "one_hot_scaler.pickle")) minmax_scaler_targets = unpickle_file(os.path.join(main_folder, "minmax_scaler_targets.pickle")) minmax_scaler_inputs = unpickle_file(os.path.join(main_folder, "minmax_scaler_inputs.pickle")) for col in columns_target: if col in columns_numerical: columns_numerical.remove(col) # If the data has not been already scaled if not x_data_scaled: df_with_results = X_test_data.copy() X_test_data = scale_numerical( X_test_data, minmax_scaler_inputs.feature_names_in_, scaler=minmax_scaler_inputs, fit=False ) else: df_with_results = pd.DataFrame(minmax_scaler_inputs.inverse_transform(X_test_data), columns=X_test_data.columns) ### Run model in inference mode predictions = predict(os.path.join(main_folder, model_name), X_test_data) y_test_data = minmax_scaler_targets.inverse_transform(y_test_data) # Depending on the model used the targets may already be rescaled (case of Ensemble models to run the uncertainty) if not y_data_rescaled: predictions = minmax_scaler_targets.inverse_transform(predictions) print("***************************************************") print(predictions) print(predictions.shape, y_test_data.shape) results = pd.DataFrame( { "predictions": np.squeeze(predictions[:, 0]), "ground truth": np.squeeze(y_test_data), "mae": np.abs(np.squeeze(predictions[:, 0]) - np.squeeze(y_test_data)), "mse": np.sqrt(np.square(np.squeeze(predictions[:, 0]) - np.squeeze(y_test_data))), "percentage error": np.abs( (np.squeeze(predictions[:, 0]) - np.squeeze(y_test_data)) / np.squeeze(predictions[:, 0]) ) * 100, } ) mean_results = pd.DataFrame( { "mean mae": [np.mean(results["mae"])], "mean mse": [np.mean(results["mse"])], "mean percentage error": [np.mean(results["percentage error"])], } ) print(mean_results) metrics = { "mae": mean_absolute_error(y_test_data, predictions), "mape": mean_absolute_percentage_error(y_test_data, predictions), "r2": r2_score(y_test_data, predictions), } with open(os.path.join(main_folder, "metrics.pkl"), "wb+") as file: pickle.dump(metrics, file) ### Plot predictions vs ground truth plt.clf() plt.scatter(results["ground truth"], results["predictions"], c="r") plt.plot(results["ground truth"], results["ground truth"]) plt.xlabel("Ground truth") plt.ylabel("Predictions") fig = plt.gcf() fig.savefig(os.path.join(main_folder, "plot_performance_test.png")) plt.show() df_with_results["ground_truth"] = y_test_data df_with_results["predictions"] = predictions return metrics if __name__ == "__main__": parser = argparse.ArgumentParser(description="Process parameters") parser.add_argument( "--model_path", type=str, help="The path to your model file", default="model_hardness.h5", required=False ) parser.add_argument( "--model_folder", type=str, help="The path to your model folder", default="./models/phases", required=False ) parser.add_argument( "--df_columns", type=str, help="List of data columns of dataset", default="%A,%B,%C,%D,%E,%F,%Phase_A,%Phase_B,%Phase_C,%Phase_D,%Phase_E,%Phase_F,%A_Matrice,%B_Matrice,%C_Matrice,%D_Matrice,%E_Matrice,%F_Matrice,H,Temperature", required=False, ) parser.add_argument("--columns_target", type=str, help="List of target columns", default="H", required=False) parser.add_argument( "--columns_numerical", type=str, help="List of data columns with numeric values", default="%A,%B,%C,%D,%E,%F,%Phase_A,%Phase_B,%Phase_C,%Phase_D,%Phase_E,%Phase_F,%A_Matrice,%B_Matrice,%C_Matrice,%D_Matrice,%E_Matrice,%F_Matrice,H,Temperature_C", required=False, ) parser.add_argument( "--data_path", type=str, help="The path to your input data for inference", default="X_test_data.pickle", required=False, ) args = parser.parse_args() ### Get categorical and numerical columns columns_numerical = args.columns_numerical.split(",") if args.columns_numerical else [] df_columns = args.df_columns.split(",") columns_target = args.columns_target.split(",") get_test_inference( args.model_folder, columns_numerical, columns_target, args.model_path, args.data_path, x_data_scaled=True, y_data_rescaled=False, )