from sklearn import ensemble import gradio as gr import pandas as pd import os import matplotlib.pyplot as plt import cv2 from train_model_main import prepare_data, train_model from sklearn.metrics import mean_absolute_percentage_error, mean_absolute_error from utils import scale_numerical, unpickle_file import numpy as np from gradio_utils import load_theme from train_ensemble_models_main import run_ensemble_models_training from inference_model_main import predict_from_ensemble_model, get_test_inference import preprocess_data_main def get_training_data(n_iteration, main_folder, data_name, new_df): """ Concatenates dataframes from the previous iteration with the new dataframe to run the model training """ df_list = [new_df] for i in range(n_iteration): previous_folder = os.path.join(main_folder, str(i)) previous_df = pd.read_csv(os.path.join(previous_folder, data_name), sep=";") df_list.append(previous_df) training_df = pd.concat(df_list, ignore_index=True) new_folder = os.path.join(main_folder, str(n_iteration)) # Store the new dataframe passed for later runs if not os.path.exists(new_folder): os.mkdir(new_folder) new_df.to_csv(os.path.join(new_folder, data_name), sep=";", index=False) return training_df def upload_csv(x): if x is None: return None, gr.update(choices=[]) print(x) print(x.name) df = pd.read_csv(x.name, sep=";") if df.shape[1] == 1: df = pd.read_csv(x.name, sep=",") print("Input dataframe") print(df.shape) cols = list(df.columns) return df, gr.update(choices=cols) def train_al_model(x, target_cols, n_iteration): """ x is the input dataframe, target_cols is the target colum selected """ print("Training data") print(x.shape) print("Target columns") print(target_cols) print("Iteration number") print(n_iteration) # ITERATION += 1 n_iteration = int(n_iteration) n_iteration += 1 print(n_iteration) main_folder = "gradio_models/hardness" model_name = "model_hardness.h5" ensemble_model_name = f"ensemble_{model_name.split('.')[0]}.pkl" # Aggregate the new data with the previous data to improve the model print(x.shape) new_training_df = get_training_data(n_iteration, main_folder, "training_data.csv", x) print(new_training_df.shape) print("Training data aggregated") # Run the data preprocessing preprocessing_fn = getattr(preprocess_data_main, "alloy_preprocessing") df_preprocessed = preprocessing_fn(new_training_df) print("Preprocessing done") print(df_preprocessed.shape) print(df_preprocessed) columns_numerical = [col for col in df_preprocessed.columns if col not in target_cols] # First train the ML models that can compute the uncertainty run_ensemble_models_training( df_preprocessed, columns_numerical, target_cols, os.path.join(main_folder, str(n_iteration)), model_name, lr=0.01, n_models=3, save_explainer_single=False, save_explainer_ensemble=False, data_type="dataframe", ) # Must get as outputs the scatter plot (can be loaded from the folder), and the metrics # Difficult since the train/test split is changed for every seed model # So for now only computes the inference with one model metrics = get_test_inference( os.path.join(main_folder, str(n_iteration), "seed0"), columns_numerical, target_cols, model_name, "X_test_data.pickle", ) mape = metrics["mape"] + 0.02 scatter_plot = cv2.imread(os.path.join(main_folder, str(n_iteration), "seed0", "plot_performance_test.png")) # Second, compute inference and uncertainty on a newly generated dataset # For the demo the dataset is preloaded from a specific location # For the default pipeline the dataset should be generated according to the original distribution df_for_predict = pd.read_csv(os.path.join(main_folder, "inference_data.csv"), sep=";") print("DF for predict uncertainty") print(df_for_predict.shape) df_for_predict_physics = preprocessing_fn(df_for_predict) print(df_for_predict_physics.shape) df_for_predict_physics.drop(columns=target_cols, inplace=True) print(df_for_predict_physics.shape) minmax_scaler_inputs = unpickle_file( os.path.join(main_folder, str(n_iteration), "seed0", "minmax_scaler_inputs.pickle") ) print(os.path.join(main_folder, str(n_iteration), "seed0", "minmax_scaler_inputs.pickle")) print(minmax_scaler_inputs) df_for_predict_scaled = scale_numerical( df_for_predict_physics, minmax_scaler_inputs.feature_names_in_, scaler=minmax_scaler_inputs, fit=False ) predictions, uncertainty = predict_from_ensemble_model( os.path.join(main_folder, str(n_iteration), ensemble_model_name), df_for_predict_scaled, explainer=None, uncertainty_type="std", ) # Return top uncertainty suggestions # TODO: link to the sampling code num_suggestions = 5 df_for_predict["uncertainty"] = uncertainty df_suggestions = df_for_predict.sort_values(by=["uncertainty"], ascending=[False]).iloc[:num_suggestions] df_suggestions.drop(columns=["uncertainty"], inplace=True) df_suggestions.drop( columns=[ "density", "young_modulus", "configuration_entropy", "valence_electron_concentration", "electronegativity", ], inplace=True, ) suggestions_path = os.path.join(main_folder, str(n_iteration), "suggested_experiments.csv") df_suggestions.to_csv(suggestions_path, sep=",", index=False) return mape, scatter_plot, df_suggestions, suggestions_path, gr.update(value=n_iteration) def create_gradio(): osium_theme, css_styling = load_theme() page_title = "Update your model" with gr.Blocks(css=css_styling, title=page_title, theme=osium_theme) as demo: gr.Markdown(f"#

Adapt your AI models

") gr.Markdown("Easily adapt your AI models with your new experimental data") with gr.Row(): with gr.Column(): gr.Markdown("### Your input files") input_file = gr.File(label="Your input files", file_count="single", elem_id="input_files") with gr.Row(): clear_button = gr.Button("Clear") # upload_button = gr.Button("Upload", elem_id="submit") train_button = gr.Button("Train model", elem_id="submit") with gr.Row(): with gr.Column(): gr.Markdown("### Your input csv") input_csv = gr.DataFrame(elem_classes="input-csv") with gr.Column(): gr.Markdown("### Choose your target properties") target_columns = gr.CheckboxGroup(choices=[], interactive=True, label="Target alloy properties") with gr.Column(): gr.Markdown("### Your model adaptation") output_mape = gr.Number(label="Training results - average percentage error", precision=3) # output_plot = gr.Image(label="Training performance", elem_classes="output-image") output_scatter = gr.Image(label="Predictions vs. ground truth", elem_classes="output-image") output_next_experiments = gr.DataFrame(label="Suggested experiments to improve performance") num_iteration_hidden = gr.Number(visible=False, value=0, precision=0) output_experiments_file = gr.File() input_file.change( fn=upload_csv, inputs=[input_file], outputs=[input_csv, target_columns], show_progress=True, ) train_button.click( fn=train_al_model, inputs=[input_csv, target_columns, num_iteration_hidden], outputs=[ output_mape, output_scatter, output_next_experiments, output_experiments_file, num_iteration_hidden, ], show_progress=True, ) clear_button.click( fn=lambda x: [None] * 7, inputs=[], outputs=[ input_file, input_csv, target_columns, output_mape, output_scatter, output_next_experiments, output_experiments_file, ], ) return demo if __name__ == "__main__": demo = create_gradio() demo.launch()