MCP_Res / mcp /clinicaltrials.py
mgbam's picture
Update mcp/clinicaltrials.py
4764268 verified
raw
history blame
3.78 kB
"""
clinicaltrials.py Β· Modernised helper (July-2025-ready)
Order of endpoints
──────────────────
1. beta-ut (modern ingest) https://beta-ut.clinicaltrials.gov/api/v2/studies
2. prod v2 https://clinicaltrials.gov/api/v2/studies
3. legacy v1 https://clinicaltrials.gov/api/query/study_fields
4. WHO ICTRP mirror https://trialsearch.who.int/api/StudyFields (JSON)
All calls are GET, JSON; no API-key required.
Returns [] on any failure so orchestrator never raises.
"""
from __future__ import annotations
import asyncio, httpx
from functools import lru_cache
from typing import List, Dict
_UA = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/126.0 Safari/537.36")
_HDR = {"User-Agent": _UA, "Accept": "application/json"}
_TIMEOUT = 12
_RETRY = 1
_BETA = "https://beta-ut.clinicaltrials.gov/api/v2/studies"
_V2 = "https://clinicaltrials.gov/api/v2/studies"
_V1 = "https://clinicaltrials.gov/api/query/study_fields"
_WHO = "https://trialsearch.who.int/api/StudyFields"
# ────────────────────────────────────────────────────────────────────
async def _get(url: str, params: Dict) -> Dict:
async with httpx.AsyncClient(timeout=_TIMEOUT,
headers=_HDR,
follow_redirects=True) as cli:
r = await cli.get(url, params=params)
r.raise_for_status()
return r.json()
# --- individual endpoint helpers -----------------------------------
async def _try_beta(term: str, n: int) -> List[Dict]:
p = {"query": term, "pageSize": n,
"fields": "nctId,briefTitle,phase,status,startDate,conditions,interventions"}
return (await _get(_BETA, p)).get("studies", [])
async def _try_v2(term: str, n: int) -> List[Dict]:
p = {"query": term, "pageSize": n,
"fields": "nctId,briefTitle,phase,status,startDate,conditions,interventions"}
return (await _get(_V2, p)).get("studies", [])
async def _try_v1(term: str, n: int) -> List[Dict]:
p = {"expr": term,
"fields": ("NCTId,BriefTitle,Phase,OverallStatus,StartDate,"
"Condition,InterventionName"),
"min_rnk": 1, "max_rnk": n, "fmt": "json"}
j = (await _get(_V1, p)).get("StudyFieldsResponse", {})
return j.get("StudyFields", [])
async def _try_who(term: str, n: int) -> List[Dict]:
p = {"expr": term, "fields": "URL,HealthCondition,PublicTitle",
"min_rnk": 1, "max_rnk": n, "fmt": "json"}
j = (await _get(_WHO, p))
return j.get("StudyFieldsResponse", {}).get("StudyFields", [])
# ────────────────────────────────────────────────────────────────────
@lru_cache(maxsize=256)
async def search_trials(term: str, *, max_studies: int = 20) -> List[Dict]:
"""
Return ≀max_studies trial records using BETAβ†’V2β†’V1β†’WHO cascade.
Empty list on total failure.
"""
limit = max(1, min(max_studies, 100))
for fn in (_try_beta, _try_v2, _try_v1, _try_who):
for attempt in range(_RETRY + 1):
try:
return await fn(term, limit)
except (httpx.HTTPStatusError, httpx.ReadTimeout):
if attempt < _RETRY:
await asyncio.sleep(0.8)
return []
# Back-compat alias (some older code calls search_trials_v2)
async def search_trials_v2(term: str, *, max_studies: int = 20):
return await search_trials(term, max_studies=max_studies)