libokj's picture
Update app.py
192213f
raw
history blame
54.6 kB
import hashlib
import json
import textwrap
import threading
from math import pi
from uuid import uuid4
import io
import os
import pathlib
from pathlib import Path
import sys
import numpy as np
from Bio.Align import PairwiseAligner
# from email_validator import validate_email
import gradio as gr
import hydra
import pandas as pd
import plotly.express as px
import requests
from rdkit.Chem.rdMolDescriptors import CalcNumRotatableBonds, CalcNumHeavyAtoms, CalcNumAtoms
from requests.adapters import HTTPAdapter, Retry
from rdkit import Chem
from rdkit.Chem import RDConfig, Descriptors, Draw, Lipinski, Crippen, PandasTools, AllChem
from rdkit.Chem.Scaffolds import MurckoScaffold
import seaborn as sns
import swifter
from tqdm.auto import tqdm
from deepscreen.data.dti import validate_seq_str, FASTA_PAT, SMILES_PAT
from deepscreen.predict import predict
sys.path.append(os.path.join(RDConfig.RDContribDir, 'SA_Score'))
import sascorer
ROOT = Path.cwd()
DF_FOR_REPORT = pd.DataFrame()
pd.set_option('display.float_format', '{:.3f}'.format)
PandasTools.molRepresentation = 'svg'
PandasTools.drawOptions = Draw.rdMolDraw2D.MolDrawOptions()
PandasTools.drawOptions.clearBackground = False
PandasTools.drawOptions.bondLineWidth = 1.5
PandasTools.drawOptions.explicitMethyl = True
PandasTools.drawOptions.singleColourWedgeBonds = True
PandasTools.drawOptions.useCDKAtomPalette()
PandasTools.molSize = (128, 128)
SESSION = requests.Session()
ADAPTER = HTTPAdapter(max_retries=Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]))
SESSION.mount('http://', ADAPTER)
SESSION.mount('https://', ADAPTER)
# SCHEDULER = BackgroundScheduler()
UNIPROT_ENDPOINT = 'https://rest.uniprot.org/uniprotkb/{query}'
CSS = """
.help-tip > div {
position: absolute;
display: block;
top: 0px;
right: 0px;
text-align: center;
border-radius: 40%;
/* border: 2px solid darkred; background-color: #8B0000;*/
width: 24px;
height: 24px;
font-size: 16px;
line-height: 26px;
cursor: default;
transition: all 0.5s cubic-bezier(0.55, 0, 0.1, 1);
}
.help-tip:hover {
cursor: pointer;
/*background-color: #ccc;*/
}
.help-tip:before {
content: '?';
font-weight: 700;
color: #8B0000;
z-index: 100;
}
.help-tip p {
visibility: hidden;
opacity: 0;
text-align: left;
background-color: #EFDDE3;
padding: 20px;
width: 300px;
position: absolute;
border-radius: 4px;
right: -4px;
color: #494F5A;
font-size: 13px;
line-height: normal;
transform: scale(0.7);
transform-origin: 100% 0%;
transition: all 0.5s cubic-bezier(0.55, 0, 0.1, 1);
z-index: 100;
}
.help-tip:hover p {
cursor: default;
visibility: visible;
opacity: 1;
transform: scale(1.0);
}
.help-tip p:before {
position: absolute;
content: '';
width: 0;
height: 0;
border: 6px solid transparent;
border-bottom-color: #EFDDE3;
right: 10px;
top: -12px;
}
.help-tip p:after {
width: 100%;
height: 40px;
content: '';
position: absolute;
top: -5px;
left: 0;
}
.upload_button {
background-color: #008000;
}
.absolute {
position: absolute;
}
#example {
padding: 0;
background: none;
border: none;
text-decoration: underline;
box-shadow: none;
text-align: left !important;
display: inline-block !important;
}
footer {
visibility: hidden
}
"""
class HelpTip:
def __new__(cls, text):
return gr.HTML(elem_classes="help-tip",
value=f'<p>{text}</p>'
)
def sa_score(row):
return sascorer.calculateScore(row['Compound'])
def mw(row):
return Chem.Descriptors.MolWt(row['Compound'])
def mr(row):
return Crippen.MolMR(row['Compound'])
def hbd(row):
return Lipinski.NumHDonors(row['Compound'])
def hba(row):
return Lipinski.NumHAcceptors(row['Compound'])
def logp(row):
return Crippen.MolLogP(row['Compound'])
def atom(row):
return CalcNumAtoms(row['Compound'])
def heavy_atom(row):
return CalcNumHeavyAtoms(row['Compound'])
def rotatable_bond(row):
return CalcNumRotatableBonds((row['Compound']))
def lipinski(row):
"""
Lipinski's rules:
Hydrogen bond donors <= 5
Hydrogen bond acceptors <= 10
Molecular weight <= 500 daltons
logP <= 5
"""
if hbd(row) > 5:
return False
elif hba(row) > 10:
return False
elif mw(row) > 500:
return False
elif logp(row) > 5:
return False
else:
return True
def reos(row):
"""
Rapid Elimination Of Swill filter:
Molecular weight between 200 and 500
LogP between -5.0 and +5.0
H-bond donor count between 0 and 5
H-bond acceptor count between 0 and 10
Formal charge between -2 and +2
Rotatable bond count between 0 and 8
Heavy atom count between 15 and 50
"""
if not 200 < mw(row) < 500:
return False
elif not -5.0 < logp(row) < 5.0:
return False
elif not 0 < hbd(row) < 5:
return False
elif not 0 < hba(row) < 10:
return False
elif not 0 < rotatable_bond(row) < 8:
return False
elif not 15 < heavy_atom(row) < 50:
return False
else:
return True
def ghose(row):
"""
Ghose drug like filter:
Molecular weight between 160 and 480
LogP between -0.4 and +5.6
Atom count between 20 and 70
Molar refractivity between 40 and 130
"""
if not 160 < mw(row) < 480:
return False
elif not -0.4 < logp(row) < 5.6:
return False
elif not 20 < atom(row) < 70:
return False
elif not 40 < mr(row) < 130:
return False
else:
return True
SCORE_MAP = {
'SAscore': sa_score,
'LogP': logp,
'Molecular weight': mw,
'Molar refractivity': mr,
'H-bond donor count': hbd,
'H-Bond acceptor count': hba,
'Rotatable bond count': rotatable_bond,
# 'TopoPSA': None,
}
FILTER_MAP = {
'REOS': reos,
"Lipinski's rule of 5": lipinski,
'Ghose': ghose,
# 'Rule of 3': rule_of_3,
# 'Veber': veber,
# 'PAINS': pains,
}
TASK_MAP = {
'Drug-target interaction': 'DTI',
'Drug-target binding affinity': 'DTA',
}
PRESET_MAP = {
'DeepDTA': 'deep_dta',
'DeepConvDTI-ECFP4': 'deep_conv_dti',
'GraphDTA': 'graph_dta',
'MGraphDTA': 'm_graph_dta',
'HyperAttentionDTI': 'hyper_attention_dti',
'MolTrans': 'mol_trans',
'TransformerCPI': 'transfomer_cpi',
'TransformerCPI2': 'transformer_cpi_2',
'DrugBAN': 'drug_ban',
'DrugVQA-Seq': 'drug_vqa'
}
TARGET_FAMILY_MAP = {
'General': 'general',
'Kinase': 'kinase',
'Non-kinase enzyme': 'enzyme',
'Membrane receptor': 'membrane',
'Nuclear receptor': 'nuclear',
'Ion channel': 'ion',
'Other protein targets': 'others',
}
TARGET_LIBRARY_MAP = {
'ChEMBL33 (all species)': 'ChEMBL33_all_spe_single_prot_info.csv.csv',
# 'STITCH': 'stitch.csv',
# 'Drug Repurposing Hub': 'drug_repurposing_hub.csv',
}
DRUG_LIBRARY_MAP = {
'DrugBank (Human)': 'drugbank_human_py_annot.csv',
}
COLUMN_ALIASES = {
'X1': 'Compound SMILES',
'X2': 'Target FASTA',
'ID1': 'Compound ID',
'ID2': 'Target ID',
}
def validate_columns(df, mandatory_cols):
missing_cols = [col for col in mandatory_cols if col not in df.columns]
if missing_cols:
error_message = (f"The following mandatory columns are missing "
f"in the uploaded dataset: {str(['X1', 'X2']).strip('[]')}.")
raise ValueError(error_message)
else:
return
def process_target_fasta(sequence):
lines = sequence.strip().split("\n")
if lines[0].startswith(">"):
lines = lines[1:]
return ''.join(lines).split(">")[0]
def send_email(receiver, msg):
pass
def submit_predict(predict_filepath, task, preset, target_family, flag, progress=gr.Progress(track_tqdm=True)):
if flag:
try:
job_id = flag
global COLUMN_ALIASES
task = TASK_MAP[task]
preset = PRESET_MAP[preset]
target_family = TARGET_FAMILY_MAP[target_family]
# email_hash = hashlib.sha256(email.encode()).hexdigest()
COLUMN_ALIASES = COLUMN_ALIASES | {
'Y': 'Actual interaction' if task == 'binary' else 'Actual affinity',
'Y^': 'Predicted interaction' if task == 'binary' else 'Predicted affinity'
}
# target_family_list = [target_family]
# for family in target_family_list:
# try:
prediction_df = pd.DataFrame()
with hydra.initialize(version_base="1.3", config_path="configs", job_name="webserver_inference"):
cfg = hydra.compose(
config_name="webserver_inference",
overrides=[f"task={task}",
f"preset={preset}",
f"ckpt_path=resources/checkpoints/{preset}-{task}-{target_family}.ckpt",
f"data.data_file='{str(predict_filepath)}'"])
predictions, _ = predict(cfg)
predictions = [pd.DataFrame(prediction) for prediction in predictions]
prediction_df = pd.concat([prediction_df, pd.concat(predictions, ignore_index=True)])
predictions_file = f'{job_id}_predictions.csv'
prediction_df.to_csv(predictions_file, index=False)
return [predictions_file,
False]
except Exception as e:
gr.Warning(f"Prediction job failed due to error: {str(e)}")
return [None,
False]
else:
return [None,
False]
#
# except Exception as e:
# raise gr.Error(str(e))
# email_lock = Path(f"outputs/{email_hash}.lock")
# with open(email_lock, "w") as file:
# record = {
# "email": email,
# "job_id": job_id
# }
# json.dump(record, file)
# def run_predict():
# TODO per-user submit usage
# # email_lock = Path(f"outputs/{email_hash}.lock")
# # with open(email_lock, "w") as file:
# # record = {
# # "email": email,
# # "job_id": job_id
# # }
# # json.dump(record, file)
#
# job_lock = DATA_PATH / f"outputs/{job_id}.lock"
# with open(job_lock, "w") as file:
# pass
#
# try:
# prediction_df = pd.DataFrame()
# for family in target_family_list:
# with hydra.initialize(version_base="1.3", config_path="configs", job_name="webserver_inference"):
# cfg = hydra.compose(
# config_name="webserver_inference",
# overrides=[f"task={task}",
# f"preset={preset}",
# f"ckpt_path=resources/checkpoints/{preset}-{task}-{family}.ckpt",
# f"data.data_file='{str(predict_dataset)}'"])
#
# predictions, _ = predict(cfg)
# predictions = [pd.DataFrame(prediction) for prediction in predictions]
# prediction_df = pd.concat([prediction_df, pd.concat(predictions, ignore_index=True)])
# prediction_df.to_csv(f'outputs/{job_id}.csv')
# # email_lock.unlink()
# job_lock.unlink()
#
# msg = (f'Your DeepSEQcreen prediction job (id: {job_id}) completed successfully. You may retrieve the '
# f'results and generate an analytical report at {URL} using the job id within 48 hours.')
# gr.Info(msg)
# except Exception as e:
# msg = (f'Your DeepSEQcreen prediction job (id: {job_id}) failed due to an error: "{str(e)}." You may '
# f'reach out to the author about the error through email ([email protected]).')
# raise gr.Error(str(e))
# finally:
# send_email(email, msg)
#
# # Run "predict" asynchronously
# threading.Thread(target=run_predict).start()
#
# msg = (f'Your DeepSEQcreen prediction job (id: {job_id}) started running. You may retrieve the results '
# f'and generate an analytical report at {URL} using the job id once the job is done. Only one job '
# f'per user is allowed at the same time.')
# send_email(email, msg)
# # Return the job id first
# return [
# gr.Blocks(visible=False),
# gr.Markdown(f"Your prediction job is running... "
# f"You may stay on this page or come back later to retrieve the results "
# f"Once you receive our email notification."),
# ]
def update_df(file, progress=gr.Progress(track_tqdm=True)):
global DF_FOR_REPORT
if file is not None:
df = pd.read_csv(file)
if df['X1'].nunique() > 1:
df['Scaffold SMILES'] = df['X1'].swifter.progress_bar(
desc=f"Calculating scaffold...").apply(MurckoScaffold.MurckoScaffoldSmilesFromSmiles)
# Add a new column with RDKit molecule objects
PandasTools.AddMoleculeColumnToFrame(df, smilesCol='X1', molCol='Compound',
includeFingerprints=False)
PandasTools.AddMoleculeColumnToFrame(df, smilesCol='Scaffold SMILES', molCol='Scaffold',
includeFingerprints=False)
DF_FOR_REPORT = df.copy()
pie_chart = None
value = None
if 'Y^' in DF_FOR_REPORT.columns:
value = 'Y^'
elif 'Y' in DF_FOR_REPORT.columns:
value = 'Y'
# if value:
# if DF_FOR_REPORT['X1'].nunique() > 1 >= DF_FOR_REPORT['X2'].nunique():
# pie_chart = create_pie_chart(DF_FOR_REPORT, category='Scaffold SMILES', value=value, top_k=100)
# elif DF_FOR_REPORT['X2'].nunique() > 1 >= DF_FOR_REPORT['X1'].nunique():
# pie_chart = create_pie_chart(DF_FOR_REPORT, category='Target family', value=value, top_k=100)
return create_html_report(DF_FOR_REPORT), pie_chart
else:
return gr.HTML(''), gr.Plot()
def create_html_report(df, file=None, progress=gr.Progress(track_tqdm=True)):
cols_left = ['ID2', 'Y', 'Y^', 'ID1', 'Compound', 'Scaffold', 'Scaffold SMILES', ]
cols_right = ['X1', 'X2']
cols_left = [col for col in cols_left if col in df.columns]
cols_right = [col for col in cols_right if col in df.columns]
df = df[cols_left + (df.columns.drop(cols_left + cols_right).tolist()) + cols_right]
df['X2'] = df['X2'].apply(wrap_text)
df.rename(COLUMN_ALIASES, inplace=True)
styled_df = df.style
# styled_df = df.style.format("{:.2f}")
colors = sns.color_palette('husl', len(df.columns))
for i, col in enumerate(df.columns):
if pd.api.types.is_numeric_dtype(df[col]):
styled_df = styled_df.background_gradient(subset=col, cmap=sns.light_palette(colors[i], as_cmap=True))
# Return the DataFrame as HTML
PandasTools.RenderImagesInAllDataFrames(images=True)
if not file:
html = df.to_html()
return f'<div style="overflow:auto; height: 500px;">{html}</div>'
else:
html = df.to_html(file)
return html
# return gr.HTML(pn.widgets.Tabulator(df).embed())
# def create_pie_chart(df, category, value, top_k):
# df.rename(COLUMN_ALIASES, inplace=True)
# # Select the top_k records based on the value_col
# top_k_df = df.nlargest(top_k, value)
#
# # Count the frequency of each unique value in the category_col column
# category_counts = top_k_df[category].value_counts()
#
# # Convert the counts to a DataFrame
# data = pd.DataFrame({category: category_counts.index, 'value': category_counts.values})
#
# # Calculate the angle for each category
# data['angle'] = data['value']/data['value'].sum() * 2*pi
#
# # Assign colors
# data['color'] = Spectral11[0:len(category_counts)]
#
# # Create the plot
# p = figure(height=350, title="Pie Chart", toolbar_location=None,
# tools="hover", tooltips="@{}: @value".format(category), x_range=(-0.5, 1.0))
#
# p.wedge(x=0, y=1, radius=0.4,
# start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'),
# line_color="white", fill_color='color', legend_field=category, source=data)
#
# p.axis.axis_label = None
# p.axis.visible = False
# p.grid.grid_line_color = None
#
# return p
def create_pie_chart(df, category, value, top_k):
df = df.copy()
df.rename(COLUMN_ALIASES, inplace=True)
value = COLUMN_ALIASES.get(value, value)
# Select the top_k records based on the value_col
top_k_df = df.nlargest(top_k, value)
# Count the frequency of each unique value in the category_col column
category_counts = top_k_df[category].value_counts()
# Convert the counts to a DataFrame
data = pd.DataFrame({category: category_counts.index, 'value': category_counts.values})
# Create the plot
fig = px.pie(data, values='value', names=category, title=f'Top-{top_k} {category} in {value}')
fig.update_traces(textposition='inside', textinfo='percent+label')
return fig
def submit_report(score_list, filter_list, progress=gr.Progress(track_tqdm=True)):
df = DF_FOR_REPORT.copy()
try:
for filter_name in filter_list:
df[filter_name] = df.swifter.progress_bar(desc=f"Calculating {filter_name}").apply(
FILTER_MAP[filter_name], axis=1)
for score_name in score_list:
df[score_name] = df.swifter.progress_bar(desc=f"Calculating {score_name}").apply(
SCORE_MAP[score_name], axis=1)
# pie_chart = None
# value = None
# if 'Y^' in df.columns:
# value = 'Y^'
# elif 'Y' in df.columns:
# value = 'Y'
#
# if value:
# if df['X1'].nunique() > 1 >= df['X2'].nunique():
# pie_chart = create_pie_chart(df, category='Scaffold SMILES', value=value, top_k=100)
# elif df['X2'].nunique() > 1 >= df['X1'].nunique():
# pie_chart = create_pie_chart(df, category='Target family', value=value, top_k=100)
return create_html_report(df), df # pie_chart
except Exception as e:
raise gr.Error(str(e))
# def check_job_status(job_id):
# job_lock = DATA_PATH / f"{job_id}.lock"
# job_file = DATA_PATH / f"{job_id}.csv"
# if job_lock.is_file():
# return {gr.Markdown(f"Your job ({job_id}) is still running... "
# f"You may stay on this page or come back later to retrieve the results "
# f"Once you receive our email notification."),
# None,
# None
# }
# elif job_file.is_file():
# return {gr.Markdown(f"Your job ({job_id}) is done! Redirecting you to generate reports..."),
# gr.Tabs(selected=3),
# gr.File(str(job_lock))}
def wrap_text(text, line_length=60):
wrapper = textwrap.TextWrapper(width=line_length)
if text.startswith('>'):
sections = text.split('>')
wrapped_sections = []
for section in sections:
if not section:
continue
lines = section.split('\n')
seq_header = lines[0]
wrapped_seq = wrapper.fill(''.join(lines[1:]))
wrapped_sections.append(f">{seq_header}\n{wrapped_seq}")
return '\n'.join(wrapped_sections)
else:
return wrapper.fill(text)
def unwrap_text(text):
return text.strip.replece('\n', '')
def smiles_from_sdf(sdf_path):
with Chem.SDMolSupplier(sdf_path) as suppl:
return Chem.MolToSmiles(suppl[0])
theme = gr.themes.Base(spacing_size="sm", text_size='md').set(
background_fill_primary='#dfe6f0',
background_fill_secondary='#dfe6f0',
checkbox_label_background_fill='#dfe6f0',
checkbox_label_background_fill_hover='#dfe6f0',
checkbox_background_color='white',
checkbox_border_color='#4372c4',
border_color_primary='#4372c4',
border_color_accent='#4372c4',
button_primary_background_fill='#4372c4',
button_primary_text_color='white',
button_secondary_border_color='#4372c4',
body_text_color='#4372c4',
block_title_text_color='#4372c4',
block_label_text_color='#4372c4',
block_info_text_color='#505358',
block_border_color=None,
input_border_color='#4372c4',
panel_border_color='#4372c4',
input_background_fill='white',
code_background_fill='white',
)
with (gr.Blocks(theme=theme, title='DeepScreen', css=CSS) as demo):
run_state = gr.State(value=False)
screen_flag = gr.State(value=False)
identify_flag = gr.State(value=False)
infer_flag = gr.State(value=False)
with gr.Tabs() as tabs:
with gr.TabItem(label='Drug hit screening', id=0):
gr.Markdown('''
# <center>DeepSEQreen Drug Hit Screening</center>
<center>
To predict interactions/binding affinities of a single target against a library of drugs.
</center>
''')
with gr.Blocks() as screen_block:
with gr.Column() as screen_page:
with gr.Row():
with gr.Column():
with gr.Row():
target_input_type = gr.Dropdown(
label='Target Input Type',
choices=['Sequence', 'UniProt ID', 'Gene symbol'],
info='Enter (paste) a FASTA string below manually or upload a FASTA file.',
value='Sequence',
scale=3, interactive=True
)
target_id = gr.Textbox(show_label=False, visible=False,
interactive=True, scale=4,
info='Query a sequence on UniProt with a UniProt ID.')
target_gene = gr.Textbox(
show_label=False, visible=False,
interactive=True, scale=4,
info='Query a sequence on UniProt with a gene symbol.')
target_organism = gr.Textbox(
info='Organism common name or scientific name (default: Human).',
placeholder='Human', show_label=False,
visible=False, interactive=True, scale=4, )
HelpTip(
"Target amino acid sequence in the FASTA format. Alternatively, you may use a "
"UniProt ID/accession to query UniProt database for the sequence of your "
"target of interest. If the input FASTA contains multiple entities, "
"only the first one will be used."
)
with gr.Column():
drug_screen_target_family = gr.Dropdown(
choices=list(TARGET_FAMILY_MAP.keys()),
value='General',
label='Select Input Protein Family (Optional)', interactive=True)
# with gr.Column(scale=1, min_width=24):
HelpTip(
"Identify the protein family by conducting sequence alignment. "
"You may select General if you find the alignment score unsatisfactory."
)
with gr.Row():
with gr.Column():
target_upload_btn = gr.UploadButton(label='Upload a FASTA file', type='binary',
visible=True, variant='primary',
size='lg')
target_query_btn = gr.Button(value='Query the sequence', variant='primary',
visible=False)
target_family_detect_btn = gr.Button(value='Auto-detect', variant='primary')
target_fasta = gr.Code(label='Input or Display FASTA', interactive=True, lines=5)
example_fasta = gr.Button(value='Example: Human MAPK14', elem_id='example')
with gr.Row():
with gr.Column():
drug_library = gr.Dropdown(label='Select a Compound Library',
choices=list(DRUG_LIBRARY_MAP.keys()))
drug_library_upload_btn = gr.UploadButton(
label='Upload a custom library', variant='primary')
drug_library_upload = gr.File(label='Custom drug library file', visible=False)
drug_screen_task = gr.Dropdown(list(TASK_MAP.keys()), label='Select a Prediction Task',
value='Drug-target interaction')
with gr.Column():
drug_screen_preset = gr.Dropdown(list(PRESET_MAP.keys()), label='Select a Preset Model')
screen_preset_recommend_btn = gr.Button(value='Recommend a model', variant='primary')
HelpTip("We recommend the appropriate model for your use case based on model performance "
"in drug-target interaction or binding affinity prediction. "
"The models were benchmarked on different target families "
"and real-world data scenarios.")
# drug_screen_email = gr.Textbox(
# label='Email (optional)',
# info="Your email will be used to send you notifications when your job finishes."
# )
with gr.Row(visible=True):
# drug_screen_clr_btn = gr.ClearButton(size='lg')
drug_screen_btn = gr.Button(value='SCREEN', variant='primary', size='lg')
# TODO Modify the pd df directly with df['X2'] = target
screen_data_for_predict = gr.File(visible=False, file_count="single", type='filepath')
screen_waiting = gr.Markdown("""
<center>Your job is running... It might take a few minutes.
When it's done, you will be redirected to the report page.
Meanwhile, please leave the page on.</center>
""", visible=False)
with gr.TabItem(label='Target protein identification', id=1):
gr.Markdown('''
# <center>DeepSEQreen Target Protein Identification</center>
<center>
To predict interactions/binding affinities of a single drug against a library of targets.
</center>
''')
with gr.Blocks() as identify_block:
with gr.Column() as identify_page:
with gr.Row():
with gr.Column():
compound_type = gr.Dropdown(
label='Compound Input Type',
choices=['SMILES', 'SDF'],
info='Enter (paste) an SMILES string or upload an SMI file.',
value='SMILES',
interactive=True)
compound_upload_btn = gr.UploadButton(label='Upload', variant='primary', type='binary')
HelpTip(
"""Compound molecule in the SMILES format. You may input the SMILES string directly,
upload an SMI file, or upload an SDF file to convert to SMILES. Alternatively,
you may search on databases like NCBI PubChem, ChEMBL, and DrugBank for the SMILES
representing your drug of interest.
"""
)
with gr.Column():
target_identify_target_family = gr.Dropdown(choices=['General'], value='General',
label='Target Protein Family')
compound_smiles = gr.Code(label='Input or Display Compound SMILES', interactive=True, lines=5)
example_drug = gr.Button(value='Example: Aspirin', elem_id='example')
with gr.Row():
with gr.Column():
target_library = gr.Dropdown(label='Select a Target Library',
choices=list(TARGET_LIBRARY_MAP.keys()))
target_library_upload_btn = gr.UploadButton(
label='Upload a custom library', variant='primary')
target_library_upload = gr.File(label='Custom target library file', visible=False)
target_identify_task = gr.Dropdown(list(TASK_MAP.keys()), label='Select a Prediction Task',
value='Drug-target interaction')
with gr.Column():
target_identify_preset = gr.Dropdown(list(PRESET_MAP.keys()), label='Preset')
identify_preset_recommend_btn = gr.Button(value='Recommend a model', variant='primary')
HelpTip("We recommend the appropriate model for your use case based on model performance "
"in drug-target interaction or binding affinity prediction. "
"The models were benchmarked on different target families "
"and real-world data scenarios.")
# with gr.Row():
# target_identify_email = gr.Textbox(
# label='Email (optional)',
# info="Your email will be used to send you notifications when your job finishes."
# )
with gr.Row(visible=True):
# target_identify_clr_btn = gr.ClearButton(size='lg')
target_identify_btn = gr.Button(value='IDENTIFY', variant='primary', size='lg')
identify_data_for_predict = gr.File(visible=False, file_count="single", type='filepath')
identify_waiting = gr.Markdown(f"Your job is running... It might take a few minutes."
f"When it's done, you will be redirected to the report page. "
f"Meanwhile, please leave the page on.",
visible=False)
with gr.TabItem(label='Interaction pair inference', id=2):
gr.Markdown('''
# <center>DeepSEQreen Interaction Pair Inference</center>
<center>
To predict interactions/binding affinities between any drug-target pairs.
</center>
''')
with gr.Blocks() as infer_block:
with gr.Column() as infer_page:
HelpTip("Upload a custom drug-target pair dataset. See the documentation for details.")
infer_data_for_predict = gr.File(
label='Prediction dataset file', file_count="single", type='filepath')
# TODO example dataset
# TODO download example dataset
with gr.Row(visible=True):
pair_infer_task = gr.Dropdown(list(TASK_MAP.keys()), label='Task')
HelpTip("Choose a preset model for making the predictions.")
pair_infer_preset = gr.Dropdown(list(PRESET_MAP.keys()), label='Preset')
HelpTip("Choose the protein family of your target.")
pair_infer_target_family = gr.Dropdown(choices=['General'],
label='Target family',
value='General')
# with gr.Row():
# pair_infer_email = gr.Textbox(
# label='Email (optional)',
# info="Your email will be used to send you notifications when your job finishes."
# )
with gr.Row(visible=True):
# pair_infer_clr_btn = gr.ClearButton(size='lg')
pair_infer_btn = gr.Button(value='INFER', variant='primary', size='lg')
infer_waiting = gr.Markdown(f"Your job is running... It might take a few minutes."
f"When it's done, you will be redirected to the report page. "
f"Meanwhile, please leave the page on.",
visible=False)
with gr.TabItem(label='Chemical property report', id=3):
with gr.Blocks() as report:
gr.Markdown('''
# <center>DeepSEQreen Chemical Property Report</center>
<center>
To compute chemical properties for the predictions of drug hit screening,
target protein identification, and interaction pair inference. You may also upload
your own dataset.
</center>
''')
with gr.Row():
file_for_report = gr.File(interactive=True, type='filepath')
df_raw = gr.Dataframe(type="pandas", interactive=False, visible=False)
scores = gr.CheckboxGroup(list(SCORE_MAP.keys()), label='Scores')
filters = gr.CheckboxGroup(list(FILTER_MAP.keys()), label='Filters')
with gr.Row():
# clear_btn = gr.ClearButton(size='lg')
analyze_btn = gr.Button('REPORT', variant='primary', size='lg')
with gr.Row():
with gr.Column(scale=3):
html_report = gr.HTML() # label='Results', visible=True)
ranking_pie_chart = gr.Plot(visible=False)
with gr.Row():
with gr.Column():
csv_generate = gr.Button(value='Generate raw data (CSV)')
csv_download_file = gr.File(label='Download raw data (CSV)', visible=False)
with gr.Column():
html_generate = gr.Button(value='Generate report (HTML)')
html_download_file = gr.File(label='Download report (HTML)', visible=False)
def target_input_type_select(input_type):
match input_type:
case 'UniProt ID':
return [gr.Dropdown(info=''),
gr.UploadButton(visible=False),
gr.Textbox(visible=True, value=''),
gr.Textbox(visible=False, value=''),
gr.Textbox(visible=False, value=''),
gr.Button(visible=True),
gr.Code(value='')]
case 'Gene symbol':
return [gr.Dropdown(info=''),
gr.UploadButton(visible=False),
gr.Textbox(visible=False, value=''),
gr.Textbox(visible=True, value=''),
gr.Textbox(visible=True, value=''),
gr.Button(visible=True),
gr.Code(value='')]
case 'Sequence':
return [gr.Dropdown(info='Enter (paste) a FASTA string below manually or upload a FASTA file.'),
gr.UploadButton(visible=True),
gr.Textbox(visible=False, value=''),
gr.Textbox(visible=False, value=''),
gr.Textbox(visible=False, value=''),
gr.Button(visible=False),
gr.Code(value='')]
target_input_type.select(
fn=target_input_type_select,
inputs=target_input_type,
outputs=[
target_input_type, target_upload_btn,
target_id, target_gene, target_organism, target_query_btn,
target_fasta
],
show_progress=False
)
def uniprot_query(input_type, uid, gene, organism='Human'):
fasta_seq = ''
match input_type:
case 'UniProt ID':
query = f"{uid.strip()}.fasta"
case 'Gene symbol':
query = f'search?query=organism_name:{organism.strip()}+AND+gene:{gene.strip()}&format=fasta'
try:
fasta = SESSION.get(UNIPROT_ENDPOINT.format(query=query))
fasta.raise_for_status()
fasta_seq = fasta.text
except Exception as e:
raise gr.Warning(f"Failed to query FASTA from UniProt database due to {str(e)}")
finally:
return fasta_seq
target_upload_btn.upload(fn=lambda x: x.decode(), inputs=target_upload_btn, outputs=target_fasta)
target_query_btn.click(uniprot_query,
inputs=[target_input_type, target_id, target_gene, target_organism],
outputs=target_fasta)
def target_family_detect(fasta, progress=gr.Progress(track_tqdm=True)):
aligner = PairwiseAligner(scoring='blastp', mode='local')
alignment_df = pd.read_csv('data/target_libraries/ChEMBL33_all_spe_single_prot_info.csv')
def align_score(query):
return aligner.align(process_target_fasta(fasta), query).score
alignment_df['score'] = alignment_df['X2'].swifter.progress_bar(
desc="Detecting protein family of the target...").apply(align_score)
row = alignment_df.loc[alignment_df['score'].idxmax()]
return gr.Dropdown(value=row['protein_family'].capitalize(),
info=f"Reason: Best BLASTP score ({row['score']}) with {row['ID2']} from family {row['protein_family']}")
target_family_detect_btn.click(fn=target_family_detect, inputs=target_fasta, outputs=drug_screen_target_family)
target_fasta.focus(fn=wrap_text, inputs=target_fasta, outputs=target_fasta, show_progress=False)
target_fasta.blur(fn=wrap_text, inputs=target_fasta, outputs=target_fasta, show_progress=False)
drug_library_upload_btn.upload(fn=lambda x: [
x.name, gr.Dropdown(value=Path(x.name).name, choices=list(DRUG_LIBRARY_MAP.keys()) + [Path(x.name).name])
], inputs=drug_library_upload_btn, outputs=[drug_library_upload, drug_library])
def example_fill(input_type):
return {target_id: 'Q16539',
target_gene: 'MAPK14',
target_organism: 'Human',
target_fasta: """
>sp|Q16539|MK14_HUMAN Mitogen-activated protein kinase 14 OS=Homo sapiens OX=9606 GN=MAPK14 PE=1 SV=3
MSQERPTFYRQELNKTIWEVPERYQNLSPVGSGAYGSVCAAFDTKTGLRVAVKKLSRPFQ
SIIHAKRTYRELRLLKHMKHENVIGLLDVFTPARSLEEFNDVYLVTHLMGADLNNIVKCQ
KLTDDHVQFLIYQILRGLKYIHSADIIHRDLKPSNLAVNEDCELKILDFGLARHTDDEMT
GYVATRWYRAPEIMLNWMHYNQTVDIWSVGCIMAELLTGRTLFPGTDHIDQLKLILRLVG
TPGAELLKKISSESARNYIQSLTQMPKMNFANVFIGANPLAVDLLEKMLVLDSDKRITAA
QALAHAYFAQYHDPDDEPVADPYDQSFESRDLLIDEWKSLTYDEVISFVPPPLDQEEMES
"""}
example_fasta.click(fn=example_fill, inputs=target_input_type,
outputs=[target_id, target_gene, target_organism, target_fasta], show_progress=False)
def screen_recommend_model(fasta, family, task):
task = TASK_MAP[task]
if task == 'DTI':
train = pd.read_csv('data/benchmarks/all_families_reduced_dti_train.csv')
score = 'AUROC'
elif task == 'DTA':
train = pd.read_csv('data/benchmarks/all_families_reduced_dta_train.csv')
score = 'CI'
if fasta not in train['X2']:
scenario = "Unseen target"
else:
scenario = "Seen target"
benchmark_df = pd.read_csv('data/benchmarks/compound_screen.csv')
if family == 'General':
filtered_df = benchmark_df[(benchmark_df[f'Task'] == task)
& (benchmark_df['Target.family'] == 'All families reduced')
& (benchmark_df['Scenario'] == 'Random split')
& (benchmark_df['all'] == True)]
else:
filtered_df = benchmark_df[(benchmark_df['Task'] == task)
& (benchmark_df['Target.family'] == family)
& (benchmark_df['Scenario'] == scenario)
& (benchmark_df['all'] == False)]
row = filtered_df.loc[filtered_df[score].idxmax()]
return gr.Dropdown(value=row['preset'],
info=f"Reason: {scenario} in the training dataset; we recommend the model "
f"with the best {score} ({float(row[score]):.3f}) "
f"in the {scenario.lower()} scenario on {family.lower()} family.")
screen_preset_recommend_btn.click(fn=screen_recommend_model,
inputs=[target_fasta, drug_screen_target_family, drug_screen_task],
outputs=drug_screen_preset)
def compound_input_type_select(input_type):
match input_type:
case 'SMILES':
return gr.Dropdown(info='Input an SMILES string or upload an SMI file')
case 'SDF':
return gr.Dropdown(info='Convert the first molecule in an SDF file to SMILES')
compound_type.select(fn=compound_input_type_select,
inputs=compound_type, outputs=compound_type, show_progress=False)
def compound_upload_process(input_type, input_upload):
match input_type:
case 'SMILES':
return input_upload.decode()
case 'SDF':
suppl = Chem.ForwardSDMolSupplier(io.BytesIO(input_upload))
return Chem.MolToSmiles(next(suppl))
compound_upload_btn.upload(fn=compound_upload_process,
inputs=[compound_type, compound_upload_btn],
outputs=compound_smiles)
example_drug.click(fn=lambda: 'CC(=O)Oc1ccccc1C(=O)O', outputs=compound_smiles, show_progress=False)
target_library_upload_btn.upload(fn=lambda x: [
x.name, gr.Dropdown(value=Path(x.name).name, choices=list(TARGET_LIBRARY_MAP.keys()) + [Path(x.name).name])
], inputs=target_library_upload_btn, outputs=[target_library_upload, target_library])
def identify_recommend_model(smiles, task):
if task == 'Drug-target interaction':
train = pd.read_csv('data/benchmarks/all_families_reduced_dti_train.csv')
score = 'AUROC'
elif task == 'Drug-target binding affinity':
train = pd.read_csv('data/benchmarks/all_families_reduced_dta_train.csv')
score = 'CI'
task = TASK_MAP[task]
if smiles not in train['X1']:
scenario = "Unseen drug"
else:
scenario = "Seen drug"
benchmark_df = pd.read_csv('data/benchmarks/target_identification.csv')
filtered_df = benchmark_df[(benchmark_df['Task'] == task)
& (benchmark_df['Scenario'] == scenario)]
row = filtered_df.loc[filtered_df[score].idxmax()]
return gr.Dropdown(value=row['preset'],
info=f"Reason: {scenario} in the training dataset; choosing the model "
f"with the best {score} ({float(row[score]):3f}) "
f"in the {scenario.lower()} scenario.")
identify_preset_recommend_btn.click(fn=identify_recommend_model,
inputs=[compound_smiles, target_identify_task],
outputs=target_identify_preset)
def drug_screen_validate(fasta, library, library_upload, state, progress=gr.Progress(track_tqdm=True)):
if not state:
try:
fasta = process_target_fasta(fasta)
err = validate_seq_str(fasta, FASTA_PAT)
if err:
raise ValueError(f'Found error(s) in your target fasta input: {err}')
if library in DRUG_LIBRARY_MAP.keys():
screen_df = pd.read_csv(Path('data/drug_libraries', DRUG_LIBRARY_MAP[library]))
else:
screen_df = pd.read_csv(library_upload)
validate_columns(screen_df, ['X1'])
screen_df['X2'] = fasta
job_id = uuid4()
temp_file = Path(f'{job_id}_input.csv').resolve()
screen_df.to_csv(temp_file, index=False)
if temp_file.is_file():
return {screen_data_for_predict: str(temp_file),
screen_flag: job_id,
run_state: job_id}
else:
raise SystemError('Failed to create temporary files. Please try again later.')
except Exception as e:
gr.Warning(f'Failed to submit the job due to error: {str(e)}')
return {screen_flag: False,
run_state: False}
else:
gr.Warning('You have another prediction job '
'(drug hit screening, target protein identification, or interation pair inference) '
'running in the session right now. '
'Please submit another job when your current job has finished.')
return {screen_flag: False,
run_state: state}
def target_identify_validate(smiles, library, library_upload, state, progress=gr.Progress(track_tqdm=True)):
if not state:
try:
smiles = smiles.strip()
err = validate_seq_str(smiles, SMILES_PAT)
if err:
raise ValueError(f'Found error(s) in your target fasta input: {err}')
if library in TARGET_LIBRARY_MAP.keys():
identify_df = pd.read_csv(TARGET_LIBRARY_MAP['target_library'])
else:
identify_df = pd.read_csv(library_upload)
validate_columns(identify_df, ['X2'])
identify_df['X1'] = smiles
job_id = uuid4()
temp_file = Path(f'{job_id}_input.csv').resolve()
identify_df.to_csv(temp_file, index=False)
if temp_file.is_file():
return {identify_data_for_predict: str(temp_file),
identify_flag: job_id,
run_state: job_id}
else:
raise SystemError('Failed to create temporary files. Please try again later.')
except Exception as e:
gr.Warning(f'Failed to submit the job due to error: {str(e)}')
return {identify_flag: False,
run_state: False}
else:
gr.Warning('You have another prediction job '
'(drug hit screening, target protein identification, or interation pair inference) '
'running in the session right now. '
'Please submit another job when your current job has finished.')
return {identify_flag: False,
run_state: state}
# return {identify_flag: False}
def pair_infer_validate(drug_target_pair_upload, state, progress=gr.Progress(track_tqdm=True)):
if not state:
try:
df = pd.read_csv(drug_target_pair_upload)
validate_columns(df, ['X1', 'X2'])
df['X1_ERR'] = df['X1'].swifter.progress_bar(desc="Validating SMILES...").apply(
validate_seq_str, regex=SMILES_PAT)
if not df['X1_ERR'].isna().all():
raise ValueError(f"Encountered invalid SMILES:\n{df[~df['X1_ERR'].isna()][['X1', 'X1_ERR']]}")
df['X2_ERR'] = df['X2'].swifter.progress_bar(desc="Validating FASTA...").apply(
validate_seq_str, regex=FASTA_PAT)
if not df['X2_ERR'].isna().all():
raise ValueError(f"Encountered invalid FASTA:\n{df[~df['X2_ERR'].isna()][['X2', 'X2_ERR']]}")
job_id = uuid4()
return {infer_flag: job_id,
run_state: job_id}
except Exception as e:
gr.Warning(f'Failed to submit the job due to error: {str(e)}')
return {infer_flag: False,
run_state: False}
else:
gr.Warning('You have another prediction job '
'(drug hit screening, target protein identification, or interation pair inference) '
'running in the session right now. '
'Please submit another job when your current job has finished.')
return {infer_flag: False,
run_state: state}
drug_screen_btn.click(
fn=drug_screen_validate,
inputs=[target_fasta, drug_library, drug_library_upload, run_state], # , drug_screen_email],
outputs=[screen_data_for_predict, screen_flag, run_state]
).then(
fn=lambda: [gr.Column(visible=False), gr.Markdown(visible=True)],
outputs=[screen_page, screen_waiting]
).then(
fn=submit_predict,
inputs=[screen_data_for_predict, drug_screen_task, drug_screen_preset,
drug_screen_target_family, screen_flag], # , drug_screen_email],
outputs=[file_for_report, run_state]
).then(
fn=lambda: [gr.Column(visible=True), gr.Markdown(visible=False), gr.Tabs(selected=3)],
outputs=[screen_page, screen_waiting, tabs]
)
target_identify_btn.click(
fn=target_identify_validate,
inputs=[compound_smiles, target_library, target_library_upload, run_state], # , drug_screen_email],
outputs=[identify_data_for_predict, identify_flag, run_state]
).then(
fn=lambda: [gr.Column(visible=False), gr.Markdown(visible=True), gr.Tabs(selected=3)],
outputs=[identify_page, identify_waiting, tabs]
).then(
fn=submit_predict,
inputs=[identify_data_for_predict, target_identify_task, target_identify_preset,
target_identify_target_family, identify_flag], # , target_identify_email],
outputs=[file_for_report, run_state]
).then(
fn=lambda: [gr.Column(visible=True), gr.Markdown(visible=False), gr.Tabs(selected=3)],
outputs=[identify_page, identify_waiting, tabs]
)
pair_infer_btn.click(
fn=pair_infer_validate,
inputs=[infer_data_for_predict, run_state], # , drug_screen_email],
outputs=[infer_flag, run_state]
).then(
fn=lambda: [gr.Column(visible=False), gr.Markdown(visible=True)],
outputs=[infer_page, infer_waiting]
).then(
fn=submit_predict,
inputs=[infer_data_for_predict, pair_infer_task, pair_infer_preset,
pair_infer_target_family, infer_flag], # , pair_infer_email],
outputs=[file_for_report, run_state]
).then(
fn=lambda: [gr.Column(visible=True), gr.Markdown(visible=False)],
outputs=[infer_page, infer_waiting]
)
# TODO background job from these 3 pipelines to update file_for_report
file_for_report.change(fn=update_df, inputs=file_for_report, outputs=[
html_report,
df_raw,
# ranking_pie_chart
])
analyze_btn.click(fn=submit_report, inputs=[scores, filters], outputs=[
html_report,
df_raw,
# ranking_pie_chart
])
def create_csv_raw_file(df, file_report):
from datetime import datetime
now = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
filename = f"reports/{Path(file_report.name).stem}_DeepSEQreen_report_{now}.csv"
df.to_csv(filename, index=False)
return gr.File(filename, visible=True)
def create_html_report_file(df, file_report):
from datetime import datetime
now = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
filename = f"reports/{Path(file_report.name).stem}_DeepSEQreen_report_{now}.csv"
create_html_report(df, filename)
return gr.File(filename, visible=True)
csv_generate.click(fn=create_csv_raw_file, inputs=[df_raw, file_for_report], outputs=csv_download_file)
html_generate.click(fn=create_html_report_file, inputs=[df_raw, file_for_report], outputs=html_download_file)
# screen_waiting.change(fn=check_job_status, inputs=run_state, outputs=[pair_waiting, tabs, file_for_report],
# every=5)
# identify_waiting.change(fn=check_job_status, inputs=run_state, outputs=[identify_waiting, tabs, file_for_report],
# every=5)
# pair_waiting.change(fn=check_job_status, inputs=run_state, outputs=[pair_waiting, tabs, file_for_report],
# every=5)
# demo.load(None, None, None, js="() => {document.body.classList.remove('dark')}")
if __name__ == "__main__":
screen_block.queue(max_size=2)
identify_block.queue(max_size=2)
infer_block.queue(max_size=2)
report.queue(max_size=20)
# SCHEDULER.add_job(func=file_cleanup(), trigger="interval", seconds=60)
# SCHEDULER.start()
demo.launch(
show_api=False,
)