Create targets.py
Browse files- mcp/targets.py +32 -0
mcp/targets.py
ADDED
@@ -0,0 +1,32 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
"""Open Targets GraphQL client – fetch gene ↔ disease ↔ drug associations.
|
2 |
+
Docs: https://platform.opentargets.org/
|
3 |
+
No API‑key required (public endpoint).
|
4 |
+
"""
|
5 |
+
|
6 |
+
import httpx, textwrap, asyncio
|
7 |
+
from typing import List, Dict
|
8 |
+
|
9 |
+
_OT_URL = "https://api.platform.opentargets.org/api/v4/graphql"
|
10 |
+
|
11 |
+
_QUERY_TEMPLATE = textwrap.dedent(
|
12 |
+
"""
|
13 |
+
query Assoc($gene: String!, $size: Int!) {
|
14 |
+
associations(geneSymbol: $gene, size: $size) {
|
15 |
+
rows {
|
16 |
+
score
|
17 |
+
datatypeId
|
18 |
+
datasourceId
|
19 |
+
disease { id name }
|
20 |
+
target { id symbol }
|
21 |
+
}
|
22 |
+
}
|
23 |
+
}
|
24 |
+
"""
|
25 |
+
)
|
26 |
+
|
27 |
+
async def fetch_ot_associations(gene_symbol: str, *, size: int = 30) -> List[Dict]:
|
28 |
+
payload = {"query": _QUERY_TEMPLATE, "variables": {"gene": gene_symbol, "size": size}}
|
29 |
+
async with httpx.AsyncClient(timeout=15) as client:
|
30 |
+
resp = await client.post(_OT_URL, json=payload)
|
31 |
+
resp.raise_for_status()
|
32 |
+
return resp.json()["data"]["associations"]["rows"]
|