File size: 2,026 Bytes
1ec3999 4764268 1ec3999 80779c4 0cd23e6 1ec3999 80779c4 1ec3999 0cd23e6 1ec3999 80779c4 1ec3999 80779c4 1ec3999 80779c4 1ec3999 80779c4 1ec3999 80779c4 1ec3999 80779c4 1ec3999 80779c4 1ec3999 80779c4 1ec3999 80779c4 1ec3999 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
# mcp/clinicaltrials.py
import httpx
from typing import List, Dict
BASE_V2 = "https://clinicaltrials.gov/api/v2/studies"
BASE_V1 = "https://clinicaltrials.gov/api/query/study_fields"
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0 Safari/537.36"
)
}
async def fetch_trials_v2(term: str, limit: int = 10) -> List[Dict]:
"""Fetch clinical trials using v2 API."""
params = {
"query": term,
"pageSize": limit,
"fields": "nctId,briefTitle,phase,status,startDate,conditions,interventions"
}
async with httpx.AsyncClient(timeout=20, headers=HEADERS) as client:
r = await client.get(BASE_V2, params=params)
if r.status_code == 403:
return [] # API blocked or IP rate-limited
r.raise_for_status()
js = r.json()
# List of studies under ['studies']
return js.get("studies", [])
async def fetch_trials_v1(term: str, limit: int = 10) -> List[Dict]:
"""Fallback: Fetch trials using legacy v1 API."""
params = dict(
expr=term,
fields="NCTId,BriefTitle,Condition,InterventionName,Phase,OverallStatus,StartDate",
max_rnk=limit,
min_rnk=1,
fmt="json",
)
async with httpx.AsyncClient(timeout=20, headers=HEADERS) as client:
r = await client.get(BASE_V1, params=params)
if r.status_code == 403:
return []
r.raise_for_status()
js = r.json()
return js.get("StudyFieldsResponse", {}).get("StudyFields", [])
async def search_trials(term: str, max_studies: int = 10) -> List[Dict]:
"""Unified entry point. Try V2, fallback to V1 if needed."""
try:
trials = await fetch_trials_v2(term, max_studies)
if trials:
return trials
except Exception:
pass
try:
trials = await fetch_trials_v1(term, max_studies)
return trials
except Exception:
return []
|