File size: 1,915 Bytes
64eb751 2a448c1 64eb751 2a448c1 64eb751 2a448c1 20e7231 64eb751 2a448c1 64eb751 20e7231 2a448c1 20e7231 2a448c1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
"""
MedGenesis v2 β core modules synchronised with the new helper API clients.
Contains three topβlevel files:
β’ mcp/cbio.py β cancer cohort variants via cBioPortal REST v3
β’ mcp/orchestrator.py β async pipeline coordinator
β’ app.py β Streamlit UI (CPUβonly)
All modules assume the new helper files (mygene.py, ensembl.py, opentargets.py,
drugcentral_ext.py, pubchem_ext.py, gene_hub.py, clinicaltrials.py, etc.) are
present in the mcp package as created earlier in the conversation.
"""
# βββββββββββββββββββββββββ mcp/cbio.py ββββββββββββββββββββββββββββββββ
"""cBioPortal REST helper β returns detailed mutation calls for a gene.
The public instance does **not** require an APIβkey; however, if you have a
personal or GENIE instance token set as CBIO_KEY, it will be used.
"""
import os, httpx
from functools import lru_cache
from typing import List, Dict
_CBIO_BASE = os.getenv("CBIO_BASE_URL", "https://www.cbioportal.org/api")
_CBIO_TOKEN = os.getenv("CBIO_KEY")
_HEADERS = {"accept": "application/json"}
if _CBIO_TOKEN:
_HEADERS["Authorization"] = f"Bearer {_CBIO_TOKEN}"
@lru_cache(maxsize=256)
async def fetch_cbio_variants(
gene_symbol: str,
study: str = "brca_tcga_pan_can_atlas_2018",
*,
timeout: float = 15.0,
) -> List[Dict]:
"""Return list of variant dicts (DETAILED projection)."""
profile = f"{study}_mutations"
url = (
f"{_CBIO_BASE}/molecular-profiles/{profile}/genes/{gene_symbol}/"
"mutations"
)
params = {"projection": "DETAILED"}
async with httpx.AsyncClient(timeout=timeout, headers=_HEADERS, follow_redirects=True) as client:
r = await client.get(url, params=params)
r.raise_for_status()
return r.json() |