CraAssitant / app.py
mgbam's picture
Update app.py
011cb58 verified
import os
import io
import json
import csv
import asyncio
import xml.etree.ElementTree as ET
from typing import Any, Dict, Optional, Tuple, Union, List
import httpx
import gradio as gr
import torch
from dotenv import load_dotenv
from loguru import logger
from huggingface_hub import login
from openai import OpenAI
from reportlab.pdfgen import canvas
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
MarianMTModel,
MarianTokenizer,
)
import pandas as pd
import altair as alt
import spacy
import spacy.cli
import PyPDF2
###############################################################################
# 1) ENVIRONMENT & LOGGING #
###############################################################################
# Ensure spaCy model is downloaded (English Core Web)
try:
nlp = spacy.load("en_core_web_sm")
except OSError:
logger.info("Downloading SpaCy 'en_core_web_sm' model...")
spacy.cli.download("en_core_web_sm")
nlp = spacy.load("en_core_web_sm")
# Logging
logger.add("error_logs.log", rotation="1 MB", level="ERROR")
# Load environment variables
load_dotenv()
HUGGINGFACE_TOKEN = os.getenv("HF_TOKEN")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
BIOPORTAL_API_KEY = os.getenv("BIOPORTAL_API_KEY") # For BioPortal integration
ENTREZ_EMAIL = os.getenv("ENTREZ_EMAIL")
if not HUGGINGFACE_TOKEN or not OPENAI_API_KEY:
logger.error("Missing Hugging Face or OpenAI credentials.")
raise ValueError("Missing credentials for Hugging Face or OpenAI.")
# Warn if BioPortal key is missing
if not BIOPORTAL_API_KEY:
logger.warning("BIOPORTAL_API_KEY is not set. BioPortal fetch calls will fail.")
# Hugging Face login
login(HUGGINGFACE_TOKEN)
# OpenAI
client = OpenAI(api_key=OPENAI_API_KEY)
# Device: CPU or GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
logger.info(f"Using device: {device}")
###############################################################################
# 2) HUGGING FACE & TRANSLATION MODEL SETUP #
###############################################################################
MODEL_NAME = "mgbam/bert-base-finetuned-mgbam"
try:
model = AutoModelForSequenceClassification.from_pretrained(
MODEL_NAME, use_auth_token=HUGGINGFACE_TOKEN
).to(device)
tokenizer = AutoTokenizer.from_pretrained(
MODEL_NAME, use_auth_token=HUGGINGFACE_TOKEN
)
except Exception as e:
logger.error(f"Model load error: {e}")
raise
try:
translation_model_name = "Helsinki-NLP/opus-mt-en-fr"
translation_model = MarianMTModel.from_pretrained(
translation_model_name, use_auth_token=HUGGINGFACE_TOKEN
).to(device)
translation_tokenizer = MarianTokenizer.from_pretrained(
translation_model_name, use_auth_token=HUGGINGFACE_TOKEN
)
except Exception as e:
logger.error(f"Translation model load error: {e}")
raise
# Language map for translation
LANGUAGE_MAP: Dict[str, Tuple[str, str]] = {
"English to French": ("en", "fr"),
"French to English": ("fr", "en"),
}
###############################################################################
# 3) API ENDPOINTS & CONSTANTS #
###############################################################################
PUBMED_SEARCH_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/esearch.fcgi"
PUBMED_FETCH_URL = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi"
EUROPE_PMC_BASE_URL = "https://www.ebi.ac.uk/europepmc/webservices/rest/search"
BIOPORTAL_API_BASE = "https://data.bioontology.org"
CROSSREF_API_URL = "https://api.crossref.org/works"
###############################################################################
# 4) HELPER FUNCTIONS #
###############################################################################
def safe_json_parse(text: str) -> Union[Dict[str, Any], None]:
"""Safely parse JSON."""
try:
return json.loads(text)
except json.JSONDecodeError as e:
logger.error(f"JSON parsing error: {e}")
return None
def parse_pubmed_xml(xml_data: str) -> List[Dict[str, Any]]:
"""Parse PubMed XML data into a structured list of articles."""
root = ET.fromstring(xml_data)
articles = []
for article in root.findall(".//PubmedArticle"):
pmid = article.findtext(".//PMID")
title = article.findtext(".//ArticleTitle")
abstract = article.findtext(".//AbstractText")
journal = article.findtext(".//Journal/Title")
pub_date_elem = article.find(".//JournalIssue/PubDate")
pub_date = None
if pub_date_elem is not None:
year = pub_date_elem.findtext("Year")
month = pub_date_elem.findtext("Month")
day = pub_date_elem.findtext("Day")
if year and month and day:
pub_date = f"{year}-{month}-{day}"
else:
pub_date = year
articles.append({
"PMID": pmid,
"Title": title,
"Abstract": abstract,
"Journal": journal,
"PublicationDate": pub_date,
})
return articles
###############################################################################
# 5) ASYNC FETCH FUNCTIONS #
###############################################################################
async def fetch_articles_by_nct_id(nct_id: str) -> Dict[str, Any]:
params = {"query": nct_id, "format": "json"}
async with httpx.AsyncClient() as client_http:
try:
resp = await client_http.get(EUROPE_PMC_BASE_URL, params=params)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"Error fetching articles for {nct_id}: {e}")
return {"error": str(e)}
async def fetch_articles_by_query(query_params: str) -> Dict[str, Any]:
"""Europe PMC query via JSON input."""
parsed_params = safe_json_parse(query_params)
if not parsed_params or not isinstance(parsed_params, dict):
return {"error": "Invalid JSON."}
query_string = " AND ".join(f"{k}:{v}" for k, v in parsed_params.items())
req_params = {"query": query_string, "format": "json"}
async with httpx.AsyncClient() as client_http:
try:
resp = await client_http.get(EUROPE_PMC_BASE_URL, params=req_params)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"Error fetching articles: {e}")
return {"error": str(e)}
async def fetch_pubmed_by_query(query_params: str) -> Dict[str, Any]:
parsed_params = safe_json_parse(query_params)
if not parsed_params or not isinstance(parsed_params, dict):
return {"error": "Invalid JSON for PubMed."}
search_params = {
"db": "pubmed",
"retmode": "json",
"email": ENTREZ_EMAIL,
"retmax": parsed_params.get("retmax", "10"),
"term": parsed_params.get("term", ""),
}
async with httpx.AsyncClient() as client_http:
try:
# Search PubMed
search_resp = await client_http.get(PUBMED_SEARCH_URL, params=search_params)
search_resp.raise_for_status()
data = search_resp.json()
id_list = data.get("esearchresult", {}).get("idlist", [])
if not id_list:
return {"result": ""}
# Fetch PubMed
fetch_params = {
"db": "pubmed",
"id": ",".join(id_list),
"retmode": "xml",
"email": ENTREZ_EMAIL,
}
fetch_resp = await client_http.get(PUBMED_FETCH_URL, params=fetch_params)
fetch_resp.raise_for_status()
return {"result": fetch_resp.text}
except Exception as e:
logger.error(f"Error fetching PubMed articles: {e}")
return {"error": str(e)}
async def fetch_crossref_by_query(query_params: str) -> Dict[str, Any]:
parsed_params = safe_json_parse(query_params)
if not parsed_params or not isinstance(parsed_params, dict):
return {"error": "Invalid JSON for Crossref."}
async with httpx.AsyncClient() as client_http:
try:
resp = await client_http.get(CROSSREF_API_URL, params=parsed_params)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"Error fetching Crossref data: {e}")
return {"error": str(e)}
async def fetch_bioportal_by_query(query_params: str) -> Dict[str, Any]:
"""
BioPortal fetch for medical ontologies/terminologies.
Expects JSON like: {"q": "cancer"}
See: https://data.bioontology.org/documentation
"""
if not BIOPORTAL_API_KEY:
return {"error": "No BioPortal API Key set."}
parsed_params = safe_json_parse(query_params)
if not parsed_params or not isinstance(parsed_params, dict):
return {"error": "Invalid JSON for BioPortal."}
search_term = parsed_params.get("q", "")
if not search_term:
return {"error": "No 'q' found in JSON. Provide a search term."}
url = f"{BIOPORTAL_API_BASE}/search"
headers = {"Authorization": f"apikey token={BIOPORTAL_API_KEY}"}
req_params = {"q": search_term}
async with httpx.AsyncClient() as client_http:
try:
resp = await client_http.get(url, params=req_params, headers=headers)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"Error fetching BioPortal data: {e}")
return {"error": str(e)}
###############################################################################
# 6) CORE FUNCTIONS #
###############################################################################
def summarize_text(text: str) -> str:
"""OpenAI GPT-3.5 summarization."""
if not text.strip():
return "No text provided for summarization."
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": f"Summarize this clinical data:\n{text}"}],
max_tokens=200,
temperature=0.7,
)
return response.choices[0].message.content.strip()
except Exception as e:
logger.error(f"Summarization error: {e}")
return "Summarization failed."
def predict_outcome(text: str) -> Union[Dict[str, float], str]:
"""Predict outcomes (classification) using a fine-tuned BERT model."""
if not text.strip():
return "No text provided for prediction."
try:
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
inputs = {k: v.to(device) for k, v in inputs.items()}
with torch.no_grad():
outputs = model(**inputs)
probabilities = torch.nn.functional.softmax(outputs.logits, dim=-1)[0]
return {f"Label {i+1}": float(prob.item()) for i, prob in enumerate(probabilities)}
except Exception as e:
logger.error(f"Prediction error: {e}")
return "Prediction failed."
def generate_report(text: str, filename: str = "clinical_report.pdf") -> Optional[str]:
"""Generate a professional PDF report from the text."""
try:
if not text.strip():
logger.warning("No text provided for the report.")
c = canvas.Canvas(filename)
c.drawString(100, 750, "Clinical Research Report")
lines = text.split("\n")
y = 730
for line in lines:
if y < 50:
c.showPage()
y = 750
c.drawString(100, y, line)
y -= 15
c.save()
logger.info(f"Report generated: {filename}")
return filename
except Exception as e:
logger.error(f"Report generation error: {e}")
return None
def visualize_predictions(predictions: Dict[str, float]) -> alt.Chart:
"""Simple Altair bar chart to visualize classification probabilities."""
data = pd.DataFrame(list(predictions.items()), columns=["Label", "Probability"])
chart = (
alt.Chart(data)
.mark_bar()
.encode(
x=alt.X("Label:N", sort=None),
y="Probability:Q",
tooltip=["Label", "Probability"],
)
.properties(title="Prediction Probabilities", width=500, height=300)
)
return chart
def translate_text(text: str, translation_option: str) -> str:
"""Translate text between English and French via MarianMT."""
if not text.strip():
return "No text provided for translation."
try:
if translation_option not in LANGUAGE_MAP:
return "Unsupported translation option."
inputs = translation_tokenizer(text, return_tensors="pt", padding=True).to(device)
translated_tokens = translation_model.generate(**inputs)
return translation_tokenizer.decode(translated_tokens[0], skip_special_tokens=True)
except Exception as e:
logger.error(f"Translation error: {e}")
return "Translation failed."
def perform_named_entity_recognition(text: str) -> str:
"""NER using spaCy (en_core_web_sm)."""
if not text.strip():
return "No text provided for NER."
try:
doc = nlp(text)
entities = [(ent.text, ent.label_) for ent in doc.ents]
if not entities:
return "No named entities found."
return "\n".join(f"{t} -> {lbl}" for t, lbl in entities)
except Exception as e:
logger.error(f"NER error: {e}")
return "NER failed."
###############################################################################
# 7) FILE PARSING (TXT, PDF, CSV, XLS) #
###############################################################################
def parse_pdf_file_as_str(file_up: gr.File) -> str:
"""Read PDF via PyPDF2. Attempt local path, else read from memory."""
pdf_path = file_up.name
if os.path.isfile(pdf_path):
with open(pdf_path, "rb") as f:
reader = PyPDF2.PdfReader(f)
return "\n".join(page.extract_text() or "" for page in reader.pages)
else:
if not hasattr(file_up, "file"):
raise ValueError("No .file attribute found for PDF.")
pdf_bytes = file_up.file.read()
reader = PyPDF2.PdfReader(io.BytesIO(pdf_bytes))
return "\n".join(page.extract_text() or "" for page in reader.pages)
def parse_text_file_as_str(file_up: gr.File) -> str:
"""Read .txt from path or fallback to memory."""
path = file_up.name
if os.path.isfile(path):
with open(path, "rb") as f:
return f.read().decode("utf-8", errors="replace")
else:
if not hasattr(file_up, "file"):
raise ValueError("No .file attribute for TXT.")
return file_up.file.read().decode("utf-8", errors="replace")
def parse_csv_file_to_df(file_up: gr.File) -> pd.DataFrame:
"""
Attempt multiple encodings for CSV: utf-8, utf-8-sig, latin1, ISO-8859-1.
"""
path = file_up.name
if os.path.isfile(path):
for enc in ["utf-8", "utf-8-sig", "latin1", "ISO-8859-1"]:
try:
return pd.read_csv(path, encoding=enc)
except UnicodeDecodeError:
logger.warning(f"CSV parse failed (enc={enc}). Trying next...")
except Exception as e:
logger.warning(f"CSV parse error (enc={enc}): {e}")
raise ValueError("Could not parse local CSV with known encodings.")
else:
if not hasattr(file_up, "file"):
raise ValueError("No .file attribute for CSV.")
raw_bytes = file_up.file.read()
for enc in ["utf-8", "utf-8-sig", "latin1", "ISO-8859-1"]:
try:
text_decoded = raw_bytes.decode(enc, errors="replace")
from io import StringIO
return pd.read_csv(StringIO(text_decoded))
except UnicodeDecodeError:
logger.warning(f"CSV in-memory parse failed (enc={enc}). Next...")
except Exception as e:
logger.warning(f"In-memory CSV error (enc={enc}): {e}")
raise ValueError("Could not parse in-memory CSV with known encodings.")
def parse_excel_file_to_df(file_up: gr.File) -> pd.DataFrame:
"""Read Excel from local path or memory (openpyxl)."""
path = file_up.name
if os.path.isfile(path):
return pd.read_excel(path, engine="openpyxl")
else:
if not hasattr(file_up, "file"):
raise ValueError("No .file attribute for Excel.")
excel_bytes = file_up.file.read()
return pd.read_excel(io.BytesIO(excel_bytes), engine="openpyxl")
###############################################################################
# 8) BUILDING THE GRADIO APP #
###############################################################################
with gr.Blocks() as demo:
gr.Markdown("# 🏥 AI-Driven Clinical Research Assistant")
gr.Markdown("""
**Highlights**:
- **Summarize** clinical text (OpenAI GPT-3.5)
- **Predict** with a specialized BERT-based model
- **Translate** (English ↔ French)
- **Named Entity Recognition** (spaCy)
- **Fetch** from PubMed, Crossref, Europe PMC, and **BioPortal**
- **Generate** professional PDF reports
""")
with gr.Row():
text_input = gr.Textbox(label="Input Text", lines=5, placeholder="Enter clinical text or notes...")
file_input = gr.File(
label="Upload File (txt/csv/xls/xlsx/pdf)",
file_types=[".txt", ".csv", ".xls", ".xlsx", ".pdf"]
)
action = gr.Radio(
[
"Summarize",
"Predict Outcome",
"Generate Report",
"Translate",
"Perform Named Entity Recognition",
"Fetch Clinical Studies",
"Fetch PubMed Articles (Legacy)",
"Fetch PubMed by Query",
"Fetch Crossref by Query",
"Fetch BioPortal by Query",
],
label="Select an Action",
)
translation_option = gr.Dropdown(
choices=list(LANGUAGE_MAP.keys()),
label="Translation Option",
value="English to French"
)
query_params_input = gr.Textbox(
label="Query Params (JSON)",
placeholder='{"term": "cancer"} or {"q": "cancer"} for BioPortal'
)
nct_id_input = gr.Textbox(label="NCT ID")
report_filename_input = gr.Textbox(label="Report Filename", value="clinical_report.pdf")
export_format = gr.Dropdown(choices=["None", "CSV", "JSON"], label="Export Format")
# Outputs
output_text = gr.Textbox(label="Output", lines=8)
with gr.Row():
output_chart = gr.Plot(label="Chart 1")
output_chart2 = gr.Plot(label="Chart 2")
output_file = gr.File(label="Generated File")
submit_btn = gr.Button("Submit")
################################################################
# 9) MAIN ACTION HANDLER (ASYNC) #
################################################################
import traceback
async def handle_action(
action: str,
txt: str,
file_up: gr.File,
translation_opt: str,
query_str: str,
nct_id: str,
report_fn: str,
exp_fmt: str
) -> Tuple[Optional[str], Optional[Any], Optional[Any], Optional[str]]:
"""
Master function to handle user actions.
Returns a 4-tuple mapped to (output_text, output_chart, output_chart2, output_file).
"""
try:
combined_text = txt.strip()
# 1) If user uploaded a file, parse minimal text from .txt/.pdf here
if file_up is not None:
ext = os.path.splitext(file_up.name)[1].lower()
if ext == ".txt":
try:
txt_data = parse_text_file_as_str(file_up)
combined_text += "\n" + txt_data
except Exception as e:
return f"TXT parse error: {e}", None, None, None
elif ext == ".pdf":
try:
pdf_data = parse_pdf_file_as_str(file_up)
combined_text += "\n" + pdf_data
except Exception as e:
return f"PDF parse error: {e}", None, None, None
# CSV and Excel are parsed *within* certain actions (e.g. Summarize)
# 2) Branch by action
if action == "Summarize":
if file_up:
fx = file_up.name.lower()
if fx.endswith(".csv"):
try:
df_csv = parse_csv_file_to_df(file_up)
combined_text += "\n" + df_csv.to_csv(index=False)
except Exception as e:
return f"CSV parse error (Summarize): {e}", None, None, None
elif fx.endswith((".xls", ".xlsx")):
try:
df_xl = parse_excel_file_to_df(file_up)
combined_text += "\n" + df_xl.to_csv(index=False)
except Exception as e:
return f"Excel parse error (Summarize): {e}", None, None, None
summary = summarize_text(combined_text)
return summary, None, None, None
elif action == "Predict Outcome":
if file_up:
fx = file_up.name.lower()
if fx.endswith(".csv"):
try:
df_csv = parse_csv_file_to_df(file_up)
combined_text += "\n" + df_csv.to_csv(index=False)
except Exception as e:
return f"CSV parse error (Predict): {e}", None, None, None
elif fx.endswith((".xls", ".xlsx")):
try:
df_xl = parse_excel_file_to_df(file_up)
combined_text += "\n" + df_xl.to_csv(index=False)
except Exception as e:
return f"Excel parse error (Predict): {e}", None, None, None
preds = predict_outcome(combined_text)
if isinstance(preds, dict):
chart = visualize_predictions(preds)
return json.dumps(preds, indent=2), chart, None, None
return preds, None, None, None
elif action == "Generate Report":
if file_up:
fx = file_up.name.lower()
if fx.endswith(".csv"):
try:
df_csv = parse_csv_file_to_df(file_up)
combined_text += "\n" + df_csv.to_csv(index=False)
except Exception as e:
return f"CSV parse error (Report): {e}", None, None, None
elif fx.endswith((".xls", ".xlsx")):
try:
df_xl = parse_excel_file_to_df(file_up)
combined_text += "\n" + df_xl.to_csv(index=False)
except Exception as e:
return f"Excel parse error (Report): {e}", None, None, None
path = generate_report(combined_text, report_fn)
msg = f"Report generated: {path}" if path else "Report generation failed."
return msg, None, None, path
elif action == "Translate":
if file_up:
fx = file_up.name.lower()
if fx.endswith(".csv"):
try:
df_csv = parse_csv_file_to_df(file_up)
combined_text += "\n" + df_csv.to_csv(index=False)
except Exception as e:
return f"CSV parse error (Translate): {e}", None, None, None
elif fx.endswith((".xls", ".xlsx")):
try:
df_xl = parse_excel_file_to_df(file_up)
combined_text += "\n" + df_xl.to_csv(index=False)
except Exception as e:
return f"Excel parse error (Translate): {e}", None, None, None
translated = translate_text(combined_text, translation_opt)
return translated, None, None, None
elif action == "Perform Named Entity Recognition":
if file_up:
fx = file_up.name.lower()
if fx.endswith(".csv"):
try:
df_csv = parse_csv_file_to_df(file_up)
combined_text += "\n" + df_csv.to_csv(index=False)
except Exception as e:
return f"CSV parse error (NER): {e}", None, None, None
elif fx.endswith((".xls", ".xlsx")):
try:
df_xl = parse_excel_file_to_df(file_up)
combined_text += "\n" + df_xl.to_csv(index=False)
except Exception as e:
return f"Excel parse error (NER): {e}", None, None, None
ner_result = perform_named_entity_recognition(combined_text)
return ner_result, None, None, None
elif action == "Fetch Clinical Studies":
if nct_id:
result = await fetch_articles_by_nct_id(nct_id)
elif query_str:
result = await fetch_articles_by_query(query_str)
else:
return "Provide either an NCT ID or valid query parameters.", None, None, None
articles = result.get("resultList", {}).get("result", [])
if not articles:
return "No articles found.", None, None, None
formatted = "\n\n".join(
f"Title: {a.get('title')}\nJournal: {a.get('journalTitle')} ({a.get('pubYear')})"
for a in articles
)
return formatted, None, None, None
elif action in ["Fetch PubMed Articles (Legacy)", "Fetch PubMed by Query"]:
pubmed_result = await fetch_pubmed_by_query(query_str)
xml_data = pubmed_result.get("result")
if xml_data:
articles = parse_pubmed_xml(xml_data)
if not articles:
return "No articles found.", None, None, None
formatted = "\n\n".join(
f"{a['Title']} - {a['Journal']} ({a['PublicationDate']})"
for a in articles if a['Title']
)
return formatted if formatted else "No articles found.", None, None, None
return "No articles found or error in fetching PubMed data.", None, None, None
elif action == "Fetch Crossref by Query":
crossref_result = await fetch_crossref_by_query(query_str)
items = crossref_result.get("message", {}).get("items", [])
if not items:
return "No results found.", None, None, None
crossref_formatted = "\n\n".join(
f"Title: {it.get('title', ['No title'])[0]}, DOI: {it.get('DOI')}"
for it in items
)
return crossref_formatted, None, None, None
elif action == "Fetch BioPortal by Query":
bp_result = await fetch_bioportal_by_query(query_str)
collection = bp_result.get("collection", [])
if not collection:
return "No BioPortal results found.", None, None, None
# Format listing
formatted = "\n\n".join(
f"Label: {col.get('prefLabel')}, ID: {col.get('@id')}"
for col in collection
)
return formatted, None, None, None
# Fallback
return "Invalid action.", None, None, None
except Exception as ex:
# Catch all exceptions, log, and return traceback to 'output_text'
tb_str = traceback.format_exc()
logger.error(f"Exception in handle_action:\n{tb_str}")
return f"Traceback:\n{tb_str}", None, None, None
submit_btn.click(
fn=handle_action,
inputs=[action, text_input, file_input, translation_option, query_params_input, nct_id_input, report_filename_input, export_format],
outputs=[output_text, output_chart, output_chart2, output_file],
)
# Launch the Gradio interface
demo.launch(server_name="0.0.0.0", server_port=7860, share=True)