Spaces:
Running
Running
import gradio as gr | |
import joblib | |
import numpy as np | |
import pandas as pd | |
from propy import AAComposition, Autocorrelation, CTD, PseudoAAC | |
from sklearn.preprocessing import MinMaxScaler | |
import torch | |
from transformers import BertTokenizer, BertModel | |
from lime.lime_tabular import LimeTabularExplainer | |
from math import expm1 | |
import matplotlib.pyplot as plt | |
import io | |
import base64 | |
import os | |
# --- Configuration and Model Loading --- | |
MODEL_DIR = os.path.dirname(os.path.abspath(__file__)) | |
# Load AMP Classifier | |
try: | |
model = joblib.load(os.path.join(MODEL_DIR, "RF.joblib")) | |
scaler = joblib.load(os.path.join(MODEL_DIR, "norm (4).joblib")) | |
except FileNotFoundError as e: | |
raise gr.Error(f"Classifier model or scaler not found: {e}. Make sure RF.joblib and norm (4).joblib are in the {MODEL_DIR} directory.") | |
except Exception as e: | |
raise gr.Error(f"Error loading classifier components: {e}") | |
# Load ProtBert | |
try: | |
tokenizer = BertTokenizer.from_pretrained("Rostlab/prot_bert", do_lower_case=False) | |
protbert_model = BertModel.from_pretrained("Rostlab/prot_bert") | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
protbert_model = protbert_model.to(device).eval() | |
except Exception as e: | |
raise gr.Error(f"Error loading ProtBert model/tokenizer: {e}. Check internet connection or model availability.") | |
# Full list of selected features (as provided in the original code) | |
selected_features = ["_SolventAccessibilityC3", "_SecondaryStrC1", "_SecondaryStrC3", "_ChargeC1", "_PolarityC1", | |
"_NormalizedVDWVC1", "_HydrophobicityC3", "_SecondaryStrT23", "_PolarizabilityD1001", "_PolarizabilityD2001", | |
"_PolarabilityD3001", "_SolventAccessibilityD1001", "_SolventAccessibilityD2001", "_SolventAccessibilityD3001", | |
"_SecondaryStrD1001", "_SecondaryStrD1075", "_SecondaryStrD2001", "_SecondaryStrD3001", "_ChargeD1001", | |
"_ChargeD1025", "_ChargeD2001", "_ChargeD3075", "_ChargeD3100", "_PolarityD1001", "_PolarityD1050", | |
"_PolarityD2001", "_PolarityD3001", "_NormalizedVDWVD1001", "_NormalizedVDWVD2001", "_NormalizedVDWVD2025", | |
"_NormalizedVDWVD2050", "_NormalizedVDWVD3001", "_HydrophobicityD1001", "_HydrophobicityD2001", | |
"_HydrophobicityD3001", "_HydrophobicityD3025", "A", "R", "D", "C", "E", "Q", "H", "I", "M", "P", "Y", "V", | |
"AR", "AV", "RC", "RL", "RV", "CR", "CC", "CL", "CK", "EE", "EI", "EL", "HC", "IA", "IL", "IV", "LA", "LC", "LE", | |
"LI", "LT", "LV", "KC", "MA", "MS", "SC", "TC", "TV", "YC", "VC", "VE", "VL", "VK", "VV", | |
"MoreauBrotoAuto_FreeEnergy30", "MoranAuto_Hydrophobicity2", "MoranAuto_Hydrophobicity4", | |
"GearyAuto_Hydrophobicity20", "GearyAuto_Hydrophobicity24", "GearyAuto_Hydrophobicity26", | |
"GearyAuto_Hydrophobicity27", "GearyAuto_Hydrophobicity28", "GearyAuto_Hydrophobicity29", | |
"GearyAuto_Hydrophobicity30", "GearyAuto_AvFlexibility22", "GearyAuto_AvFlexibility26", | |
"GearyAuto_AvFlexibility27", "GearyAuto_AvFlexibility28", "GearyAuto_AvFlexibility29", "GearyAuto_AvFlexibility30", | |
"GearyAuto_Polarizability22", "GearyAuto_Polarizability24", "GearyAuto_Polarizability25", | |
"GearyAuto_Polarizability27", "GearyAuto_Polarizability28", "GearyAuto_Polarizability29", | |
"GearyAuto_Polarizability30", "GearyAuto_FreeEnergy24", "GearyAuto_FreeEnergy25", "GearyAuto_FreeEnergy30", | |
"GearyAuto_ResidueASA21", "GearyAuto_ResidueASA22", "GearyAuto_ResidueASA23", "GearyAuto_ResidueASA24", | |
"GearyAuto_ResidueASA30", "GearyAuto_ResidueVol21", "GearyAuto_ResidueVol24", "GearyAuto_ResidueVol25", | |
"GearyAuto_ResidueVol26", "GearyAuto_ResidueVol28", "GearyAuto_ResidueVol29", "GearyAuto_ResidueVol30", | |
"GearyAuto_Steric18", "GearyAuto_Steric21", "GearyAuto_Steric26", "GearyAuto_Steric27", "GearyAuto_Steric28", | |
"GearyAuto_Steric29", "GearyAuto_Steric30", "GearyAuto_Mutability23", "GearyAuto_Mutability25", | |
"GearyAuto_Mutability26", "GearyAuto_Mutability27", "GearyAuto_Mutability28", "GearyAuto_Mutability29", | |
"GearyAuto_Mutability30", "APAAC1", "APAAC4", "APAAC5", "APAAC6", "APAAC8", "APAAC9", "APAAC12", "APAAC13", | |
"APAAC15", "APAAC18", "APAAC19", "APAAC24"] | |
# LIME Explainer Setup | |
try: | |
sample_data = np.random.rand(500, len(selected_features)) # Fallback: Generate random sample data | |
except Exception: | |
print("Warning: Could not load pre-saved sample data for LIME. Generating random sample data.") | |
sample_data = np.random.rand(500, len(selected_features)) | |
explainer = LimeTabularExplainer( | |
training_data=sample_data, | |
feature_names=selected_features, | |
class_names=["AMP", "Non-AMP"], | |
mode="classification" | |
) | |
# --- Feature Extraction Function --- | |
def extract_features(sequence: str) -> np.ndarray: | |
cleaned_sequence = ''.join([aa for aa in sequence.upper() if aa in "ACDEFGHIKLMNPQRSTVWY"]) | |
if not (10 <= len(cleaned_sequence) <= 100): | |
raise gr.Error(f"Invalid sequence length ({len(cleaned_sequence)}). Must be between 10 and 100 characters and contain only standard amino acids.") | |
try: | |
dipeptide_features = AAComposition.CalculateAADipeptideComposition(cleaned_sequence) | |
ctd_features = CTD.CalculateCTD(cleaned_sequence) | |
auto_features = Autocorrelation.CalculateAutoTotal(cleaned_sequence) | |
pseudo_features = PseudoAAC.GetAPseudoAAC(cleaned_sequence, lamda=9) | |
all_features_dict = {} | |
all_features_dict.update(ctd_features) | |
all_features_dict.update(dipeptide_features) | |
all_features_dict.update(auto_features) | |
all_features_dict.update(pseudo_features) | |
feature_df_all = pd.DataFrame([all_features_dict]) | |
computed_features_ordered = feature_df_all.reindex(columns=selected_features, fill_value=0) | |
computed_features_ordered = computed_features_ordered.fillna(0) | |
normalized_array = scaler.transform(computed_features_ordered.values) | |
return normalized_array | |
except Exception as e: | |
raise gr.Error(f"Feature extraction failed: {e}. Ensure sequence is valid and Propy dependencies are met.") | |
# --- MIC Prediction Function --- | |
def predictmic(sequence: str, selected_bacteria_keys: list) -> dict: | |
cleaned_sequence = ''.join([aa for aa in sequence.upper() if aa in "ACDEFGHIKLMNPQRSTVWY"]) | |
if not (10 <= len(cleaned_sequence) <= 100): | |
raise gr.Error(f"Invalid sequence length for MIC prediction ({len(cleaned_sequence)}). Must be between 10 and 100 characters.") | |
seq_spaced = ' '.join(list(cleaned_sequence)) | |
try: | |
tokens = tokenizer(seq_spaced, return_tensors="pt", padding='max_length', truncation=True, max_length=512) | |
tokens = {k: v.to(device) for k, v in tokens.items()} | |
with torch.no_grad(): | |
outputs = protbert_model(**tokens) | |
embedding = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy().reshape(1, -1) | |
except Exception as e: | |
raise gr.Error(f"Error generating ProtBert embedding: {e}. Check sequence format or model availability.") | |
bacteria_config = { | |
"e_coli": {"display_name": "E.coli", "model": "coli_xgboost_model.pkl", "scaler": "coli_scaler.pkl", "pca": None}, | |
"p_aeruginosa": {"display_name": "P. aeruginosa", "model": "arg_xgboost_model.pkl", "scaler": "arg_scaler.pkl", "pca": None}, | |
"s_aureus": {"display_name": "S. aureus", "model": "aur_xgboost_model.pkl", "scaler": "aur_scaler.pkl", "pca": None}, | |
"k_pneumoniae": {"display_name": "K. pneumoniae", "model": "pne_mlp_model.pkl", "scaler": "pne_scaler.pkl", "pca": "pne_pca.pkl"} | |
} | |
mic_results = {} | |
for bacterium_key in selected_bacteria_keys: | |
cfg = bacteria_config.get(bacterium_key) | |
if not cfg: | |
mic_results[bacterium_key] = "Error: Invalid bacterium key provided." | |
continue | |
try: | |
mic_scaler = joblib.load(os.path.join(MODEL_DIR, cfg["scaler"])) | |
scaled_embedding = mic_scaler.transform(embedding) | |
transformed_embedding = scaled_embedding | |
if cfg["pca"]: | |
mic_pca = joblib.load(os.path.join(MODEL_DIR, cfg["pca"])) | |
transformed_embedding = mic_pca.transform(scaled_embedding) | |
mic_model = joblib.load(os.path.join(MODEL_DIR, cfg["model"])) | |
mic_log = mic_model.predict(transformed_embedding)[0] | |
mic = round(expm1(mic_log), 3) | |
mic_results[bacterium_key] = mic | |
except FileNotFoundError as e: | |
mic_results[bacterium_key] = f"Model file not found for {cfg['display_name']}: {e}" | |
except Exception as e: | |
mic_results[bacterium_key] = f"Prediction error for {cfg['display_name']}: {e}" | |
return mic_results | |
# --- LIME Plot Generation Helper --- | |
def generate_lime_plot_base64(explanation_list: list) -> str: | |
if not explanation_list: | |
return "" | |
fig, ax = plt.subplots(figsize=(10, 6)) | |
features = [item[0] for item in explanation_list] | |
weights = [item[1] for item in explanation_list] | |
sorted_indices = np.argsort(np.abs(weights))[::-1] | |
features_sorted = [features[i] for i in sorted_indices] | |
weights_sorted = [weights[i] for i in sorted_indices] | |
y_pos = np.arange(len(features_sorted)) | |
colors = ['green' if w > 0 else 'red' for w in weights_sorted] | |
ax.barh(y_pos, weights_sorted, align='center', color=colors) | |
ax.set_yticks(y_pos) | |
ax.set_yticklabels(features_sorted, fontsize=10) | |
ax.invert_yaxis() | |
ax.set_xlabel('Contribution to Prediction (LIME Weight)', fontsize=12) | |
ax.set_title('Top Features Influencing Prediction (LIME)', fontsize=14) | |
ax.axvline(0, color='grey', linestyle='--', linewidth=0.8) | |
plt.grid(axis='x', linestyle=':', alpha=0.7) | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png', bbox_inches='tight', dpi=150) | |
buf.seek(0) | |
image_base64 = base64.b64encode(buf.getvalue()).decode('utf-8') | |
plt.close(fig) | |
return image_base64 | |
# --- Gradio API Endpoints --- | |
def classify_and_interpret_amp(sequence: str) -> dict: | |
try: | |
features = extract_features(sequence) | |
prediction_class_idx = model.predict(features)[0] | |
probabilities = model.predict_proba(features)[0] | |
amp_label = "AMP (Positive)" if prediction_class_idx == 0 else "Non-AMP" | |
confidence = probabilities[prediction_class_idx] | |
explanation = explainer.explain_instance( | |
data_row=features[0], | |
predict_fn=model.predict_proba, | |
num_features=10 | |
) | |
top_features = [] | |
for feat_str, weight in explanation.as_list(): | |
parts = feat_str.split(" ", 1) | |
feature_name = parts[0] | |
condition = parts[1] if len(parts) > 1 else "" | |
top_features.append({ | |
"feature": feature_name, | |
"condition": condition.strip(), | |
"value": round(weight, 4) | |
}) | |
lime_plot_base64_str = generate_lime_plot_base64(explanation.as_list()) | |
return { | |
"label": amp_label, | |
"confidence": float(confidence), | |
"shap_plot_base64": lime_plot_base64_str, | |
"top_features": top_features | |
} | |
except gr.Error as e: | |
raise e | |
except Exception as e: | |
raise gr.Error(f"An unexpected error occurred during AMP classification: {e}") | |
def get_mic_predictions_api(sequence: str, selected_bacteria_keys: list) -> dict: | |
try: | |
mic_results = predictmic(sequence, selected_bacteria_keys) | |
return mic_results | |
except gr.Error as e: | |
raise e | |
except Exception as e: | |
raise gr.Error(f"An unexpected error occurred during MIC prediction API call: {e}") | |
# --- Gradio Interface Definition --- | |
with gr.Blocks() as demo: | |
gr.Markdown("# EPIC-AMP Platform Backend API") | |
gr.Markdown("This Gradio application provides the backend services for the EPIC-AMP frontend.") | |
with gr.Tab("AMP Classification & Interpretability API"): | |
gr.Markdown("### `/predict` Endpoint (AMP Classification, Confidence, LIME Plot, Top Features)") | |
gr.Markdown("Input an amino acid sequence (10-100 AAs) to get classification details.") | |
sequence_input_amp = gr.Textbox(label="Amino Acid Sequence", lines=5, placeholder="Enter sequence here...") | |
amp_api_output = gr.Json(label="AMP Prediction Details JSON Output") | |
gr.Button("Test Classification").click( | |
fn=classify_and_interpret_amp, | |
inputs=[sequence_input_amp], | |
outputs=[amp_api_output], | |
api_name="predict" | |
) | |
with gr.Tab("MIC Prediction API"): | |
gr.Markdown("### `/predict_mic` Endpoint (MIC Values)") | |
gr.Markdown("Input an amino acid sequence (only if classified as AMP) and select bacteria to get predicted MIC values.") | |
sequence_input_mic = gr.Textbox(label="Amino Acid Sequence", lines=5, placeholder="Enter AMP sequence for MIC prediction...") | |
mic_bacteria_checkboxes = gr.CheckboxGroup( | |
choices=["e_coli", "p_aeruginosa", "s_aureus", "k_pneumoniae"], | |
label="Select Bacteria for MIC Prediction (keys for backend)" | |
) | |
mic_api_output = gr.Json(label="MIC Prediction JSON Output") | |
gr.Button("Test MIC Prediction").click( | |
fn=get_mic_predictions_api, | |
inputs=[sequence_input_mic, mic_bacteria_checkboxes], | |
outputs=[mic_api_output], | |
api_name="predict_mic" | |
) | |
# Corrected launch command: removed 'enable_queue' | |
demo.launch(share=True, show_api=True) |