libokj's picture
Upload 174 files
52318e2
raw
history blame
42.8 kB
import hashlib
import json
import textwrap
import threading
from math import pi
from uuid import uuid4
import io
import os
import pathlib
from pathlib import Path
import sys
from Bio import AlignIO, SeqIO
# from email_validator import validate_email
import gradio as gr
import hydra
import pandas as pd
import plotly.express as px
import requests
from requests.adapters import HTTPAdapter, Retry
from rdkit import Chem
from rdkit.Chem import RDConfig, Descriptors, Draw, Lipinski, Crippen, PandasTools
from rdkit.Chem.Scaffolds import MurckoScaffold
import seaborn as sns
import swifter
from tqdm.auto import tqdm
from deepscreen.data.dti import rdkit_canonicalize, validate_seq_str, FASTA_PAT, SMILES_PAT
from deepscreen.predict import predict
sys.path.append(os.path.join(RDConfig.RDContribDir, 'SA_Score'))
import sascorer
ROOT = Path.cwd()
DATA_PATH = Path("./") # Path("/data")
DF_FOR_REPORT = pd.DataFrame()
pd.set_option('display.float_format', '{:.3f}'.format)
PandasTools.molRepresentation = 'svg'
PandasTools.drawOptions = Draw.rdMolDraw2D.MolDrawOptions()
PandasTools.drawOptions.clearBackground = False
PandasTools.drawOptions.bondLineWidth = 1.5
PandasTools.drawOptions.explicitMethyl = True
PandasTools.drawOptions.singleColourWedgeBonds = True
PandasTools.drawOptions.useCDKAtomPalette()
PandasTools.molSize = (128, 128)
SESSION = requests.Session()
ADAPTER = HTTPAdapter(max_retries=Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504]))
SESSION.mount('http://', ADAPTER)
SESSION.mount('https://', ADAPTER)
# SCHEDULER = BackgroundScheduler()
UNIPROT_ENDPOINT = 'https://rest.uniprot.org/uniprotkb/{query}'
CSS = """
.help-tip {
position: absolute;
display: block;
top: 0px;
right: 0px;
text-align: center;
background-color: #29b6f6;
border-radius: 50%;
width: 24px;
height: 24px;
font-size: 12px;
line-height: 26px;
cursor: default;
transition: all 0.5s cubic-bezier(0.55, 0, 0.1, 1);
}
.help-tip:hover {
cursor: pointer;
background-color: #ccc;
}
.help-tip:before {
content: '?';
font-weight: 700;
color: #fff;
z-index: 100;
}
.help-tip p {
visibility: hidden;
opacity: 0;
text-align: left;
background-color: #039be5;
padding: 20px;
width: 300px;
position: absolute;
border-radius: 4px;
right: -4px;
color: #fff;
font-size: 13px;
line-height: normal;
transform: scale(0.7);
transform-origin: 100% 0%;
transition: all 0.5s cubic-bezier(0.55, 0, 0.1, 1);
z-index: 100;
}
.help-tip:hover p {
cursor: default;
visibility: visible;
opacity: 1;
transform: scale(1.0);
}
.help-tip p:before {
position: absolute;
content: '';
width: 0;
height: 0;
border: 6px solid transparent;
border-bottom-color: #039be5;
right: 10px;
top: -12px;
}
.help-tip p:after {
width: 100%;
height: 40px;
content: '';
position: absolute;
top: -5px;
left: 0;
}
.help-tip a {
color: #fff;
font-weight: 700;
}
.help-tip a:hover, .help-tip a:focus {
color: #fff;
text-decoration: underline;
}
.upload_button {
background-color: #008000;
}
.absolute {
position: absolute;
}
#example {
padding: 0;
background: none;
border: none;
text-decoration: underline;
box-shadow: none;
text-align: left !important;
display: inline-block !important;
}
footer {
visibility: hidden
}
"""
class HelpTip:
def __new__(cls, text):
return gr.HTML(elem_classes="help-tip",
value=f'<p>{text}</p>'
)
def sa_score(row):
return sascorer.calculateScore((row['Compound']))
def mw(row):
return Chem.Descriptors.MolWt((row['Compound']))
def hbd(row):
return Lipinski.NumHDonors((row['Compound']))
def hba(row):
return Lipinski.NumHAcceptors((row['Compound']))
def logp(row):
return Crippen.MolLogP((row['Compound']))
SCORE_MAP = {
'SAscore': sa_score,
'RAscore': None, # https://github.com/reymond-group/RAscore
'SCScore': None, # https://pubs.acs.org/doi/10.1021/acs.jcim.7b00622
'LogP': logp, # https://www.rdkit.org/docs/source/rdkit.Chem.Crippen.html
'MW': mw, # https://www.rdkit.org/docs/source/rdkit.Chem.Descriptors.html
'HBD': hbd, # https://www.rdkit.org/docs/source/rdkit.Chem.Lipinski.html
'HBA': hba, # https://www.rdkit.org/docs/source/rdkit.Chem.Lipinski.html
'TopoPSA': None, # http://mordred-descriptor.github.io/documentation/master/api/mordred.TopoPSA.html
}
FILTER_MAP = {
'PAINS filter': None,
"Lipinski's rule of five": None, # https://gist.github.com/strets123/fdc4db6d450b66345f46
'ADMET filter': None,
'TCL filter': None
}
TASK_MAP = {
'Drug-target interaction': 'binary',
'Drug-target binding affinity': 'regression',
}
PRESET_MAP = {
'DeepDTA': 'deep_dta',
'DeepConvDTI': 'deep_conv_dti',
'GraphDTA': 'graph_dta',
'MGraphDTA': 'm_graph_dta',
'HyperAttentionDTI': 'hyper_attention_dti',
'MolTrans': 'mol_trans',
'TransformerCPI': 'transfomer_cpi',
'TransformerCPI2': 'transformer_cpi_2',
'DrugBAN': 'drug_ban',
'DrugVQA(Seq)': 'drug_vqa'
}
TARGET_FAMILY_MAP = {
'General': 'general',
'Kinase': 'kinases',
'Non-kinase enzyme': 'non-kinase_enzymes',
'Membrane receptor': 'membrane_receptors',
'Nuclear receptor': 'nuclear_receptors',
'Ion channel': 'ion_channels',
'Other protein targets': 'other_protein_targets',
}
TARGET_LIBRARY_MAP = {
# 'STITCH': 'stitch.csv',
'ChEMBL33 (all species)': 'ChEMBL33_all_spe_single_prot_info.csv',
'DrugBank (Human)': 'drugbank_human_py_annot.csv',
}
DRUG_LIBRARY_MAP = {
# 'ChEMBL': 'chembl.csv',
'DrugBank (Human)': 'drugbank_human_py_annot.csv',
}
MODE_LIST = [
'Drug screening',
'Drug repurposing',
'Drug-target pair'
]
COLUMN_ALIASES = {
'X1': 'Drug SMILES',
'X2': 'Target FASTA',
'ID1': 'Drug ID',
'ID2': 'Target ID',
}
URL = "https://ciddr-lab.ac.cn/deepseqreen"
def validate_columns(df, mandatory_cols):
missing_cols = [col for col in mandatory_cols if col not in df.columns]
if missing_cols:
error_message = (f"The following mandatory columns are missing "
f"in the uploaded dataset: {str(['X1', 'X2']).strip('[]')}.")
raise gr.Error(error_message)
def send_email(receiver, msg):
pass
def submit_predict(predict_filepath, task, preset, target_family, flag, progress=gr.Progress(track_tqdm=True)):
if flag:
job_id = flag
global COLUMN_ALIASES
task = TASK_MAP[task]
preset = PRESET_MAP[preset]
target_family = TARGET_FAMILY_MAP[target_family]
# email_hash = hashlib.sha256(email.encode()).hexdigest()
COLUMN_ALIASES = COLUMN_ALIASES | {
'Y': 'Actual interaction' if task == 'binary' else 'Actual affinity',
'Y^': 'Predicted interaction' if task == 'binary' else 'Predicted affinity'
}
# target_family_list = [target_family]
# for family in target_family_list:
# try:
prediction_df = pd.DataFrame()
with hydra.initialize(version_base="1.3", config_path="configs", job_name="webserver_inference"):
cfg = hydra.compose(
config_name="webserver_inference",
overrides=[f"task={task}",
f"preset={preset}",
f"ckpt_path=resources/checkpoints/{preset}-{task}-{target_family}.ckpt",
f"data.data_file='{str(predict_filepath)}'"])
predictions, _ = predict(cfg)
predictions = [pd.DataFrame(prediction) for prediction in predictions]
prediction_df = pd.concat([prediction_df, pd.concat(predictions, ignore_index=True)])
predictions_file = f'{job_id}_predictions.csv'
prediction_df.to_csv(predictions_file)
return [gr.Markdown(visible=True),
gr.File(predictions_file),
gr.State(False)]
#
# except Exception as e:
# raise gr.Error(str(e))
# email_lock = Path(f"outputs/{email_hash}.lock")
# with open(email_lock, "w") as file:
# record = {
# "email": email,
# "job_id": job_id
# }
# json.dump(record, file)
# def run_predict():
# TODO per-user submit usage
# # email_lock = Path(f"outputs/{email_hash}.lock")
# # with open(email_lock, "w") as file:
# # record = {
# # "email": email,
# # "job_id": job_id
# # }
# # json.dump(record, file)
#
# job_lock = DATA_PATH / f"outputs/{job_id}.lock"
# with open(job_lock, "w") as file:
# pass
#
# try:
# prediction_df = pd.DataFrame()
# for family in target_family_list:
# with hydra.initialize(version_base="1.3", config_path="configs", job_name="webserver_inference"):
# cfg = hydra.compose(
# config_name="webserver_inference",
# overrides=[f"task={task}",
# f"preset={preset}",
# f"ckpt_path=resources/checkpoints/{preset}-{task}-{family}.ckpt",
# f"data.data_file='{str(predict_dataset)}'"])
#
# predictions, _ = predict(cfg)
# predictions = [pd.DataFrame(prediction) for prediction in predictions]
# prediction_df = pd.concat([prediction_df, pd.concat(predictions, ignore_index=True)])
# prediction_df.to_csv(f'outputs/{job_id}.csv')
# # email_lock.unlink()
# job_lock.unlink()
#
# msg = (f'Your DeepSEQcreen prediction job (id: {job_id}) completed successfully. You may retrieve the '
# f'results and generate an analytical report at {URL} using the job id within 48 hours.')
# gr.Info(msg)
# except Exception as e:
# msg = (f'Your DeepSEQcreen prediction job (id: {job_id}) failed due to an error: "{str(e)}." You may '
# f'reach out to the author about the error through email ([email protected]).')
# raise gr.Error(str(e))
# finally:
# send_email(email, msg)
#
# # Run "predict" asynchronously
# threading.Thread(target=run_predict).start()
#
# msg = (f'Your DeepSEQcreen prediction job (id: {job_id}) started running. You may retrieve the results '
# f'and generate an analytical report at {URL} using the job id once the job is done. Only one job '
# f'per user is allowed at the same time.')
# send_email(email, msg)
# # Return the job id first
# return [
# gr.Blocks(visible=False),
# gr.Markdown(f"Your prediction job is running... "
# f"You may stay on this page or come back later to retrieve the results "
# f"Once you receive our email notification."),
# ]
def update_df(file, progress=gr.Progress(track_tqdm=True)):
global DF_FOR_REPORT
if file is not None:
df = pd.read_csv(file)
if df['X1'].nunique() > 1:
df['Scaffold SMILES'] = df['X1'].swifter.progress_bar(
desc=f"Calculating scaffold...").apply(MurckoScaffold.MurckoScaffoldSmilesFromSmiles)
# Add a new column with RDKit molecule objects
PandasTools.AddMoleculeColumnToFrame(df, smilesCol='X1', molCol='Compound',
includeFingerprints=False)
PandasTools.AddMoleculeColumnToFrame(df, smilesCol='Scaffold SMILES', molCol='Scaffold',
includeFingerprints=False)
DF_FOR_REPORT = df.copy()
pie_chart = None
value = None
if 'Y^' in DF_FOR_REPORT.columns:
value = 'Y^'
elif 'Y' in DF_FOR_REPORT.columns:
value = 'Y'
if value:
if DF_FOR_REPORT['X1'].nunique() > 1 >= DF_FOR_REPORT['X2'].nunique():
pie_chart = create_pie_chart(DF_FOR_REPORT, category='Scaffold SMILES', value=value, top_k=100)
elif DF_FOR_REPORT['X2'].nunique() > 1 >= DF_FOR_REPORT['X1'].nunique():
pie_chart = create_pie_chart(DF_FOR_REPORT, category='Target family', value=value, top_k=100)
return create_html_report(DF_FOR_REPORT), pie_chart
else:
return gr.HTML(''), gr.Plot()
def create_html_report(df, progress=gr.Progress(track_tqdm=True)):
cols_left = ['ID2', 'Y', 'Y^', 'ID1', 'Compound', 'Scaffold', 'Scaffold SMILES', ]
cols_right = ['X1', 'X2']
cols_left = [col for col in cols_left if col in df.columns]
cols_right = [col for col in cols_right if col in df.columns]
df = df[cols_left + (df.columns.drop(cols_left + cols_right).tolist()) + cols_right]
df['X2'] = df['X2'].apply(wrap_text)
df.rename(COLUMN_ALIASES, inplace=True)
styled_df = df.style
# styled_df = df.style.format("{:.2f}")
colors = sns.color_palette('husl', len(df.columns))
for i, col in enumerate(df.columns):
if pd.api.types.is_numeric_dtype(df[col]):
styled_df = styled_df.background_gradient(subset=col, cmap=sns.light_palette(colors[i], as_cmap=True))
# Return the DataFrame as HTML
PandasTools.RenderImagesInAllDataFrames(images=True)
html = df.to_html()
return f'<div style="overflow:auto; height: 500px;">{html}</div>'
# return gr.HTML(pn.widgets.Tabulator(df).embed())
# def create_pie_chart(df, category, value, top_k):
# df.rename(COLUMN_ALIASES, inplace=True)
# # Select the top_k records based on the value_col
# top_k_df = df.nlargest(top_k, value)
#
# # Count the frequency of each unique value in the category_col column
# category_counts = top_k_df[category].value_counts()
#
# # Convert the counts to a DataFrame
# data = pd.DataFrame({category: category_counts.index, 'value': category_counts.values})
#
# # Calculate the angle for each category
# data['angle'] = data['value']/data['value'].sum() * 2*pi
#
# # Assign colors
# data['color'] = Spectral11[0:len(category_counts)]
#
# # Create the plot
# p = figure(height=350, title="Pie Chart", toolbar_location=None,
# tools="hover", tooltips="@{}: @value".format(category), x_range=(-0.5, 1.0))
#
# p.wedge(x=0, y=1, radius=0.4,
# start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'),
# line_color="white", fill_color='color', legend_field=category, source=data)
#
# p.axis.axis_label = None
# p.axis.visible = False
# p.grid.grid_line_color = None
#
# return p
def create_pie_chart(df, category, value, top_k):
df = df.copy()
df.rename(COLUMN_ALIASES, inplace=True)
value = COLUMN_ALIASES.get(value, value)
# Select the top_k records based on the value_col
top_k_df = df.nlargest(top_k, value)
# Count the frequency of each unique value in the category_col column
category_counts = top_k_df[category].value_counts()
# Convert the counts to a DataFrame
data = pd.DataFrame({category: category_counts.index, 'value': category_counts.values})
# Create the plot
fig = px.pie(data, values='value', names=category, title=f'Top-{top_k} {category} in {value}')
fig.update_traces(textposition='inside', textinfo='percent+label')
return fig
def submit_report(score_list, filter_list, progress=gr.Progress(track_tqdm=True)):
df = DF_FOR_REPORT.copy()
try:
for filter_name in filter_list:
pass
for score_name in score_list:
df[score_name] = df.swifter.progress_bar(desc=f"Calculating {score_name}").apply(
SCORE_MAP[score_name], axis=1)
pie_chart = None
value = None
if 'Y^' in df.columns:
value = 'Y^'
elif 'Y' in df.columns:
value = 'Y'
if value:
if df['X1'].nunique() > 1 >= df['X2'].nunique():
pie_chart = create_pie_chart(df, category='Scaffold SMILES', value=value, top_k=100)
elif df['X2'].nunique() > 1 >= df['X1'].nunique():
pie_chart = create_pie_chart(df, category='Target famiy', value=value, top_k=100)
return create_html_report(df), pie_chart
except Exception as e:
raise gr.Error(str(e))
def check_job_status(job_id):
job_lock = DATA_PATH / f"{job_id}.lock"
job_file = DATA_PATH / f"{job_id}.csv"
if job_lock.is_file():
return {gr.Markdown(f"Your job ({job_id}) is still running... "
f"You may stay on this page or come back later to retrieve the results "
f"Once you receive our email notification."),
None,
None
}
elif job_file.is_file():
return {gr.Markdown(f"Your job ({job_id}) is done! Redirecting you to generate reports..."),
gr.Tabs(selected=3),
gr.File(str(job_lock))}
def wrap_text(text, line_length=60):
wrapper = textwrap.TextWrapper(width=line_length)
if text.startswith('>'):
sections = text.split('>')
wrapped_sections = []
for section in sections:
if not section:
continue
lines = section.split('\n')
seq_header = lines[0]
wrapped_seq = wrapper.fill(''.join(lines[1:]))
wrapped_sections.append(f">{seq_header}\n{wrapped_seq}")
return '\n'.join(wrapped_sections)
else:
return wrapper.fill(text)
def unwrap_text(text):
return text.strip.replece('\n', '')
def smiles_from_sdf(sdf_path):
with Chem.SDMolSupplier(sdf_path) as suppl:
return Chem.MolToSmiles(suppl[0])
theme = gr.themes.Base(spacing_size="sm", text_size='md').set(
background_fill_primary='#dfe6f0',
background_fill_secondary='#dfe6f0',
checkbox_label_background_fill='#dfe6f0',
checkbox_label_background_fill_hover='#dfe6f0',
checkbox_background_color='white',
checkbox_border_color='#4372c4',
border_color_primary='#4372c4',
border_color_accent='#4372c4',
button_primary_background_fill='#4372c4',
button_primary_text_color='white',
button_secondary_border_color='#4372c4',
body_text_color='#4372c4',
block_title_text_color='#4372c4',
block_label_text_color='#4372c4',
block_info_text_color='#505358',
block_border_color=None,
input_border_color='#4372c4',
panel_border_color='#4372c4',
input_background_fill='white',
code_background_fill='white',
)
with (gr.Blocks(theme=theme, title='DeepScreen', css=CSS) as demo):
run_state = gr.State(value=False)
screen_flag = gr.State(value=False)
identify_flag = gr.State(value=False)
infer_flag = gr.State(value=False)
with gr.Tabs() as tabs:
with gr.TabItem(label='Drug hit screening', id=0):
gr.Markdown('''
# <center>DeepSEQreen Drug Hit Screening</center>
<center>
To predict interactions/binding affinities of a single target against a library of drugs.
</center>
''')
with gr.Blocks() as screen_block:
with gr.Column() as screen_page:
with gr.Row():
with gr.Column(scale=4, variant='panel'):
target_fasta = gr.Code(label='Target sequence FASTA',
interactive=True, lines=5)
example_target = gr.Button(value='Example: Human MAPK14', elem_id='example')
with gr.Row():
with gr.Column(scale=1):
with gr.Group():
with gr.Row():
target_input_type = gr.Radio(label='Target input type',
choices=['Sequence', 'UniProt ID', 'Gene symbol'],
value='Sequence')
target_query = gr.Textbox(label='UniProt ID/Accession',
visible=False, interactive=True)
target_upload_btn = gr.UploadButton(label='Upload a FASTA file',
type='binary',
visible=True, variant='primary',
size='lg', elem_classes="upload_button")
target_query_btn = gr.Button(value='Query the sequence', variant='primary',
elem_classes='upload_button', visible=False)
with gr.Column(scale=1):
with gr.Row():
with gr.Group():
drug_screen_target_family = gr.Dropdown(
choices=list(TARGET_FAMILY_MAP.keys()),
value='General',
label='Target family', interactive=True)
# with gr.Column(scale=1, min_width=24):
auto_detect_btn = gr.Button(value='Auto-detect', variant='primary')
HelpTip(
"Target amino acid sequence in the FASTA format. Alternatively, you may use a "
"UniProt ID/accession to query UniProt database for the sequence of your target"
"of interest. You can also search on databases like UniProt, RCSB PDB, "
"NCBI Protein for the FASTA string representing your target of interest. If "
"the input FASTA contains multiple entities, only the first one will be used."
)
with gr.Column(variant='panel'):
with gr.Group():
drug_library = gr.Radio(label='Drug library',
choices=list(DRUG_LIBRARY_MAP.keys()) + ['Upload a drug library'])
drug_library_upload = gr.File(label='Custom drug library file', visible=True)
with gr.Row(variant='panel'):
drug_screen_task = gr.Radio(list(TASK_MAP.keys()), label='Task',
value='Drug-target interaction')
with gr.Column(scale=2):
with gr.Group():
drug_screen_preset = gr.Dropdown(list(PRESET_MAP.keys()), label='Model')
recommend_btn = gr.Button(value='Recommend a model', variant='primary')
HelpTip("We recommend the appropriate model for your use case based on model performance "
"in drug-target interaction or binding affinity prediction "
"benchmarked on different target families and real-world data scenarios.")
# drug_screen_email = gr.Textbox(
# label='Email (optional)',
# info="Your email will be used to send you notifications when your job finishes."
# )
with gr.Row(visible=True):
drug_screen_clr_btn = gr.ClearButton()
drug_screen_btn = gr.Button(value='SCREEN', variant='primary')
# TODO Modify the pd df directly with df['X2'] = target
screen_data_for_predict = gr.File(visible=False, file_count="single", type='filepath')
screen_waiting = gr.Markdown("""
<center>Your job is running... It might take a few minutes.
When it's done, you will be redirected to the report page.
Meanwhile, please leave the page on.</center>
""", visible=False)
with gr.TabItem(label='Target protein identification', id=1):
gr.Markdown('''
# <center>DeepSEQreen Target Protein Identification</center>
<center>
To predict interactions/binding affinities of a single drug against a library of targets.
</center>
''')
with gr.Blocks() as identify_block:
with gr.Column() as identify_page:
with gr.Row():
with gr.Group():
drug_type = gr.Dropdown(label='Drug input type',
choices=['SMILES', 'SDF'],
value='SMILES',
scale=1,
interactive=True)
drug_upload = gr.UploadButton(label='⤒ Upload a file')
drug_smiles = gr.Code(label='Drug canonical SMILES', interactive=True, scale=5, lines=5)
with gr.Column(scale=1):
HelpTip(
"""Drug molecule in the SMILES format. You may search on databases like
NCBI PubChem, ChEMBL, and DrugBank for the SMILES strings
representing your drugs of interest.
"""
)
example_drug = gr.Button(value='Example: Aspirin', elem_id='example')
with gr.Column(variant='panel'):
with gr.Group():
target_library = gr.Radio(label='Target library',
choices=list(TARGET_LIBRARY_MAP.keys()) + ['Upload a target library'])
target_library_upload = gr.File(label='Custom target library file', visible=True)
with gr.Row(visible=True):
target_identify_task = gr.Dropdown(list(TASK_MAP.keys()), label='Task')
HelpTip("Choose a preset model for making the predictions.")
target_identify_preset = gr.Dropdown(list(PRESET_MAP.keys()), label='Preset')
HelpTip("Choose the protein family of your target.")
target_identify_target_family = gr.Dropdown(choices=['General'],
value='General',
label='Target family')
# with gr.Row():
# target_identify_email = gr.Textbox(
# label='Email (optional)',
# info="Your email will be used to send you notifications when your job finishes."
# )
with gr.Row(visible=True):
target_identify_clr_btn = gr.ClearButton()
target_identify_btn = gr.Button(value='IDENTIFY', variant='primary')
identify_data_for_predict = gr.File(visible=False, file_count="single", type='filepath')
identify_waiting = gr.Markdown(f"Your job is running... It might take a few minutes."
f"When it's done, you will be redirected to the report page. "
f"Meanwhile, please leave the page on.",
visible=False)
with gr.TabItem(label='Interaction pair inference', id=2):
gr.Markdown('''
# <center>DeepSEQreen Interaction Pair Inference</center>
<center>
To predict interactions/binding affinities between any drug-target pairs.
</center>
''')
with gr.Blocks() as infer_block:
with gr.Column() as infer_page:
HelpTip("Upload a custom drug-target pair dataset. See the documentation for details.")
infer_data_for_predict = gr.File(
label='Prediction dataset file', file_count="single", type='filepath')
# TODO example dataset
# TODO download example dataset
with gr.Row(visible=True):
pair_infer_task = gr.Dropdown(list(TASK_MAP.keys()), label='Task')
HelpTip("Choose a preset model for making the predictions.")
pair_infer_preset = gr.Dropdown(list(PRESET_MAP.keys()), label='Preset')
HelpTip("Choose the protein family of your target.")
pair_infer_target_family = gr.Dropdown(choices=['General'],
label='Target family',
value='General')
# with gr.Row():
# pair_infer_email = gr.Textbox(
# label='Email (optional)',
# info="Your email will be used to send you notifications when your job finishes."
# )
with gr.Row(visible=True):
pair_infer_clr_btn = gr.ClearButton()
pair_infer_btn = gr.Button(value='INFER', variant='primary')
infer_waiting = gr.Markdown(f"Your job is running... It might take a few minutes."
f"When it's done, you will be redirected to the report page. "
f"Meanwhile, please leave the page on.",
visible=False)
with gr.TabItem(label='Chemical property report', id=3):
with gr.Blocks() as report:
gr.Markdown('''
# <center>DeepSEQreen Chemical Property Report</center>
<center>
To compute chemical properties for the predictions of drug hit screening,
target protein identification, and interaction pair inference. You may also upload
your own dataset.
</center>
''')
with gr.Row():
file_for_report = gr.File(interactive=True, type='filepath')
# df_original = gr.Dataframe(type="pandas", interactive=False, visible=False)
scores = gr.CheckboxGroup(list(SCORE_MAP.keys()), label='Scores')
filters = gr.CheckboxGroup(list(FILTER_MAP.keys()), label='Filters')
with gr.Row():
clear_btn = gr.ClearButton()
analyze_btn = gr.Button('REPORT', variant='primary')
with gr.Row():
with gr.Column(scale=3):
html_report = gr.HTML() # label='Results', visible=True)
ranking_pie_chart = gr.Plot(visible=False)
with gr.Row():
csv_download_btn = gr.Button('Download report (HTML)', variant='primary')
html_download_btn = gr.Button('Download raw data (CSV)', variant='primary')
def target_input_type_select(input_type):
match input_type:
case 'UniProt ID':
return [gr.UploadButton(visible=False),
gr.Textbox(visible=True, label='UniProt ID/accession', info=None, value=''),
gr.Button(visible=True)]
case 'Gene symbol':
return [gr.UploadButton(visible=False),
gr.Textbox(visible=True, label='Gene symbol/name', info='Organism: human', value=''),
gr.Button(visible=True)]
case 'Sequence':
return [gr.UploadButton(visible=True),
gr.Textbox(visible=False), gr.Button(visible=False)]
target_input_type.select(fn=target_input_type_select,
inputs=target_input_type, outputs=[target_upload_btn, target_query, target_query_btn],
show_progress=False)
def uniprot_query(query, input_type):
fasta_seq = ''
query = query.strip()
match input_type:
case 'UniProt ID':
query = f"{query.strip()}.fasta"
case 'Gene symbol':
query = f'search?query=organism_id:9606+AND+gene:{query}&format=fasta'
try:
fasta = SESSION.get(UNIPROT_ENDPOINT.format(query=query))
fasta.raise_for_status()
fasta_seq = fasta.text
except Exception as e:
raise gr.Warning(f"Failed to query FASTA from UniProt due to {str(e)}")
finally:
return fasta_seq
target_upload_btn.upload(fn=lambda x: x.decode(), inputs=target_upload_btn, outputs=target_fasta)
target_query_btn.click(uniprot_query, inputs=[target_query, target_input_type], outputs=target_fasta)
target_fasta.focus(fn=wrap_text, inputs=target_fasta, outputs=target_fasta, show_progress=False)
target_fasta.blur(fn=wrap_text, inputs=target_fasta, outputs=target_fasta, show_progress=False)
drug_smiles.focus(fn=wrap_text, inputs=drug_smiles, outputs=drug_smiles, show_progress=False)
drug_smiles.blur(fn=wrap_text, inputs=drug_smiles, outputs=drug_smiles, show_progress=False)
def example_fill(input_type):
match input_type:
case 'UniProt ID':
query = 'Q16539'
case 'Gene symbol':
query = 'MAPK14'
case _:
query = ''
return {target_query: query,
target_fasta: """
>sp|Q16539|MK14_HUMAN Mitogen-activated protein kinase 14 OS=Homo sapiens OX=9606 GN=MAPK14 PE=1 SV=3
MSQERPTFYRQELNKTIWEVPERYQNLSPVGSGAYGSVCAAFDTKTGLRVAVKKLSRPFQ
SIIHAKRTYRELRLLKHMKHENVIGLLDVFTPARSLEEFNDVYLVTHLMGADLNNIVKCQ
KLTDDHVQFLIYQILRGLKYIHSADIIHRDLKPSNLAVNEDCELKILDFGLARHTDDEMT
GYVATRWYRAPEIMLNWMHYNQTVDIWSVGCIMAELLTGRTLFPGTDHIDQLKLILRLVG
TPGAELLKKISSESARNYIQSLTQMPKMNFANVFIGANPLAVDLLEKMLVLDSDKRITAA
QALAHAYFAQYHDPDDEPVADPYDQSFESRDLLIDEWKSLTYDEVISFVPPPLDQEEMES
"""}
example_target.click(fn=example_fill, inputs=target_input_type,
outputs=[target_query, target_fasta], show_progress=False)
example_drug.click(fn=lambda: 'CC(=O)Oc1ccccc1C(=O)O', outputs=drug_smiles, show_progress=False)
def drug_screen_validate(fasta, library, library_upload, state):
if not state:
def process_target_fasta(sequence):
lines = sequence.strip().split("\n")
if lines[0].startswith(">"):
lines = lines[1:]
return ''.join(lines).split(">")[0]
fasta = process_target_fasta(fasta)
err = validate_seq_str(fasta, FASTA_PAT)
if err:
raise gr.Error(f'Found error(s) in your target fasta input: {err}')
if library in DRUG_LIBRARY_MAP.keys():
screen_df = pd.read_csv(Path('data/drug_libraries', DRUG_LIBRARY_MAP[library]))
else:
screen_df = pd.read_csv(library_upload)
validate_columns(screen_df, ['X1'])
screen_df['X2'] = fasta
job_id = uuid4()
temp_file = Path(f'{job_id}_temp.csv').resolve()
screen_df.to_csv(temp_file)
if temp_file.is_file():
return {screen_data_for_predict: str(temp_file),
screen_flag: job_id,
run_state: job_id}
else:
gr.Warning('You have another prediction job '
'(drug hit screening, target protein identification, or interation pair inference) '
'running in the session right now. '
'Please submit another job when your current job has finished.')
return {screen_flag: False}
def target_identify_validate(smiles, library, library_upload, state):
if not state:
err = validate_seq_str(smiles, SMILES_PAT)
if err:
raise gr.Error(f'Found error(s) in your compound SMILES input: {err}')
if library in TARGET_LIBRARY_MAP.keys():
identify_df = pd.read_csv(TARGET_LIBRARY_MAP['target_library'])
else:
identify_df = pd.read_csv(library_upload)
validate_columns(identify_df, ['X2'])
identify_df['X1'] = smiles
job_id = uuid4()
temp_file = Path(f'{job_id}_temp.csv').resolve()
identify_df.to_csv(temp_file)
if temp_file.is_file():
return {identify_data_for_predict: str(temp_file),
identify_flag: gr.State(job_id),
run_state: gr.State(job_id)}
else:
gr.Warning('You have another prediction job '
'(drug hit screening, target protein identification, or interation pair inference) '
'running in the session right now. '
'Please submit another job when your current job has finished.')
return {identify_flag: False}
def pair_infer_validate(drug_target_pair_upload, run_state):
if not run_state:
df = pd.read_csv(drug_target_pair_upload)
validate_columns(df, ['X1', 'X2'])
df['X1_ERR'] = df['X1'].swifter.apply(
validate_seq_str, regex=SMILES_PAT)
df['X2_ERR'] = df['X2'].swifter.apply(
validate_seq_str, regex=FASTA_PAT)
if not df['X1_ERR'].isna().all():
raise gr.Error(f"Encountered invalid SMILES:\n{df[~df['X1_ERR'].isna()][['X1', 'X1_ERR']]}")
if not df['X2_ERR'].isna().all():
raise gr.Error(f"Encountered invalid FASTA:\n{df[~df['X2_ERR'].isna()][['X2', 'X2_ERR']]}")
job_id = uuid4()
return {infer_flag: gr.State(job_id),
run_state: gr.State(job_id)}
else:
gr.Warning('You have another prediction job '
'(drug hit screening, target protein identification, or interation pair inference) '
'running in the session right now. '
'Please submit another job when your current job has finished.')
return {infer_flag: False}
drug_screen_btn.click(
fn=drug_screen_validate,
inputs=[target_fasta, drug_library, drug_library_upload, run_state], # , drug_screen_email],
outputs=[screen_data_for_predict, screen_flag, run_state]
).then(
fn=lambda: [gr.Column(visible=False), gr.Markdown(visible=True)],
outputs=[screen_page, screen_waiting]
).then(
fn=submit_predict,
inputs=[screen_data_for_predict, drug_screen_task, drug_screen_preset,
drug_screen_target_family, screen_flag], # , drug_screen_email],
outputs=[file_for_report, run_state]
).then(
fn=lambda: [gr.Column(visible=True), gr.Markdown(visible=False)],
outputs=[screen_page, screen_waiting]
)
target_identify_btn.click(
fn=target_identify_validate,
inputs=[drug_smiles, target_library, target_library_upload, run_state], # , drug_screen_email],
outputs=[identify_data_for_predict, identify_flag, run_state]
).then(
fn=lambda: [gr.Column(visible=False), gr.Markdown(visible=True)],
outputs=[identify_page, identify_waiting]
).then(
fn=submit_predict,
inputs=[identify_data_for_predict, target_identify_task, target_identify_preset,
target_identify_target_family, identify_flag], # , target_identify_email],
outputs=[file_for_report, run_state]
).then(
fn=lambda: [gr.Column(visible=True), gr.Markdown(visible=False)],
outputs=[identify_page, identify_waiting]
)
pair_infer_btn.click(
fn=pair_infer_validate,
inputs=[infer_data_for_predict, run_state], # , drug_screen_email],
outputs=[infer_flag, run_state]
).then(
fn=lambda: [gr.Column(visible=False), gr.Markdown(visible=True)],
outputs=[infer_page, infer_waiting]
).then(
fn=submit_predict,
inputs=[infer_data_for_predict, pair_infer_task, pair_infer_preset,
pair_infer_target_family, infer_flag], # , pair_infer_email],
outputs=[file_for_report, run_state]
).then(
fn=lambda: [gr.Column(visible=True), gr.Markdown(visible=False)],
outputs=[infer_page, infer_waiting]
)
# TODO background job from these 3 pipelines to update file_for_report
file_for_report.change(fn=update_df, inputs=file_for_report, outputs=[html_report, ranking_pie_chart])
analyze_btn.click(fn=submit_report, inputs=[scores, filters], outputs=[html_report, ranking_pie_chart])
# screen_waiting.change(fn=check_job_status, inputs=run_state, outputs=[pair_waiting, tabs, file_for_report],
# every=5)
# identify_waiting.change(fn=check_job_status, inputs=run_state, outputs=[identify_waiting, tabs, file_for_report],
# every=5)
# pair_waiting.change(fn=check_job_status, inputs=run_state, outputs=[pair_waiting, tabs, file_for_report],
# every=5)
# demo.load(None, None, None, js="() => {document.body.classList.remove('dark')}")
if __name__ == "__main__":
screen_block.queue(max_size=2)
identify_block.queue(max_size=2)
infer_block.queue(max_size=2)
report.queue(max_size=20)
# SCHEDULER.add_job(func=file_cleanup(), trigger="interval", seconds=60)
# SCHEDULER.start()
demo.launch(
# debug=True,
show_api=False,
# favicon_path=,
# inline=False
debug=True
)