MCP_Res / app.py
mgbam's picture
Update app.py
f9a0bdb verified
raw
history blame
10.8 kB
#!/usr/bin/env python3
# app.py – MedGenesis AI Β· Streamlit front-end (v3)
# ---------------------------------------------------
# β€’ Dual-LLM selector (OpenAI | Gemini)
# β€’ Robust PDF export (all Unicode β†’ Latin-1 safe)
# β€’ Lazy session-state handling so a failed background
# request never kills the whole app.
# β€’ New β€œVariants” tab (cBioPortal) + null-safe β€œGraph”
# and β€œMetrics” using the patched helpers.
import os, pathlib, asyncio, re
from pathlib import Path
import streamlit as st
import pandas as pd
import plotly.express as px
from streamlit_agraph import agraph
from fpdf import FPDF
from mcp.orchestrator import orchestrate_search, answer_ai_question
from mcp.workspace import get_workspace, save_query
from mcp.knowledge_graph import build_agraph
from mcp.graph_metrics import build_nx, get_top_hubs, get_density
# ── Streamlit telemetry dir fix ─────────────────────────────────────
os.environ["STREAMLIT_DATA_DIR"] = "/tmp/.streamlit"
os.environ["XDG_STATE_HOME"] = "/tmp"
os.environ["STREAMLIT_BROWSER_GATHERUSAGESTATS"] = "false"
pathlib.Path("/tmp/.streamlit").mkdir(parents=True, exist_ok=True)
ROOT = Path(__file__).parent
LOGO = ROOT / "assets" / "logo.png"
# ── PDF export helper (robust to ALL Unicode) ───────────────────────
def _latin1_safe(txt: str) -> str:
return txt.encode("latin-1", "replace").decode("latin-1")
def _pdf(papers):
pdf = FPDF()
pdf.set_auto_page_break(auto=True, margin=15)
pdf.add_page()
pdf.set_font("Helvetica", size=11)
pdf.cell(200, 8, _latin1_safe("MedGenesis AI – Results"), ln=True, align="C")
pdf.ln(3)
for i, p in enumerate(papers, 1):
pdf.set_font("Helvetica", "B", 11)
pdf.multi_cell(0, 7, _latin1_safe(f"{i}. {p['title']}"))
pdf.set_font("Helvetica", "", 9)
body = (
f"{p['authors']}\n"
f"{p['summary']}\n"
f"{p['link']}\n"
)
pdf.multi_cell(0, 6, _latin1_safe(body))
pdf.ln(1)
return pdf.output(dest="S").encode("latin-1", "replace")
# ── Sidebar workspace ───────────────────────────────────────────────
def _workspace_sidebar():
with st.sidebar:
st.header("πŸ—‚οΈ Workspace")
ws = get_workspace()
if not ws:
st.info("Run a search then press **Save** to populate this list.")
return
for i, item in enumerate(ws, 1):
with st.expander(f"{i}. {item['query']}"):
st.write(item["result"]["ai_summary"])
# ── UI main routine ─────────────────────────────────────────────────
def render_ui():
st.set_page_config("MedGenesis AI", layout="wide")
# Session-state defaults
for key, default in {
"query_result" : None,
"last_query" : "",
"last_llm" : "openai",
"followup_input" : "",
"followup_response": None,
}.items():
if key not in st.session_state:
st.session_state[key] = default
_workspace_sidebar()
# Header block
c1, c2 = st.columns([0.15, 0.85])
with c1:
if LOGO.exists():
st.image(str(LOGO), width=105)
with c2:
st.markdown("## 🧬 **MedGenesis AI**")
st.caption("Multi-source biomedical assistant – OpenAI / Gemini")
# Controls
llm = st.radio("LLM engine", ["openai", "gemini"],
horizontal=True, index=0)
query = st.text_input("Enter biomedical question",
placeholder="e.g. CRISPR glioblastoma therapy")
# Run search
if st.button("Run Search πŸš€") and query:
with st.spinner("Collecting literature & biomedical data …"):
res = asyncio.run(orchestrate_search(query, llm=llm))
st.session_state.query_result = res
st.session_state.last_query = query
st.session_state.last_llm = llm
st.session_state.followup_input = ""
st.session_state.followup_response = None
res = st.session_state.query_result
if res:
# Guard against missing keys
for key in (
"papers", "umls", "drug_safety", "genes", "mesh_defs",
"gene_disease", "clinical_trials", "variants"
):
res.setdefault(key, [])
# -------------- TABS -------------------------------------------------
tabs = st.tabs([
"Results", "Genes", "Trials", "Variants",
"Graph", "Metrics", "Visuals"
])
# ── Results tab ─────────────────────────────────────────────────────
with tabs[0]:
st.subheader("Literature")
for i, p in enumerate(res["papers"], 1):
st.markdown(f"**{i}. [{p['title']}]({p['link']})** *{p['authors']}*")
st.write(p["summary"])
col1, col2 = st.columns(2)
with col1:
st.download_button(
"CSV",
pd.DataFrame(res["papers"]).to_csv(index=False),
"papers.csv",
"text/csv",
)
with col2:
st.download_button(
"PDF",
_pdf(res["papers"]),
"papers.pdf",
"application/pdf",
)
if st.button("πŸ’Ύ Save"):
save_query(st.session_state.last_query, res)
st.success("Saved to workspace")
st.subheader("UMLS concepts")
for c in res["umls"]:
if c.get("cui"):
st.write(f"- **{c['name']}** ({c['cui']})")
st.subheader("OpenFDA safety signals")
for d in res["drug_safety"]:
st.json(d)
st.subheader("AI summary")
st.info(res["ai_summary"])
# ── Genes tab ───────────────────────────────────────────────────────
with tabs[1]:
st.header("Gene / Variant signals")
for g in res["genes"]:
lab = g.get("name") or g.get("symbol") or g.get("geneid")
st.write(f"- **{lab}**")
if res["gene_disease"]:
st.markdown("### DisGeNET associations")
st.json(res["gene_disease"][:15])
if res["mesh_defs"]:
st.markdown("### MeSH definitions")
for d in res["mesh_defs"]:
if d:
st.write("-", d)
# ── Trials tab ──────────────────────────────────────────────────────
with tabs[2]:
st.header("Clinical trials")
if not res["clinical_trials"]:
st.info("No trials (rate-limited or none found).")
for t in res["clinical_trials"]:
st.markdown(f"**{t['nctId']}** – {t['briefTitle']}")
st.write(f"Phase {t.get('phase')} | Status {t.get('status')}")
# ── Variants tab ────────────────────────────────────────────────────
with tabs[3]:
st.header("Cancer variants (cBioPortal)")
if not res["variants"]:
st.info("No variant data.")
else:
st.json(res["variants"][:50])
# ── Graph tab ───────────────────────────────────────────────────────
with tabs[4]:
nodes, edges, cfg = build_agraph(
res["papers"], res["umls"], res["drug_safety"]
)
hl = st.text_input("Highlight node:", key="hl")
if hl:
pat = re.compile(re.escape(hl), re.I)
for n in nodes:
n.color = "#f1c40f" if pat.search(n.label) else "#d3d3d3"
agraph(nodes, edges, cfg)
# ── Metrics tab ─────────────────────────────────────────────────────
with tabs[5]:
G = build_nx(
[n.__dict__ for n in nodes],
[e.__dict__ for e in edges],
)
st.metric("Density", f"{get_density(G):.3f}")
st.markdown("**Top hubs**")
for nid, sc in get_top_hubs(G):
lab = next((n.label for n in nodes if n.id == nid), nid)
st.write(f"- {lab} {sc:.3f}")
# ── Visuals tab ────────────────────────────────────────────────────
with tabs[6]:
years = [p.get("published", "")[:4] for p in res["papers"] if p.get("published")]
if years:
st.plotly_chart(px.histogram(years, nbins=12,
title="Publication Year"))
# ── Follow-up Q-A block ────────────────────────────────────────────
st.markdown("---")
st.text_input("Ask follow-up question:", key="followup_input")
def _on_ask():
q = st.session_state.followup_input.strip()
if not q:
st.warning("Please type a question first.")
return
with st.spinner("Querying LLM …"):
ans = asyncio.run(
answer_ai_question(
q,
context=st.session_state.last_query,
llm=st.session_state.last_llm,
)
)
st.session_state.followup_response = ans["answer"]
st.button("Ask AI", on_click=_on_ask)
if st.session_state.followup_response:
st.write(st.session_state.followup_response)
else:
st.info("Enter a question and press **Run Search πŸš€**")
if __name__ == "__main__":
render_ui()