File size: 1,915 Bytes
64eb751
2a448c1
 
 
 
 
64eb751
2a448c1
 
 
64eb751
 
2a448c1
 
 
 
 
 
20e7231
 
64eb751
2a448c1
 
 
 
 
64eb751
20e7231
2a448c1
 
 
 
 
20e7231
2a448c1
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
"""
MedGenesis v2 – core modules synchronised with the new helper API clients.
Contains three top‑level files:
    β€’ mcp/cbio.py         – cancer cohort variants via cBioPortal REST v3
    β€’ mcp/orchestrator.py – async pipeline coordinator
    β€’ app.py              – Streamlit UI (CPU‑only)                  

All modules assume the new helper files (mygene.py, ensembl.py, opentargets.py,
drugcentral_ext.py, pubchem_ext.py, gene_hub.py, clinicaltrials.py, etc.) are
present in the mcp package as created earlier in the conversation.
"""

# ───────────────────────── mcp/cbio.py ────────────────────────────────
"""cBioPortal REST helper – returns detailed mutation calls for a gene.
The public instance does **not** require an API‑key; however, if you have a
personal or GENIE instance token set as CBIO_KEY, it will be used.
"""
import os, httpx
from functools import lru_cache
from typing import List, Dict

_CBIO_BASE   = os.getenv("CBIO_BASE_URL", "https://www.cbioportal.org/api")
_CBIO_TOKEN  = os.getenv("CBIO_KEY")
_HEADERS     = {"accept": "application/json"}
if _CBIO_TOKEN:
    _HEADERS["Authorization"] = f"Bearer {_CBIO_TOKEN}"

@lru_cache(maxsize=256)
async def fetch_cbio_variants(
    gene_symbol: str,
    study: str = "brca_tcga_pan_can_atlas_2018",
    *,
    timeout: float = 15.0,
) -> List[Dict]:
    """Return list of variant dicts (DETAILED projection)."""
    profile = f"{study}_mutations"
    url     = (
        f"{_CBIO_BASE}/molecular-profiles/{profile}/genes/{gene_symbol}/"
        "mutations"
    )
    params  = {"projection": "DETAILED"}
    async with httpx.AsyncClient(timeout=timeout, headers=_HEADERS, follow_redirects=True) as client:
        r = await client.get(url, params=params)
        r.raise_for_status()
        return r.json()