MCP_Res / mcp /orchestrator.py
mgbam's picture
Update mcp/orchestrator.py
57eeff5 verified
raw
history blame
1.86 kB
# mcp/orchestrator.py
import asyncio
from mcp.mygene import mygene
from mcp.opentargets import ot
from mcp.cbio import cbio
# … import pubmed, umls, clinicaltrials, etc …
from typing import Dict, Any
async def orchestrate_search(query: str, *, llm: str="openai") -> Dict[str,Any]:
# 1) fetch papers + abstracts
papers_task = asyncio.create_task(fetch_papers(query))
# 2) pull UMLS concepts
from mcp.nlp import extract_keywords
kws = extract_keywords(query)[:5]
umls_tasks = [lookup_umls(k) for k in kws]
# 3) fetch gene info + associations
gene_task = asyncio.create_task(mygene.fetch(query))
ot_task = asyncio.create_task(ot.fetch(query))
# 4) fetch variants
cbio_task = asyncio.create_task(cbio.fetch_variants(query))
# 5) clinical trials
trials_task = asyncio.create_task(search_trials(query))
# wait all
papers = await papers_task
umls = await asyncio.gather(*umls_tasks, return_exceptions=True)
gene, assoc, vars_, trials = await asyncio.gather(
gene_task, ot_task, cbio_task, trials_task, return_exceptions=True
)
# 6) call your chosen LLM
from mcp.ai import ai_summarize, gemini_summarize
if llm=="openai":
summary = await ai_summarize("\n\n".join(p["summary"] for p in papers))
else:
summary = await gemini_summarize("\n\n".join(p["summary"] for p in papers))
return {
"papers": papers,
"umls": [u for u in umls if not isinstance(u, Exception)],
"gene": gene if not isinstance(gene, Exception) else {},
"associations": assoc if not isinstance(assoc, Exception) else [],
"variants": vars_ if not isinstance(vars_, Exception) else [],
"trials": trials if not isinstance(trials, Exception) else [],
"ai_summary": summary,
"llm_used": llm
}