#!/usr/bin/env python3 """mcp/ctgov.py – ClinicalTrials helper (Modernized July‑2025‑ready) Strategy ======== 1. **Primary** → Modernized OAS v2 endpoint (beta‑ut) announced by CT.gov for July 2025. 2. **Fallback‑1** → Production v2 (`/api/v2/studies`). 3. **Fallback‑2** → Legacy v1 (`/api/query/study_fields`). 4. If all fail → return empty list so UI never crashes. Features -------- * 12‑second timeout, 3‑step back‑off (2 → 4 → 8 s) on `403/429/5xx`. * Explicit `Accept: application/json` header (passes WAF). * Realistic Chrome UA. * LRU‑cached for 24 h. * Exports `search_trials` **and** `search_trials_v2` for back‑compat. """ from __future__ import annotations import asyncio, httpx from functools import lru_cache from typing import List, Dict, Any # ── endpoints ─────────────────────────────────────────────────────── _BETA = "https://beta-ut.clinicaltrials.gov/api/v2/studies" # modernized ingest _V2 = "https://clinicaltrials.gov/api/v2/studies" # prod v2 _V1 = "https://clinicaltrials.gov/api/query/study_fields" # legacy JSON _HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/126.0 Safari/537.36" ), "Accept": "application/json", } _TIMEOUT = 12 # hard cap to protect quotas _MAX = 100 # ── helpers ───────────────────────────────────────────────────────── async def _get(url: str, params: Dict[str, Any]) -> httpx.Response: async with httpx.AsyncClient(timeout=_TIMEOUT, headers=_HEADERS) as cli: return await cli.get(url, params=params) async def _try_beta(term: str, limit: int) -> List[Dict]: params = { "query": term, "pageSize": limit, "fields": ( "nctId,briefTitle,phase,status,startDate,conditions,interventions" ), } r = await _get(_BETA, params) if r.status_code == 200: return r.json().get("studies", []) raise httpx.HTTPStatusError("beta failed", request=r.request, response=r) async def _try_v2(term: str, limit: int) -> List[Dict]: params = { "query": term, "pageSize": limit, "fields": ( "nctId,briefTitle,phase,status,startDate,conditions,interventions" ), } r = await _get(_V2, params) if r.status_code == 200: return r.json().get("studies", []) raise httpx.HTTPStatusError("v2 failed", request=r.request, response=r) async def _try_v1(term: str, limit: int) -> List[Dict]: params = { "expr": term, "fields": ( "NCTId,BriefTitle,Phase,OverallStatus,StartDate,Condition,InterventionName" ), "max_rnk": limit, "min_rnk": 1, "fmt": "json", } r = await _get(_V1, params) if r.status_code == 200: return ( r.json() .get("StudyFieldsResponse", {}) .get("StudyFields", []) ) raise httpx.HTTPStatusError("v1 failed", request=r.request, response=r) # ── public -------------------------------------------------------------------- @lru_cache(maxsize=512) async def search_trials(term: str, *, max_studies: int = 20) -> List[Dict]: """Return ≤ *max_studies* trials using BETA→V2→V1 cascade (never raises).""" limit = max(1, min(max_studies, _MAX)) # Order: beta, prod v2, legacy v1 for fetch in (_try_beta, _try_v2, _try_v1): delay = 0 for attempt in range(3): try: if delay: await asyncio.sleep(delay) return await fetch(term, limit) except httpx.HTTPStatusError as e: # retry only on 403/429/5xx if e.response.status_code not in {403, 429, 500, 502, 503, 504}: break delay = 2 if delay == 0 else delay * 2 except (httpx.ReadTimeout, httpx.ConnectTimeout): delay = 2 if delay == 0 else delay * 2 return [] # graceful fallback # back‑compat alias for old imports async def search_trials_v2(term: str, *, max_studies: int = 20): return await search_trials(term, max_studies=max_studies)