File size: 5,206 Bytes
86771dc 146b143 9965499 86771dc e33dfeb 146b143 e33dfeb 146b143 e33dfeb 86771dc e33dfeb 146b143 86771dc e33dfeb c72a1cd 3637999 e33dfeb 146b143 e33dfeb c72a1cd 146b143 c72a1cd 146b143 12007d6 146b143 12007d6 146b143 e33dfeb 9965499 c72a1cd 86771dc e33dfeb 86771dc 146b143 86771dc e33dfeb 146b143 c72a1cd e33dfeb 146b143 e33dfeb 146b143 c72a1cd 146b143 e33dfeb 86771dc 146b143 e33dfeb 146b143 e33dfeb 146b143 e33dfeb 9965499 146b143 e33dfeb 146b143 e33dfeb c72a1cd 146b143 9965499 e33dfeb 3d539ca 86771dc c72a1cd e33dfeb 86771dc e33dfeb 146b143 e33dfeb 146b143 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
#!/usr/bin/env python3
"""MedGenesis β orchestrator (v4.1, contextβsafe)
Runs an async pipeline that fetches literature, enriches with biomedical
APIs, and summarises via either OpenAI or Gemini. Fully resilient:
β’ HTTPS arXiv
β’ 403βproof ClinicalTrials.gov helper
β’ Filters out failed enrichment calls so UI never crashes
β’ Followβup QA passes `context=` kwarg (fixes TypeError)
"""
from __future__ import annotations
import asyncio
from typing import Any, Dict, List
# ββ async fetchers ββββββββββββββββββββββββββββββββββββββββββββββββββ
from mcp.arxiv import fetch_arxiv
from mcp.pubmed import fetch_pubmed
from mcp.nlp import extract_keywords
from mcp.umls import lookup_umls
from mcp.openfda import fetch_drug_safety
from mcp.ncbi import search_gene, get_mesh_definition
from mcp.disgenet import disease_to_genes
from mcp.mygene import fetch_gene_info
from mcp.ctgov import search_trials # v2βv1 helper
# ββ LLM helpers ββββββββββββββββββββββββββββββββββββββββββββββββββββ
from mcp.openai_utils import ai_summarize, ai_qa
from mcp.gemini import gemini_summarize, gemini_qa
# ------------------------------------------------------------------
# LLM router
# ------------------------------------------------------------------
_DEF_LLM = "openai"
def _llm_router(name: str | None):
if name and name.lower() == "gemini":
return gemini_summarize, gemini_qa, "gemini"
return ai_summarize, ai_qa, "openai"
# ------------------------------------------------------------------
# Keyword enrichment bundle (NCBI / MeSH / DisGeNET)
# ------------------------------------------------------------------
async def _enrich_keywords(keys: List[str]) -> Dict[str, Any]:
jobs: List[asyncio.Future] = []
for k in keys:
jobs += [search_gene(k), get_mesh_definition(k), disease_to_genes(k)]
res = await asyncio.gather(*jobs, return_exceptions=True)
genes, meshes, disg = [], [], []
for idx, r in enumerate(res):
if isinstance(r, Exception):
continue
bucket = idx % 3
if bucket == 0:
genes.extend(r)
elif bucket == 1:
meshes.append(r)
else:
disg.extend(r)
return {"genes": genes, "meshes": meshes, "disgenet": disg}
# ------------------------------------------------------------------
# Orchestrator main
# ------------------------------------------------------------------
async def orchestrate_search(query: str, *, llm: str = _DEF_LLM) -> Dict[str, Any]:
"""Fetch + enrich + summarise; returns dict for Streamlit UI."""
# 1) Literature --------------------------------------------------
arxiv_task = asyncio.create_task(fetch_arxiv(query, max_results=10))
pubmed_task = asyncio.create_task(fetch_pubmed(query, max_results=10))
papers: List[Dict] = []
for res in await asyncio.gather(arxiv_task, pubmed_task, return_exceptions=True):
if not isinstance(res, Exception):
papers.extend(res)
# 2) Keyword extraction -----------------------------------------
corpus = " ".join(p.get("summary", "") for p in papers)
keywords = extract_keywords(corpus)[:8]
# 3) Enrichment fanβout -----------------------------------------
umls_f = [lookup_umls(k) for k in keywords]
fda_f = [fetch_drug_safety(k) for k in keywords]
ncbi_f = asyncio.create_task(_enrich_keywords(keywords))
gene_f = asyncio.create_task(fetch_gene_info(query))
trials_f = asyncio.create_task(search_trials(query, max_studies=20))
umls, fda, ncbi, mygene, trials = await asyncio.gather(
asyncio.gather(*umls_f, return_exceptions=True),
asyncio.gather(*fda_f, return_exceptions=True),
ncbi_f,
gene_f,
trials_f,
)
# filter out failed calls --------------------------------------
umls = [u for u in umls if isinstance(u, dict)]
fda = [d for d in fda if isinstance(d, (dict, list))]
# 4) LLM summary -------------------------------------------------
summarize_fn, _, engine = _llm_router(llm)
ai_summary = await summarize_fn(corpus) if corpus else ""
# 5) Assemble payload -------------------------------------------
return {
"papers" : papers,
"umls" : umls,
"drug_safety" : fda,
"ai_summary" : ai_summary,
"llm_used" : engine,
"genes" : (ncbi["genes"] or []) + ([mygene] if mygene else []),
"mesh_defs" : ncbi["meshes"],
"gene_disease" : ncbi["disgenet"],
"clinical_trials": trials,
}
# ------------------------------------------------------------------
async def answer_ai_question(question: str, *, context: str, llm: str = _DEF_LLM) -> Dict[str, str]:
"""Followβup QA using selected LLM (context kwarg fixed)."""
_, qa_fn, _ = _llm_router(llm)
return {"answer": await qa_fn(question, context=context)}
|