import argparse import pandas as pd import tensorflow as tf import numpy as np import matplotlib.pyplot as plt # from numpy.random import seed import random import os import pickle import shap import dill from utils import encode_categorical, scale_numerical, NoPhysicsModels, unpickle_file from alloy_data_preprocessing import add_physics_features import tensorflow as tf from tensorflow.keras import initializers from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, r2_score SEED = 42 def set_all_seeds(seed=SEED): os.environ["PYTHONHASHSEED"] = str(seed) tf.keras.utils.set_random_seed(seed) np.random.seed(seed) random.seed(seed) def setup_model(num_outputs): model = tf.keras.models.Sequential( [ tf.keras.layers.Flatten(), tf.keras.layers.Dense( 8, kernel_initializer=initializers.RandomNormal(stddev=0.00001), # Initially was at 0.01 bias_initializer=initializers.Zeros(), activation="relu", ), tf.keras.layers.Dense( 4, activation="relu", kernel_initializer=initializers.RandomNormal(stddev=0.00001), # Initially was at 0.01 bias_initializer=initializers.Zeros(), ), tf.keras.layers.Dense( num_outputs, activation="relu", kernel_initializer=initializers.RandomNormal(stddev=0.00001), # Initially was at 0.01 bias_initializer=initializers.Zeros(), ), ] ) return model def prepare_data(data, columns_num, columns_target, main_folder, data_type="path", seed=SEED): # Create folder if doesn't exist if not os.path.exists(main_folder): os.makedirs(main_folder) columns_numerical = columns_num.copy() ### Read data print(data_type) if data_type == "path": df = pd.read_csv(data, sep=";") else: df = data.copy() ### Remove columns not used during training X = df.drop(columns=columns_target) y = df[columns_target] # Remove the index columns (if coming from the sampling pipeline) if "Index" in X.columns: X.drop(columns=["Index"], inplace=True) ### Get categorical columns columns_categorical = [column for column in X.columns if column not in columns_numerical] # ### Remove target from column names # for target in columns_target: # columns_numerical.remove(target) print("lllllllllllllllllllllllll") print(X.columns) ### Encode variables into one-hot X, one_hot_scaler = encode_categorical(X, columns_categorical) X, minmax_scaler_inputs = scale_numerical( X, [column for column in columns_numerical if column not in columns_target] ) y, minmax_scaler_targets = scale_numerical(y, columns_target) ### Split data X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=seed) ### Pickle data with open(os.path.join(main_folder, f"X_test_data.pickle"), "wb+") as file: pickle.dump(X_test, file) with open(os.path.join(main_folder, f"y_test_data.pickle"), "wb+") as file: pickle.dump(y_test, file) with open(os.path.join(main_folder, f"one_hot_scaler.pickle"), "wb+") as file: pickle.dump(one_hot_scaler, file) with open(os.path.join(main_folder, f"minmax_scaler_inputs.pickle"), "wb+") as file: pickle.dump(minmax_scaler_inputs, file) with open(os.path.join(main_folder, f"minmax_scaler_targets.pickle"), "wb+") as file: pickle.dump(minmax_scaler_targets, file) return X_train, X_test, y_train, y_test def train_model_ml(X_train, X_test, y_train, y_test, main_folder, model_path, seed=SEED): set_all_seeds(seed) model = RandomForestRegressor(random_state=seed) model.fit(X_train, y_train) y_hat = model.predict(X_test) print("----------------") print("Model performance") print("MAE", mean_absolute_error(y_test, y_hat)) print("MAPE", mean_absolute_percentage_error(y_test, y_hat)) print("R2", r2_score(y_test, y_hat)) with open(os.path.join(main_folder, model_path), "wb+") as file: pickle.dump(model, file) return model def train_model( X_train, X_test, y_train, y_test, columns_target, main_folder, model_path, lr=0.01, seed=SEED, get_history=False ): # Set all seeds from reproducibility set_all_seeds(seed) # Create folder if doesn't exist if not os.path.exists(main_folder): os.makedirs(main_folder) ## Setup model for training and training model = setup_model(len(columns_target)) opt = tf.keras.optimizers.Adam(learning_rate=lr) # 0.01 for the hardness print("learning rate", lr) model.compile(optimizer=opt, loss="mean_squared_error") validation_split = 0.1 history = model.fit( X_train, y_train, batch_size=1, epochs=200, verbose=1, validation_data=(X_test, y_test), shuffle=True ) # 200 epochs initially # raise Exception("Early stopping to test reproducibility") model.save(os.path.join(main_folder, model_path)) model_core_name = model_path.split(".")[0] with open(os.path.join(main_folder, f"{model_core_name}_fit_history.pickle"), "wb+") as file: pickle.dump(history, file) ### Plot loss plt.clf() plt.plot(history.history["loss"]) plt.plot(history.history["val_loss"]) plt.title("model loss") plt.ylabel("loss") plt.xlabel("epoch") plt.legend(["train", "test"], loc="upper left") fig = plt.gcf() plt.show() fig.savefig(os.path.join(main_folder, "plot_loss_function.png")) if get_history: return model, history return model def save_shap_explainer(predict_fn, X_train, X_test, main_folder, explainer_name="explainer"): # Create folder if doesn't exist if not os.path.exists(main_folder): os.makedirs(main_folder) ## Get explainer ex = shap.KernelExplainer(predict_fn, X_train[:80]) shap_values = ex.shap_values(X_test[-20:]) fig, axes = plt.subplots(1, 2, figsize=(5, 5)) # need to check that it works in all cases (especially if size the X_test is 1) if len(shap_values) == 1: shap_values = shap_values[0] plt.clf() shap.summary_plot(shap_values, X_test[-20:], show=False) fig = plt.gcf() fig.savefig(os.path.join(main_folder, f"plot_shap_summary_{explainer_name}.png")) plt.show() with open(os.path.join(main_folder, f"{explainer_name}.bz2"), "wb") as file: # pickle.dump(ex, file) dill.dump(ex, file) def compute_shap_explainer_no_physics(model_path, X_train, X_test, main_folder, scaler_inputs_path): """ Creates and save a shap explainer that do not include physics-informed features To be shared with customers and put into the gradio X_train and X_test must NOT be scaled """ scaler_inputs = unpickle_file(scaler_inputs_path) if model_path.split(".")[-1] == "h5": model = tf.keras.models.load_model(model_path) else: model = unpickle_file(model_path) model_no_physics = NoPhysicsModels(model, scaler_inputs, add_physics_features) save_shap_explainer(model_no_physics.predict, X_train, X_test, main_folder, explainer_name="exp_no_physics") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Process parameters") parser.add_argument( "--data_path", type=str, help="The path to your input data file", default="preprocessed_data.csv", required=False, ) parser.add_argument( "--main_folder", type=str, help="Folder to save model files", default="../models/hardness", required=False ) parser.add_argument( "--model_path", type=str, help="Path to save model", default="model_hardness.h5", required=False ) parser.add_argument("--columns_target", type=str, help="List of target columns", default="H", required=False) parser.add_argument( "--columns_numerical", type=str, help="List of data columns with numeric values", default="%A,%B,%C,%D,%E,%F,%Phase_A,%Phase_B,%Phase_C,%Phase_D,%Phase_E,%Phase_F,%A_Matrice,%B_Matrice,%C_Matrice,%D_Matrice,%E_Matrice,%F_Matrice,H,Temperature_C", required=False, ) args = parser.parse_args() columns_numerical = args.columns_numerical.split(",") if args.columns_numerical else [] columns_target = args.columns_target.split(",") if args.columns_target else [] X_train, X_test, y_train, y_test = prepare_data(args.data_path, columns_numerical, columns_target, args.main_folder) model = train_model(X_train, X_test, y_train, y_test, columns_target, args.main_folder, args.model_path) save_shap_explainer(model.predict, X_train, X_test, args.main_folder)