|
|
|
|
|
|
|
from __future__ import annotations |
|
import os, pathlib, asyncio, re |
|
from pathlib import Path |
|
|
|
import streamlit as st |
|
import pandas as pd |
|
import plotly.express as px |
|
from fpdf import FPDF |
|
from streamlit_agraph import agraph |
|
|
|
from mcp.orchestrator import orchestrate_search, answer_ai_question |
|
from mcp.workspace import get_workspace, save_query, clear_workspace |
|
from mcp.knowledge_graph import build_agraph |
|
from mcp.graph_utils import build_nx, get_top_hubs, get_density |
|
from mcp.alerts import check_alerts |
|
|
|
|
|
os.environ["STREAMLIT_DATA_DIR"] = "/tmp/.streamlit" |
|
os.environ["XDG_STATE_HOME"] = "/tmp" |
|
os.environ["STREAMLIT_BROWSER_GATHERUSAGESTATS"] = "false" |
|
pathlib.Path("/tmp/.streamlit").mkdir(parents=True, exist_ok=True) |
|
|
|
ROOT = Path(__file__).parent |
|
LOGO = ROOT / "assets" / "logo.png" |
|
|
|
|
|
|
|
|
|
def _latin1_safe(txt: str) -> str: |
|
return txt.encode("latin-1", "replace").decode("latin-1") |
|
|
|
|
|
def _pdf(papers: list[dict]) -> bytes: |
|
pdf = FPDF() |
|
pdf.set_auto_page_break(auto=True, margin=15) |
|
pdf.add_page() |
|
pdf.set_font("Helvetica", size=11) |
|
pdf.cell(200, 8, _latin1_safe("MedGenesis AI β Results"), ln=True, align="C") |
|
pdf.ln(3) |
|
for i, p in enumerate(papers, 1): |
|
pdf.set_font("Helvetica", "B", 11) |
|
pdf.multi_cell(0, 7, _latin1_safe(f"{i}. {p['title']}")) |
|
pdf.set_font("Helvetica", "", 9) |
|
body = f"{p['authors']}\n{p['summary']}\n{p['link']}\n" |
|
pdf.multi_cell(0, 6, _latin1_safe(body)) |
|
pdf.ln(1) |
|
return pdf.output(dest="S").encode("latin-1", "replace") |
|
|
|
|
|
def _workspace_sidebar() -> None: |
|
with st.sidebar: |
|
st.header("ποΈ Workspace") |
|
ws = get_workspace() |
|
if not ws: |
|
st.info("Run a search then press **Save** to populate this list.") |
|
return |
|
if st.button("Clear workspace ποΈ"): |
|
clear_workspace() |
|
st.experimental_rerun() |
|
for i, item in enumerate(ws, 1): |
|
with st.expander(f"{i}. {item['query']}"): |
|
st.write(item["result"]["ai_summary"]) |
|
|
|
|
|
|
|
|
|
|
|
def render_ui() -> None: |
|
st.set_page_config("MedGenesis AI", layout="wide") |
|
|
|
|
|
for key, default in { |
|
"query_result" : None, |
|
"followup_input" : "", |
|
"followup_response" : None, |
|
"last_query" : "", |
|
"last_llm" : "", |
|
}.items(): |
|
st.session_state.setdefault(key, default) |
|
|
|
_workspace_sidebar() |
|
|
|
|
|
c1, c2 = st.columns([0.15, 0.85]) |
|
with c1: |
|
if LOGO.exists(): |
|
st.image(str(LOGO), width=105) |
|
with c2: |
|
st.markdown("## 𧬠**MedGenesis AI**") |
|
st.caption("Multi-source biomedical assistant Β· OpenAI / Gemini") |
|
|
|
llm = st.radio("LLM engine", ["openai", "gemini"], horizontal=True) |
|
query = st.text_input("Enter biomedical question", |
|
placeholder="e.g. CRISPR glioblastoma therapy") |
|
|
|
|
|
if ws := get_workspace(): |
|
try: |
|
news = asyncio.run(check_alerts([w["query"] for w in ws])) |
|
if news: |
|
with st.sidebar: |
|
st.subheader("π New papers") |
|
for q, lnks in news.items(): |
|
st.write(f"**{q}** β {len(lnks)} new") |
|
except Exception: |
|
pass |
|
|
|
|
|
if st.button("Run Search π") and query.strip(): |
|
with st.spinner("Collecting literature & biomedical data β¦"): |
|
res = asyncio.run(orchestrate_search(query, llm=llm)) |
|
st.success(f"Completed with **{res['llm_used'].title()}**") |
|
st.session_state.update({ |
|
"query_result" : res, |
|
"last_query" : query, |
|
"last_llm" : llm, |
|
"followup_input" : "", |
|
"followup_response" : None, |
|
}) |
|
|
|
res = st.session_state.query_result |
|
if not res: |
|
st.info("Enter a question and press **Run Search π**") |
|
return |
|
|
|
|
|
|
|
|
|
tabs = st.tabs(["Results", "Genes", "Trials", "Graph", "Metrics", "Visuals"]) |
|
|
|
|
|
with tabs[0]: |
|
for i, p in enumerate(res["papers"], 1): |
|
st.markdown(f"**{i}. [{p['title']}]({p['link']})** *{p['authors']}*") |
|
st.write(p["summary"]) |
|
|
|
c1, c2 = st.columns(2) |
|
with c1: |
|
st.download_button("CSV", |
|
pd.DataFrame(res["papers"]).to_csv(index=False), |
|
"papers.csv", "text/csv") |
|
with c2: |
|
st.download_button("PDF", _pdf(res["papers"]), |
|
"papers.pdf", "application/pdf") |
|
|
|
if st.button("πΎ Save this result"): |
|
save_query(st.session_state.last_query, res) |
|
st.success("Saved to workspace") |
|
|
|
st.subheader("UMLS concepts") |
|
for c in res["umls"]: |
|
if c.get("cui"): |
|
st.write(f"- **{c['name']}** ({c['cui']})") |
|
|
|
st.subheader("OpenFDA safety") |
|
for d in res["drug_safety"]: |
|
st.json(d) |
|
|
|
st.subheader("AI summary") |
|
st.info(res["ai_summary"]) |
|
|
|
|
|
with tabs[1]: |
|
st.header("Gene / Variant signals") |
|
if not res["genes"]: |
|
st.info("No gene hits (rate-limited or none found).") |
|
for g in res["genes"]: |
|
st.write(f"- **{g.get('symbol', g.get('name', ''))}** " |
|
f"{g.get('summary', '')[:120]}β¦") |
|
|
|
if res["gene_disease"]: |
|
st.markdown("### DisGeNET links") |
|
st.json(res["gene_disease"][:15]) |
|
|
|
if res["mesh_defs"]: |
|
st.markdown("### MeSH definitions") |
|
for d in res["mesh_defs"]: |
|
if d: |
|
st.write("-", d) |
|
|
|
|
|
with tabs[2]: |
|
st.header("Clinical trials") |
|
trials = res["clinical_trials"] |
|
if not trials: |
|
st.info("No trials (rate-limited or none found).") |
|
for t in trials: |
|
st.markdown(f"**{t['nctId']}** β {t['briefTitle']}") |
|
st.write(f"Phase {t.get('phase','')} | Status {t.get('status')}") |
|
|
|
|
|
with tabs[3]: |
|
nodes, edges, cfg = build_agraph( |
|
res["papers"], res["umls"], res["drug_safety"], |
|
res["genes"], res["clinical_trials"], res.get("ot_associations", []) |
|
) |
|
hl = st.text_input("Highlight node:") |
|
if hl: |
|
pat = re.compile(re.escape(hl), re.I) |
|
for n in nodes: |
|
n.color = "#f1c40f" if pat.search(n.label) else "#d3d3d3" |
|
agraph(nodes, edges, cfg) |
|
|
|
|
|
with tabs[4]: |
|
G = build_nx([n.__dict__ for n in nodes], [e.__dict__ for e in edges]) |
|
st.metric("Density", f"{get_density(G):.3f}") |
|
st.markdown("**Top hubs**") |
|
for nid, sc in get_top_hubs(G): |
|
lab = next((n.label for n in nodes if n.id == nid), nid) |
|
st.write(f"- {lab} {sc:.3f}") |
|
|
|
|
|
with tabs[5]: |
|
years = [p["published"] for p in res["papers"] if p.get("published")] |
|
if years: |
|
st.plotly_chart(px.histogram(years, nbins=12, |
|
title="Publication Year")) |
|
|
|
|
|
|
|
|
|
st.markdown("---") |
|
st.text_input("Ask follow-up question:", |
|
key="followup_input", |
|
placeholder="e.g. Any phase III trials recruiting now?") |
|
|
|
def _on_ask() -> None: |
|
q = st.session_state.followup_input.strip() |
|
if not q: |
|
st.warning("Please type a question first.") |
|
return |
|
with st.spinner("Querying LLM β¦"): |
|
ans = asyncio.run( |
|
answer_ai_question(q, |
|
context=st.session_state.last_query, |
|
llm=st.session_state.last_llm)) |
|
st.session_state.followup_response = ans["answer"] |
|
|
|
st.button("Ask AI", on_click=_on_ask) |
|
if st.session_state.followup_response: |
|
st.write(st.session_state.followup_response) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
render_ui() |
|
|