MCP_Res / mcp /drugcentral_ext.py
mgbam's picture
Create drugcentral_ext.py
5373a3d verified
"""
DrugCentral SMART API helper
Docs: https://drugcentral.org/api (public; key optional)
• Returns {} on any error so orchestrator never fails.
• Optional DRUGCENTRAL_KEY (JWT) lifts the rate-limit.
"""
from __future__ import annotations
import os, httpx, asyncio
from functools import lru_cache
from typing import Dict
_TOKEN = os.getenv("DRUGCENTRAL_KEY") # optional
_BASE = "https://drugcentral.org/api/v1/drug"
_HDR = {"Accept": "application/json"}
if _TOKEN:
_HDR["Authorization"] = f"Bearer {_TOKEN}"
@lru_cache(maxsize=256)
async def fetch_drugcentral(name: str) -> Dict:
"""Return DrugCentral record or {}."""
try:
async with httpx.AsyncClient(timeout=10, headers=_HDR) as cli:
r = await cli.get(_BASE, params={"name": name})
if r.status_code == 404:
return {}
r.raise_for_status()
return r.json()
except Exception:
return {}