|
""" |
|
MedGenesis v2 β core modules synchronised with the new helper API clients. |
|
Contains three topβlevel files: |
|
β’ mcp/cbio.py β cancer cohort variants via cBioPortal REST v3 |
|
β’ mcp/orchestrator.py β async pipeline coordinator |
|
β’ app.py β Streamlit UI (CPUβonly) |
|
|
|
All modules assume the new helper files (mygene.py, ensembl.py, opentargets.py, |
|
drugcentral_ext.py, pubchem_ext.py, gene_hub.py, clinicaltrials.py, etc.) are |
|
present in the mcp package as created earlier in the conversation. |
|
""" |
|
|
|
|
|
"""cBioPortal REST helper β returns detailed mutation calls for a gene. |
|
The public instance does **not** require an APIβkey; however, if you have a |
|
personal or GENIE instance token set as CBIO_KEY, it will be used. |
|
""" |
|
import os, httpx |
|
from functools import lru_cache |
|
from typing import List, Dict |
|
|
|
_CBIO_BASE = os.getenv("CBIO_BASE_URL", "https://www.cbioportal.org/api") |
|
_CBIO_TOKEN = os.getenv("CBIO_KEY") |
|
_HEADERS = {"accept": "application/json"} |
|
if _CBIO_TOKEN: |
|
_HEADERS["Authorization"] = f"Bearer {_CBIO_TOKEN}" |
|
|
|
@lru_cache(maxsize=256) |
|
async def fetch_cbio_variants( |
|
gene_symbol: str, |
|
study: str = "brca_tcga_pan_can_atlas_2018", |
|
*, |
|
timeout: float = 15.0, |
|
) -> List[Dict]: |
|
"""Return list of variant dicts (DETAILED projection).""" |
|
profile = f"{study}_mutations" |
|
url = ( |
|
f"{_CBIO_BASE}/molecular-profiles/{profile}/genes/{gene_symbol}/" |
|
"mutations" |
|
) |
|
params = {"projection": "DETAILED"} |
|
async with httpx.AsyncClient(timeout=timeout, headers=_HEADERS, follow_redirects=True) as client: |
|
r = await client.get(url, params=params) |
|
r.raise_for_status() |
|
return r.json() |