Spaces:
Sleeping
Sleeping
import argparse | |
import pandas as pd | |
import tensorflow as tf | |
import numpy as np | |
import matplotlib.pyplot as plt | |
# from numpy.random import seed | |
import random | |
import os | |
import pickle | |
import shap | |
import dill | |
from utils import encode_categorical, scale_numerical, NoPhysicsModels, unpickle_file | |
from alloy_data_preprocessing import add_physics_features | |
import tensorflow as tf | |
from tensorflow.keras import initializers | |
from sklearn.model_selection import train_test_split | |
from sklearn.ensemble import RandomForestRegressor | |
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, r2_score | |
SEED = 42 | |
def set_all_seeds(seed=SEED): | |
os.environ["PYTHONHASHSEED"] = str(seed) | |
tf.keras.utils.set_random_seed(seed) | |
np.random.seed(seed) | |
random.seed(seed) | |
def setup_model(num_outputs): | |
model = tf.keras.models.Sequential( | |
[ | |
tf.keras.layers.Flatten(), | |
tf.keras.layers.Dense( | |
8, | |
kernel_initializer=initializers.RandomNormal(stddev=0.00001), # Initially was at 0.01 | |
bias_initializer=initializers.Zeros(), | |
activation="relu", | |
), | |
tf.keras.layers.Dense( | |
4, | |
activation="relu", | |
kernel_initializer=initializers.RandomNormal(stddev=0.00001), # Initially was at 0.01 | |
bias_initializer=initializers.Zeros(), | |
), | |
tf.keras.layers.Dense( | |
num_outputs, | |
activation="relu", | |
kernel_initializer=initializers.RandomNormal(stddev=0.00001), # Initially was at 0.01 | |
bias_initializer=initializers.Zeros(), | |
), | |
] | |
) | |
return model | |
def prepare_data(data, columns_num, columns_target, main_folder, data_type="path", seed=SEED): | |
# Create folder if doesn't exist | |
if not os.path.exists(main_folder): | |
os.makedirs(main_folder) | |
columns_numerical = columns_num.copy() | |
### Read data | |
print(data_type) | |
if data_type == "path": | |
df = pd.read_csv(data, sep=";") | |
else: | |
df = data.copy() | |
### Remove columns not used during training | |
X = df.drop(columns=columns_target) | |
y = df[columns_target] | |
# Remove the index columns (if coming from the sampling pipeline) | |
if "Index" in X.columns: | |
X.drop(columns=["Index"], inplace=True) | |
### Get categorical columns | |
columns_categorical = [column for column in X.columns if column not in columns_numerical] | |
# ### Remove target from column names | |
# for target in columns_target: | |
# columns_numerical.remove(target) | |
print("lllllllllllllllllllllllll") | |
print(X.columns) | |
### Encode variables into one-hot | |
X, one_hot_scaler = encode_categorical(X, columns_categorical) | |
X, minmax_scaler_inputs = scale_numerical( | |
X, [column for column in columns_numerical if column not in columns_target] | |
) | |
y, minmax_scaler_targets = scale_numerical(y, columns_target) | |
### Split data | |
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=seed) | |
### Pickle data | |
with open(os.path.join(main_folder, f"X_test_data.pickle"), "wb+") as file: | |
pickle.dump(X_test, file) | |
with open(os.path.join(main_folder, f"y_test_data.pickle"), "wb+") as file: | |
pickle.dump(y_test, file) | |
with open(os.path.join(main_folder, f"one_hot_scaler.pickle"), "wb+") as file: | |
pickle.dump(one_hot_scaler, file) | |
with open(os.path.join(main_folder, f"minmax_scaler_inputs.pickle"), "wb+") as file: | |
pickle.dump(minmax_scaler_inputs, file) | |
with open(os.path.join(main_folder, f"minmax_scaler_targets.pickle"), "wb+") as file: | |
pickle.dump(minmax_scaler_targets, file) | |
return X_train, X_test, y_train, y_test | |
def train_model_ml(X_train, X_test, y_train, y_test, main_folder, model_path, seed=SEED): | |
set_all_seeds(seed) | |
model = RandomForestRegressor(random_state=seed) | |
model.fit(X_train, y_train) | |
y_hat = model.predict(X_test) | |
print("----------------") | |
print("Model performance") | |
print("MAE", mean_absolute_error(y_test, y_hat)) | |
print("MAPE", mean_absolute_percentage_error(y_test, y_hat)) | |
print("R2", r2_score(y_test, y_hat)) | |
with open(os.path.join(main_folder, model_path), "wb+") as file: | |
pickle.dump(model, file) | |
return model | |
def train_model( | |
X_train, X_test, y_train, y_test, columns_target, main_folder, model_path, lr=0.01, seed=SEED, get_history=False | |
): | |
# Set all seeds from reproducibility | |
set_all_seeds(seed) | |
# Create folder if doesn't exist | |
if not os.path.exists(main_folder): | |
os.makedirs(main_folder) | |
## Setup model for training and training | |
model = setup_model(len(columns_target)) | |
opt = tf.keras.optimizers.Adam(learning_rate=lr) # 0.01 for the hardness | |
print("learning rate", lr) | |
model.compile(optimizer=opt, loss="mean_squared_error") | |
validation_split = 0.1 | |
history = model.fit( | |
X_train, y_train, batch_size=1, epochs=200, verbose=1, validation_data=(X_test, y_test), shuffle=True | |
) # 200 epochs initially | |
# raise Exception("Early stopping to test reproducibility") | |
model.save(os.path.join(main_folder, model_path)) | |
model_core_name = model_path.split(".")[0] | |
with open(os.path.join(main_folder, f"{model_core_name}_fit_history.pickle"), "wb+") as file: | |
pickle.dump(history, file) | |
### Plot loss | |
plt.clf() | |
plt.plot(history.history["loss"]) | |
plt.plot(history.history["val_loss"]) | |
plt.title("model loss") | |
plt.ylabel("loss") | |
plt.xlabel("epoch") | |
plt.legend(["train", "test"], loc="upper left") | |
fig = plt.gcf() | |
plt.show() | |
fig.savefig(os.path.join(main_folder, "plot_loss_function.png")) | |
if get_history: | |
return model, history | |
return model | |
def save_shap_explainer(predict_fn, X_train, X_test, main_folder, explainer_name="explainer"): | |
# Create folder if doesn't exist | |
if not os.path.exists(main_folder): | |
os.makedirs(main_folder) | |
## Get explainer | |
ex = shap.KernelExplainer(predict_fn, X_train[:80]) | |
shap_values = ex.shap_values(X_test[-20:]) | |
fig, axes = plt.subplots(1, 2, figsize=(5, 5)) | |
# need to check that it works in all cases (especially if size the X_test is 1) | |
if len(shap_values) == 1: | |
shap_values = shap_values[0] | |
plt.clf() | |
shap.summary_plot(shap_values, X_test[-20:], show=False) | |
fig = plt.gcf() | |
fig.savefig(os.path.join(main_folder, f"plot_shap_summary_{explainer_name}.png")) | |
plt.show() | |
with open(os.path.join(main_folder, f"{explainer_name}.bz2"), "wb") as file: | |
# pickle.dump(ex, file) | |
dill.dump(ex, file) | |
def compute_shap_explainer_no_physics(model_path, X_train, X_test, main_folder, scaler_inputs_path): | |
""" | |
Creates and save a shap explainer that do not include physics-informed features | |
To be shared with customers and put into the gradio | |
X_train and X_test must NOT be scaled | |
""" | |
scaler_inputs = unpickle_file(scaler_inputs_path) | |
if model_path.split(".")[-1] == "h5": | |
model = tf.keras.models.load_model(model_path) | |
else: | |
model = unpickle_file(model_path) | |
model_no_physics = NoPhysicsModels(model, scaler_inputs, add_physics_features) | |
save_shap_explainer(model_no_physics.predict, X_train, X_test, main_folder, explainer_name="exp_no_physics") | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description="Process parameters") | |
parser.add_argument( | |
"--data_path", | |
type=str, | |
help="The path to your input data file", | |
default="preprocessed_data.csv", | |
required=False, | |
) | |
parser.add_argument( | |
"--main_folder", type=str, help="Folder to save model files", default="../models/hardness", required=False | |
) | |
parser.add_argument( | |
"--model_path", type=str, help="Path to save model", default="model_hardness.h5", required=False | |
) | |
parser.add_argument("--columns_target", type=str, help="List of target columns", default="H", required=False) | |
parser.add_argument( | |
"--columns_numerical", | |
type=str, | |
help="List of data columns with numeric values", | |
default="%A,%B,%C,%D,%E,%F,%Phase_A,%Phase_B,%Phase_C,%Phase_D,%Phase_E,%Phase_F,%A_Matrice,%B_Matrice,%C_Matrice,%D_Matrice,%E_Matrice,%F_Matrice,H,Temperature_C", | |
required=False, | |
) | |
args = parser.parse_args() | |
columns_numerical = args.columns_numerical.split(",") if args.columns_numerical else [] | |
columns_target = args.columns_target.split(",") if args.columns_target else [] | |
X_train, X_test, y_train, y_test = prepare_data(args.data_path, columns_numerical, columns_target, args.main_folder) | |
model = train_model(X_train, X_test, y_train, y_test, columns_target, args.main_folder, args.model_path) | |
save_shap_explainer(model.predict, X_train, X_test, args.main_folder) | |