MCP_Res / app.py
mgbam's picture
Update app.py
590f907 verified
raw
history blame
10.1 kB
#!/usr/bin/env python3
# MedGenesis AI Β· CPU-only Streamlit front-end (OpenAI / Gemini)
from __future__ import annotations
import os, pathlib, asyncio, re
from pathlib import Path
import streamlit as st
import pandas as pd
import plotly.express as px
from fpdf import FPDF
from streamlit_agraph import agraph
from mcp.orchestrator import orchestrate_search, answer_ai_question
from mcp.workspace import get_workspace, save_query, clear_workspace
from mcp.knowledge_graph import build_agraph
from mcp.graph_utils import build_nx, get_top_hubs, get_density
from mcp.alerts import check_alerts
# ── Streamlit telemetry dir fix ──────────────────────────────────────
os.environ["STREAMLIT_DATA_DIR"] = "/tmp/.streamlit"
os.environ["XDG_STATE_HOME"] = "/tmp"
os.environ["STREAMLIT_BROWSER_GATHERUSAGESTATS"] = "false"
pathlib.Path("/tmp/.streamlit").mkdir(parents=True, exist_ok=True)
ROOT = Path(__file__).parent
LOGO = ROOT / "assets" / "logo.png"
# -------------------------------------------------------------------#
# Utility helpers #
# -------------------------------------------------------------------#
def _latin1_safe(txt: str) -> str:
return txt.encode("latin-1", "replace").decode("latin-1")
def _pdf(papers: list[dict]) -> bytes:
pdf = FPDF()
pdf.set_auto_page_break(auto=True, margin=15)
pdf.add_page()
pdf.set_font("Helvetica", size=11)
pdf.cell(200, 8, _latin1_safe("MedGenesis AI – Results"), ln=True, align="C")
pdf.ln(3)
for i, p in enumerate(papers, 1):
pdf.set_font("Helvetica", "B", 11)
pdf.multi_cell(0, 7, _latin1_safe(f"{i}. {p['title']}"))
pdf.set_font("Helvetica", "", 9)
body = f"{p['authors']}\n{p['summary']}\n{p['link']}\n"
pdf.multi_cell(0, 6, _latin1_safe(body))
pdf.ln(1)
return pdf.output(dest="S").encode("latin-1", "replace")
def _workspace_sidebar() -> None:
with st.sidebar:
st.header("πŸ—‚οΈ Workspace")
ws = get_workspace()
if not ws:
st.info("Run a search then press **Save** to populate this list.")
return
if st.button("Clear workspace πŸ—‘οΈ"):
clear_workspace()
st.experimental_rerun()
for i, item in enumerate(ws, 1):
with st.expander(f"{i}. {item['query']}"):
st.write(item["result"]["ai_summary"])
# -------------------------------------------------------------------#
# Streamlit main UI #
# -------------------------------------------------------------------#
def render_ui() -> None:
st.set_page_config("MedGenesis AI", layout="wide")
# ── session_state bootstrap ────────────────────────────────────
for key, default in {
"query_result" : None,
"followup_input" : "",
"followup_response" : None,
"last_query" : "",
"last_llm" : "",
}.items():
st.session_state.setdefault(key, default)
_workspace_sidebar()
# ── header ─────────────────────────────────────────────────────
c1, c2 = st.columns([0.15, 0.85])
with c1:
if LOGO.exists():
st.image(str(LOGO), width=105)
with c2:
st.markdown("## 🧬 **MedGenesis AI**")
st.caption("Multi-source biomedical assistant Β· OpenAI / Gemini")
llm = st.radio("LLM engine", ["openai", "gemini"], horizontal=True)
query = st.text_input("Enter biomedical question",
placeholder="e.g. CRISPR glioblastoma therapy")
# ── alerts for saved queries ───────────────────────────────────
if ws := get_workspace():
try:
news = asyncio.run(check_alerts([w["query"] for w in ws]))
if news:
with st.sidebar:
st.subheader("πŸ”” New papers")
for q, lnks in news.items():
st.write(f"**{q}** – {len(lnks)} new")
except Exception:
pass
# ── primary search trigger ─────────────────────────────────────
if st.button("Run Search πŸš€") and query.strip():
with st.spinner("Collecting literature & biomedical data …"):
res = asyncio.run(orchestrate_search(query, llm=llm))
st.success(f"Completed with **{res['llm_used'].title()}**")
st.session_state.update({
"query_result" : res,
"last_query" : query,
"last_llm" : llm,
"followup_input" : "",
"followup_response" : None,
})
res = st.session_state.query_result
if not res:
st.info("Enter a question and press **Run Search πŸš€**")
return
# ----------------------------------------------------------------#
# Tabs #
# ----------------------------------------------------------------#
tabs = st.tabs(["Results", "Genes", "Trials", "Graph", "Metrics", "Visuals"])
# Results ---------------------------------------------------------
with tabs[0]:
for i, p in enumerate(res["papers"], 1):
st.markdown(f"**{i}. [{p['title']}]({p['link']})** *{p['authors']}*")
st.write(p["summary"])
c1, c2 = st.columns(2)
with c1:
st.download_button("CSV",
pd.DataFrame(res["papers"]).to_csv(index=False),
"papers.csv", "text/csv")
with c2:
st.download_button("PDF", _pdf(res["papers"]),
"papers.pdf", "application/pdf")
if st.button("πŸ’Ύ Save this result"):
save_query(st.session_state.last_query, res)
st.success("Saved to workspace")
st.subheader("UMLS concepts")
for c in res["umls"]:
if c.get("cui"):
st.write(f"- **{c['name']}** ({c['cui']})")
st.subheader("OpenFDA safety")
for d in res["drug_safety"]:
st.json(d)
st.subheader("AI summary")
st.info(res["ai_summary"])
# Genes -----------------------------------------------------------
with tabs[1]:
st.header("Gene / Variant signals")
if not res["genes"]:
st.info("No gene hits (rate-limited or none found).")
for g in res["genes"]:
st.write(f"- **{g.get('symbol', g.get('name', ''))}** "
f"{g.get('summary', '')[:120]}…")
if res["gene_disease"]:
st.markdown("### DisGeNET links")
st.json(res["gene_disease"][:15])
if res["mesh_defs"]:
st.markdown("### MeSH definitions")
for d in res["mesh_defs"]:
if d:
st.write("-", d)
# Trials ----------------------------------------------------------
with tabs[2]:
st.header("Clinical trials")
trials = res["clinical_trials"]
if not trials:
st.info("No trials (rate-limited or none found).")
for t in trials:
st.markdown(f"**{t['nctId']}** – {t['briefTitle']}")
st.write(f"Phase {t.get('phase','')} | Status {t.get('status')}")
# Graph -----------------------------------------------------------
with tabs[3]:
nodes, edges, cfg = build_agraph(
res["papers"], res["umls"], res["drug_safety"],
res["genes"], res["clinical_trials"], res.get("ot_associations", [])
)
hl = st.text_input("Highlight node:")
if hl:
pat = re.compile(re.escape(hl), re.I)
for n in nodes:
n.color = "#f1c40f" if pat.search(n.label) else "#d3d3d3"
agraph(nodes, edges, cfg)
# Metrics ---------------------------------------------------------
with tabs[4]:
G = build_nx([n.__dict__ for n in nodes], [e.__dict__ for e in edges])
st.metric("Density", f"{get_density(G):.3f}")
st.markdown("**Top hubs**")
for nid, sc in get_top_hubs(G):
lab = next((n.label for n in nodes if n.id == nid), nid)
st.write(f"- {lab} {sc:.3f}")
# Visuals ---------------------------------------------------------
with tabs[5]:
years = [p["published"] for p in res["papers"] if p.get("published")]
if years:
st.plotly_chart(px.histogram(years, nbins=12,
title="Publication Year"))
# ----------------------------------------------------------------#
# Follow-up Q & A #
# ----------------------------------------------------------------#
st.markdown("---")
st.text_input("Ask follow-up question:",
key="followup_input",
placeholder="e.g. Any phase III trials recruiting now?")
def _on_ask() -> None:
q = st.session_state.followup_input.strip()
if not q:
st.warning("Please type a question first.")
return
with st.spinner("Querying LLM …"):
ans = asyncio.run(
answer_ai_question(q,
context=st.session_state.last_query,
llm=st.session_state.last_llm))
st.session_state.followup_response = ans["answer"]
st.button("Ask AI", on_click=_on_ask)
if st.session_state.followup_response:
st.write(st.session_state.followup_response)
# -------------------------------------------------------------------#
if __name__ == "__main__":
render_ui()