|
"""Convenience orchestrator that fans‑out to MyGene, CT.gov v2, and Open Targets. |
|
Used by downstream modules when they need a single call for full enrichment. |
|
""" |
|
|
|
import asyncio |
|
from typing import Dict, Any, List |
|
|
|
from mcp.mygene import fetch_gene_info |
|
from mcp.ctgov import search_trials_v2 |
|
from mcp.targets import fetch_ot_associations |
|
|
|
async def enrich(query: str, keywords: List[str]) -> Dict[str, Any]: |
|
gene_task = asyncio.create_task(fetch_gene_info(query)) |
|
trials_task = asyncio.create_task(search_trials_v2(query)) |
|
|
|
|
|
ot_task = asyncio.create_task(fetch_ot_associations(keywords[0])) if keywords else None |
|
|
|
gene = await gene_task |
|
trials = await trials_task |
|
ot = await ot_task if ot_task else [] |
|
|
|
return {"gene": gene, "clinical_trials": trials, "ot_associations": ot} |