import os, httpx, textwrap | |
from functools import lru_cache | |
_URL = "https://api.platform.opentargets.org/api/v4/graphql" | |
_HDR = {"Content-Type": "application/json", "Accept": "application/json"} | |
if os.getenv("OT_KEY"): # optional higher quota | |
_HDR["Authorization"] = f"Bearer {os.getenv('OT_KEY')}" | |
_QUERY = textwrap.dedent(""" | |
query Assoc($g:String!, $n:Int!) { | |
associations(geneSymbol:$g, size:$n) { | |
rows { score datasourceId disease { id name } target { id symbol } } | |
} | |
}""") | |
async def fetch_ot(sym: str, n: int = 30): | |
payload = {"query": _QUERY, "variables": {"g": sym, "n": n}} | |
async with httpx.AsyncClient(timeout=10, headers=_HDR) as c: | |
r = await c.post(_URL, json=payload) | |
r.raise_for_status() | |
return r.json()["data"]["associations"]["rows"] | |