Spaces:
Runtime error
Runtime error
import os | |
import io | |
import json | |
import csv | |
import asyncio | |
import xml.etree.ElementTree as ET | |
from typing import Any, Dict, Optional, Tuple, Union, List | |
import httpx | |
import gradio as gr | |
import torch | |
from dotenv import load_dotenv | |
from loguru import logger | |
from huggingface_hub import login | |
from openai import OpenAI | |
from reportlab.pdfgen import canvas | |
from transformers import ( | |
AutoTokenizer, | |
AutoModelForSequenceClassification, | |
MarianMTModel, | |
MarianTokenizer, | |
) | |
import pandas as pd | |
import altair as alt | |
import spacy | |
import spacy.cli | |
import PyPDF2 | |
############################################################################### | |
# 1) ENVIRONMENT & LOGGING # | |
############################################################################### | |
# Ensure spaCy model is downloaded (English Core Web) | |
try: | |
nlp = spacy.load("en_core_web_sm") | |
except OSError: | |
logger.info("Downloading SpaCy 'en_core_web_sm' model...") | |
spacy.cli.download("en_core_web_sm") | |
nlp = spacy.load("en_core_web_sm") | |
# Logging | |
logger.add("error_logs.log", rotation="1 MB", level="ERROR") | |
# Load environment variables | |
load_dotenv() | |
HUGGINGFACE_TOKEN = os.getenv("HF_TOKEN") | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
BIOPORTAL_API_KEY = os.getenv("BIOPORTAL_API_KEY") # For BioPortal integration | |
ENTREZ_EMAIL = os.getenv("ENTREZ_EMAIL") | |
if not HUGGINGFACE_TOKEN or not OPENAI_API_KEY: | |
logger.error("Missing Hugging Face or OpenAI credentials.") | |
raise ValueError("Missing credentials for Hugging Face or OpenAI.") | |
# Warn if BioPortal key is missing | |
if not BIOPORTAL_API_KEY: | |
logger.warning("BIOPORTAL_API_KEY is not set. BioPortal fetch calls will fail.") | |
# Hugging Face login | |
login(HUGGINGFACE_TOKEN) | |
# OpenAI | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
# Device: CPU or GPU | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
logger.info(f"Using device: {device}") | |
############################################################################### | |
# 2) HUGGING FACE & TRANSLATION MODEL SETUP # | |
############################################################################### | |
MODEL_NAME = "mgbam/bert-base-finetuned-mgbam" | |
try: | |
model = AutoModelForSequenceClassification.from_pretrained( | |
MODEL_NAME, use_auth_token=HUGGINGFACE_TOKEN | |
).to(device) | |
tokenizer = AutoTokenizer.from_pretrained( | |
MODEL_NAME, use_auth_token=HUGGINGFACE_TOKEN | |
) | |
except Exception as e: | |
logger.error(f"Model load error: {e}") | |
raise | |
try: | |
translation_model_name = "Helsinki-NLP/opus-mt-en-fr" | |
translation_model = MarianMTModel.from_pretrained( | |
translation_model_name, use_auth_token=HUGGINGFACE_TOKEN | |
).to(device) | |
translation_tokenizer = MarianTokenizer.from_pretrained( | |
translation_model_name, use_auth_token=HUGGINGFACE_TOKEN | |
) | |
except Exception as e: | |
logger.error(f"Translation model load error: {e}") | |
raise | |
# Language map for translation | |
LANGUAGE_MAP: Dict[str, Tuple[str, str]] = { | |
"English to French": ("en", "fr"), | |
"French to English": ("fr", "en"), | |
} | |
############################################################################### | |
# 3) API ENDPOINTS & CONSTANTS # | |
############################################################################### | |
PUBMED_SEARCH_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi" | |
PUBMED_FETCH_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi" | |
EUROPE_PMC_BASE_URL = "https://www.ebi.ac.uk/europepmc/webservices/rest/search" | |
BIOPORTAL_API_BASE = "https://data.bioontology.org" | |
CROSSREF_API_URL = "https://api.crossref.org/works" | |
############################################################################### | |
# 4) HELPER FUNCTIONS # | |
############################################################################### | |
def safe_json_parse(text: str) -> Union[Dict[str, Any], None]: | |
"""Safely parse JSON.""" | |
try: | |
return json.loads(text) | |
except json.JSONDecodeError as e: | |
logger.error(f"JSON parsing error: {e}") | |
return None | |
def parse_pubmed_xml(xml_data: str) -> List[Dict[str, Any]]: | |
"""Parse PubMed XML data into a structured list of articles.""" | |
root = ET.fromstring(xml_data) | |
articles = [] | |
for article in root.findall(".//PubmedArticle"): | |
pmid = article.findtext(".//PMID") | |
title = article.findtext(".//ArticleTitle") | |
abstract = article.findtext(".//AbstractText") | |
journal = article.findtext(".//Journal/Title") | |
pub_date_elem = article.find(".//JournalIssue/PubDate") | |
pub_date = None | |
if pub_date_elem is not None: | |
year = pub_date_elem.findtext("Year") | |
month = pub_date_elem.findtext("Month") | |
day = pub_date_elem.findtext("Day") | |
if year and month and day: | |
pub_date = f"{year}-{month}-{day}" | |
else: | |
pub_date = year | |
articles.append({ | |
"PMID": pmid, | |
"Title": title, | |
"Abstract": abstract, | |
"Journal": journal, | |
"PublicationDate": pub_date, | |
}) | |
return articles | |
############################################################################### | |
# 5) ASYNC FETCH FUNCTIONS # | |
############################################################################### | |
async def fetch_articles_by_nct_id(nct_id: str) -> Dict[str, Any]: | |
params = {"query": nct_id, "format": "json"} | |
async with httpx.AsyncClient() as client_http: | |
try: | |
resp = await client_http.get(EUROPE_PMC_BASE_URL, params=params) | |
resp.raise_for_status() | |
return resp.json() | |
except Exception as e: | |
logger.error(f"Error fetching articles for {nct_id}: {e}") | |
return {"error": str(e)} | |
async def fetch_articles_by_query(query_params: str) -> Dict[str, Any]: | |
"""Europe PMC query via JSON input.""" | |
parsed_params = safe_json_parse(query_params) | |
if not parsed_params or not isinstance(parsed_params, dict): | |
return {"error": "Invalid JSON."} | |
query_string = " AND ".join(f"{k}:{v}" for k, v in parsed_params.items()) | |
req_params = {"query": query_string, "format": "json"} | |
async with httpx.AsyncClient() as client_http: | |
try: | |
resp = await client_http.get(EUROPE_PMC_BASE_URL, params=req_params) | |
resp.raise_for_status() | |
return resp.json() | |
except Exception as e: | |
logger.error(f"Error fetching articles: {e}") | |
return {"error": str(e)} | |
async def fetch_pubmed_by_query(query_params: str) -> Dict[str, Any]: | |
parsed_params = safe_json_parse(query_params) | |
if not parsed_params or not isinstance(parsed_params, dict): | |
return {"error": "Invalid JSON for PubMed."} | |
search_params = { | |
"db": "pubmed", | |
"retmode": "json", | |
"email": ENTREZ_EMAIL, | |
"retmax": parsed_params.get("retmax", "10"), | |
"term": parsed_params.get("term", ""), | |
} | |
async with httpx.AsyncClient() as client_http: | |
try: | |
# Search PubMed | |
search_resp = await client_http.get(PUBMED_SEARCH_URL, params=search_params) | |
search_resp.raise_for_status() | |
data = search_resp.json() | |
id_list = data.get("esearchresult", {}).get("idlist", []) | |
if not id_list: | |
return {"result": ""} | |
# Fetch PubMed | |
fetch_params = { | |
"db": "pubmed", | |
"id": ",".join(id_list), | |
"retmode": "xml", | |
"email": ENTREZ_EMAIL, | |
} | |
fetch_resp = await client_http.get(PUBMED_FETCH_URL, params=fetch_params) | |
fetch_resp.raise_for_status() | |
return {"result": fetch_resp.text} | |
except Exception as e: | |
logger.error(f"Error fetching PubMed articles: {e}") | |
return {"error": str(e)} | |
async def fetch_crossref_by_query(query_params: str) -> Dict[str, Any]: | |
parsed_params = safe_json_parse(query_params) | |
if not parsed_params or not isinstance(parsed_params, dict): | |
return {"error": "Invalid JSON for Crossref."} | |
async with httpx.AsyncClient() as client_http: | |
try: | |
resp = await client_http.get(CROSSREF_API_URL, params=parsed_params) | |
resp.raise_for_status() | |
return resp.json() | |
except Exception as e: | |
logger.error(f"Error fetching Crossref data: {e}") | |
return {"error": str(e)} | |
async def fetch_bioportal_by_query(query_params: str) -> Dict[str, Any]: | |
""" | |
BioPortal fetch for medical ontologies/terminologies. | |
Expects JSON like: {"q": "cancer"} | |
See: https://data.bioontology.org/documentation | |
""" | |
if not BIOPORTAL_API_KEY: | |
return {"error": "No BioPortal API Key set."} | |
parsed_params = safe_json_parse(query_params) | |
if not parsed_params or not isinstance(parsed_params, dict): | |
return {"error": "Invalid JSON for BioPortal."} | |
search_term = parsed_params.get("q", "") | |
if not search_term: | |
return {"error": "No 'q' found in JSON. Provide a search term."} | |
url = f"{BIOPORTAL_API_BASE}/search" | |
headers = {"Authorization": f"apikey token={BIOPORTAL_API_KEY}"} | |
req_params = {"q": search_term} | |
async with httpx.AsyncClient() as client_http: | |
try: | |
resp = await client_http.get(url, params=req_params, headers=headers) | |
resp.raise_for_status() | |
return resp.json() | |
except Exception as e: | |
logger.error(f"Error fetching BioPortal data: {e}") | |
return {"error": str(e)} | |
############################################################################### | |
# 6) CORE FUNCTIONS # | |
############################################################################### | |
def summarize_text(text: str) -> str: | |
"""OpenAI GPT-3.5 summarization.""" | |
if not text.strip(): | |
return "No text provided for summarization." | |
try: | |
response = client.chat.completions.create( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": f"Summarize this clinical data:\n{text}"}], | |
max_tokens=200, | |
temperature=0.7, | |
) | |
return response.choices[0].message.content.strip() | |
except Exception as e: | |
logger.error(f"Summarization error: {e}") | |
return "Summarization failed." | |
def predict_outcome(text: str) -> Union[Dict[str, float], str]: | |
"""Predict outcomes (classification) using a fine-tuned BERT model.""" | |
if not text.strip(): | |
return "No text provided for prediction." | |
try: | |
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
inputs = {k: v.to(device) for k, v in inputs.items()} | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)[0] | |
return {f"Label {i+1}": float(prob.item()) for i, prob in enumerate(probabilities)} | |
except Exception as e: | |
logger.error(f"Prediction error: {e}") | |
return "Prediction failed." | |
def generate_report(text: str, filename: str = "clinical_report.pdf") -> Optional[str]: | |
"""Generate a professional PDF report from the text.""" | |
try: | |
if not text.strip(): | |
logger.warning("No text provided for the report.") | |
c = canvas.Canvas(filename) | |
c.drawString(100, 750, "Clinical Research Report") | |
lines = text.split("\n") | |
y = 730 | |
for line in lines: | |
if y < 50: | |
c.showPage() | |
y = 750 | |
c.drawString(100, y, line) | |
y -= 15 | |
c.save() | |
logger.info(f"Report generated: {filename}") | |
return filename | |
except Exception as e: | |
logger.error(f"Report generation error: {e}") | |
return None | |
def visualize_predictions(predictions: Dict[str, float]) -> alt.Chart: | |
"""Simple Altair bar chart to visualize classification probabilities.""" | |
data = pd.DataFrame(list(predictions.items()), columns=["Label", "Probability"]) | |
chart = ( | |
alt.Chart(data) | |
.mark_bar() | |
.encode( | |
x=alt.X("Label:N", sort=None), | |
y="Probability:Q", | |
tooltip=["Label", "Probability"], | |
) | |
.properties(title="Prediction Probabilities", width=500, height=300) | |
) | |
return chart | |
def translate_text(text: str, translation_option: str) -> str: | |
"""Translate text between English and French via MarianMT.""" | |
if not text.strip(): | |
return "No text provided for translation." | |
try: | |
if translation_option not in LANGUAGE_MAP: | |
return "Unsupported translation option." | |
inputs = translation_tokenizer(text, return_tensors="pt", padding=True).to(device) | |
translated_tokens = translation_model.generate(**inputs) | |
return translation_tokenizer.decode(translated_tokens[0], skip_special_tokens=True) | |
except Exception as e: | |
logger.error(f"Translation error: {e}") | |
return "Translation failed." | |
def perform_named_entity_recognition(text: str) -> str: | |
"""NER using spaCy (en_core_web_sm).""" | |
if not text.strip(): | |
return "No text provided for NER." | |
try: | |
doc = nlp(text) | |
entities = [(ent.text, ent.label_) for ent in doc.ents] | |
if not entities: | |
return "No named entities found." | |
return "\n".join(f"{t} -> {lbl}" for t, lbl in entities) | |
except Exception as e: | |
logger.error(f"NER error: {e}") | |
return "NER failed." | |
############################################################################### | |
# 7) FILE PARSING (TXT, PDF, CSV, XLS) # | |
############################################################################### | |
def parse_pdf_file_as_str(file_up: gr.File) -> str: | |
"""Read PDF via PyPDF2. Attempt local path, else read from memory.""" | |
pdf_path = file_up.name | |
if os.path.isfile(pdf_path): | |
with open(pdf_path, "rb") as f: | |
reader = PyPDF2.PdfReader(f) | |
return "\n".join(page.extract_text() or "" for page in reader.pages) | |
else: | |
if not hasattr(file_up, "file"): | |
raise ValueError("No .file attribute found for PDF.") | |
pdf_bytes = file_up.file.read() | |
reader = PyPDF2.PdfReader(io.BytesIO(pdf_bytes)) | |
return "\n".join(page.extract_text() or "" for page in reader.pages) | |
def parse_text_file_as_str(file_up: gr.File) -> str: | |
"""Read .txt from path or fallback to memory.""" | |
path = file_up.name | |
if os.path.isfile(path): | |
with open(path, "rb") as f: | |
return f.read().decode("utf-8", errors="replace") | |
else: | |
if not hasattr(file_up, "file"): | |
raise ValueError("No .file attribute for TXT.") | |
return file_up.file.read().decode("utf-8", errors="replace") | |
def parse_csv_file_to_df(file_up: gr.File) -> pd.DataFrame: | |
""" | |
Attempt multiple encodings for CSV: utf-8, utf-8-sig, latin1, ISO-8859-1. | |
""" | |
path = file_up.name | |
if os.path.isfile(path): | |
for enc in ["utf-8", "utf-8-sig", "latin1", "ISO-8859-1"]: | |
try: | |
return pd.read_csv(path, encoding=enc) | |
except UnicodeDecodeError: | |
logger.warning(f"CSV parse failed (enc={enc}). Trying next...") | |
except Exception as e: | |
logger.warning(f"CSV parse error (enc={enc}): {e}") | |
raise ValueError("Could not parse local CSV with known encodings.") | |
else: | |
if not hasattr(file_up, "file"): | |
raise ValueError("No .file attribute for CSV.") | |
raw_bytes = file_up.file.read() | |
for enc in ["utf-8", "utf-8-sig", "latin1", "ISO-8859-1"]: | |
try: | |
text_decoded = raw_bytes.decode(enc, errors="replace") | |
from io import StringIO | |
return pd.read_csv(StringIO(text_decoded)) | |
except UnicodeDecodeError: | |
logger.warning(f"CSV in-memory parse failed (enc={enc}). Next...") | |
except Exception as e: | |
logger.warning(f"In-memory CSV error (enc={enc}): {e}") | |
raise ValueError("Could not parse in-memory CSV with known encodings.") | |
def parse_excel_file_to_df(file_up: gr.File) -> pd.DataFrame: | |
"""Read Excel from local path or memory (openpyxl).""" | |
path = file_up.name | |
if os.path.isfile(path): | |
return pd.read_excel(path, engine="openpyxl") | |
else: | |
if not hasattr(file_up, "file"): | |
raise ValueError("No .file attribute for Excel.") | |
excel_bytes = file_up.file.read() | |
return pd.read_excel(io.BytesIO(excel_bytes), engine="openpyxl") | |
############################################################################### | |
# 8) BUILDING THE GRADIO APP # | |
############################################################################### | |
with gr.Blocks() as demo: | |
gr.Markdown("# 🏥 AI-Driven Clinical Research Assistant") | |
gr.Markdown(""" | |
**Highlights**: | |
- **Summarize** clinical text (OpenAI GPT-3.5) | |
- **Predict** with a specialized BERT-based model | |
- **Translate** (English ↔ French) | |
- **Named Entity Recognition** (spaCy) | |
- **Fetch** from PubMed, Crossref, Europe PMC, and **BioPortal** | |
- **Generate** professional PDF reports | |
""") | |
with gr.Row(): | |
text_input = gr.Textbox(label="Input Text", lines=5, placeholder="Enter clinical text or notes...") | |
file_input = gr.File( | |
label="Upload File (txt/csv/xls/xlsx/pdf)", | |
file_types=[".txt", ".csv", ".xls", ".xlsx", ".pdf"] | |
) | |
action = gr.Radio( | |
[ | |
"Summarize", | |
"Predict Outcome", | |
"Generate Report", | |
"Translate", | |
"Perform Named Entity Recognition", | |
"Fetch Clinical Studies", | |
"Fetch PubMed Articles (Legacy)", | |
"Fetch PubMed by Query", | |
"Fetch Crossref by Query", | |
"Fetch BioPortal by Query", | |
], | |
label="Select an Action", | |
) | |
translation_option = gr.Dropdown( | |
choices=list(LANGUAGE_MAP.keys()), | |
label="Translation Option", | |
value="English to French" | |
) | |
query_params_input = gr.Textbox( | |
label="Query Params (JSON)", | |
placeholder='{"term": "cancer"} or {"q": "cancer"} for BioPortal' | |
) | |
nct_id_input = gr.Textbox(label="NCT ID") | |
report_filename_input = gr.Textbox(label="Report Filename", value="clinical_report.pdf") | |
export_format = gr.Dropdown(choices=["None", "CSV", "JSON"], label="Export Format") | |
# Outputs | |
output_text = gr.Textbox(label="Output", lines=8) | |
with gr.Row(): | |
output_chart = gr.Plot(label="Chart 1") | |
output_chart2 = gr.Plot(label="Chart 2") | |
output_file = gr.File(label="Generated File") | |
submit_btn = gr.Button("Submit") | |
################################################################ | |
# 9) MAIN ACTION HANDLER (ASYNC) # | |
################################################################ | |
import traceback | |
async def handle_action( | |
action: str, | |
txt: str, | |
file_up: gr.File, | |
translation_opt: str, | |
query_str: str, | |
nct_id: str, | |
report_fn: str, | |
exp_fmt: str | |
) -> Tuple[Optional[str], Optional[Any], Optional[Any], Optional[str]]: | |
""" | |
Master function to handle user actions. | |
Returns a 4-tuple mapped to (output_text, output_chart, output_chart2, output_file). | |
""" | |
try: | |
combined_text = txt.strip() | |
# 1) If user uploaded a file, parse minimal text from .txt/.pdf here | |
if file_up is not None: | |
ext = os.path.splitext(file_up.name)[1].lower() | |
if ext == ".txt": | |
try: | |
txt_data = parse_text_file_as_str(file_up) | |
combined_text += "\n" + txt_data | |
except Exception as e: | |
return f"TXT parse error: {e}", None, None, None | |
elif ext == ".pdf": | |
try: | |
pdf_data = parse_pdf_file_as_str(file_up) | |
combined_text += "\n" + pdf_data | |
except Exception as e: | |
return f"PDF parse error: {e}", None, None, None | |
# CSV and Excel are parsed *within* certain actions (e.g. Summarize) | |
# 2) Branch by action | |
if action == "Summarize": | |
if file_up: | |
fx = file_up.name.lower() | |
if fx.endswith(".csv"): | |
try: | |
df_csv = parse_csv_file_to_df(file_up) | |
combined_text += "\n" + df_csv.to_csv(index=False) | |
except Exception as e: | |
return f"CSV parse error (Summarize): {e}", None, None, None | |
elif fx.endswith((".xls", ".xlsx")): | |
try: | |
df_xl = parse_excel_file_to_df(file_up) | |
combined_text += "\n" + df_xl.to_csv(index=False) | |
except Exception as e: | |
return f"Excel parse error (Summarize): {e}", None, None, None | |
summary = summarize_text(combined_text) | |
return summary, None, None, None | |
elif action == "Predict Outcome": | |
if file_up: | |
fx = file_up.name.lower() | |
if fx.endswith(".csv"): | |
try: | |
df_csv = parse_csv_file_to_df(file_up) | |
combined_text += "\n" + df_csv.to_csv(index=False) | |
except Exception as e: | |
return f"CSV parse error (Predict): {e}", None, None, None | |
elif fx.endswith((".xls", ".xlsx")): | |
try: | |
df_xl = parse_excel_file_to_df(file_up) | |
combined_text += "\n" + df_xl.to_csv(index=False) | |
except Exception as e: | |
return f"Excel parse error (Predict): {e}", None, None, None | |
preds = predict_outcome(combined_text) | |
if isinstance(preds, dict): | |
chart = visualize_predictions(preds) | |
return json.dumps(preds, indent=2), chart, None, None | |
return preds, None, None, None | |
elif action == "Generate Report": | |
if file_up: | |
fx = file_up.name.lower() | |
if fx.endswith(".csv"): | |
try: | |
df_csv = parse_csv_file_to_df(file_up) | |
combined_text += "\n" + df_csv.to_csv(index=False) | |
except Exception as e: | |
return f"CSV parse error (Report): {e}", None, None, None | |
elif fx.endswith((".xls", ".xlsx")): | |
try: | |
df_xl = parse_excel_file_to_df(file_up) | |
combined_text += "\n" + df_xl.to_csv(index=False) | |
except Exception as e: | |
return f"Excel parse error (Report): {e}", None, None, None | |
path = generate_report(combined_text, report_fn) | |
msg = f"Report generated: {path}" if path else "Report generation failed." | |
return msg, None, None, path | |
elif action == "Translate": | |
if file_up: | |
fx = file_up.name.lower() | |
if fx.endswith(".csv"): | |
try: | |
df_csv = parse_csv_file_to_df(file_up) | |
combined_text += "\n" + df_csv.to_csv(index=False) | |
except Exception as e: | |
return f"CSV parse error (Translate): {e}", None, None, None | |
elif fx.endswith((".xls", ".xlsx")): | |
try: | |
df_xl = parse_excel_file_to_df(file_up) | |
combined_text += "\n" + df_xl.to_csv(index=False) | |
except Exception as e: | |
return f"Excel parse error (Translate): {e}", None, None, None | |
translated = translate_text(combined_text, translation_opt) | |
return translated, None, None, None | |
elif action == "Perform Named Entity Recognition": | |
if file_up: | |
fx = file_up.name.lower() | |
if fx.endswith(".csv"): | |
try: | |
df_csv = parse_csv_file_to_df(file_up) | |
combined_text += "\n" + df_csv.to_csv(index=False) | |
except Exception as e: | |
return f"CSV parse error (NER): {e}", None, None, None | |
elif fx.endswith((".xls", ".xlsx")): | |
try: | |
df_xl = parse_excel_file_to_df(file_up) | |
combined_text += "\n" + df_xl.to_csv(index=False) | |
except Exception as e: | |
return f"Excel parse error (NER): {e}", None, None, None | |
ner_result = perform_named_entity_recognition(combined_text) | |
return ner_result, None, None, None | |
elif action == "Fetch Clinical Studies": | |
if nct_id: | |
result = await fetch_articles_by_nct_id(nct_id) | |
elif query_str: | |
result = await fetch_articles_by_query(query_str) | |
else: | |
return "Provide either an NCT ID or valid query parameters.", None, None, None | |
articles = result.get("resultList", {}).get("result", []) | |
if not articles: | |
return "No articles found.", None, None, None | |
formatted = "\n\n".join( | |
f"Title: {a.get('title')}\nJournal: {a.get('journalTitle')} ({a.get('pubYear')})" | |
for a in articles | |
) | |
return formatted, None, None, None | |
elif action in ["Fetch PubMed Articles (Legacy)", "Fetch PubMed by Query"]: | |
pubmed_result = await fetch_pubmed_by_query(query_str) | |
xml_data = pubmed_result.get("result") | |
if xml_data: | |
articles = parse_pubmed_xml(xml_data) | |
if not articles: | |
return "No articles found.", None, None, None | |
formatted = "\n\n".join( | |
f"{a['Title']} - {a['Journal']} ({a['PublicationDate']})" | |
for a in articles if a['Title'] | |
) | |
return formatted if formatted else "No articles found.", None, None, None | |
return "No articles found or error in fetching PubMed data.", None, None, None | |
elif action == "Fetch Crossref by Query": | |
crossref_result = await fetch_crossref_by_query(query_str) | |
items = crossref_result.get("message", {}).get("items", []) | |
if not items: | |
return "No results found.", None, None, None | |
crossref_formatted = "\n\n".join( | |
f"Title: {it.get('title', ['No title'])[0]}, DOI: {it.get('DOI')}" | |
for it in items | |
) | |
return crossref_formatted, None, None, None | |
elif action == "Fetch BioPortal by Query": | |
bp_result = await fetch_bioportal_by_query(query_str) | |
collection = bp_result.get("collection", []) | |
if not collection: | |
return "No BioPortal results found.", None, None, None | |
# Format listing | |
formatted = "\n\n".join( | |
f"Label: {col.get('prefLabel')}, ID: {col.get('@id')}" | |
for col in collection | |
) | |
return formatted, None, None, None | |
# Fallback | |
return "Invalid action.", None, None, None | |
except Exception as ex: | |
# Catch all exceptions, log, and return traceback to 'output_text' | |
tb_str = traceback.format_exc() | |
logger.error(f"Exception in handle_action:\n{tb_str}") | |
return f"Traceback:\n{tb_str}", None, None, None | |
submit_btn.click( | |
fn=handle_action, | |
inputs=[action, text_input, file_input, translation_option, query_params_input, nct_id_input, report_filename_input, export_format], | |
outputs=[output_text, output_chart, output_chart2, output_file], | |
) | |
# Launch the Gradio interface | |
demo.launch(server_name="0.0.0.0", server_port=7860, share=True) | |