MCP_Res / mcp /orchestrator.py
mgbam's picture
Update mcp/orchestrator.py
08c0325 verified
raw
history blame
3.86 kB
# ──────────────────────── mcp/orchestrator.py ─────────────────────────
"""Dual‑LLM orchestrator coordinating literature ↔ annotation ↔ trials.
Adds gene/variant enrichment with MyGene.info β†’ Ensembl β†’ OpenTargets β†’ cBio.
"""
import asyncio
from typing import Dict, Any, List
from mcp.arxiv import fetch_arxiv
from mcp.pubmed import fetch_pubmed
from mcp.nlp import extract_keywords
from mcp.umls import lookup_umls
from mcp.openfda import fetch_drug_safety
from mcp.clinicaltrials import search_trials
from mcp.gene_hub import resolve_gene # MyGene→Ensembl→OT
from mcp.cbio import fetch_cbio_variants
from mcp.openai_utils import ai_summarize, ai_qa
from mcp.gemini import gemini_summarize, gemini_qa
_DEF = "openai"
# ------------ light LLM router ------------
def _llm_router(llm: str):
if llm.lower() == "gemini":
return gemini_summarize, gemini_qa, "gemini"
return ai_summarize, ai_qa, "openai"
# ---------------- gene / variant enrichment --------------------------
async def _enrich_gene_block(keywords: List[str]) -> Dict[str, Any]:
out: List[Dict] = []
variants: Dict[str, List[Dict]] = {}
for kw in keywords:
g = await resolve_gene(kw)
if g:
out.append(g)
# fetch tumour variants – fire & forget (errors ignored)
try:
variants[kw] = await fetch_cbio_variants(kw)
except Exception:
variants[kw] = []
return {"genes": out, "variants": variants}
# ---------------- orchestrator entry‑points --------------------------
async def orchestrate_search(query: str, llm: str = _DEF) -> Dict[str, Any]:
"""Run search, summarise and join annotations for the UI."""
# literature ------------------------------------------------------
arxiv_task = asyncio.create_task(fetch_arxiv(query, max_results=20))
pubmed_task = asyncio.create_task(fetch_pubmed(query, max_results=20))
papers = sum(await asyncio.gather(arxiv_task, pubmed_task), [])
# NLP keyword extraction -----------------------------------------
blob = " ".join(p.get("summary", "") for p in papers)[:60_000]
keywords = extract_keywords(blob)[:12]
# enrichment (in parallel) ---------------------------------------
umls_f = [lookup_umls(k) for k in keywords]
fda_f = [fetch_drug_safety(k) for k in keywords]
gene_block = asyncio.create_task(_enrich_gene_block(keywords))
trials_task = asyncio.create_task(search_trials(query, max_studies=20))
umls, fda, gene_data, trials = await asyncio.gather(
asyncio.gather(*umls_f, return_exceptions=True),
asyncio.gather(*fda_f, return_exceptions=True),
gene_block,
trials_task,
)
# summarise via LLM ----------------------------------------------
summarise, _, engine_name = _llm_router(llm)
try:
summary = await summarise(blob)
except Exception:
summary = "LLM summarisation unavailable." # graceful fallback
return {
"papers": papers,
"umls": umls,
"drug_safety": fda,
"genes": gene_data["genes"],
"variants": gene_data["variants"],
"clinical_trials": trials,
"ai_summary": summary,
"llm_used": engine_name,
}
async def answer_ai_question(question: str, *, context: str, llm: str = _DEF) -> Dict[str, str]:
"""Follow‑up Q&A via selected LLM."""
_, qa_fn, _ = _llm_router(llm)
try:
answer = await qa_fn(question, context)
except Exception:
answer = "LLM unavailable or quota exceeded."
return {"answer": answer}