MCP_Res / mcp /cbio.py
mgbam's picture
Update mcp/cbio.py
2a448c1 verified
raw
history blame
1.92 kB
"""
MedGenesis v2 – core modules synchronised with the new helper API clients.
Contains three top‑level files:
β€’ mcp/cbio.py – cancer cohort variants via cBioPortal REST v3
β€’ mcp/orchestrator.py – async pipeline coordinator
β€’ app.py – Streamlit UI (CPU‑only)
All modules assume the new helper files (mygene.py, ensembl.py, opentargets.py,
drugcentral_ext.py, pubchem_ext.py, gene_hub.py, clinicaltrials.py, etc.) are
present in the mcp package as created earlier in the conversation.
"""
# ───────────────────────── mcp/cbio.py ────────────────────────────────
"""cBioPortal REST helper – returns detailed mutation calls for a gene.
The public instance does **not** require an API‑key; however, if you have a
personal or GENIE instance token set as CBIO_KEY, it will be used.
"""
import os, httpx
from functools import lru_cache
from typing import List, Dict
_CBIO_BASE = os.getenv("CBIO_BASE_URL", "https://www.cbioportal.org/api")
_CBIO_TOKEN = os.getenv("CBIO_KEY")
_HEADERS = {"accept": "application/json"}
if _CBIO_TOKEN:
_HEADERS["Authorization"] = f"Bearer {_CBIO_TOKEN}"
@lru_cache(maxsize=256)
async def fetch_cbio_variants(
gene_symbol: str,
study: str = "brca_tcga_pan_can_atlas_2018",
*,
timeout: float = 15.0,
) -> List[Dict]:
"""Return list of variant dicts (DETAILED projection)."""
profile = f"{study}_mutations"
url = (
f"{_CBIO_BASE}/molecular-profiles/{profile}/genes/{gene_symbol}/"
"mutations"
)
params = {"projection": "DETAILED"}
async with httpx.AsyncClient(timeout=timeout, headers=_HEADERS, follow_redirects=True) as client:
r = await client.get(url, params=params)
r.raise_for_status()
return r.json()