|
|
|
"""MedGenesis – ClinicalTrials.gov **v2** async wrapper. |
|
|
|
The legacy v1 JSON endpoint often throttles (HTTP 403/503). This helper |
|
switches to the production **v2** REST API, adds retry/back‑off, and |
|
mirrors the interface expected by existing code (`search_trials`). |
|
|
|
Key points |
|
~~~~~~~~~~ |
|
* Endpoint: `https://clinicaltrials.gov/api/v2/studies` (public, no key). |
|
* Back‑off retry (2×, 4×) for 429/5xx. |
|
* Results cached 12 h (512 queries). |
|
* Returns list of dicts with minimal fields: `nctId`, `briefTitle`, |
|
`phase`, `status`, `startDate`, `conditions`, `interventions`. |
|
""" |
|
from __future__ import annotations |
|
|
|
import asyncio, httpx |
|
from functools import lru_cache |
|
from typing import List, Dict, Any |
|
|
|
_BASE = "https://clinicaltrials.gov/api/v2/studies" |
|
_TIMEOUT = 15 |
|
_HEADERS = {"User-Agent": "MedGenesis/1.0 (https://huggingface.co/spaces)"} |
|
_MAX_PAGE = 50 |
|
|
|
|
|
|
|
|
|
async def _fetch(params: Dict[str, Any], *, retries: int = 3) -> List[Dict]: |
|
delay = 2 |
|
for _ in range(retries): |
|
async with httpx.AsyncClient(timeout=_TIMEOUT, headers=_HEADERS) as cli: |
|
r = await cli.get(_BASE, params=params) |
|
if r.status_code == 200: |
|
return r.json().get("studies", []) |
|
if r.status_code in {429, 500, 503}: |
|
await asyncio.sleep(delay) |
|
delay *= 2 |
|
continue |
|
r.raise_for_status() |
|
return [] |
|
|
|
|
|
|
|
|
|
@lru_cache(maxsize=512) |
|
async def search_trials(term: str, *, max_studies: int = 10) -> List[Dict]: |
|
"""Return up to *max_studies* clinical‑trial dicts for free‑text *term*.""" |
|
max_studies = max(1, min(max_studies, _MAX_PAGE)) |
|
params = { |
|
"query" : term, |
|
"pageSize": max_studies, |
|
"fields" : "nctId,briefTitle,phase,status,startDate,conditions,interventions", |
|
} |
|
return await _fetch(params) |
|
|