Spaces:
Sleeping
Sleeping
import hashlib | |
import json | |
import textwrap | |
import threading | |
from math import pi | |
from uuid import uuid4 | |
import io | |
import os | |
import pathlib | |
from pathlib import Path | |
import sys | |
from Bio import AlignIO, SeqIO | |
# from email_validator import validate_email | |
import gradio as gr | |
import hydra | |
import pandas as pd | |
import plotly.express as px | |
import requests | |
from requests.adapters import HTTPAdapter, Retry | |
from rdkit import Chem | |
from rdkit.Chem import RDConfig, Descriptors, Draw, Lipinski, Crippen, PandasTools | |
from rdkit.Chem.Scaffolds import MurckoScaffold | |
import seaborn as sns | |
import swifter | |
from tqdm.auto import tqdm | |
from deepscreen.data.dti import rdkit_canonicalize, validate_seq_str, FASTA_PAT, SMILES_PAT | |
from deepscreen.predict import predict | |
sys.path.append(os.path.join(RDConfig.RDContribDir, 'SA_Score')) | |
import sascorer | |
ROOT = Path.cwd() | |
DATA_PATH = Path("./") # Path("/data") | |
DF_FOR_REPORT = pd.DataFrame() | |
pd.set_option('display.float_format', '{:.3f}'.format) | |
PandasTools.molRepresentation = 'svg' | |
PandasTools.drawOptions = Draw.rdMolDraw2D.MolDrawOptions() | |
PandasTools.drawOptions.clearBackground = False | |
PandasTools.drawOptions.bondLineWidth = 1.5 | |
PandasTools.drawOptions.explicitMethyl = True | |
PandasTools.drawOptions.singleColourWedgeBonds = True | |
PandasTools.drawOptions.useCDKAtomPalette() | |
PandasTools.molSize = (128, 128) | |
SESSION = requests.Session() | |
ADAPTER = HTTPAdapter(max_retries=Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])) | |
SESSION.mount('http://', ADAPTER) | |
SESSION.mount('https://', ADAPTER) | |
# SCHEDULER = BackgroundScheduler() | |
UNIPROT_ENDPOINT = 'https://rest.uniprot.org/uniprotkb/{query}' | |
CSS = """ | |
.help-tip { | |
position: absolute; | |
display: block; | |
top: 0px; | |
right: 0px; | |
text-align: center; | |
background-color: #29b6f6; | |
border-radius: 50%; | |
width: 24px; | |
height: 24px; | |
font-size: 12px; | |
line-height: 26px; | |
cursor: default; | |
transition: all 0.5s cubic-bezier(0.55, 0, 0.1, 1); | |
} | |
.help-tip:hover { | |
cursor: pointer; | |
background-color: #ccc; | |
} | |
.help-tip:before { | |
content: '?'; | |
font-weight: 700; | |
color: #fff; | |
z-index: 100; | |
} | |
.help-tip p { | |
visibility: hidden; | |
opacity: 0; | |
text-align: left; | |
background-color: #039be5; | |
padding: 20px; | |
width: 300px; | |
position: absolute; | |
border-radius: 4px; | |
right: -4px; | |
color: #fff; | |
font-size: 13px; | |
line-height: normal; | |
transform: scale(0.7); | |
transform-origin: 100% 0%; | |
transition: all 0.5s cubic-bezier(0.55, 0, 0.1, 1); | |
z-index: 100; | |
} | |
.help-tip:hover p { | |
cursor: default; | |
visibility: visible; | |
opacity: 1; | |
transform: scale(1.0); | |
} | |
.help-tip p:before { | |
position: absolute; | |
content: ''; | |
width: 0; | |
height: 0; | |
border: 6px solid transparent; | |
border-bottom-color: #039be5; | |
right: 10px; | |
top: -12px; | |
} | |
.help-tip p:after { | |
width: 100%; | |
height: 40px; | |
content: ''; | |
position: absolute; | |
top: -5px; | |
left: 0; | |
} | |
.help-tip a { | |
color: #fff; | |
font-weight: 700; | |
} | |
.help-tip a:hover, .help-tip a:focus { | |
color: #fff; | |
text-decoration: underline; | |
} | |
.upload_button { | |
background-color: #008000; | |
} | |
.absolute { | |
position: absolute; | |
} | |
#example { | |
padding: 0; | |
background: none; | |
border: none; | |
text-decoration: underline; | |
box-shadow: none; | |
text-align: left !important; | |
display: inline-block !important; | |
} | |
footer { | |
visibility: hidden | |
} | |
""" | |
class HelpTip: | |
def __new__(cls, text): | |
return gr.HTML(elem_classes="help-tip", | |
value=f'<p>{text}</p>' | |
) | |
def sa_score(row): | |
return sascorer.calculateScore((row['Compound'])) | |
def mw(row): | |
return Chem.Descriptors.MolWt((row['Compound'])) | |
def hbd(row): | |
return Lipinski.NumHDonors((row['Compound'])) | |
def hba(row): | |
return Lipinski.NumHAcceptors((row['Compound'])) | |
def logp(row): | |
return Crippen.MolLogP((row['Compound'])) | |
SCORE_MAP = { | |
'SAscore': sa_score, | |
'RAscore': None, # https://github.com/reymond-group/RAscore | |
'SCScore': None, # https://pubs.acs.org/doi/10.1021/acs.jcim.7b00622 | |
'LogP': logp, # https://www.rdkit.org/docs/source/rdkit.Chem.Crippen.html | |
'MW': mw, # https://www.rdkit.org/docs/source/rdkit.Chem.Descriptors.html | |
'HBD': hbd, # https://www.rdkit.org/docs/source/rdkit.Chem.Lipinski.html | |
'HBA': hba, # https://www.rdkit.org/docs/source/rdkit.Chem.Lipinski.html | |
'TopoPSA': None, # http://mordred-descriptor.github.io/documentation/master/api/mordred.TopoPSA.html | |
} | |
FILTER_MAP = { | |
'PAINS filter': None, | |
"Lipinski's rule of five": None, # https://gist.github.com/strets123/fdc4db6d450b66345f46 | |
'ADMET filter': None, | |
'TCL filter': None | |
} | |
TASK_MAP = { | |
'Drug-target interaction': 'binary', | |
'Drug-target binding affinity': 'regression', | |
} | |
PRESET_MAP = { | |
'DeepDTA': 'deep_dta', | |
'DeepConvDTI': 'deep_conv_dti', | |
'GraphDTA': 'graph_dta', | |
'MGraphDTA': 'm_graph_dta', | |
'HyperAttentionDTI': 'hyper_attention_dti', | |
'MolTrans': 'mol_trans', | |
'TransformerCPI': 'transfomer_cpi', | |
'TransformerCPI2': 'transformer_cpi_2', | |
'DrugBAN': 'drug_ban', | |
'DrugVQA(Seq)': 'drug_vqa' | |
} | |
TARGET_FAMILY_MAP = { | |
'General': 'general', | |
'Kinase': 'kinases', | |
'Non-kinase enzyme': 'non-kinase_enzymes', | |
'Membrane receptor': 'membrane_receptors', | |
'Nuclear receptor': 'nuclear_receptors', | |
'Ion channel': 'ion_channels', | |
'Other protein targets': 'other_protein_targets', | |
} | |
TARGET_LIBRARY_MAP = { | |
# 'STITCH': 'stitch.csv', | |
'ChEMBL33 (all species)': 'ChEMBL33_all_spe_single_prot_info.csv', | |
'DrugBank (Human)': 'drugbank_human_py_annot.csv', | |
} | |
DRUG_LIBRARY_MAP = { | |
# 'ChEMBL': 'chembl.csv', | |
'DrugBank (Human)': 'drugbank_human_py_annot.csv', | |
} | |
MODE_LIST = [ | |
'Drug screening', | |
'Drug repurposing', | |
'Drug-target pair' | |
] | |
COLUMN_ALIASES = { | |
'X1': 'Drug SMILES', | |
'X2': 'Target FASTA', | |
'ID1': 'Drug ID', | |
'ID2': 'Target ID', | |
} | |
URL = "https://ciddr-lab.ac.cn/deepseqreen" | |
def validate_columns(df, mandatory_cols): | |
missing_cols = [col for col in mandatory_cols if col not in df.columns] | |
if missing_cols: | |
error_message = (f"The following mandatory columns are missing " | |
f"in the uploaded dataset: {str(['X1', 'X2']).strip('[]')}.") | |
raise gr.Error(error_message) | |
def send_email(receiver, msg): | |
pass | |
def submit_predict(predict_filepath, task, preset, target_family, flag, progress=gr.Progress(track_tqdm=True)): | |
if flag: | |
job_id = flag | |
global COLUMN_ALIASES | |
task = TASK_MAP[task] | |
preset = PRESET_MAP[preset] | |
target_family = TARGET_FAMILY_MAP[target_family] | |
# email_hash = hashlib.sha256(email.encode()).hexdigest() | |
COLUMN_ALIASES = COLUMN_ALIASES | { | |
'Y': 'Actual interaction' if task == 'binary' else 'Actual affinity', | |
'Y^': 'Predicted interaction' if task == 'binary' else 'Predicted affinity' | |
} | |
# target_family_list = [target_family] | |
# for family in target_family_list: | |
# try: | |
prediction_df = pd.DataFrame() | |
with hydra.initialize(version_base="1.3", config_path="configs", job_name="webserver_inference"): | |
cfg = hydra.compose( | |
config_name="webserver_inference", | |
overrides=[f"task={task}", | |
f"preset={preset}", | |
f"ckpt_path=resources/checkpoints/{preset}-{task}-{target_family}.ckpt", | |
f"data.data_file='{str(predict_filepath)}'"]) | |
predictions, _ = predict(cfg) | |
predictions = [pd.DataFrame(prediction) for prediction in predictions] | |
prediction_df = pd.concat([prediction_df, pd.concat(predictions, ignore_index=True)]) | |
predictions_file = f'{job_id}_predictions.csv' | |
prediction_df.to_csv(predictions_file) | |
return [gr.Markdown(visible=True), | |
gr.File(predictions_file), | |
gr.State(False)] | |
# | |
# except Exception as e: | |
# raise gr.Error(str(e)) | |
# email_lock = Path(f"outputs/{email_hash}.lock") | |
# with open(email_lock, "w") as file: | |
# record = { | |
# "email": email, | |
# "job_id": job_id | |
# } | |
# json.dump(record, file) | |
# def run_predict(): | |
# TODO per-user submit usage | |
# # email_lock = Path(f"outputs/{email_hash}.lock") | |
# # with open(email_lock, "w") as file: | |
# # record = { | |
# # "email": email, | |
# # "job_id": job_id | |
# # } | |
# # json.dump(record, file) | |
# | |
# job_lock = DATA_PATH / f"outputs/{job_id}.lock" | |
# with open(job_lock, "w") as file: | |
# pass | |
# | |
# try: | |
# prediction_df = pd.DataFrame() | |
# for family in target_family_list: | |
# with hydra.initialize(version_base="1.3", config_path="configs", job_name="webserver_inference"): | |
# cfg = hydra.compose( | |
# config_name="webserver_inference", | |
# overrides=[f"task={task}", | |
# f"preset={preset}", | |
# f"ckpt_path=resources/checkpoints/{preset}-{task}-{family}.ckpt", | |
# f"data.data_file='{str(predict_dataset)}'"]) | |
# | |
# predictions, _ = predict(cfg) | |
# predictions = [pd.DataFrame(prediction) for prediction in predictions] | |
# prediction_df = pd.concat([prediction_df, pd.concat(predictions, ignore_index=True)]) | |
# prediction_df.to_csv(f'outputs/{job_id}.csv') | |
# # email_lock.unlink() | |
# job_lock.unlink() | |
# | |
# msg = (f'Your DeepSEQcreen prediction job (id: {job_id}) completed successfully. You may retrieve the ' | |
# f'results and generate an analytical report at {URL} using the job id within 48 hours.') | |
# gr.Info(msg) | |
# except Exception as e: | |
# msg = (f'Your DeepSEQcreen prediction job (id: {job_id}) failed due to an error: "{str(e)}." You may ' | |
# f'reach out to the author about the error through email ([email protected]).') | |
# raise gr.Error(str(e)) | |
# finally: | |
# send_email(email, msg) | |
# | |
# # Run "predict" asynchronously | |
# threading.Thread(target=run_predict).start() | |
# | |
# msg = (f'Your DeepSEQcreen prediction job (id: {job_id}) started running. You may retrieve the results ' | |
# f'and generate an analytical report at {URL} using the job id once the job is done. Only one job ' | |
# f'per user is allowed at the same time.') | |
# send_email(email, msg) | |
# # Return the job id first | |
# return [ | |
# gr.Blocks(visible=False), | |
# gr.Markdown(f"Your prediction job is running... " | |
# f"You may stay on this page or come back later to retrieve the results " | |
# f"Once you receive our email notification."), | |
# ] | |
def update_df(file, progress=gr.Progress(track_tqdm=True)): | |
global DF_FOR_REPORT | |
if file is not None: | |
df = pd.read_csv(file) | |
if df['X1'].nunique() > 1: | |
df['Scaffold SMILES'] = df['X1'].swifter.progress_bar( | |
desc=f"Calculating scaffold...").apply(MurckoScaffold.MurckoScaffoldSmilesFromSmiles) | |
# Add a new column with RDKit molecule objects | |
PandasTools.AddMoleculeColumnToFrame(df, smilesCol='X1', molCol='Compound', | |
includeFingerprints=False) | |
PandasTools.AddMoleculeColumnToFrame(df, smilesCol='Scaffold SMILES', molCol='Scaffold', | |
includeFingerprints=False) | |
DF_FOR_REPORT = df.copy() | |
pie_chart = None | |
value = None | |
if 'Y^' in DF_FOR_REPORT.columns: | |
value = 'Y^' | |
elif 'Y' in DF_FOR_REPORT.columns: | |
value = 'Y' | |
if value: | |
if DF_FOR_REPORT['X1'].nunique() > 1 >= DF_FOR_REPORT['X2'].nunique(): | |
pie_chart = create_pie_chart(DF_FOR_REPORT, category='Scaffold SMILES', value=value, top_k=100) | |
elif DF_FOR_REPORT['X2'].nunique() > 1 >= DF_FOR_REPORT['X1'].nunique(): | |
pie_chart = create_pie_chart(DF_FOR_REPORT, category='Target family', value=value, top_k=100) | |
return create_html_report(DF_FOR_REPORT), pie_chart | |
else: | |
return gr.HTML(''), gr.Plot() | |
def create_html_report(df, progress=gr.Progress(track_tqdm=True)): | |
cols_left = ['ID2', 'Y', 'Y^', 'ID1', 'Compound', 'Scaffold', 'Scaffold SMILES', ] | |
cols_right = ['X1', 'X2'] | |
cols_left = [col for col in cols_left if col in df.columns] | |
cols_right = [col for col in cols_right if col in df.columns] | |
df = df[cols_left + (df.columns.drop(cols_left + cols_right).tolist()) + cols_right] | |
df['X2'] = df['X2'].apply(wrap_text) | |
df.rename(COLUMN_ALIASES, inplace=True) | |
styled_df = df.style | |
# styled_df = df.style.format("{:.2f}") | |
colors = sns.color_palette('husl', len(df.columns)) | |
for i, col in enumerate(df.columns): | |
if pd.api.types.is_numeric_dtype(df[col]): | |
styled_df = styled_df.background_gradient(subset=col, cmap=sns.light_palette(colors[i], as_cmap=True)) | |
# Return the DataFrame as HTML | |
PandasTools.RenderImagesInAllDataFrames(images=True) | |
html = df.to_html() | |
return f'<div style="overflow:auto; height: 500px;">{html}</div>' | |
# return gr.HTML(pn.widgets.Tabulator(df).embed()) | |
# def create_pie_chart(df, category, value, top_k): | |
# df.rename(COLUMN_ALIASES, inplace=True) | |
# # Select the top_k records based on the value_col | |
# top_k_df = df.nlargest(top_k, value) | |
# | |
# # Count the frequency of each unique value in the category_col column | |
# category_counts = top_k_df[category].value_counts() | |
# | |
# # Convert the counts to a DataFrame | |
# data = pd.DataFrame({category: category_counts.index, 'value': category_counts.values}) | |
# | |
# # Calculate the angle for each category | |
# data['angle'] = data['value']/data['value'].sum() * 2*pi | |
# | |
# # Assign colors | |
# data['color'] = Spectral11[0:len(category_counts)] | |
# | |
# # Create the plot | |
# p = figure(height=350, title="Pie Chart", toolbar_location=None, | |
# tools="hover", tooltips="@{}: @value".format(category), x_range=(-0.5, 1.0)) | |
# | |
# p.wedge(x=0, y=1, radius=0.4, | |
# start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'), | |
# line_color="white", fill_color='color', legend_field=category, source=data) | |
# | |
# p.axis.axis_label = None | |
# p.axis.visible = False | |
# p.grid.grid_line_color = None | |
# | |
# return p | |
def create_pie_chart(df, category, value, top_k): | |
df = df.copy() | |
df.rename(COLUMN_ALIASES, inplace=True) | |
value = COLUMN_ALIASES.get(value, value) | |
# Select the top_k records based on the value_col | |
top_k_df = df.nlargest(top_k, value) | |
# Count the frequency of each unique value in the category_col column | |
category_counts = top_k_df[category].value_counts() | |
# Convert the counts to a DataFrame | |
data = pd.DataFrame({category: category_counts.index, 'value': category_counts.values}) | |
# Create the plot | |
fig = px.pie(data, values='value', names=category, title=f'Top-{top_k} {category} in {value}') | |
fig.update_traces(textposition='inside', textinfo='percent+label') | |
return fig | |
def submit_report(score_list, filter_list, progress=gr.Progress(track_tqdm=True)): | |
df = DF_FOR_REPORT.copy() | |
try: | |
for filter_name in filter_list: | |
pass | |
for score_name in score_list: | |
df[score_name] = df.swifter.progress_bar(desc=f"Calculating {score_name}").apply( | |
SCORE_MAP[score_name], axis=1) | |
pie_chart = None | |
value = None | |
if 'Y^' in df.columns: | |
value = 'Y^' | |
elif 'Y' in df.columns: | |
value = 'Y' | |
if value: | |
if df['X1'].nunique() > 1 >= df['X2'].nunique(): | |
pie_chart = create_pie_chart(df, category='Scaffold SMILES', value=value, top_k=100) | |
elif df['X2'].nunique() > 1 >= df['X1'].nunique(): | |
pie_chart = create_pie_chart(df, category='Target famiy', value=value, top_k=100) | |
return create_html_report(df), pie_chart | |
except Exception as e: | |
raise gr.Error(str(e)) | |
def check_job_status(job_id): | |
job_lock = DATA_PATH / f"{job_id}.lock" | |
job_file = DATA_PATH / f"{job_id}.csv" | |
if job_lock.is_file(): | |
return {gr.Markdown(f"Your job ({job_id}) is still running... " | |
f"You may stay on this page or come back later to retrieve the results " | |
f"Once you receive our email notification."), | |
None, | |
None | |
} | |
elif job_file.is_file(): | |
return {gr.Markdown(f"Your job ({job_id}) is done! Redirecting you to generate reports..."), | |
gr.Tabs(selected=3), | |
gr.File(str(job_lock))} | |
def wrap_text(text, line_length=60): | |
wrapper = textwrap.TextWrapper(width=line_length) | |
if text.startswith('>'): | |
sections = text.split('>') | |
wrapped_sections = [] | |
for section in sections: | |
if not section: | |
continue | |
lines = section.split('\n') | |
seq_header = lines[0] | |
wrapped_seq = wrapper.fill(''.join(lines[1:])) | |
wrapped_sections.append(f">{seq_header}\n{wrapped_seq}") | |
return '\n'.join(wrapped_sections) | |
else: | |
return wrapper.fill(text) | |
def unwrap_text(text): | |
return text.strip.replece('\n', '') | |
def smiles_from_sdf(sdf_path): | |
with Chem.SDMolSupplier(sdf_path) as suppl: | |
return Chem.MolToSmiles(suppl[0]) | |
theme = gr.themes.Base(spacing_size="sm", text_size='md').set( | |
background_fill_primary='#dfe6f0', | |
background_fill_secondary='#dfe6f0', | |
checkbox_label_background_fill='#dfe6f0', | |
checkbox_label_background_fill_hover='#dfe6f0', | |
checkbox_background_color='white', | |
checkbox_border_color='#4372c4', | |
border_color_primary='#4372c4', | |
border_color_accent='#4372c4', | |
button_primary_background_fill='#4372c4', | |
button_primary_text_color='white', | |
button_secondary_border_color='#4372c4', | |
body_text_color='#4372c4', | |
block_title_text_color='#4372c4', | |
block_label_text_color='#4372c4', | |
block_info_text_color='#505358', | |
block_border_color=None, | |
input_border_color='#4372c4', | |
panel_border_color='#4372c4', | |
input_background_fill='white', | |
code_background_fill='white', | |
) | |
with (gr.Blocks(theme=theme, title='DeepScreen', css=CSS) as demo): | |
run_state = gr.State(value=False) | |
screen_flag = gr.State(value=False) | |
identify_flag = gr.State(value=False) | |
infer_flag = gr.State(value=False) | |
with gr.Tabs() as tabs: | |
with gr.TabItem(label='Drug hit screening', id=0): | |
gr.Markdown(''' | |
# <center>DeepSEQreen Drug Hit Screening</center> | |
<center> | |
To predict interactions/binding affinities of a single target against a library of drugs. | |
</center> | |
''') | |
with gr.Blocks() as screen_block: | |
with gr.Column() as screen_page: | |
with gr.Row(): | |
with gr.Column(scale=4, variant='panel'): | |
target_fasta = gr.Code(label='Target sequence FASTA', | |
interactive=True, lines=5) | |
example_target = gr.Button(value='Example: Human MAPK14', elem_id='example') | |
with gr.Row(): | |
with gr.Column(scale=1): | |
with gr.Group(): | |
with gr.Row(): | |
target_input_type = gr.Radio(label='Target input type', | |
choices=['Sequence', 'UniProt ID', 'Gene symbol'], | |
value='Sequence') | |
target_query = gr.Textbox(label='UniProt ID/Accession', | |
visible=False, interactive=True) | |
target_upload_btn = gr.UploadButton(label='Upload a FASTA file', | |
type='binary', | |
visible=True, variant='primary', | |
size='lg', elem_classes="upload_button") | |
target_query_btn = gr.Button(value='Query the sequence', variant='primary', | |
elem_classes='upload_button', visible=False) | |
with gr.Column(scale=1): | |
with gr.Row(): | |
with gr.Group(): | |
drug_screen_target_family = gr.Dropdown( | |
choices=list(TARGET_FAMILY_MAP.keys()), | |
value='General', | |
label='Target family', interactive=True) | |
# with gr.Column(scale=1, min_width=24): | |
auto_detect_btn = gr.Button(value='Auto-detect', variant='primary') | |
HelpTip( | |
"Target amino acid sequence in the FASTA format. Alternatively, you may use a " | |
"UniProt ID/accession to query UniProt database for the sequence of your target" | |
"of interest. You can also search on databases like UniProt, RCSB PDB, " | |
"NCBI Protein for the FASTA string representing your target of interest. If " | |
"the input FASTA contains multiple entities, only the first one will be used." | |
) | |
with gr.Column(variant='panel'): | |
with gr.Group(): | |
drug_library = gr.Radio(label='Drug library', | |
choices=list(DRUG_LIBRARY_MAP.keys()) + ['Upload a drug library']) | |
drug_library_upload = gr.File(label='Custom drug library file', visible=True) | |
with gr.Row(variant='panel'): | |
drug_screen_task = gr.Radio(list(TASK_MAP.keys()), label='Task', | |
value='Drug-target interaction') | |
with gr.Column(scale=2): | |
with gr.Group(): | |
drug_screen_preset = gr.Dropdown(list(PRESET_MAP.keys()), label='Model') | |
recommend_btn = gr.Button(value='Recommend a model', variant='primary') | |
HelpTip("We recommend the appropriate model for your use case based on model performance " | |
"in drug-target interaction or binding affinity prediction " | |
"benchmarked on different target families and real-world data scenarios.") | |
# drug_screen_email = gr.Textbox( | |
# label='Email (optional)', | |
# info="Your email will be used to send you notifications when your job finishes." | |
# ) | |
with gr.Row(visible=True): | |
drug_screen_clr_btn = gr.ClearButton() | |
drug_screen_btn = gr.Button(value='SCREEN', variant='primary') | |
# TODO Modify the pd df directly with df['X2'] = target | |
screen_data_for_predict = gr.File(visible=False, file_count="single", type='filepath') | |
screen_waiting = gr.Markdown(""" | |
<center>Your job is running... It might take a few minutes. | |
When it's done, you will be redirected to the report page. | |
Meanwhile, please leave the page on.</center> | |
""", visible=False) | |
with gr.TabItem(label='Target protein identification', id=1): | |
gr.Markdown(''' | |
# <center>DeepSEQreen Target Protein Identification</center> | |
<center> | |
To predict interactions/binding affinities of a single drug against a library of targets. | |
</center> | |
''') | |
with gr.Blocks() as identify_block: | |
with gr.Column() as identify_page: | |
with gr.Row(): | |
with gr.Group(): | |
drug_type = gr.Dropdown(label='Drug input type', | |
choices=['SMILES', 'SDF'], | |
value='SMILES', | |
scale=1, | |
interactive=True) | |
drug_upload = gr.UploadButton(label='⤒ Upload a file') | |
drug_smiles = gr.Code(label='Drug canonical SMILES', interactive=True, scale=5, lines=5) | |
with gr.Column(scale=1): | |
HelpTip( | |
"""Drug molecule in the SMILES format. You may search on databases like | |
NCBI PubChem, ChEMBL, and DrugBank for the SMILES strings | |
representing your drugs of interest. | |
""" | |
) | |
example_drug = gr.Button(value='Example: Aspirin', elem_id='example') | |
with gr.Column(variant='panel'): | |
with gr.Group(): | |
target_library = gr.Radio(label='Target library', | |
choices=list(TARGET_LIBRARY_MAP.keys()) + ['Upload a target library']) | |
target_library_upload = gr.File(label='Custom target library file', visible=True) | |
with gr.Row(visible=True): | |
target_identify_task = gr.Dropdown(list(TASK_MAP.keys()), label='Task') | |
HelpTip("Choose a preset model for making the predictions.") | |
target_identify_preset = gr.Dropdown(list(PRESET_MAP.keys()), label='Preset') | |
HelpTip("Choose the protein family of your target.") | |
target_identify_target_family = gr.Dropdown(choices=['General'], | |
value='General', | |
label='Target family') | |
# with gr.Row(): | |
# target_identify_email = gr.Textbox( | |
# label='Email (optional)', | |
# info="Your email will be used to send you notifications when your job finishes." | |
# ) | |
with gr.Row(visible=True): | |
target_identify_clr_btn = gr.ClearButton() | |
target_identify_btn = gr.Button(value='IDENTIFY', variant='primary') | |
identify_data_for_predict = gr.File(visible=False, file_count="single", type='filepath') | |
identify_waiting = gr.Markdown(f"Your job is running... It might take a few minutes." | |
f"When it's done, you will be redirected to the report page. " | |
f"Meanwhile, please leave the page on.", | |
visible=False) | |
with gr.TabItem(label='Interaction pair inference', id=2): | |
gr.Markdown(''' | |
# <center>DeepSEQreen Interaction Pair Inference</center> | |
<center> | |
To predict interactions/binding affinities between any drug-target pairs. | |
</center> | |
''') | |
with gr.Blocks() as infer_block: | |
with gr.Column() as infer_page: | |
HelpTip("Upload a custom drug-target pair dataset. See the documentation for details.") | |
infer_data_for_predict = gr.File( | |
label='Prediction dataset file', file_count="single", type='filepath') | |
# TODO example dataset | |
# TODO download example dataset | |
with gr.Row(visible=True): | |
pair_infer_task = gr.Dropdown(list(TASK_MAP.keys()), label='Task') | |
HelpTip("Choose a preset model for making the predictions.") | |
pair_infer_preset = gr.Dropdown(list(PRESET_MAP.keys()), label='Preset') | |
HelpTip("Choose the protein family of your target.") | |
pair_infer_target_family = gr.Dropdown(choices=['General'], | |
label='Target family', | |
value='General') | |
# with gr.Row(): | |
# pair_infer_email = gr.Textbox( | |
# label='Email (optional)', | |
# info="Your email will be used to send you notifications when your job finishes." | |
# ) | |
with gr.Row(visible=True): | |
pair_infer_clr_btn = gr.ClearButton() | |
pair_infer_btn = gr.Button(value='INFER', variant='primary') | |
infer_waiting = gr.Markdown(f"Your job is running... It might take a few minutes." | |
f"When it's done, you will be redirected to the report page. " | |
f"Meanwhile, please leave the page on.", | |
visible=False) | |
with gr.TabItem(label='Chemical property report', id=3): | |
with gr.Blocks() as report: | |
gr.Markdown(''' | |
# <center>DeepSEQreen Chemical Property Report</center> | |
<center> | |
To compute chemical properties for the predictions of drug hit screening, | |
target protein identification, and interaction pair inference. You may also upload | |
your own dataset. | |
</center> | |
''') | |
with gr.Row(): | |
file_for_report = gr.File(interactive=True, type='filepath') | |
# df_original = gr.Dataframe(type="pandas", interactive=False, visible=False) | |
scores = gr.CheckboxGroup(list(SCORE_MAP.keys()), label='Scores') | |
filters = gr.CheckboxGroup(list(FILTER_MAP.keys()), label='Filters') | |
with gr.Row(): | |
clear_btn = gr.ClearButton() | |
analyze_btn = gr.Button('REPORT', variant='primary') | |
with gr.Row(): | |
with gr.Column(scale=3): | |
html_report = gr.HTML() # label='Results', visible=True) | |
ranking_pie_chart = gr.Plot(visible=False) | |
with gr.Row(): | |
csv_download_btn = gr.Button('Download report (HTML)', variant='primary') | |
html_download_btn = gr.Button('Download raw data (CSV)', variant='primary') | |
def target_input_type_select(input_type): | |
match input_type: | |
case 'UniProt ID': | |
return [gr.UploadButton(visible=False), | |
gr.Textbox(visible=True, label='UniProt ID/accession', info=None, value=''), | |
gr.Button(visible=True)] | |
case 'Gene symbol': | |
return [gr.UploadButton(visible=False), | |
gr.Textbox(visible=True, label='Gene symbol/name', info='Organism: human', value=''), | |
gr.Button(visible=True)] | |
case 'Sequence': | |
return [gr.UploadButton(visible=True), | |
gr.Textbox(visible=False), gr.Button(visible=False)] | |
target_input_type.select(fn=target_input_type_select, | |
inputs=target_input_type, outputs=[target_upload_btn, target_query, target_query_btn], | |
show_progress=False) | |
def uniprot_query(query, input_type): | |
fasta_seq = '' | |
query = query.strip() | |
match input_type: | |
case 'UniProt ID': | |
query = f"{query.strip()}.fasta" | |
case 'Gene symbol': | |
query = f'search?query=organism_id:9606+AND+gene:{query}&format=fasta' | |
try: | |
fasta = SESSION.get(UNIPROT_ENDPOINT.format(query=query)) | |
fasta.raise_for_status() | |
fasta_seq = fasta.text | |
except Exception as e: | |
raise gr.Warning(f"Failed to query FASTA from UniProt due to {str(e)}") | |
finally: | |
return fasta_seq | |
target_upload_btn.upload(fn=lambda x: x.decode(), inputs=target_upload_btn, outputs=target_fasta) | |
target_query_btn.click(uniprot_query, inputs=[target_query, target_input_type], outputs=target_fasta) | |
target_fasta.focus(fn=wrap_text, inputs=target_fasta, outputs=target_fasta, show_progress=False) | |
target_fasta.blur(fn=wrap_text, inputs=target_fasta, outputs=target_fasta, show_progress=False) | |
drug_smiles.focus(fn=wrap_text, inputs=drug_smiles, outputs=drug_smiles, show_progress=False) | |
drug_smiles.blur(fn=wrap_text, inputs=drug_smiles, outputs=drug_smiles, show_progress=False) | |
def example_fill(input_type): | |
match input_type: | |
case 'UniProt ID': | |
query = 'Q16539' | |
case 'Gene symbol': | |
query = 'MAPK14' | |
case _: | |
query = '' | |
return {target_query: query, | |
target_fasta: """ | |
>sp|Q16539|MK14_HUMAN Mitogen-activated protein kinase 14 OS=Homo sapiens OX=9606 GN=MAPK14 PE=1 SV=3 | |
MSQERPTFYRQELNKTIWEVPERYQNLSPVGSGAYGSVCAAFDTKTGLRVAVKKLSRPFQ | |
SIIHAKRTYRELRLLKHMKHENVIGLLDVFTPARSLEEFNDVYLVTHLMGADLNNIVKCQ | |
KLTDDHVQFLIYQILRGLKYIHSADIIHRDLKPSNLAVNEDCELKILDFGLARHTDDEMT | |
GYVATRWYRAPEIMLNWMHYNQTVDIWSVGCIMAELLTGRTLFPGTDHIDQLKLILRLVG | |
TPGAELLKKISSESARNYIQSLTQMPKMNFANVFIGANPLAVDLLEKMLVLDSDKRITAA | |
QALAHAYFAQYHDPDDEPVADPYDQSFESRDLLIDEWKSLTYDEVISFVPPPLDQEEMES | |
"""} | |
example_target.click(fn=example_fill, inputs=target_input_type, | |
outputs=[target_query, target_fasta], show_progress=False) | |
example_drug.click(fn=lambda: 'CC(=O)Oc1ccccc1C(=O)O', outputs=drug_smiles, show_progress=False) | |
def drug_screen_validate(fasta, library, library_upload, state): | |
if not state: | |
def process_target_fasta(sequence): | |
lines = sequence.strip().split("\n") | |
if lines[0].startswith(">"): | |
lines = lines[1:] | |
return ''.join(lines).split(">")[0] | |
fasta = process_target_fasta(fasta) | |
err = validate_seq_str(fasta, FASTA_PAT) | |
if err: | |
raise gr.Error(f'Found error(s) in your target fasta input: {err}') | |
if library in DRUG_LIBRARY_MAP.keys(): | |
screen_df = pd.read_csv(Path('data/drug_libraries', DRUG_LIBRARY_MAP[library])) | |
else: | |
screen_df = pd.read_csv(library_upload) | |
validate_columns(screen_df, ['X1']) | |
screen_df['X2'] = fasta | |
job_id = uuid4() | |
temp_file = Path(f'{job_id}_temp.csv').resolve() | |
screen_df.to_csv(temp_file) | |
if temp_file.is_file(): | |
return {screen_data_for_predict: str(temp_file), | |
screen_flag: job_id, | |
run_state: job_id} | |
else: | |
gr.Warning('You have another prediction job ' | |
'(drug hit screening, target protein identification, or interation pair inference) ' | |
'running in the session right now. ' | |
'Please submit another job when your current job has finished.') | |
return {screen_flag: False} | |
def target_identify_validate(smiles, library, library_upload, state): | |
if not state: | |
err = validate_seq_str(smiles, SMILES_PAT) | |
if err: | |
raise gr.Error(f'Found error(s) in your compound SMILES input: {err}') | |
if library in TARGET_LIBRARY_MAP.keys(): | |
identify_df = pd.read_csv(TARGET_LIBRARY_MAP['target_library']) | |
else: | |
identify_df = pd.read_csv(library_upload) | |
validate_columns(identify_df, ['X2']) | |
identify_df['X1'] = smiles | |
job_id = uuid4() | |
temp_file = Path(f'{job_id}_temp.csv').resolve() | |
identify_df.to_csv(temp_file) | |
if temp_file.is_file(): | |
return {identify_data_for_predict: str(temp_file), | |
identify_flag: gr.State(job_id), | |
run_state: gr.State(job_id)} | |
else: | |
gr.Warning('You have another prediction job ' | |
'(drug hit screening, target protein identification, or interation pair inference) ' | |
'running in the session right now. ' | |
'Please submit another job when your current job has finished.') | |
return {identify_flag: False} | |
def pair_infer_validate(drug_target_pair_upload, run_state): | |
if not run_state: | |
df = pd.read_csv(drug_target_pair_upload) | |
validate_columns(df, ['X1', 'X2']) | |
df['X1_ERR'] = df['X1'].swifter.apply( | |
validate_seq_str, regex=SMILES_PAT) | |
df['X2_ERR'] = df['X2'].swifter.apply( | |
validate_seq_str, regex=FASTA_PAT) | |
if not df['X1_ERR'].isna().all(): | |
raise gr.Error(f"Encountered invalid SMILES:\n{df[~df['X1_ERR'].isna()][['X1', 'X1_ERR']]}") | |
if not df['X2_ERR'].isna().all(): | |
raise gr.Error(f"Encountered invalid FASTA:\n{df[~df['X2_ERR'].isna()][['X2', 'X2_ERR']]}") | |
job_id = uuid4() | |
return {infer_flag: gr.State(job_id), | |
run_state: gr.State(job_id)} | |
else: | |
gr.Warning('You have another prediction job ' | |
'(drug hit screening, target protein identification, or interation pair inference) ' | |
'running in the session right now. ' | |
'Please submit another job when your current job has finished.') | |
return {infer_flag: False} | |
drug_screen_btn.click( | |
fn=drug_screen_validate, | |
inputs=[target_fasta, drug_library, drug_library_upload, run_state], # , drug_screen_email], | |
outputs=[screen_data_for_predict, screen_flag, run_state] | |
).then( | |
fn=lambda: [gr.Column(visible=False), gr.Markdown(visible=True)], | |
outputs=[screen_page, screen_waiting] | |
).then( | |
fn=submit_predict, | |
inputs=[screen_data_for_predict, drug_screen_task, drug_screen_preset, | |
drug_screen_target_family, screen_flag], # , drug_screen_email], | |
outputs=[file_for_report, run_state] | |
).then( | |
fn=lambda: [gr.Column(visible=True), gr.Markdown(visible=False)], | |
outputs=[screen_page, screen_waiting] | |
) | |
target_identify_btn.click( | |
fn=target_identify_validate, | |
inputs=[drug_smiles, target_library, target_library_upload, run_state], # , drug_screen_email], | |
outputs=[identify_data_for_predict, identify_flag, run_state] | |
).then( | |
fn=lambda: [gr.Column(visible=False), gr.Markdown(visible=True)], | |
outputs=[identify_page, identify_waiting] | |
).then( | |
fn=submit_predict, | |
inputs=[identify_data_for_predict, target_identify_task, target_identify_preset, | |
target_identify_target_family, identify_flag], # , target_identify_email], | |
outputs=[file_for_report, run_state] | |
).then( | |
fn=lambda: [gr.Column(visible=True), gr.Markdown(visible=False)], | |
outputs=[identify_page, identify_waiting] | |
) | |
pair_infer_btn.click( | |
fn=pair_infer_validate, | |
inputs=[infer_data_for_predict, run_state], # , drug_screen_email], | |
outputs=[infer_flag, run_state] | |
).then( | |
fn=lambda: [gr.Column(visible=False), gr.Markdown(visible=True)], | |
outputs=[infer_page, infer_waiting] | |
).then( | |
fn=submit_predict, | |
inputs=[infer_data_for_predict, pair_infer_task, pair_infer_preset, | |
pair_infer_target_family, infer_flag], # , pair_infer_email], | |
outputs=[file_for_report, run_state] | |
).then( | |
fn=lambda: [gr.Column(visible=True), gr.Markdown(visible=False)], | |
outputs=[infer_page, infer_waiting] | |
) | |
# TODO background job from these 3 pipelines to update file_for_report | |
file_for_report.change(fn=update_df, inputs=file_for_report, outputs=[html_report, ranking_pie_chart]) | |
analyze_btn.click(fn=submit_report, inputs=[scores, filters], outputs=[html_report, ranking_pie_chart]) | |
# screen_waiting.change(fn=check_job_status, inputs=run_state, outputs=[pair_waiting, tabs, file_for_report], | |
# every=5) | |
# identify_waiting.change(fn=check_job_status, inputs=run_state, outputs=[identify_waiting, tabs, file_for_report], | |
# every=5) | |
# pair_waiting.change(fn=check_job_status, inputs=run_state, outputs=[pair_waiting, tabs, file_for_report], | |
# every=5) | |
# demo.load(None, None, None, js="() => {document.body.classList.remove('dark')}") | |
if __name__ == "__main__": | |
screen_block.queue(max_size=2) | |
identify_block.queue(max_size=2) | |
infer_block.queue(max_size=2) | |
report.queue(max_size=20) | |
# SCHEDULER.add_job(func=file_cleanup(), trigger="interval", seconds=60) | |
# SCHEDULER.start() | |
demo.launch( | |
# debug=True, | |
show_api=False, | |
# favicon_path=, | |
# inline=False | |
debug=True | |
) | |