File size: 3,861 Bytes
08c0325 9965499 f400521 874a822 08c0325 c72a1cd 3637999 e33dfeb 08c0325 eaba1ed f400521 08c0325 3d539ca 08c0325 86771dc f400521 08c0325 e33dfeb 08c0325 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# ββββββββββββββββββββββββ mcp/orchestrator.py βββββββββββββββββββββββββ
"""DualβLLM orchestrator coordinating literature β annotation β trials.
Adds gene/variant enrichment with MyGene.info β Ensembl β OpenTargets β cBio.
"""
import asyncio
from typing import Dict, Any, List
from mcp.arxiv import fetch_arxiv
from mcp.pubmed import fetch_pubmed
from mcp.nlp import extract_keywords
from mcp.umls import lookup_umls
from mcp.openfda import fetch_drug_safety
from mcp.clinicaltrials import search_trials
from mcp.gene_hub import resolve_gene # MyGeneβEnsemblβOT
from mcp.cbio import fetch_cbio_variants
from mcp.openai_utils import ai_summarize, ai_qa
from mcp.gemini import gemini_summarize, gemini_qa
_DEF = "openai"
# ------------ light LLM router ------------
def _llm_router(llm: str):
if llm.lower() == "gemini":
return gemini_summarize, gemini_qa, "gemini"
return ai_summarize, ai_qa, "openai"
# ---------------- gene / variant enrichment --------------------------
async def _enrich_gene_block(keywords: List[str]) -> Dict[str, Any]:
out: List[Dict] = []
variants: Dict[str, List[Dict]] = {}
for kw in keywords:
g = await resolve_gene(kw)
if g:
out.append(g)
# fetch tumour variants β fire & forget (errors ignored)
try:
variants[kw] = await fetch_cbio_variants(kw)
except Exception:
variants[kw] = []
return {"genes": out, "variants": variants}
# ---------------- orchestrator entryβpoints --------------------------
async def orchestrate_search(query: str, llm: str = _DEF) -> Dict[str, Any]:
"""Run search, summarise and join annotations for the UI."""
# literature ------------------------------------------------------
arxiv_task = asyncio.create_task(fetch_arxiv(query, max_results=20))
pubmed_task = asyncio.create_task(fetch_pubmed(query, max_results=20))
papers = sum(await asyncio.gather(arxiv_task, pubmed_task), [])
# NLP keyword extraction -----------------------------------------
blob = " ".join(p.get("summary", "") for p in papers)[:60_000]
keywords = extract_keywords(blob)[:12]
# enrichment (in parallel) ---------------------------------------
umls_f = [lookup_umls(k) for k in keywords]
fda_f = [fetch_drug_safety(k) for k in keywords]
gene_block = asyncio.create_task(_enrich_gene_block(keywords))
trials_task = asyncio.create_task(search_trials(query, max_studies=20))
umls, fda, gene_data, trials = await asyncio.gather(
asyncio.gather(*umls_f, return_exceptions=True),
asyncio.gather(*fda_f, return_exceptions=True),
gene_block,
trials_task,
)
# summarise via LLM ----------------------------------------------
summarise, _, engine_name = _llm_router(llm)
try:
summary = await summarise(blob)
except Exception:
summary = "LLM summarisation unavailable." # graceful fallback
return {
"papers": papers,
"umls": umls,
"drug_safety": fda,
"genes": gene_data["genes"],
"variants": gene_data["variants"],
"clinical_trials": trials,
"ai_summary": summary,
"llm_used": engine_name,
}
async def answer_ai_question(question: str, *, context: str, llm: str = _DEF) -> Dict[str, str]:
"""Followβup Q&A via selected LLM."""
_, qa_fn, _ = _llm_router(llm)
try:
answer = await qa_fn(question, context)
except Exception:
answer = "LLM unavailable or quota exceeded."
return {"answer": answer} |