Spaces:
Sleeping
Sleeping
| import hashlib | |
| import itertools | |
| import json | |
| import textwrap | |
| import threading | |
| from math import pi | |
| from uuid import uuid4 | |
| import io | |
| import os | |
| import pathlib | |
| from pathlib import Path | |
| import sys | |
| import numpy as np | |
| from Bio import SeqIO | |
| from Bio.Align import PairwiseAligner | |
| # from email_validator import validate_email | |
| import gradio as gr | |
| import hydra | |
| import pandas as pd | |
| import plotly.express as px | |
| import requests | |
| from rdkit.Chem.rdMolDescriptors import CalcNumRotatableBonds, CalcNumHeavyAtoms, CalcNumAtoms, CalcTPSA | |
| from requests.adapters import HTTPAdapter, Retry | |
| from rdkit import Chem | |
| from rdkit.Chem import RDConfig, Descriptors, Draw, Lipinski, Crippen, PandasTools, AllChem | |
| from rdkit.Chem.Scaffolds import MurckoScaffold | |
| import seaborn as sns | |
| import swifter | |
| from tqdm.auto import tqdm | |
| from deepscreen.data.dti import validate_seq_str, FASTA_PAT, SMILES_PAT | |
| from deepscreen.predict import predict | |
| sys.path.append(os.path.join(RDConfig.RDContribDir, 'SA_Score')) | |
| import sascorer | |
| ROOT = Path.cwd() | |
| DF_FOR_REPORT = pd.DataFrame() | |
| pd.set_option('display.float_format', '{:.3f}'.format) | |
| PandasTools.molRepresentation = 'svg' | |
| PandasTools.drawOptions = Draw.rdMolDraw2D.MolDrawOptions() | |
| PandasTools.drawOptions.clearBackground = False | |
| PandasTools.drawOptions.bondLineWidth = 1.5 | |
| PandasTools.drawOptions.explicitMethyl = True | |
| PandasTools.drawOptions.singleColourWedgeBonds = True | |
| PandasTools.drawOptions.useCDKAtomPalette() | |
| PandasTools.molSize = (128, 128) | |
| SESSION = requests.Session() | |
| ADAPTER = HTTPAdapter(max_retries=Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])) | |
| SESSION.mount('http://', ADAPTER) | |
| SESSION.mount('https://', ADAPTER) | |
| # SCHEDULER = BackgroundScheduler() | |
| UNIPROT_ENDPOINT = 'https://rest.uniprot.org/uniprotkb/{query}' | |
| CUSTOM_DATASET_MAX_LEN = 10_000 | |
| CSS = """ | |
| .help-tip { | |
| position: absolute; | |
| display: inline-block; | |
| top: 16px; | |
| right: 0px; | |
| text-align: center; | |
| border-radius: 40%; | |
| /* border: 2px solid darkred; background-color: #8B0000;*/ | |
| width: 24px; | |
| height: 24px; | |
| font-size: 16px; | |
| line-height: 26px; | |
| cursor: default; | |
| transition: all 0.5s cubic-bezier(0.55, 0, 0.1, 1); | |
| z-index: 100 !important; | |
| } | |
| .help-tip:hover { | |
| cursor: pointer; | |
| /*background-color: #ccc;*/ | |
| } | |
| .help-tip:before { | |
| content: '?'; | |
| font-weight: 700; | |
| color: #8B0000; | |
| z-index: 100 !important; | |
| } | |
| .help-tip p { | |
| visibility: hidden; | |
| opacity: 0; | |
| text-align: left; | |
| background-color: #EFDDE3; | |
| padding: 20px; | |
| width: 300px; | |
| position: absolute; | |
| border-radius: 4px; | |
| right: -4px; | |
| color: #494F5A; | |
| font-size: 13px; | |
| line-height: normal; | |
| transform: scale(0.7); | |
| transform-origin: 100% 0%; | |
| transition: all 0.5s cubic-bezier(0.55, 0, 0.1, 1); | |
| z-index: 100; | |
| } | |
| .help-tip:hover p { | |
| cursor: default; | |
| visibility: visible; | |
| opacity: 1; | |
| transform: scale(1.0); | |
| } | |
| .help-tip p:before { | |
| position: absolute; | |
| content: ''; | |
| width: 0; | |
| height: 0; | |
| border: 6px solid transparent; | |
| border-bottom-color: #EFDDE3; | |
| right: 10px; | |
| top: -12px; | |
| } | |
| .help-tip p:after { | |
| width: 100%; | |
| height: 40px; | |
| content: ''; | |
| position: absolute; | |
| top: -5px; | |
| left: 0; | |
| } | |
| .upload_button { | |
| background-color: #008000; | |
| } | |
| .absolute { | |
| position: absolute; | |
| } | |
| #example { | |
| padding: 0; | |
| background: none; | |
| border: none; | |
| text-decoration: underline; | |
| box-shadow: none; | |
| text-align: left !important; | |
| display: inline-block !important; | |
| } | |
| footer { | |
| visibility: hidden | |
| } | |
| """ | |
| class HelpTip: | |
| def __new__(cls, text): | |
| return gr.HTML( | |
| # elem_classes="absolute", | |
| value=f'<div class="help-tip"><p>{text}</p>', | |
| ) | |
| def sa_score(row): | |
| return sascorer.calculateScore(row['Compound']) | |
| def mw(row): | |
| return Chem.Descriptors.MolWt(row['Compound']) | |
| def mr(row): | |
| return Crippen.MolMR(row['Compound']) | |
| def hbd(row): | |
| return Lipinski.NumHDonors(row['Compound']) | |
| def hba(row): | |
| return Lipinski.NumHAcceptors(row['Compound']) | |
| def logp(row): | |
| return Crippen.MolLogP(row['Compound']) | |
| def atom(row): | |
| return CalcNumAtoms(row['Compound']) | |
| def heavy_atom(row): | |
| return CalcNumHeavyAtoms(row['Compound']) | |
| def rotatable_bond(row): | |
| return CalcNumRotatableBonds((row['Compound'])) | |
| def tpsa(row): | |
| return CalcTPSA((row['Compound'])) | |
| def lipinski(row): | |
| """ | |
| Lipinski's rules: | |
| Hydrogen bond donors <= 5 | |
| Hydrogen bond acceptors <= 10 | |
| Molecular weight <= 500 daltons | |
| logP <= 5 | |
| """ | |
| if hbd(row) > 5: | |
| return False | |
| elif hba(row) > 10: | |
| return False | |
| elif mw(row) > 500: | |
| return False | |
| elif logp(row) > 5: | |
| return False | |
| else: | |
| return True | |
| def reos(row): | |
| """ | |
| Rapid Elimination Of Swill filter: | |
| Molecular weight between 200 and 500 | |
| LogP between -5.0 and +5.0 | |
| H-bond donor count between 0 and 5 | |
| H-bond acceptor count between 0 and 10 | |
| Formal charge between -2 and +2 | |
| Rotatable bond count between 0 and 8 | |
| Heavy atom count between 15 and 50 | |
| """ | |
| if not 200 < mw(row) < 500: | |
| return False | |
| elif not -5.0 < logp(row) < 5.0: | |
| return False | |
| elif not 0 < hbd(row) < 5: | |
| return False | |
| elif not 0 < hba(row) < 10: | |
| return False | |
| elif not 0 < rotatable_bond(row) < 8: | |
| return False | |
| elif not 15 < heavy_atom(row) < 50: | |
| return False | |
| else: | |
| return True | |
| def ghose(row): | |
| """ | |
| Ghose drug like filter: | |
| Molecular weight between 160 and 480 | |
| LogP between -0.4 and +5.6 | |
| Atom count between 20 and 70 | |
| Molar refractivity between 40 and 130 | |
| """ | |
| if not 160 < mw(row) < 480: | |
| return False | |
| elif not -0.4 < logp(row) < 5.6: | |
| return False | |
| elif not 20 < atom(row) < 70: | |
| return False | |
| elif not 40 < mr(row) < 130: | |
| return False | |
| else: | |
| return True | |
| def veber(row): | |
| """ | |
| The Veber filter is a rule of thumb filter for orally active drugs described in | |
| Veber et al., J Med Chem. 2002; 45(12): 2615-23.: | |
| Rotatable bonds <= 10 | |
| Topological polar surface area <= 140 | |
| """ | |
| if not rotatable_bond(row) <= 10: | |
| return False | |
| elif not tpsa(row) <= 140: | |
| return False | |
| else: | |
| return True | |
| def rule_of_three(row): | |
| """ | |
| Rule of Three filter (Congreve et al., Drug Discov. Today. 8 (19): 876–7, (2003).): | |
| Molecular weight <= 300 | |
| LogP <= 3 | |
| H-bond donor <= 3 | |
| H-bond acceptor count <= 3 | |
| Rotatable bond count <= 3 | |
| """ | |
| if not mw(row) <= 300: | |
| return False | |
| elif not logp(row) <= 3: | |
| return False | |
| elif not hbd(row) <= 3: | |
| return False | |
| elif not hba(row) <= 3: | |
| return False | |
| elif not rotatable_bond(row) <= 3: | |
| return False | |
| else: | |
| return True | |
| # def smarts_filter(): | |
| # alerts = Chem.MolFromSmarts("enter one smart here") | |
| # detected_alerts = [] | |
| # for smiles in data['X1']: | |
| # mol = Chem.MolFromSmiles(smiles) | |
| # detected_alerts.append(mol.HasSubstructMatch(alerts)) | |
| SCORE_MAP = { | |
| 'SAscore': sa_score, | |
| 'LogP': logp, | |
| 'Molecular weight': mw, | |
| 'Number of heavy atoms': heavy_atom, | |
| 'Molar refractivity': mr, | |
| 'H-bond donor count': hbd, | |
| 'H-Bond acceptor count': hba, | |
| 'Rotatable bond count': rotatable_bond, | |
| 'Topological polar surface area': tpsa, | |
| } | |
| FILTER_MAP = { | |
| # TODO support number_of_violations | |
| 'REOS': reos, | |
| "Lipinski's Rule of Five": lipinski, | |
| 'Ghose': ghose, | |
| 'Rule of Three': rule_of_three, | |
| 'Veber': veber, | |
| # 'PAINS': pains, | |
| } | |
| TASK_MAP = { | |
| 'Compound-protein interaction': 'CPI', | |
| 'Compound-protein binding affinity': 'CPA', | |
| } | |
| PRESET_MAP = { | |
| 'DeepDTA': 'deep_dta', | |
| 'DeepConvDTI-ECFP4': 'deep_conv_dti', | |
| 'GraphDTA': 'graph_dta', | |
| 'MGraphDTA': 'm_graph_dta', | |
| 'HyperAttentionDTI': 'hyper_attention_dti', | |
| 'MolTrans': 'mol_trans', | |
| 'TransformerCPI': 'transfomer_cpi', | |
| 'TransformerCPI2': 'transformer_cpi_2', | |
| 'DrugBAN': 'drug_ban', | |
| 'DrugVQA-Seq': 'drug_vqa' | |
| } | |
| TARGET_FAMILY_MAP = { | |
| 'General': 'general', | |
| 'Kinase': 'kinase', | |
| 'Non-kinase enzyme': 'enzyme', | |
| 'Membrane receptor': 'membrane', | |
| 'Nuclear receptor': 'nuclear', | |
| 'Ion channel': 'ion', | |
| 'Other protein targets': 'others', | |
| } | |
| TARGET_LIBRARY_MAP = { | |
| 'ChEMBL33 (Human)': 'ChEMBL33_human_proteins.csv', | |
| # 'STITCH': 'stitch.csv', | |
| # 'Drug Repurposing Hub': 'drug_repurposing_hub.csv', | |
| } | |
| DRUG_LIBRARY_MAP = { | |
| 'DrugBank (Human)': 'drugbank.csv', | |
| } | |
| COLUMN_ALIASES = { | |
| 'X1': 'Compound SMILES', | |
| 'X2': 'Target FASTA', | |
| 'ID1': 'Compound ID', | |
| 'ID2': 'Target ID', | |
| } | |
| def validate_columns(df, mandatory_cols): | |
| missing_cols = [col for col in mandatory_cols if col not in df.columns] | |
| if missing_cols: | |
| error_message = (f"The following mandatory columns are missing " | |
| f"in the uploaded dataset: {str(['X1', 'X2']).strip('[]')}.") | |
| raise ValueError(error_message) | |
| else: | |
| return | |
| def process_target_fasta(sequence): | |
| # lines = sequence.strip().split("\n") | |
| # if lines[0].startswith(">"): | |
| # lines = lines[1:] | |
| # return ''.join(lines).split(">")[0] | |
| record = SeqIO.parse(io.StringIO(sequence), "fasta")[0] | |
| return str(record.seq) | |
| def send_email(receiver, msg): | |
| pass | |
| def submit_predict(predict_filepath, task, preset, target_family, flag, progress=gr.Progress(track_tqdm=True)): | |
| if flag: | |
| try: | |
| job_id = flag | |
| global COLUMN_ALIASES | |
| task = TASK_MAP[task] | |
| preset = PRESET_MAP[preset] | |
| target_family = TARGET_FAMILY_MAP[target_family] | |
| # email_hash = hashlib.sha256(email.encode()).hexdigest() | |
| COLUMN_ALIASES = COLUMN_ALIASES | { | |
| 'Y': 'Actual interaction probability' if task == 'binary' else 'Actual binding affinity', | |
| 'Y^': 'Predicted interaction probability' if task == 'binary' else 'Predicted binding affinity' | |
| } | |
| # target_family_list = [target_family] | |
| # for family in target_family_list: | |
| # try: | |
| prediction_df = pd.DataFrame() | |
| with hydra.initialize(version_base="1.3", config_path="configs", job_name="webserver_inference"): | |
| cfg = hydra.compose( | |
| config_name="webserver_inference", | |
| overrides=[f"task={task}", | |
| f"preset={preset}", | |
| f"ckpt_path=resources/checkpoints/{preset}-{task}-{target_family}.ckpt", | |
| f"data.data_file='{str(predict_filepath)}'"]) | |
| predictions, _ = predict(cfg) | |
| predictions = [pd.DataFrame(prediction) for prediction in predictions] | |
| prediction_df = pd.concat([prediction_df, pd.concat(predictions, ignore_index=True)]) | |
| predictions_file = f'temp/{job_id}_predictions.csv' | |
| prediction_df.to_csv(predictions_file, index=False) | |
| return [predictions_file, | |
| False] | |
| except Exception as e: | |
| gr.Warning(f"Prediction job failed due to error: {str(e)}") | |
| return [None, | |
| False] | |
| else: | |
| return [None, | |
| False] | |
| # | |
| # except Exception as e: | |
| # raise gr.Error(str(e)) | |
| # email_lock = Path(f"outputs/{email_hash}.lock") | |
| # with open(email_lock, "w") as file: | |
| # record = { | |
| # "email": email, | |
| # "job_id": job_id | |
| # } | |
| # json.dump(record, file) | |
| # def run_predict(): | |
| # TODO per-user submit usage | |
| # # email_lock = Path(f"outputs/{email_hash}.lock") | |
| # # with open(email_lock, "w") as file: | |
| # # record = { | |
| # # "email": email, | |
| # # "job_id": job_id | |
| # # } | |
| # # json.dump(record, file) | |
| # | |
| # job_lock = DATA_PATH / f"outputs/{job_id}.lock" | |
| # with open(job_lock, "w") as file: | |
| # pass | |
| # | |
| # try: | |
| # prediction_df = pd.DataFrame() | |
| # for family in target_family_list: | |
| # with hydra.initialize(version_base="1.3", config_path="configs", job_name="webserver_inference"): | |
| # cfg = hydra.compose( | |
| # config_name="webserver_inference", | |
| # overrides=[f"task={task}", | |
| # f"preset={preset}", | |
| # f"ckpt_path=resources/checkpoints/{preset}-{task}-{family}.ckpt", | |
| # f"data.data_file='{str(predict_dataset)}'"]) | |
| # | |
| # predictions, _ = predict(cfg) | |
| # predictions = [pd.DataFrame(prediction) for prediction in predictions] | |
| # prediction_df = pd.concat([prediction_df, pd.concat(predictions, ignore_index=True)]) | |
| # prediction_df.to_csv(f'outputs/{job_id}.csv') | |
| # # email_lock.unlink() | |
| # job_lock.unlink() | |
| # | |
| # msg = (f'Your DeepSEQcreen prediction job (id: {job_id}) completed successfully. You may retrieve the ' | |
| # f'results and generate an analytical report at {URL} using the job id within 48 hours.') | |
| # gr.Info(msg) | |
| # except Exception as e: | |
| # msg = (f'Your DeepSEQcreen prediction job (id: {job_id}) failed due to an error: "{str(e)}." You may ' | |
| # f'reach out to the author about the error through email ([email protected]).') | |
| # raise gr.Error(str(e)) | |
| # finally: | |
| # send_email(email, msg) | |
| # | |
| # # Run "predict" asynchronously | |
| # threading.Thread(target=run_predict).start() | |
| # | |
| # msg = (f'Your DeepSEQcreen prediction job (id: {job_id}) started running. You may retrieve the results ' | |
| # f'and generate an analytical report at {URL} using the job id once the job is done. Only one job ' | |
| # f'per user is allowed at the same time.') | |
| # send_email(email, msg) | |
| # # Return the job id first | |
| # return [ | |
| # gr.Blocks(visible=False), | |
| # gr.Markdown(f"Your prediction job is running... " | |
| # f"You may stay on this page or come back later to retrieve the results " | |
| # f"Once you receive our email notification."), | |
| # ] | |
| def update_df(file, progress=gr.Progress(track_tqdm=True)): | |
| global DF_FOR_REPORT | |
| if file is not None: | |
| df = pd.read_csv(file) | |
| if df['X1'].nunique() > 1: | |
| df['Scaffold SMILES'] = df['X1'].swifter.progress_bar( | |
| desc=f"Calculating scaffold...").apply(MurckoScaffold.MurckoScaffoldSmilesFromSmiles) | |
| # Add a new column with RDKit molecule objects | |
| if 'Compound' not in df.columns or df['Compound'].dtype != 'object': | |
| PandasTools.AddMoleculeColumnToFrame(df, smilesCol='X1', molCol='Compound', | |
| includeFingerprints=True) | |
| PandasTools.AddMoleculeColumnToFrame(df, smilesCol='Scaffold SMILES', molCol='Scaffold', | |
| includeFingerprints=True) | |
| DF_FOR_REPORT = df.copy() | |
| # pie_chart = None | |
| # value = None | |
| # if 'Y^' in DF_FOR_REPORT.columns: | |
| # value = 'Y^' | |
| # elif 'Y' in DF_FOR_REPORT.columns: | |
| # value = 'Y' | |
| # if value: | |
| # if DF_FOR_REPORT['X1'].nunique() > 1 >= DF_FOR_REPORT['X2'].nunique(): | |
| # pie_chart = create_pie_chart(DF_FOR_REPORT, category='Scaffold SMILES', value=value, top_k=100) | |
| # elif DF_FOR_REPORT['X2'].nunique() > 1 >= DF_FOR_REPORT['X1'].nunique(): | |
| # pie_chart = create_pie_chart(DF_FOR_REPORT, category='Target family', value=value, top_k=100) | |
| return create_html_report(DF_FOR_REPORT), df # pie_chart | |
| else: | |
| return gr.HTML(), gr.Dataframe() | |
| def create_html_report(df, file=None, progress=gr.Progress(track_tqdm=True)): | |
| df_html = df.copy() | |
| cols_left = ['ID1', 'ID2', 'Y', 'Y^', 'Compound', 'Scaffold', 'Scaffold SMILES', ] | |
| cols_right = ['X1', 'X2'] | |
| cols_left = [col for col in cols_left if col in df_html.columns] | |
| cols_right = [col for col in cols_right if col in df_html.columns] | |
| df_html = df_html[cols_left + (df_html.columns.drop(cols_left + cols_right).tolist()) + cols_right] | |
| df_html['X2'] = df_html['X2'].swifter.apply(wrap_text) | |
| df_html = df_html.sort_values( | |
| [col for col in ['Y', 'Y^', 'ID1', 'ID2', 'X1', 'X2'] if col in df.columns], ascending=False | |
| ).rename(columns=COLUMN_ALIASES) | |
| # PandasTools.RenderImagesInAllDataFrames(images=True) | |
| PandasTools.ChangeMoleculeRendering(df_html, renderer='image') | |
| # Return the DataFrame as HTML | |
| PandasTools.RenderImagesInAllDataFrames(images=True) | |
| if not file: | |
| styled_df = df_html.iloc[:51].style | |
| # styled_df = df.style.format("{:.2f}") | |
| colors = sns.color_palette('husl', len(df_html.columns)) | |
| for i, col in enumerate(df_html.columns): | |
| if pd.api.types.is_numeric_dtype(df_html[col]): | |
| styled_df = styled_df.background_gradient(subset=col, cmap=sns.light_palette(colors[i], as_cmap=True)) | |
| html = styled_df.to_html() | |
| return f'Report preview<div style="overflow:auto; height: 300px; font-family: Courier !important;">{html}</div>' | |
| else: | |
| import panel as pn | |
| from bokeh.resources import INLINE | |
| from bokeh.models import NumberFormatter, BooleanFormatter | |
| bokeh_formatters = { | |
| 'float': {'type': 'progress', 'legend': True}, | |
| 'bool': BooleanFormatter(), | |
| } | |
| # html = df.to_html(file) | |
| # return html | |
| pn.widgets.Tabulator(df_html, formatters=bokeh_formatters).save(file, resources=INLINE) | |
| # def create_pie_chart(df, category, value, top_k): | |
| # df.rename(COLUMN_ALIASES, inplace=True) | |
| # # Select the top_k records based on the value_col | |
| # top_k_df = df.nlargest(top_k, value) | |
| # | |
| # # Count the frequency of each unique value in the category_col column | |
| # category_counts = top_k_df[category].value_counts() | |
| # | |
| # # Convert the counts to a DataFrame | |
| # data = pd.DataFrame({category: category_counts.index, 'value': category_counts.values}) | |
| # | |
| # # Calculate the angle for each category | |
| # data['angle'] = data['value']/data['value'].sum() * 2*pi | |
| # | |
| # # Assign colors | |
| # data['color'] = Spectral11[0:len(category_counts)] | |
| # | |
| # # Create the plot | |
| # p = figure(height=350, title="Pie Chart", toolbar_location=None, | |
| # tools="hover", tooltips="@{}: @value".format(category), x_range=(-0.5, 1.0)) | |
| # | |
| # p.wedge(x=0, y=1, radius=0.4, | |
| # start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'), | |
| # line_color="white", fill_color='color', legend_field=category, source=data) | |
| # | |
| # p.axis.axis_label = None | |
| # p.axis.visible = False | |
| # p.grid.grid_line_color = None | |
| # | |
| # return p | |
| def create_pie_chart(df, category, value, top_k): | |
| df = df.copy() | |
| df.rename(COLUMN_ALIASES, inplace=True) | |
| value = COLUMN_ALIASES.get(value, value) | |
| # Select the top_k records based on the value_col | |
| top_k_df = df.nlargest(top_k, value) | |
| # Count the frequency of each unique value in the category_col column | |
| category_counts = top_k_df[category].value_counts() | |
| # Convert the counts to a DataFrame | |
| data = pd.DataFrame({category: category_counts.index, 'value': category_counts.values}) | |
| # Create the plot | |
| fig = px.pie(data, values='value', names=category, title=f'Top-{top_k} {category} in {value}') | |
| fig.update_traces(textposition='inside', textinfo='percent+label') | |
| return fig | |
| def submit_report(score_list, filter_list, progress=gr.Progress(track_tqdm=True)): | |
| df = DF_FOR_REPORT.copy() | |
| try: | |
| for filter_name in filter_list: | |
| df[filter_name] = df.swifter.progress_bar(desc=f"Calculating {filter_name}").apply( | |
| FILTER_MAP[filter_name], axis=1) | |
| for score_name in score_list: | |
| df[score_name] = df.swifter.progress_bar(desc=f"Calculating {score_name}").apply( | |
| SCORE_MAP[score_name], axis=1) | |
| # pie_chart = None | |
| # value = None | |
| # if 'Y^' in df.columns: | |
| # value = 'Y^' | |
| # elif 'Y' in df.columns: | |
| # value = 'Y' | |
| # | |
| # if value: | |
| # if df['X1'].nunique() > 1 >= df['X2'].nunique(): | |
| # pie_chart = create_pie_chart(df, category='Scaffold SMILES', value=value, top_k=100) | |
| # elif df['X2'].nunique() > 1 >= df['X1'].nunique(): | |
| # pie_chart = create_pie_chart(df, category='Target family', value=value, top_k=100) | |
| return create_html_report(df), df # pie_chart | |
| except Exception as e: | |
| raise gr.Error(str(e)) | |
| # def check_job_status(job_id): | |
| # job_lock = DATA_PATH / f"{job_id}.lock" | |
| # job_file = DATA_PATH / f"{job_id}.csv" | |
| # if job_lock.is_file(): | |
| # return {gr.Markdown(f"Your job ({job_id}) is still running... " | |
| # f"You may stay on this page or come back later to retrieve the results " | |
| # f"Once you receive our email notification."), | |
| # None, | |
| # None | |
| # } | |
| # elif job_file.is_file(): | |
| # return {gr.Markdown(f"Your job ({job_id}) is done! Redirecting you to generate reports..."), | |
| # gr.Tabs(selected=3), | |
| # gr.File(str(job_lock))} | |
| def wrap_text(text, line_length=60): | |
| wrapper = textwrap.TextWrapper(width=line_length) | |
| if text.startswith('>'): | |
| sections = text.split('>') | |
| wrapped_sections = [] | |
| for section in sections: | |
| if not section: | |
| continue | |
| lines = section.split('\n') | |
| seq_header = lines[0] | |
| wrapped_seq = wrapper.fill(''.join(lines[1:])) | |
| wrapped_sections.append(f">{seq_header}\n{wrapped_seq}") | |
| return '\n'.join(wrapped_sections) | |
| else: | |
| return wrapper.fill(text) | |
| def unwrap_text(text): | |
| return text.strip.replece('\n', '') | |
| def smiles_from_sdf(sdf_path): | |
| with Chem.SDMolSupplier(sdf_path) as suppl: | |
| return Chem.MolToSmiles(suppl[0]) | |
| def drug_library_from_sdf(sdf_path): | |
| return PandasTools.LoadSDF( | |
| sdf_path, | |
| smilesName='X1', molColName='Compound', includeFingerprints=True | |
| ) | |
| def process_target_library_upload(library_upload): | |
| if library_upload.endswith('.csv'): | |
| identify_df = pd.read_csv(library_upload) | |
| elif library_upload.endswith('.fasta'): | |
| identify_df = target_library_from_fasta(library_upload) | |
| else: | |
| raise gr.Error('Currently only CSV and FASTA files are supported as target libraries.') | |
| validate_columns(identify_df, ['X2']) | |
| return library_upload | |
| def process_drug_library_upload(library_upload): | |
| if library_upload.endswith('.csv'): | |
| screen_df = pd.read_csv(library_upload) | |
| elif library_upload.endswith('.sdf'): | |
| screen_df = drug_library_from_sdf(library_upload) | |
| else: | |
| raise gr.Error('Currently only CSV and SDF files are supported as compound libraries.') | |
| validate_columns(screen_df, ['X1']) | |
| return library_upload | |
| def target_library_from_fasta(fasta_path): | |
| records = list(SeqIO.parse(fasta_path, "fasta")) | |
| id2 = [record.id for record in records] | |
| seq = [str(record.seq) for record in records] | |
| df = pd.DataFrame({'ID2': id2, 'X2': seq}) | |
| return df | |
| theme = gr.themes.Base(spacing_size="sm", text_size='md').set( | |
| background_fill_primary='#dfe6f0', | |
| background_fill_secondary='#dfe6f0', | |
| checkbox_label_background_fill='#dfe6f0', | |
| checkbox_label_background_fill_hover='#dfe6f0', | |
| checkbox_background_color='white', | |
| checkbox_border_color='#4372c4', | |
| border_color_primary='#4372c4', | |
| border_color_accent='#4372c4', | |
| button_primary_background_fill='#4372c4', | |
| button_primary_text_color='white', | |
| button_secondary_border_color='#4372c4', | |
| body_text_color='#4372c4', | |
| block_title_text_color='#4372c4', | |
| block_label_text_color='#4372c4', | |
| block_info_text_color='#505358', | |
| block_border_color=None, | |
| input_border_color='#4372c4', | |
| panel_border_color='#4372c4', | |
| input_background_fill='white', | |
| code_background_fill='white', | |
| ) | |
| with (gr.Blocks(theme=theme, title='DeepScreen', css=CSS) as demo): | |
| run_state = gr.State(value=False) | |
| screen_flag = gr.State(value=False) | |
| identify_flag = gr.State(value=False) | |
| infer_flag = gr.State(value=False) | |
| with gr.Tabs() as tabs: | |
| with gr.TabItem(label='Drug hit screening', id=0): | |
| gr.Markdown(''' | |
| # <center>DeepSEQreen Drug Hit Screening</center> | |
| <center> | |
| To predict interactions/binding affinities of a single target against a library of compounds. | |
| </center> | |
| ''') | |
| with gr.Blocks() as screen_block: | |
| with gr.Column() as screen_page: | |
| with gr.Row(): | |
| with gr.Column(): | |
| HelpTip( | |
| "Target amino acid sequence in the FASTA format. Alternatively, you may use a " | |
| "UniProt ID/accession to query UniProt database for the sequence of your " | |
| "target of interest. If the input FASTA contains multiple entities, " | |
| "only the first one will be used." | |
| ) | |
| with gr.Row(): | |
| target_input_type = gr.Dropdown( | |
| label='Target Input Type', | |
| choices=['Sequence', 'UniProt ID', 'Gene symbol'], | |
| info='Enter (paste) a FASTA string below manually or upload a FASTA file.', | |
| value='Sequence', | |
| scale=3, interactive=True | |
| ) | |
| target_id = gr.Textbox(show_label=False, visible=False, | |
| interactive=True, scale=4, | |
| info='Query a sequence on UniProt with a UniProt ID.') | |
| target_gene = gr.Textbox( | |
| show_label=False, visible=False, | |
| interactive=True, scale=4, | |
| info='Query a sequence on UniProt with a gene symbol.') | |
| target_organism = gr.Textbox( | |
| info='Organism common name or scientific name (default: Human).', | |
| placeholder='Human', show_label=False, | |
| visible=False, interactive=True, scale=4, ) | |
| with gr.Column(): | |
| HelpTip( | |
| "Identify the protein family by conducting sequence alignment. " | |
| "You may select General if you find the alignment score unsatisfactory." | |
| ) | |
| drug_screen_target_family = gr.Dropdown( | |
| choices=list(TARGET_FAMILY_MAP.keys()), | |
| value='General', | |
| label='Select Input Protein Family (Optional)', interactive=True) | |
| # with gr.Column(scale=1, min_width=24): | |
| with gr.Row(): | |
| with gr.Column(): | |
| target_upload_btn = gr.UploadButton(label='Upload a FASTA file', type='binary', | |
| visible=True, variant='primary', | |
| size='lg') | |
| target_query_btn = gr.Button(value='Query the sequence', variant='primary', | |
| visible=False) | |
| target_family_detect_btn = gr.Button(value='Auto-detect', variant='primary') | |
| target_fasta = gr.Code(label='Input or Display FASTA', interactive=True, lines=5) | |
| example_fasta = gr.Button(value='Example: Human MAPK14', elem_id='example') | |
| with gr.Row(): | |
| with gr.Column(): | |
| drug_library = gr.Dropdown(label='Select a Compound Library', | |
| choices=list(DRUG_LIBRARY_MAP.keys())) | |
| with gr.Row(): | |
| gr.File(label='Example SDF compound library', | |
| value='data/examples/compound_library.sdf', interactive=False) | |
| gr.File(label='Example CSV compound library', | |
| value='data/examples/compound_library.csv', interactive=False) | |
| drug_library_upload_btn = gr.UploadButton( | |
| label='Upload a custom library', variant='primary') | |
| drug_library_upload = gr.File(label='Custom compound library file', visible=False) | |
| drug_screen_task = gr.Dropdown(list(TASK_MAP.keys()), label='Select a Prediction Task', | |
| value='Compound-protein interaction') | |
| with gr.Column(): | |
| HelpTip("We recommend the appropriate model for your use case based on model performance " | |
| "in drug-target interaction or binding affinity prediction. " | |
| "The models were benchmarked on different target families " | |
| "and real-world data scenarios.") | |
| drug_screen_preset = gr.Dropdown(list(PRESET_MAP.keys()), label='Select a Preset Model') | |
| screen_preset_recommend_btn = gr.Button(value='Recommend a model', variant='primary') | |
| # drug_screen_email = gr.Textbox( | |
| # label='Email (optional)', | |
| # info="Your email will be used to send you notifications when your job finishes." | |
| # ) | |
| with gr.Row(visible=True): | |
| # drug_screen_clr_btn = gr.ClearButton(size='lg') | |
| drug_screen_btn = gr.Button(value='SCREEN', variant='primary', size='lg') | |
| # TODO Modify the pd df directly with df['X2'] = target | |
| screen_data_for_predict = gr.File(visible=False, file_count="single", type='filepath') | |
| screen_waiting = gr.Markdown(""" | |
| <center>Your job is running... It might take a few minutes. | |
| When it's done, you will be redirected to the report page. | |
| Meanwhile, please leave the page on.</center> | |
| """, visible=False) | |
| with gr.TabItem(label='Target protein identification', id=1): | |
| gr.Markdown(''' | |
| # <center>DeepSEQreen Target Protein Identification</center> | |
| <center> | |
| To predict interactions/binding affinities of a single compound against a library of protein targets. | |
| </center> | |
| ℹ️ A custom target library can be a FASTA file with a single or multiple amino acid sequences, | |
| or a CSV file has a required FASTA string column and optionally an ID column: | |
| <b>X2</b>: the FASTA sequence of a target\n | |
| <b>ID2</b> (optional): the ID (PubChem or any arbitrary unique identifier) of a compound\n | |
| Example CSV target library: | |
| | X2 | ID2 | | |
| |---------------|--------| | |
| | MVQKSRNGGV... | O88943 | | |
| | MTSPSSSPVF... | Q9Y5S1 | | |
| ''') | |
| with gr.Blocks() as identify_block: | |
| with gr.Column() as identify_page: | |
| with gr.Row(): | |
| with gr.Column(): | |
| HelpTip( | |
| """Compound molecule in the SMILES format. You may input the SMILES string directly, | |
| upload an SMI file, or upload an SDF file to convert to SMILES. Alternatively, | |
| you may search on databases like NCBI PubChem, ChEMBL, and DrugBank for the SMILES | |
| representing your drug of interest. | |
| """ | |
| ) | |
| compound_type = gr.Dropdown( | |
| label='Compound Input Type', | |
| choices=['SMILES', 'SDF'], | |
| info='Enter (paste) an SMILES string or upload an SMI file.', | |
| value='SMILES', | |
| interactive=True) | |
| compound_upload_btn = gr.UploadButton(label='Upload', variant='primary', type='binary') | |
| target_identify_target_family = gr.Dropdown(choices=['General'], value='General', | |
| label='Target Protein Family') | |
| compound_smiles = gr.Code(label='Input or Display Compound SMILES', interactive=True, lines=5) | |
| example_drug = gr.Button(value='Example: Aspirin', elem_id='example') | |
| with gr.Row(): | |
| with gr.Column(): | |
| target_library = gr.Dropdown(label='Select a Target Library', | |
| choices=list(TARGET_LIBRARY_MAP.keys())) | |
| with gr.Row(): | |
| gr.File(label='Example FASTA target library', | |
| value='data/examples/target_library.fasta', interactive=False) | |
| gr.File(label='Example CSV target library', | |
| value='data/examples/target_library.csv', interactive=False) | |
| target_library_upload_btn = gr.UploadButton( | |
| label='Upload a custom library', variant='primary') | |
| target_library_upload = gr.File(label='Custom target library file', visible=False) | |
| target_identify_task = gr.Dropdown(list(TASK_MAP.keys()), label='Select a Prediction Task', | |
| value='Compound-protein interaction') | |
| with gr.Column(): | |
| HelpTip("We recommend the appropriate model for your use case based on model performance " | |
| "in drug-target interaction or binding affinity prediction. " | |
| "The models were benchmarked on different target families " | |
| "and real-world data scenarios.") | |
| target_identify_preset = gr.Dropdown(list(PRESET_MAP.keys()), label='Preset') | |
| identify_preset_recommend_btn = gr.Button(value='Recommend a model', variant='primary') | |
| # with gr.Row(): | |
| # target_identify_email = gr.Textbox( | |
| # label='Email (optional)', | |
| # info="Your email will be used to send you notifications when your job finishes." | |
| # ) | |
| with gr.Row(visible=True): | |
| # target_identify_clr_btn = gr.ClearButton(size='lg') | |
| target_identify_btn = gr.Button(value='IDENTIFY', variant='primary', size='lg') | |
| identify_data_for_predict = gr.File(visible=False, file_count="single", type='filepath') | |
| identify_waiting = gr.Markdown(f"Your job is running... It might take a few minutes." | |
| f"When it's done, you will be redirected to the report page. " | |
| f"Meanwhile, please leave the page on.", | |
| visible=False) | |
| with gr.TabItem(label='Interaction pair inference', id=2): | |
| gr.Markdown(''' | |
| # <center>DeepSEQreen Interaction Pair Inference</center> | |
| <center>To predict interactions/binding affinities between any compound-protein pairs.</center> | |
| ℹ️ A custom interaction pair dataset can be generated from a FASTA file containing multiple sequences | |
| and a SDF file containing multiple compounds (for predicting CPI/CPA of all possible combinations of | |
| compound-protein pairs), or a CSV file with 2 required string columns and optionally 2 ID columns: | |
| <b>X1</b>: the SMILES string of a compound\n | |
| <b>X2</b>: the FASTA sequence of a target\n | |
| <b>ID1</b> (optional): the ID (PubChem or any arbitrary unique identifier) of a compound\n | |
| <b>ID2</b> (optional): the ID (UniProt or any arbitrary unique identifier) of a protein | |
| Example CSV interaction pair dataset: | |
| | X1 | X2 | ID1 | ID2 | | |
| |---------------------------------------- |---------------|--------------|--------| | |
| | CCOC(=O)Nc1ccc(NCc2ccc(F)cc2)cc1N | MVQKSRNGGV... | CHEMBL41355 | O88943 | | |
| | CCCCCc1cc(O)c(C/C=C(\C)CCC=C(C)C)c(O)c1 | MTSPSSSPVF... | CHEMBL497318 | Q9Y5S1 | | |
| ''') | |
| with gr.Blocks() as infer_block: | |
| with gr.Column() as infer_page: | |
| infer_type = gr.Dropdown(choices=['Upload a compound library and a target library', | |
| 'Upload a CSV interaction pair dataset'], | |
| value='Upload a compound library and a target library') | |
| with gr.Column() as pair_upload: | |
| gr.File(label="Example custom dataset", | |
| value="data/examples/interaction_pair_inference.csv", | |
| interactive=False) | |
| with gr.Column(): | |
| infer_data_for_predict = gr.File( | |
| label='Upload a custom dataset', file_count="single", type='filepath', visible=True) | |
| with gr.Column() as pair_generate: | |
| with gr.Row(): | |
| gr.File(label='Example SDF compound library', | |
| value='data/examples/compound_library.sdf', interactive=False) | |
| gr.File(label='Example FASTA target library', | |
| value='data/examples/target_library.fasta', interactive=False) | |
| with gr.Row(): | |
| gr.File(label='Example CSV compound library', | |
| value='data/examples/compound_library.csv', interactive=False) | |
| gr.File(label='Example CSV target library', | |
| value='data/examples/target_library.csv', interactive=False) | |
| with gr.Row(): | |
| infer_drug = gr.File(label='SDF/CSV file containing multiple compounds', | |
| file_count="single", type='filepath') | |
| infer_target = gr.File(label='FASTA/CSV file containing multiple targets', | |
| file_count="single", type='filepath') | |
| with gr.Row(visible=True): | |
| pair_infer_task = gr.Dropdown(list(TASK_MAP.keys()), label='Task') | |
| pair_infer_preset = gr.Dropdown(list(PRESET_MAP.keys()), label='Preset') | |
| pair_infer_target_family = gr.Dropdown(choices=['General'], | |
| label='Target family', | |
| value='General') | |
| # with gr.Row(): | |
| # pair_infer_email = gr.Textbox( | |
| # label='Email (optional)', | |
| # info="Your email will be used to send you notifications when your job finishes." | |
| # ) | |
| with gr.Row(visible=True): | |
| # pair_infer_clr_btn = gr.ClearButton(size='lg') | |
| pair_infer_btn = gr.Button(value='INFER', variant='primary', size='lg') | |
| infer_waiting = gr.Markdown(f"Your job is running... It might take a few minutes." | |
| f"When it's done, you will be redirected to the report page. " | |
| f"Meanwhile, please leave the page on.", | |
| visible=False) | |
| with gr.TabItem(label='Chemical property report', id=3): | |
| with gr.Blocks() as report: | |
| gr.Markdown(''' | |
| # <center>DeepSEQreen Chemical Property Report</center> | |
| <center> | |
| To compute chemical properties for the predictions of drug hit screening, | |
| target protein identification, and interaction pair inference. | |
| You may also upload | |
| your own dataset. The page shows only a preview report displaying at most 30 records | |
| (with top predicted CPI/CPA if reporting results from a prediction job). For a full report, please | |
| generate and download a raw data CSV or interactive table HTML file below. | |
| </center> | |
| ''') | |
| with gr.Row(): | |
| file_for_report = gr.File(interactive=True, type='filepath') | |
| df_raw = gr.Dataframe(type="pandas", interactive=False, visible=False) | |
| scores = gr.CheckboxGroup(list(SCORE_MAP.keys()), label='Scores') | |
| filters = gr.CheckboxGroup(list(FILTER_MAP.keys()), label='Filters') | |
| with gr.Row(): | |
| # clear_btn = gr.ClearButton(size='lg') | |
| analyze_btn = gr.Button('REPORT', variant='primary', size='lg') | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| html_report = gr.HTML() # label='Results', visible=True) | |
| ranking_pie_chart = gr.Plot(visible=False) | |
| with gr.Row(): | |
| with gr.Column(): | |
| csv_generate = gr.Button(value='Generate raw data (CSV)', interactive=True) | |
| csv_download_file = gr.File(label='Download raw data (CSV)', visible=False) | |
| with gr.Column(): | |
| html_generate = gr.Button(value='Generate report (HTML)', interactive=True) | |
| html_download_file = gr.File(label='Download report (HTML)', visible=False) | |
| def target_input_type_select(input_type): | |
| match input_type: | |
| case 'UniProt ID': | |
| return [gr.Dropdown(info=''), | |
| gr.UploadButton(visible=False), | |
| gr.Textbox(visible=True, value=''), | |
| gr.Textbox(visible=False, value=''), | |
| gr.Textbox(visible=False, value=''), | |
| gr.Button(visible=True), | |
| gr.Code(value='')] | |
| case 'Gene symbol': | |
| return [gr.Dropdown(info=''), | |
| gr.UploadButton(visible=False), | |
| gr.Textbox(visible=False, value=''), | |
| gr.Textbox(visible=True, value=''), | |
| gr.Textbox(visible=True, value=''), | |
| gr.Button(visible=True), | |
| gr.Code(value='')] | |
| case 'Sequence': | |
| return [gr.Dropdown(info='Enter (paste) a FASTA string below manually or upload a FASTA file.'), | |
| gr.UploadButton(visible=True), | |
| gr.Textbox(visible=False, value=''), | |
| gr.Textbox(visible=False, value=''), | |
| gr.Textbox(visible=False, value=''), | |
| gr.Button(visible=False), | |
| gr.Code(value='')] | |
| target_input_type.select( | |
| fn=target_input_type_select, | |
| inputs=target_input_type, | |
| outputs=[ | |
| target_input_type, target_upload_btn, | |
| target_id, target_gene, target_organism, target_query_btn, | |
| target_fasta | |
| ], | |
| show_progress=False | |
| ) | |
| def uniprot_query(input_type, uid, gene, organism='Human'): | |
| fasta_seq = '' | |
| match input_type: | |
| case 'UniProt ID': | |
| query = f"{uid.strip()}.fasta" | |
| case 'Gene symbol': | |
| organism = organism if organism else 'Human' | |
| query = f'search?query=organism_name:{organism.strip()}+AND+gene:{gene.strip()}&format=fasta' | |
| try: | |
| fasta = SESSION.get(UNIPROT_ENDPOINT.format(query=query)) | |
| fasta.raise_for_status() | |
| fasta_seq = fasta.text | |
| except Exception as e: | |
| raise gr.Warning(f"Failed to query FASTA from UniProt database due to {str(e)}") | |
| finally: | |
| return fasta_seq | |
| target_upload_btn.upload(fn=lambda x: x.decode(), inputs=target_upload_btn, outputs=target_fasta) | |
| target_query_btn.click(uniprot_query, | |
| inputs=[target_input_type, target_id, target_gene, target_organism], | |
| outputs=target_fasta) | |
| def target_family_detect(fasta, progress=gr.Progress(track_tqdm=True)): | |
| aligner = PairwiseAligner(scoring='blastp', mode='local') | |
| alignment_df = pd.read_csv('data/target_libraries/ChEMBL33_all_spe_single_prot_info.csv') | |
| def align_score(query): | |
| return aligner.align(process_target_fasta(fasta), query).score | |
| alignment_df['score'] = alignment_df['X2'].swifter.progress_bar( | |
| desc="Detecting protein family of the target...").apply(align_score) | |
| row = alignment_df.loc[alignment_df['score'].idxmax()] | |
| return gr.Dropdown(value=row['protein_family'].capitalize(), | |
| info=f"Reason: Best BLASTP score ({row['score']}) " | |
| f"with {row['ID2']} from family {row['protein_family']}") | |
| target_family_detect_btn.click(fn=target_family_detect, inputs=target_fasta, outputs=drug_screen_target_family) | |
| target_fasta.focus(fn=wrap_text, inputs=target_fasta, outputs=target_fasta, show_progress=False) | |
| target_fasta.blur(fn=wrap_text, inputs=target_fasta, outputs=target_fasta, show_progress=False) | |
| drug_library_upload_btn.upload(fn=lambda x: [ | |
| x.name, gr.Dropdown(value=Path(x.name).name, choices=list(DRUG_LIBRARY_MAP.keys()) + [Path(x.name).name]) | |
| ], inputs=drug_library_upload_btn, outputs=[drug_library_upload, drug_library]) | |
| def example_fill(input_type): | |
| return {target_id: 'Q16539', | |
| target_gene: 'MAPK14', | |
| target_organism: 'Human', | |
| target_fasta: """ | |
| >sp|Q16539|MK14_HUMAN Mitogen-activated protein kinase 14 OS=Homo sapiens OX=9606 GN=MAPK14 PE=1 SV=3 | |
| MSQERPTFYRQELNKTIWEVPERYQNLSPVGSGAYGSVCAAFDTKTGLRVAVKKLSRPFQ | |
| SIIHAKRTYRELRLLKHMKHENVIGLLDVFTPARSLEEFNDVYLVTHLMGADLNNIVKCQ | |
| KLTDDHVQFLIYQILRGLKYIHSADIIHRDLKPSNLAVNEDCELKILDFGLARHTDDEMT | |
| GYVATRWYRAPEIMLNWMHYNQTVDIWSVGCIMAELLTGRTLFPGTDHIDQLKLILRLVG | |
| TPGAELLKKISSESARNYIQSLTQMPKMNFANVFIGANPLAVDLLEKMLVLDSDKRITAA | |
| QALAHAYFAQYHDPDDEPVADPYDQSFESRDLLIDEWKSLTYDEVISFVPPPLDQEEMES | |
| """} | |
| example_fasta.click(fn=example_fill, inputs=target_input_type, | |
| outputs=[target_id, target_gene, target_organism, target_fasta], show_progress=False) | |
| def screen_recommend_model(fasta, family, task): | |
| task = TASK_MAP[task] | |
| if task == 'CPI': | |
| train = pd.read_csv('data/benchmarks/all_families_reduced_dti_train.csv') | |
| score = 'AUROC' | |
| elif task == 'CPA': | |
| train = pd.read_csv('data/benchmarks/all_families_reduced_dta_train.csv') | |
| score = 'CI' | |
| if fasta not in train['X2']: | |
| scenario = "Unseen target" | |
| else: | |
| scenario = "Seen target" | |
| benchmark_df = pd.read_csv('data/benchmarks/compound_screen.csv') | |
| if family == 'General': | |
| filtered_df = benchmark_df[(benchmark_df[f'Task'] == task) | |
| & (benchmark_df['Target.family'] == 'All families reduced') | |
| & (benchmark_df['Scenario'] == 'Random split') | |
| & (benchmark_df['all'] == True)] | |
| else: | |
| filtered_df = benchmark_df[(benchmark_df['Task'] == task) | |
| & (benchmark_df['Target.family'] == family) | |
| & (benchmark_df['Scenario'] == scenario) | |
| & (benchmark_df['all'] == False)] | |
| row = filtered_df.loc[filtered_df[score].idxmax()] | |
| return gr.Dropdown(value=row['preset'], | |
| info=f"Reason: {scenario} in the training dataset; we recommend the model " | |
| f"with the best {score} ({float(row[score]):.3f}) " | |
| f"in the {scenario.lower()} scenario on {family.lower()} family.") | |
| screen_preset_recommend_btn.click(fn=screen_recommend_model, | |
| inputs=[target_fasta, drug_screen_target_family, drug_screen_task], | |
| outputs=drug_screen_preset) | |
| def compound_input_type_select(input_type): | |
| match input_type: | |
| case 'SMILES': | |
| return gr.Dropdown(info='Input an SMILES string or upload an SMI file') | |
| case 'SDF': | |
| return gr.Dropdown(info='Convert the first molecule in an SDF file to SMILES') | |
| compound_type.select(fn=compound_input_type_select, | |
| inputs=compound_type, outputs=compound_type, show_progress=False) | |
| def compound_upload_process(input_type, input_upload): | |
| match input_type: | |
| case 'SMILES': | |
| return input_upload.decode() | |
| case 'SDF': | |
| suppl = Chem.ForwardSDMolSupplier(io.BytesIO(input_upload)) | |
| return Chem.MolToSmiles(next(suppl)) | |
| compound_upload_btn.upload(fn=compound_upload_process, | |
| inputs=[compound_type, compound_upload_btn], | |
| outputs=compound_smiles) | |
| example_drug.click(fn=lambda: 'CC(=O)Oc1ccccc1C(=O)O', outputs=compound_smiles, show_progress=False) | |
| target_library_upload_btn.upload(fn=lambda x: [ | |
| x.name, gr.Dropdown(value=Path(x.name).name, choices=list(TARGET_LIBRARY_MAP.keys()) + [Path(x.name).name]) | |
| ], inputs=target_library_upload_btn, outputs=[target_library_upload, target_library]) | |
| def identify_recommend_model(smiles, task): | |
| task = TASK_MAP[task] | |
| if task == 'CPI': | |
| train = pd.read_csv('data/benchmarks/all_families_reduced_dti_train.csv') | |
| score = 'AUROC' | |
| elif task == 'CPA': | |
| train = pd.read_csv('data/benchmarks/all_families_reduced_dta_train.csv') | |
| score = 'CI' | |
| if smiles not in train['X1']: | |
| scenario = "Unseen drug" | |
| else: | |
| scenario = "Seen drug" | |
| benchmark_df = pd.read_csv('data/benchmarks/target_identification.csv') | |
| filtered_df = benchmark_df[(benchmark_df['Task'] == task) | |
| & (benchmark_df['Scenario'] == scenario)] | |
| row = filtered_df.loc[filtered_df[score].idxmax()] | |
| return gr.Dropdown(value=row['preset'], | |
| info=f"Reason: {scenario} in the training dataset; choosing the model " | |
| f"with the best {score} ({float(row[score]):3f}) " | |
| f"in the {scenario.lower()} scenario.") | |
| identify_preset_recommend_btn.click(fn=identify_recommend_model, | |
| inputs=[compound_smiles, target_identify_task], | |
| outputs=target_identify_preset) | |
| def infer_type_change(upload_type): | |
| match upload_type: | |
| case "Upload a compound library and a target library": | |
| return { | |
| pair_upload: gr.Column(visible=False), | |
| pair_generate: gr.Column(visible=True), | |
| infer_data_for_predict: None, | |
| infer_drug: None, | |
| infer_target: None | |
| } | |
| match upload_type: | |
| case "Upload a CSV interaction pair dataset": | |
| return { | |
| pair_upload: gr.Column(visible=True), | |
| pair_generate: gr.Column(visible=False), | |
| infer_data_for_predict: None, | |
| infer_drug: None, | |
| infer_target: None | |
| } | |
| infer_type.select(fn=infer_type_change, inputs=infer_type, | |
| outputs=[pair_upload, pair_generate, infer_data_for_predict, infer_drug, infer_target]) | |
| def drug_screen_validate(fasta, library, library_upload, state, progress=gr.Progress(track_tqdm=True)): | |
| if not state: | |
| try: | |
| fasta = process_target_fasta(fasta) | |
| err = validate_seq_str(fasta, FASTA_PAT) | |
| if err: | |
| raise ValueError(f'Found error(s) in your target fasta input: {err}') | |
| if library in DRUG_LIBRARY_MAP.keys(): | |
| screen_df = pd.read_csv(Path('data/drug_libraries', DRUG_LIBRARY_MAP[library])) | |
| else: | |
| screen_df = process_drug_library_upload(library_upload) | |
| if len(screen_df) >= CUSTOM_DATASET_MAX_LEN: | |
| raise gr.Error(f'The uploaded compound library has more records ' | |
| f'than the allowed maximum (CUSTOM_DATASET_MAX_LEN).') | |
| screen_df['X2'] = fasta | |
| job_id = uuid4() | |
| temp_file = Path(f'temp/{job_id}_input.csv').resolve() | |
| screen_df.to_csv(temp_file, index=False) | |
| if temp_file.is_file(): | |
| return {screen_data_for_predict: str(temp_file), | |
| screen_flag: job_id, | |
| run_state: job_id} | |
| else: | |
| raise SystemError('Failed to create temporary files. Please try again later.') | |
| except Exception as e: | |
| gr.Warning(f'Failed to submit the job due to error: {str(e)}') | |
| return {screen_flag: False, | |
| run_state: False} | |
| else: | |
| gr.Warning('You have another prediction job ' | |
| '(drug hit screening, target protein identification, or interation pair inference) ' | |
| 'running in the session right now. ' | |
| 'Please submit another job when your current job has finished.') | |
| return {screen_flag: False, | |
| run_state: state} | |
| def target_identify_validate(smiles, library, library_upload, state, progress=gr.Progress(track_tqdm=True)): | |
| if not state: | |
| try: | |
| smiles = smiles.strip() | |
| err = validate_seq_str(smiles, SMILES_PAT) | |
| if err: | |
| raise ValueError(f'Found error(s) in your target fasta input: {err}') | |
| if library in TARGET_LIBRARY_MAP.keys(): | |
| identify_df = pd.read_csv(Path('data/target_libraries', TARGET_LIBRARY_MAP[library])) | |
| else: | |
| identify_df = process_target_library_upload(library_upload) | |
| if len(identify_df) >= CUSTOM_DATASET_MAX_LEN: | |
| raise gr.Error(f'The uploaded target library has more records ' | |
| f'than the allowed maximum (CUSTOM_DATASET_MAX_LEN).') | |
| identify_df['X1'] = smiles | |
| job_id = uuid4() | |
| temp_file = Path(f'temp/{job_id}_input.csv').resolve() | |
| identify_df.to_csv(temp_file, index=False) | |
| if temp_file.is_file(): | |
| return {identify_data_for_predict: str(temp_file), | |
| identify_flag: job_id, | |
| run_state: job_id} | |
| else: | |
| raise SystemError('Failed to create temporary files. Please try again later.') | |
| except Exception as e: | |
| gr.Warning(f'Failed to submit the job due to error: {str(e)}') | |
| return {identify_flag: False, | |
| run_state: False} | |
| else: | |
| gr.Warning('You have another prediction job ' | |
| '(drug hit screening, target protein identification, or interation pair inference) ' | |
| 'running in the session right now. ' | |
| 'Please submit another job when your current job has finished.') | |
| return {identify_flag: False, | |
| run_state: state} | |
| # return {identify_flag: False} | |
| def pair_infer_validate(drug_target_pair_upload, drug_upload, target_upload, state, | |
| progress=gr.Progress(track_tqdm=True)): | |
| if not state: | |
| try: | |
| job_id = uuid4() | |
| if drug_target_pair_upload: | |
| infer_df = pd.read_csv(drug_target_pair_upload) | |
| validate_columns(infer_df, ['X1', 'X2']) | |
| infer_df['X1_ERR'] = infer_df['X1'].swifter.progress_bar(desc="Validating SMILES...").apply( | |
| validate_seq_str, regex=SMILES_PAT) | |
| if not infer_df['X1_ERR'].isna().all(): | |
| raise ValueError( | |
| f"Encountered invalid SMILES:\n{infer_df[~infer_df['X1_ERR'].isna()][['X1', 'X1_ERR']]}") | |
| infer_df['X2_ERR'] = infer_df['X2'].swifter.progress_bar(desc="Validating FASTA...").apply( | |
| validate_seq_str, regex=FASTA_PAT) | |
| if not infer_df['X2_ERR'].isna().all(): | |
| raise ValueError( | |
| f"Encountered invalid FASTA:\n{infer_df[~infer_df['X2_ERR'].isna()][['X2', 'X2_ERR']]}") | |
| return {infer_data_for_predict: str(drug_target_pair_upload), | |
| infer_flag: job_id, | |
| run_state: job_id} | |
| elif drug_upload and target_upload: | |
| drug_df = process_drug_library_upload(drug_upload) | |
| target_df = process_target_library_upload(target_upload) | |
| drug_df.drop_duplicates(subset=['X1'], inplace=True) | |
| target_df.drop_duplicates(subset=['X2'], inplace=True) | |
| infer_df = pd.DataFrame(list(itertools.product(drug_df['X1'], target_df['X2'])), | |
| columns=['X1', 'X2']) | |
| infer_df = infer_df.merge(drug_df, on='X1').merge(target_df, on='X2') | |
| temp_file = Path(f'temp/{job_id}_input.csv').resolve() | |
| infer_df.to_csv(temp_file, index=False) | |
| if temp_file.is_file(): | |
| return {infer_data_for_predict: str(temp_file), | |
| infer_flag: job_id, | |
| run_state: job_id} | |
| else: | |
| raise gr.Error('Should upload a compound-protein pair dataset,or ' | |
| 'upload both a compound library and a target library.') | |
| if len(infer_df) >= CUSTOM_DATASET_MAX_LEN: | |
| raise gr.Error(f'The uploaded/generated compound-protein pair dataset has more records ' | |
| f'than the allowed maximum (CUSTOM_DATASET_MAX_LEN).') | |
| except Exception as e: | |
| gr.Warning(f'Failed to submit the job due to error: {str(e)}') | |
| return {infer_flag: False, | |
| run_state: False} | |
| else: | |
| gr.Warning('You have another prediction job ' | |
| '(drug hit screening, target protein identification, or interation pair inference) ' | |
| 'running in the session right now. ' | |
| 'Please submit another job when your current job has finished.') | |
| return {infer_flag: False, | |
| run_state: state} | |
| drug_screen_btn.click( | |
| fn=drug_screen_validate, | |
| inputs=[target_fasta, drug_library, drug_library_upload, run_state], # , drug_screen_email], | |
| outputs=[screen_data_for_predict, screen_flag, run_state] | |
| ).then( | |
| fn=lambda: [gr.Column(visible=False), gr.Markdown(visible=True)], | |
| outputs=[screen_page, screen_waiting] | |
| ).then( | |
| fn=submit_predict, | |
| inputs=[screen_data_for_predict, drug_screen_task, drug_screen_preset, | |
| drug_screen_target_family, screen_flag], # , drug_screen_email], | |
| outputs=[file_for_report, run_state] | |
| ).then( | |
| fn=lambda: [gr.Column(visible=True), gr.Markdown(visible=False), gr.Tabs(selected=3)], | |
| outputs=[screen_page, screen_waiting, tabs] | |
| ) | |
| target_identify_btn.click( | |
| fn=target_identify_validate, | |
| inputs=[compound_smiles, target_library, target_library_upload, run_state], # , drug_screen_email], | |
| outputs=[identify_data_for_predict, identify_flag, run_state] | |
| ).then( | |
| fn=lambda: [gr.Column(visible=False), gr.Markdown(visible=True), gr.Tabs(selected=3)], | |
| outputs=[identify_page, identify_waiting, tabs] | |
| ).then( | |
| fn=submit_predict, | |
| inputs=[identify_data_for_predict, target_identify_task, target_identify_preset, | |
| target_identify_target_family, identify_flag], # , target_identify_email], | |
| outputs=[file_for_report, run_state] | |
| ).then( | |
| fn=lambda: [gr.Column(visible=True), gr.Markdown(visible=False), gr.Tabs(selected=3)], | |
| outputs=[identify_page, identify_waiting, tabs] | |
| ) | |
| pair_infer_btn.click( | |
| fn=pair_infer_validate, | |
| inputs=[infer_data_for_predict, infer_drug, infer_target, run_state], # , drug_screen_email], | |
| outputs=[infer_data_for_predict, infer_flag, run_state] | |
| ).then( | |
| fn=lambda: [gr.Column(visible=False), gr.Markdown(visible=True)], | |
| outputs=[infer_page, infer_waiting] | |
| ).then( | |
| fn=submit_predict, | |
| inputs=[infer_data_for_predict, pair_infer_task, pair_infer_preset, | |
| pair_infer_target_family, infer_flag], # , pair_infer_email], | |
| outputs=[file_for_report, run_state] | |
| ).then( | |
| fn=lambda: [gr.Column(visible=True), gr.Markdown(visible=False)], | |
| outputs=[infer_page, infer_waiting] | |
| ) | |
| # TODO background job from these 3 pipelines to update file_for_report | |
| file_for_report.change(fn=update_df, inputs=file_for_report, outputs=[ | |
| html_report, | |
| df_raw, | |
| # ranking_pie_chart | |
| ]) | |
| analyze_btn.click(fn=submit_report, inputs=[scores, filters], outputs=[ | |
| html_report, | |
| df_raw, | |
| # ranking_pie_chart | |
| ]) | |
| def create_csv_raw_file(df, file_report): | |
| from datetime import datetime | |
| now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| filename = f"reports/{Path(file_report.name).stem}_DeepSEQreen_report_{now}.csv" | |
| df.drop(['Compound', 'Scaffold']).to_csv(filename, index=False) | |
| return gr.File(filename, visible=True) | |
| def create_html_report_file(df, file_report): | |
| from datetime import datetime | |
| now = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| filename = f"reports/{Path(file_report.name).stem}_DeepSEQreen_report_{now}.html" | |
| create_html_report(df, filename) | |
| return gr.File(filename, visible=True) | |
| csv_generate.click(fn=create_csv_raw_file, inputs=[df_raw, file_for_report], outputs=csv_download_file) | |
| html_generate.click(fn=create_html_report_file, inputs=[df_raw, file_for_report], outputs=html_download_file) | |
| # screen_waiting.change(fn=check_job_status, inputs=run_state, outputs=[pair_waiting, tabs, file_for_report], | |
| # every=5) | |
| # identify_waiting.change(fn=check_job_status, inputs=run_state, outputs=[identify_waiting, tabs, file_for_report], | |
| # every=5) | |
| # pair_waiting.change(fn=check_job_status, inputs=run_state, outputs=[pair_waiting, tabs, file_for_report], | |
| # every=5) | |
| # demo.load(None, None, None, js="() => {document.body.classList.remove('dark')}") | |
| if __name__ == "__main__": | |
| screen_block.queue(max_size=2) | |
| identify_block.queue(max_size=2) | |
| infer_block.queue(max_size=2) | |
| report.queue(max_size=20) | |
| # SCHEDULER.add_job(func=file_cleanup(), trigger="interval", seconds=60) | |
| # SCHEDULER.start() | |
| demo.launch( | |
| show_api=False, | |
| ) | |