|
|
|
"""mcp/ctgov.py β ClinicalTrials helper (Modernized Julyβ2025βready) |
|
|
|
Strategy |
|
======== |
|
1. **Primary** β Modernized OASΒ v2 endpoint (betaβut) announced by CT.gov for Julyβ―2025. |
|
2. **Fallbackβ1** β Production v2 (`/api/v2/studies`). |
|
3. **Fallbackβ2** β Legacy v1 (`/api/query/study_fields`). |
|
4. If all failΒ β return empty list so UI never crashes. |
|
|
|
Features |
|
-------- |
|
* 12βsecond timeout, 3βstep backβoff (2Β βΒ 4Β βΒ 8β―s) on `403/429/5xx`. |
|
* Explicit `Accept: application/json` header (passes WAF). |
|
* Realistic ChromeΒ UA. |
|
* LRUβcached for 24Β h. |
|
* Exports `search_trials` **and** `search_trials_v2` for backβcompat. |
|
""" |
|
from __future__ import annotations |
|
|
|
import asyncio, httpx |
|
from functools import lru_cache |
|
from typing import List, Dict, Any |
|
|
|
|
|
_BETA = "https://beta-ut.clinicaltrials.gov/api/v2/studies" |
|
_V2 = "https://clinicaltrials.gov/api/v2/studies" |
|
_V1 = "https://clinicaltrials.gov/api/query/study_fields" |
|
|
|
_HEADERS = { |
|
"User-Agent": ( |
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) " |
|
"AppleWebKit/537.36 (KHTML, like Gecko) " |
|
"Chrome/126.0 Safari/537.36" |
|
), |
|
"Accept": "application/json", |
|
} |
|
_TIMEOUT = 12 |
|
|
|
|
|
_MAX = 100 |
|
|
|
|
|
async def _get(url: str, params: Dict[str, Any]) -> httpx.Response: |
|
async with httpx.AsyncClient(timeout=_TIMEOUT, headers=_HEADERS) as cli: |
|
return await cli.get(url, params=params) |
|
|
|
async def _try_beta(term: str, limit: int) -> List[Dict]: |
|
params = { |
|
"query": term, |
|
"pageSize": limit, |
|
"fields": ( |
|
"nctId,briefTitle,phase,status,startDate,conditions,interventions" |
|
), |
|
} |
|
r = await _get(_BETA, params) |
|
if r.status_code == 200: |
|
return r.json().get("studies", []) |
|
raise httpx.HTTPStatusError("beta failed", request=r.request, response=r) |
|
|
|
async def _try_v2(term: str, limit: int) -> List[Dict]: |
|
params = { |
|
"query": term, |
|
"pageSize": limit, |
|
"fields": ( |
|
"nctId,briefTitle,phase,status,startDate,conditions,interventions" |
|
), |
|
} |
|
r = await _get(_V2, params) |
|
if r.status_code == 200: |
|
return r.json().get("studies", []) |
|
raise httpx.HTTPStatusError("v2 failed", request=r.request, response=r) |
|
|
|
async def _try_v1(term: str, limit: int) -> List[Dict]: |
|
params = { |
|
"expr": term, |
|
"fields": ( |
|
"NCTId,BriefTitle,Phase,OverallStatus,StartDate,Condition,InterventionName" |
|
), |
|
"max_rnk": limit, |
|
"min_rnk": 1, |
|
"fmt": "json", |
|
} |
|
r = await _get(_V1, params) |
|
if r.status_code == 200: |
|
return ( |
|
r.json() |
|
.get("StudyFieldsResponse", {}) |
|
.get("StudyFields", []) |
|
) |
|
raise httpx.HTTPStatusError("v1 failed", request=r.request, response=r) |
|
|
|
|
|
@lru_cache(maxsize=512) |
|
async def search_trials(term: str, *, max_studies: int = 20) -> List[Dict]: |
|
"""Return β€ *max_studies* trials using BETAβV2βV1 cascade (never raises).""" |
|
limit = max(1, min(max_studies, _MAX)) |
|
|
|
|
|
for fetch in (_try_beta, _try_v2, _try_v1): |
|
delay = 0 |
|
for attempt in range(3): |
|
try: |
|
if delay: |
|
await asyncio.sleep(delay) |
|
return await fetch(term, limit) |
|
except httpx.HTTPStatusError as e: |
|
|
|
if e.response.status_code not in {403, 429, 500, 502, 503, 504}: |
|
break |
|
delay = 2 if delay == 0 else delay * 2 |
|
except (httpx.ReadTimeout, httpx.ConnectTimeout): |
|
delay = 2 if delay == 0 else delay * 2 |
|
return [] |
|
|
|
|
|
async def search_trials_v2(term: str, *, max_studies: int = 20): |
|
return await search_trials(term, max_studies=max_studies) |
|
|