design_space / app.py
snajmark's picture
Update app.py
1a4e328
import os
import csv
import gradio as gr
import tensorflow as tf
import numpy as np
import pandas as pd
from datetime import datetime
import utils
from huggingface_hub import Repository
import itertools
import time
import cv2
from domain_space import load_domain_space, create_plot, create_slicer_update, update_dropdown
import yaml
# Unique phase elements
# Load access tokens
WRITE_TOKEN = os.environ.get("WRITE_PER") # write
# Logs repo path
dataset_url = "https://huggingface.co/datasets/sandl/upload_alloy_hardness"
dataset_path = "logs_alloy_hardness.csv"
scaling_factors = {'PROPERTY: Calculated Density (g/cm$^3$)': (5.5, 13.7),
'PROPERTY: Calculated Young modulus (GPa)': (77.0, 336.0),
'PROPERTY: HV': (107.0, 1183.0),
'PROPERTY: YS (MPa)': (62.0, 3416.0)}
input_mapping = {'PROPERTY: BCC/FCC/other': {'BCC': 0, 'FCC': 1, 'OTHER': 2},#, 'nan': 2},
'PROPERTY: Processing method': {'ANNEAL': 0, 'CAST': 1, 'OTHER': 2, 'POWDER': 3, 'WROUGHT': 4},#, 'nan': 2},
'PROPERTY: Microstructure': {'B2': 0, 'B2+BCC': 1, 'B2+L12': 2, 'B2+Laves+Sec.': 3, 'B2+Sec.': 4, 'BCC': 5,
'BCC+B2': 6, 'BCC+B2+FCC': 7, 'BCC+B2+FCC+Sec.': 8, 'BCC+B2+L12': 9, 'BCC+B2+Laves': 10,
'BCC+B2+Sec.': 11, 'BCC+BCC': 12, 'BCC+BCC+HCP': 13, 'BCC+BCC+Laves': 14,
'BCC+BCC+Laves(C14)': 15, 'BCC+BCC+Laves(C15)': 16, 'BCC+FCC': 17, 'BCC+HCP': 18,
'BCC+Laves': 19, 'BCC+Laves(C14)': 20, 'BCC+Laves(C15)': 21, 'BCC+Laves+Sec.': 22,
'BCC+Sec.': 23, 'FCC': 24, 'FCC+B2': 25, 'FCC+B2+Sec.': 26, 'FCC+BCC': 27,
'FCC+BCC+B2': 28, 'FCC+BCC+B2+Sec.': 29, 'FCC+BCC+BCC': 30, 'FCC+BCC+Sec.': 31,
'FCC+FCC': 32, 'FCC+HCP': 33, 'FCC+HCP+Sec.': 34, 'FCC+L12': 35, 'FCC+L12+B2': 36,
'FCC+L12+Sec.': 37, 'FCC+Laves': 38, 'FCC+Laves(C14)': 39, 'FCC+Laves+Sec.': 40,
'FCC+Sec.': 41, 'L12+B2': 42, 'Laves(C14)+Sec.': 43, 'OTHER': 44},#, 'nan': 44},
'PROPERTY: Single/Multiphase': {'': 0, 'M': 1, 'S': 2, 'OTHER': 3}}#, 'nan': 3}}
unique_phase_elements = ['B2', 'BCC', 'FCC', 'HCP', 'L12', 'Laves', 'Laves(C14)', 'Laves(C15)', 'Sec.', 'OTHER']
input_cols = {
"PROPERTY: Alloy formula": "(PROPERTY: Alloy formula) "
"Enter alloy formula using proportions representation (i.e. Al0.25 Co1 Fe1 Ni1)",
"PROPERTY: Single/Multiphase": "(PROPERTY: Single/Multiphase) "
"Choose between Single (S), Multiphase (M) and other (OTHER)",
"PROPERTY: BCC/FCC/other": "(PROPERTY: BCC/FCC/other) "
"Choose between BCC, FCC and other ",
"PROPERTY: Processing method": "(PROPERTY: Processing method) "
"Choose your processing method (ANNEAL, CAST, POWDER, WROUGHT or OTHER)",
"PROPERTY: Microstructure": "(PROPERTY: Microstructure) "
"Choose the microstructure (SEC means the secondary/tertiary microstructure is not one of FCC, BCC, HCP, L12, B2, Laves, Laves (C14), Laves (C15))",
}
with open("conf_test_uncertainty.yaml", "rb") as file:
conf = yaml.safe_load(file)
space_dict = conf["domain_space"]["uncertainty_space_dict"]
explored_dict = conf["domain_space"]["explored_space_dict"]
df_synth = load_domain_space(conf["domain_space"]["design_space_path"])
plot_fn_uncertainty, update_plot_fn_uncertainty = create_plot(df_synth, explored_dict, target="uncertainty")
plot_fn_hardness, update_plot_fn_hardness = create_plot(df_synth, explored_dict, target="y_pred")
update_slider_fn = create_slicer_update(space_dict)
def process_microstructure(list_phases):
permutations = list(itertools.permutations(list_phases))
permutations_strings = [str('+'.join(list(e))) for e in permutations]
for e in permutations_strings:
if e in list(input_mapping['PROPERTY: Microstructure'].keys()):
return e
return 'OTHER'
def write_logs(message, message_type="Prediction"):
"""
Write logs
"""
with Repository(local_dir="data", clone_from=dataset_url, use_auth_token=WRITE_TOKEN).commit(commit_message="from private", blocking=False):
with open(dataset_path, "a") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=["name", "message", "time"])
writer.writerow(
{"name": message_type, "message": message, "time": str(datetime.now())}
)
return
def predict(x, request: gr.Request):
"""
Predict the hardness and yield strength using the ML model. Input data is a dataframe
"""
loaded_model = tf.keras.models.load_model("hardness.h5")
print("summary is", loaded_model.summary())
x = x.replace("", 0)
x = np.asarray(x).astype("float32")
y = loaded_model.predict(x)
y_hardness = y[0][0]
y_ys = y[0][1]
minimum_hardness, maximum_hardness = scaling_factors['PROPERTY: HV']
minimum_ys, maximum_ys = scaling_factors['PROPERTY: YS (MPa)']
print("Prediction is ", y)
if request is not None: # Verify if request is not None (when building the app the first request is None)
message = f"{request.username}_{request.client.host}"
print("MESSAGE")
print(message)
res = write_logs(message)
interpret_fig = utils.interpret(x)
return (round(y_hardness*(maximum_hardness-minimum_hardness)+minimum_hardness, 2), 12,
round(y_ys*(maximum_ys-minimum_ys)+minimum_ys, 2), 4.8, interpret_fig)
def predict_from_tuple(in1, in2, in3, in4, in5, request: gr.Request):
"""
Predict the hardness using the ML model. Input data is a tuple. Input order should be the same as the cols list
"""
input_tuple = (in1, in2, in3, in4, in5)
formula = utils.normalize_and_alphabetize_formula(in1)
density = utils.calculate_density(formula)
young_modulus = utils.calculate_youngs_modulus(formula)
input_dict = {}
in2 = input_mapping['PROPERTY: Single/Multiphase'][str(in2)]
input_dict['PROPERTY: Single/Multiphase'] = [int(in2)]
in3 = input_mapping['PROPERTY: BCC/FCC/other'][str(in3)]
input_dict['PROPERTY: BCC/FCC/other'] = [int(in3)]
in4 = input_mapping['PROPERTY: Processing method'][str(in4)]
input_dict['PROPERTY: Processing method'] = [int(in4)]
in5 = process_microstructure(in5)
in5 = input_mapping['PROPERTY: Microstructure'][in5]
input_dict['PROPERTY: Microstructure'] = [int(in5)]
density_scaling_factors = scaling_factors['PROPERTY: Calculated Density (g/cm$^3$)']
density = (density-density_scaling_factors[0])/(
density_scaling_factors[1]-density_scaling_factors[0])
input_dict['PROPERTY: Calculated Density (g/cm$^3$)'] = [float(density)]
ym_scaling_factors = scaling_factors['PROPERTY: Calculated Young modulus (GPa)']
young_modulus = (young_modulus-ym_scaling_factors[0])/(
ym_scaling_factors[1]-ym_scaling_factors[0])
input_dict['PROPERTY: Calculated Young modulus (GPa)'] = [float(young_modulus)]
input_df = pd.DataFrame.from_dict(input_dict)
one_hot = utils.turn_into_one_hot(input_df, input_mapping)
print("One hot columns are ", one_hot.columns)
return predict(one_hot, request)
def upload_csv(x):
print(x)
print(x.name)
df = pd.read_csv(x.name, sep=",")
print("Input dataframe")
print(df.shape)
df.drop(columns=["Unnamed: 0"], inplace=True)
cols = list(df.columns)
return df, gr.update(choices=cols)
def train_model(x, target_cols):
print("Selected target columns")
print(target_cols)
time.sleep(6)
# performance_plot = cv2.imread("model_performance.png")
performance_plot = cv2.imread("predictions_ground_truth.png")
metrics = pd.DataFrame([[0.05, 0.017]], columns=["RMSE", "MAPE"])
next_df = x.sample(n=5, random_state=12)
next_df.drop(columns=target_cols, inplace=True)
return "0.017", performance_plot, next_df
example_inputs = ['Al0.25 Co1 Fe1 Ni1', 'S', 'BCC', 'CAST', ['B2', 'Sec.']]
css_styling = """#submit {background: #1eccd8}
#submit:hover {background: #a2f1f6}
.output-image, .input-image, .image-preview {height: 250px !important}
.output-plot {height: 250px !important}"""
light_theme_colors = gr.themes.Color(c50="#e4f3fa", # Dataframe background cell content - light mode only
c100="#e4f3fa", # Top corner of clear button in light mode + markdown text in dark mode
c200="#a1c6db", # Component borders
c300="#FFFFFF", #
c400="#e4f3fa", # Footer text
c500="#0c1538", # Text of component headers in light mode only
c600="#a1c6db", # Top corner of button in dark mode
c700="#475383", # Button text in light mode + component borders in dark mode
c800="#0c1538", # Markdown text in light mode
c900="#a1c6db", # Background of dataframe - dark mode
c950="#0c1538") # Background in dark mode only
# secondary color used for highlight box content when typing in light mode, and download option in dark mode
# primary color used for login button in dark mode
osium_theme = gr.themes.Default(primary_hue="cyan", secondary_hue="cyan", neutral_hue=light_theme_colors)
page_title = "Alloys' hardness and yield strength prediction"
favicon_path = "osiumai_favicon.ico"
logo_path = "osiumai_logo.jpg"
html = f"""<html> <link rel="icon" type="image/x-icon" href="file={favicon_path}">
<img src='file={logo_path}' alt='Osium AI logo' width='200' height='100'> </html>"""
with gr.Blocks(css=css_styling, title=page_title, theme=osium_theme) as demo:
#gr.HTML(html)
gr.Markdown("# <p style='text-align: center;'>Predict your alloy's hardness and yield strength</p>")
gr.Markdown("This AI model provides the estimation of hardness and yield strength based on the input alloy description")
with gr.Tab(label="Model adaptation"):
with gr.Row():
with gr.Column():
gr.Markdown("### Your input files")
input_file = gr.File(label="Your input files", file_count="single", elem_id="input_files")
with gr.Row():
clear_train_button = gr.Button("Clear")
# upload_button = gr.Button("Upload", elem_id="submit")
train_button = gr.Button("Train model", elem_id="submit")
with gr.Row():
with gr.Column():
gr.Markdown("### Your input csv")
# input_image1 = gr.Image(elem_classes="input-csv")
input_csv = gr.DataFrame(elem_classes="input-csv")
with gr.Column():
gr.Markdown("### Choose your target properties")
target_columns = gr.CheckboxGroup(choices=[], interactive=True, label="Target alloy properties")
with gr.Column():
gr.Markdown("### Your model adaptation")
output_text = gr.Textbox(label="Training results - Mean Average Percentage Error")
output_plot = gr.Image(label="Training performance", elem_classes="output-image")
# output_performance = gr.DataFrame(label="Model performance")
output_next_experiments = gr.DataFrame(label="Suggested experiments to improve performance")
with gr.Tab(label="Run your model"):
with gr.Row():
clear_button = gr.Button("Clear")
prediction_button = gr.Button("Predict", elem_id="submit")
with gr.Row():
with gr.Column(scale=0.25, min_width=80):
gr.Markdown("### Your alloy's characteristics")
input_formula = gr.Textbox(
lines=2, placeholder=input_cols["PROPERTY: Alloy formula"], label=input_cols["PROPERTY: Alloy formula"]
)
input_phase = gr.Dropdown(
choices=list(input_mapping["PROPERTY: Single/Multiphase"].keys()),
label=input_cols["PROPERTY: Single/Multiphase"],
)
input_bccfcc = gr.Dropdown(
choices=list(input_mapping["PROPERTY: BCC/FCC/other"].keys()),
label=input_cols["PROPERTY: BCC/FCC/other"],
)
input_processing = gr.Dropdown(
choices=list(input_mapping["PROPERTY: Processing method"].keys()),
label=input_cols["PROPERTY: Processing method"],
)
input_microstructure = gr.CheckboxGroup(
choices=unique_phase_elements, #list(input_mapping["PROPERTY: Microstructure"].keys()),
label=input_cols["PROPERTY: Microstructure"],
)
with gr.Column():
with gr.Row():
with gr.Column():
gr.Markdown("### Your alloy's hardness (HV)")
output_hardness = gr.Text(label="Hardness (in HV)")
output_hardness_uncertainty = gr.Text(label="Hardness uncertainty (%)")
with gr.Column():
gr.Markdown("### Your alloy's yield strength (MPa)")
output_ys = gr.Text(label="Yield Strength (MPa)")
output_ys_uncertainty = gr.Text(label="Yield strength uncertainty (%)")
with gr.Row():
with gr.Column():
with gr.Row():
gr.Markdown("### Interpretation of hardness prediction")
gr.Markdown("### Interpretation of yield strength prediction")
with gr.Row():
output_interpretation = gr.Plot(label="Interpretation")
gr.Markdown("### Explore your alloy design space")
with gr.Row():
elem1 = "%Cr"
elem2 = "%V"
elem3 = "%Mo"
with gr.Row():
input_cols_gradio = ["%C", "%Co", "%Cr", "%V", "%Mo", "%W", "Temperature_C"]
input_list1 = input_cols_gradio.copy()
input_list1.remove(elem2)
input_list1.remove(elem3)
dropdown_1 = gr.Dropdown(label="Fix element 1", choices=input_list1, value=elem1)
input_slicer_1 = gr.Slider(
label=elem1,
minimum=space_dict[elem1]["min"],
maximum=space_dict[elem1]["max"],
value=space_dict[elem1]["value"],
step=space_dict[elem1]["step_display"],
)
with gr.Row():
input_list2 = input_cols_gradio.copy()
input_list2.remove(elem1)
input_list2.remove(elem3)
dropdown_2 = gr.Dropdown(label="Fix element 2", choices=input_list2, value=elem2)
input_slicer_2 = gr.Slider(
label=elem2,
minimum=space_dict[elem2]["min"],
maximum=space_dict[elem2]["max"],
value=space_dict[elem2]["value"],
step=space_dict[elem2]["step_display"],
)
with gr.Row():
input_list3 = input_cols_gradio.copy()
input_list3.remove(elem1)
input_list3.remove(elem2)
dropdown_3 = gr.Dropdown(label="Fix element 3", choices=input_list3, value=elem3)
input_slicer_3 = gr.Slider(
label=elem3,
minimum=space_dict[elem3]["min"],
maximum=space_dict[elem3]["max"],
value=space_dict[elem3]["value"],
step=space_dict[elem3]["step_display"],
)
with gr.Column():
gr.Markdown("### Your design space")
output_plot_space_hardness = gr.Plot(type="plotly")
output_plot_space_uncertainty = gr.Plot(type="plotly")
with gr.Row():
gr.Examples([example_inputs], [input_formula, input_phase, input_bccfcc, input_processing, input_microstructure])
input_slicer_1.change(
fn=update_plot_fn_uncertainty,
inputs=[dropdown_1, input_slicer_1, dropdown_2, input_slicer_2, dropdown_3, input_slicer_3],
outputs=[output_plot_space_uncertainty],
show_progress=True,
queue=True,
every=0.5,
)
input_slicer_2.change(
fn=update_plot_fn_uncertainty,
inputs=[dropdown_1, input_slicer_1, dropdown_2, input_slicer_2, dropdown_3, input_slicer_3],
outputs=[output_plot_space_uncertainty],
show_progress=True,
queue=True,
# every=2,
)
input_slicer_3.change(
fn=update_plot_fn_uncertainty,
inputs=[dropdown_1, input_slicer_1, dropdown_2, input_slicer_2, dropdown_3, input_slicer_3],
outputs=[output_plot_space_uncertainty],
show_progress=True,
queue=True,
# every=2,
)
output_hardness.change(
fn=update_plot_fn_uncertainty,
inputs=[dropdown_1, input_slicer_1, dropdown_2, input_slicer_2, dropdown_3, input_slicer_3],
outputs=[output_plot_space_uncertainty],
show_progress=True,
queue=True,
# every=2,
)
input_slicer_1.change(
fn=update_plot_fn_hardness,
inputs=[dropdown_1, input_slicer_1, dropdown_2, input_slicer_2, dropdown_3, input_slicer_3],
outputs=[output_plot_space_hardness],
show_progress=True,
queue=True,
every=0.5,
)
input_slicer_2.change(
fn=update_plot_fn_hardness,
inputs=[dropdown_1, input_slicer_1, dropdown_2, input_slicer_2, dropdown_3, input_slicer_3],
outputs=[output_plot_space_hardness],
show_progress=True,
queue=True,
# every=2,
)
input_slicer_3.change(
fn=update_plot_fn_hardness,
inputs=[dropdown_1, input_slicer_1, dropdown_2, input_slicer_2, dropdown_3, input_slicer_3],
outputs=[output_plot_space_hardness],
show_progress=True,
queue=True,
# every=2,
)
output_hardness.change(
fn=update_plot_fn_hardness,
inputs=[dropdown_1, input_slicer_1, dropdown_2, input_slicer_2, dropdown_3, input_slicer_3],
outputs=[output_plot_space_hardness],
show_progress=True,
queue=True,
# every=2,
)
# Update the choices in the dropdown based on the elements selected
# dropdown_1.change(fn=update_dropdown, inputs=[dropdown_1], outputs=[dropdown_2, dropdown_3], show_progress=True)
# dropdown_2.change(fn=update_dropdown, inputs=[dropdown_2], outputs=[dropdown_1, dropdown_3], show_progress=True)
# dropdown_2.change(fn=update_dropdown, inputs=[dropdown_3], outputs=[dropdown_1, dropdown_2], show_progress=True)
dropdown_1.change(
fn=update_dropdown,
inputs=[dropdown_1, dropdown_2, dropdown_3],
outputs=[dropdown_1, dropdown_2, dropdown_3],
show_progress=True,
)
dropdown_2.change(
fn=update_dropdown,
inputs=[dropdown_1, dropdown_2, dropdown_3],
outputs=[dropdown_1, dropdown_2, dropdown_3],
show_progress=True,
)
dropdown_3.change(
fn=update_dropdown,
inputs=[dropdown_1, dropdown_2, dropdown_3],
outputs=[dropdown_1, dropdown_2, dropdown_3],
show_progress=True,
)
# Update the slider name based on the choice of the dropdow
dropdown_1.change(fn=update_slider_fn, inputs=[dropdown_1], outputs=[input_slicer_1])
dropdown_2.change(fn=update_slider_fn, inputs=[dropdown_2], outputs=[input_slicer_2])
dropdown_3.change(fn=update_slider_fn, inputs=[dropdown_3], outputs=[input_slicer_3])
train_button.click(
fn=train_model,
inputs=[input_csv, target_columns],
outputs=[output_text, output_plot, output_next_experiments],
show_progress=True,
)
clear_train_button.click(
lambda x: [gr.update(value=None)] * 6,
[],
# [input_file, input_csv, target_columns, output_text, output_plot, output_performance],
[input_file, input_csv, target_columns, output_text, output_plot],
)
# upload_button.click(
# fn=upload_csv,
# inputs=[input_file],
# outputs=[input_csv, target_columns],
# show_progress=True,
# # every=2,
# )
input_file.change(
fn=upload_csv,
inputs=[input_file],
outputs=[input_csv, target_columns],
show_progress=True,
# every=2,
)
prediction_button.click(
fn=predict_from_tuple,
inputs=[input_formula, input_phase, input_bccfcc, input_processing, input_microstructure],
outputs=[
output_hardness,
output_hardness_uncertainty,
output_ys,
output_ys_uncertainty,
output_interpretation,
],
show_progress=True,
)
clear_button.click(
lambda x: [gr.update(value=None)] * 10,
[],
[
input_formula,
input_phase,
input_bccfcc,
input_processing,
input_microstructure,
output_hardness,
output_hardness_uncertainty,
output_ys,
output_ys_uncertainty,
output_interpretation,
],
)
if __name__ == "__main__":
demo.queue(concurrency_count=2)
demo.launch()