MCP_Res / mcp /ctgov.py
mgbam's picture
Update mcp/ctgov.py
e816b33 verified
#!/usr/bin/env python3
"""mcp/ctgov.py – ClinicalTrials helper (Modernized July‑2025‑ready)
Strategy
========
1. **Primary** β†’ Modernized OASΒ v2 endpoint (beta‑ut) announced by CT.gov for Julyβ€―2025.
2. **Fallback‑1** β†’ Production v2 (`/api/v2/studies`).
3. **Fallback‑2** β†’ Legacy v1 (`/api/query/study_fields`).
4. If all failΒ β†’ return empty list so UI never crashes.
Features
--------
* 12‑second timeout, 3‑step back‑off (2Β β†’Β 4Β β†’Β 8β€―s) on `403/429/5xx`.
* Explicit `Accept: application/json` header (passes WAF).
* Realistic ChromeΒ UA.
* LRU‑cached for 24Β h.
* Exports `search_trials` **and** `search_trials_v2` for back‑compat.
"""
from __future__ import annotations
import asyncio, httpx
from functools import lru_cache
from typing import List, Dict, Any
# ── endpoints ───────────────────────────────────────────────────────
_BETA = "https://beta-ut.clinicaltrials.gov/api/v2/studies" # modernized ingest
_V2 = "https://clinicaltrials.gov/api/v2/studies" # prod v2
_V1 = "https://clinicaltrials.gov/api/query/study_fields" # legacy JSON
_HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/126.0 Safari/537.36"
),
"Accept": "application/json",
}
_TIMEOUT = 12
# hard cap to protect quotas
_MAX = 100
# ── helpers ─────────────────────────────────────────────────────────
async def _get(url: str, params: Dict[str, Any]) -> httpx.Response:
async with httpx.AsyncClient(timeout=_TIMEOUT, headers=_HEADERS) as cli:
return await cli.get(url, params=params)
async def _try_beta(term: str, limit: int) -> List[Dict]:
params = {
"query": term,
"pageSize": limit,
"fields": (
"nctId,briefTitle,phase,status,startDate,conditions,interventions"
),
}
r = await _get(_BETA, params)
if r.status_code == 200:
return r.json().get("studies", [])
raise httpx.HTTPStatusError("beta failed", request=r.request, response=r)
async def _try_v2(term: str, limit: int) -> List[Dict]:
params = {
"query": term,
"pageSize": limit,
"fields": (
"nctId,briefTitle,phase,status,startDate,conditions,interventions"
),
}
r = await _get(_V2, params)
if r.status_code == 200:
return r.json().get("studies", [])
raise httpx.HTTPStatusError("v2 failed", request=r.request, response=r)
async def _try_v1(term: str, limit: int) -> List[Dict]:
params = {
"expr": term,
"fields": (
"NCTId,BriefTitle,Phase,OverallStatus,StartDate,Condition,InterventionName"
),
"max_rnk": limit,
"min_rnk": 1,
"fmt": "json",
}
r = await _get(_V1, params)
if r.status_code == 200:
return (
r.json()
.get("StudyFieldsResponse", {})
.get("StudyFields", [])
)
raise httpx.HTTPStatusError("v1 failed", request=r.request, response=r)
# ── public --------------------------------------------------------------------
@lru_cache(maxsize=512)
async def search_trials(term: str, *, max_studies: int = 20) -> List[Dict]:
"""Return ≀ *max_studies* trials using BETAβ†’V2β†’V1 cascade (never raises)."""
limit = max(1, min(max_studies, _MAX))
# Order: beta, prod v2, legacy v1
for fetch in (_try_beta, _try_v2, _try_v1):
delay = 0
for attempt in range(3):
try:
if delay:
await asyncio.sleep(delay)
return await fetch(term, limit)
except httpx.HTTPStatusError as e:
# retry only on 403/429/5xx
if e.response.status_code not in {403, 429, 500, 502, 503, 504}:
break
delay = 2 if delay == 0 else delay * 2
except (httpx.ReadTimeout, httpx.ConnectTimeout):
delay = 2 if delay == 0 else delay * 2
return [] # graceful fallback
# back‑compat alias for old imports
async def search_trials_v2(term: str, *, max_studies: int = 20):
return await search_trials(term, max_studies=max_studies)