|
|
|
|
|
import os, pathlib, asyncio, re |
|
from pathlib import Path |
|
|
|
import streamlit as st |
|
import pandas as pd |
|
import plotly.express as px |
|
from fpdf import FPDF |
|
from streamlit_agraph import agraph |
|
|
|
from mcp.orchestrator import orchestrate_search, answer_ai_question |
|
from mcp.workspace import get_workspace, save_query |
|
from mcp.knowledge_graph import build_agraph |
|
from mcp.graph_metrics import build_nx, get_top_hubs, get_density |
|
from mcp.alerts import check_alerts |
|
|
|
|
|
os.environ["STREAMLIT_DATA_DIR"] = "/tmp/.streamlit" |
|
os.environ["XDG_STATE_HOME"] = "/tmp" |
|
os.environ["STREAMLIT_BROWSER_GATHERUSAGESTATS"] = "false" |
|
pathlib.Path("/tmp/.streamlit").mkdir(parents=True, exist_ok=True) |
|
|
|
ROOT = Path(__file__).parent |
|
LOGO = ROOT / "assets" / "logo.png" |
|
|
|
def _latin1_safe(txt: str) -> str: |
|
return txt.encode("latin-1", "replace").decode("latin-1") |
|
|
|
def _pdf(papers): |
|
pdf = FPDF() |
|
pdf.set_auto_page_break(auto=True, margin=15) |
|
pdf.add_page() |
|
pdf.set_font("Helvetica", size=11) |
|
pdf.cell(200, 8, _latin1_safe("MedGenesis AI – Results"), ln=True, align="C") |
|
pdf.ln(3) |
|
for i, p in enumerate(papers, 1): |
|
pdf.set_font("Helvetica", "B", 11) |
|
pdf.multi_cell(0, 7, _latin1_safe(f"{i}. {p.get('title', '')}")) |
|
pdf.set_font("Helvetica", "", 9) |
|
body = f"{p.get('authors','')}\n{p.get('summary','')}\n{p.get('link','')}\n" |
|
pdf.multi_cell(0, 6, _latin1_safe(body)) |
|
pdf.ln(1) |
|
return pdf.output(dest="S").encode("latin-1", "replace") |
|
|
|
def _workspace_sidebar(): |
|
with st.sidebar: |
|
st.header("🗂️ Workspace") |
|
ws = get_workspace() |
|
if not ws: |
|
st.info("Run a search then press **Save** to populate this list.") |
|
return |
|
for i, item in enumerate(ws, 1): |
|
with st.expander(f"{i}. {item['query']}"): |
|
st.write(item["result"].get("ai_summary", "")) |
|
|
|
def render_ui(): |
|
st.set_page_config("MedGenesis AI", layout="wide") |
|
|
|
|
|
for k, v in [ |
|
("query_result", None), ("followup_input", ""), |
|
("followup_response", None), ("last_query", ""), ("last_llm", "") |
|
]: |
|
if k not in st.session_state: |
|
st.session_state[k] = v |
|
|
|
_workspace_sidebar() |
|
c1, c2 = st.columns([0.15, 0.85]) |
|
with c1: |
|
if LOGO.exists(): |
|
st.image(str(LOGO), width=105) |
|
with c2: |
|
st.markdown("## 🧬 **MedGenesis AI**") |
|
st.caption("Multi-source biomedical assistant · OpenAI / Gemini") |
|
|
|
llm = st.radio("LLM engine", ["openai", "gemini"], horizontal=True) |
|
query = st.text_input("Enter biomedical question", placeholder="e.g. CRISPR glioblastoma therapy") |
|
|
|
|
|
wsq = get_workspace() |
|
if wsq: |
|
try: |
|
news = asyncio.run(check_alerts([w["query"] for w in wsq])) |
|
if news: |
|
with st.sidebar: |
|
st.subheader("🔔 New papers") |
|
for q, lnks in news.items(): |
|
st.write(f"**{q}** – {len(lnks)} new") |
|
except Exception: |
|
pass |
|
|
|
if st.button("Run Search 🚀") and query: |
|
with st.spinner("Collecting literature & biomedical data …"): |
|
res = asyncio.run(orchestrate_search(query, llm=llm)) |
|
st.success(f"Completed with **{res.get('llm_used','LLM').title()}**") |
|
st.session_state.query_result = res |
|
st.session_state.last_query = query |
|
st.session_state.last_llm = llm |
|
st.session_state.followup_input = "" |
|
st.session_state.followup_response = None |
|
|
|
res = st.session_state.query_result |
|
if not res: |
|
st.info("Enter a question and press **Run Search 🚀**") |
|
return |
|
|
|
tabs = st.tabs(["Results", "Genes", "Trials", "Variants", "Graph", "Metrics", "Visuals"]) |
|
|
|
with tabs[0]: |
|
for i, p in enumerate(res.get("papers", []), 1): |
|
st.markdown(f"**{i}. [{p.get('title','')}]({p.get('link','')})** *{p.get('authors','')}*") |
|
st.write(p.get("summary", "")) |
|
col1, col2 = st.columns(2) |
|
with col1: |
|
st.download_button("CSV", pd.DataFrame(res.get("papers", [])).to_csv(index=False), |
|
"papers.csv", "text/csv") |
|
with col2: |
|
st.download_button("PDF", _pdf(res.get("papers", [])), "papers.pdf", "application/pdf") |
|
if st.button("💾 Save"): |
|
save_query(st.session_state.last_query, res) |
|
st.success("Saved to workspace") |
|
st.subheader("UMLS concepts") |
|
for c in res.get("umls", []): |
|
if isinstance(c, dict) and c.get("cui"): |
|
st.write(f"- **{c.get('name','')}** ({c.get('cui')})") |
|
st.subheader("OpenFDA safety signals") |
|
st.json(res.get("drug_safety", [])) |
|
st.subheader("AI summary") |
|
st.info(res.get("ai_summary", "")) |
|
|
|
|
|
with tabs[1]: |
|
st.header("Gene / Variant signals") |
|
genes = res.get("genes", []) |
|
if not genes: |
|
st.info("No gene hits (rate-limited or none found).") |
|
else: |
|
for g in genes: |
|
if isinstance(g, dict): |
|
lab = g.get("name") or g.get("symbol") or g.get("geneid") |
|
st.write(f"- **{lab}** {g.get('description','')}") |
|
if res.get("gene_disease"): |
|
st.markdown("### DisGeNET associations") |
|
st.json(res.get("gene_disease")[:15]) |
|
if res.get("mesh_defs"): |
|
st.markdown("### MeSH definitions") |
|
for d in res["mesh_defs"]: |
|
if d: |
|
st.write("-", d) |
|
|
|
|
|
with tabs[2]: |
|
st.header("Clinical trials") |
|
trials = res.get("clinical_trials", []) |
|
if not trials: |
|
st.info("No trials (rate-limited or none found).") |
|
else: |
|
for t in trials: |
|
nct = t.get("nctId") or (t.get("NCTId", [""])[0] if isinstance(t.get("NCTId"), list) else "") |
|
title = t.get("briefTitle") or (t.get("BriefTitle", [""])[0] if isinstance(t.get("BriefTitle"), list) else "") |
|
phase = t.get("phase") or (t.get("Phase", [""])[0] if isinstance(t.get("Phase"), list) else "") |
|
status = t.get("status") or (t.get("OverallStatus", [""])[0] if isinstance(t.get("OverallStatus"), list) else "") |
|
st.markdown(f"**{nct}** – {title}") |
|
st.write(f"Phase {phase} | Status {status}") |
|
|
|
|
|
with tabs[3]: |
|
st.header("Cancer variants (cBioPortal)") |
|
variants = res.get("variants", []) |
|
if not variants: |
|
st.info("No variant data.") |
|
else: |
|
for v in variants: |
|
st.json(v) |
|
|
|
|
|
with tabs[4]: |
|
nodes, edges, cfg = build_agraph(res.get("papers", []), res.get("umls", []), res.get("drug_safety", [])) |
|
hl = st.text_input("Highlight node:", key="hl") |
|
if hl: |
|
pat = re.compile(re.escape(hl), re.I) |
|
for n in nodes: |
|
n.color = "#f1c40f" if pat.search(n.label) else "#d3d3d3" |
|
agraph(nodes, edges, cfg) |
|
|
|
|
|
with tabs[5]: |
|
nodes, edges, _ = build_agraph(res.get("papers", []), res.get("umls", []), res.get("drug_safety", [])) |
|
G = build_nx([n.__dict__ for n in nodes], [e.__dict__ for e in edges]) |
|
st.metric("Density", f"{get_density(G):.3f}") |
|
st.markdown("**Top hubs**") |
|
for nid, sc in get_top_hubs(G): |
|
lab = next((n.label for n in nodes if n.id == nid), nid) |
|
st.write(f"- {lab} {sc:.3f}") |
|
|
|
|
|
with tabs[6]: |
|
years = [p.get("published", "") for p in res.get("papers", []) if p.get("published")] |
|
if years: |
|
st.plotly_chart(px.histogram(years, nbins=12, title="Publication Year")) |
|
|
|
|
|
st.markdown("---") |
|
st.text_input("Ask follow‑up question:", key="followup_input") |
|
def handle_followup(): |
|
follow = st.session_state.followup_input |
|
if follow.strip(): |
|
ans = asyncio.run(answer_ai_question( |
|
follow, |
|
context=st.session_state.last_query, |
|
llm=st.session_state.last_llm)) |
|
st.session_state.followup_response = ans.get("answer", "No answer.") |
|
else: |
|
st.session_state.followup_response = None |
|
st.button("Ask AI", on_click=handle_followup) |
|
if st.session_state.followup_response: |
|
st.write(st.session_state.followup_response) |
|
|
|
if __name__ == "__main__": |
|
render_ui() |
|
|