| import os, httpx, textwrap | |
| from functools import lru_cache | |
| _URL = "https://api.platform.opentargets.org/api/v4/graphql" | |
| _HDR = {"Content-Type": "application/json", "Accept": "application/json"} | |
| if os.getenv("OT_KEY"): # optional higher quota | |
| _HDR["Authorization"] = f"Bearer {os.getenv('OT_KEY')}" | |
| _QUERY = textwrap.dedent(""" | |
| query Assoc($g:String!, $n:Int!) { | |
| associations(geneSymbol:$g, size:$n) { | |
| rows { score datasourceId disease { id name } target { id symbol } } | |
| } | |
| }""") | |
| async def fetch_ot(sym: str, n: int = 30): | |
| payload = {"query": _QUERY, "variables": {"g": sym, "n": n}} | |
| async with httpx.AsyncClient(timeout=10, headers=_HDR) as c: | |
| r = await c.post(_URL, json=payload) | |
| r.raise_for_status() | |
| return r.json()["data"]["associations"]["rows"] | |