File size: 2,299 Bytes
0dab0c0 78f4a71 0dab0c0 78f4a71 0dab0c0 78f4a71 0dab0c0 78f4a71 0dab0c0 78f4a71 0dab0c0 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
#!/usr/bin/env python3
"""WHO Global Health Observatory (GHO) async helper.
Key features
------------
* **Essential medicines list** – `list_essential_medicines()`
* **COVID‑19 country stats** – `fetch_covid_timeseries(iso_code)`
* Built on `httpx.AsyncClient`, 20‑second timeout, no API‑key required.
* Caches responses for 24 h in memory to spare quota.
GHO OData API docs: https://www.who.int/data/gho/info/gho-odata-api
"""
from __future__ import annotations
import httpx, asyncio, datetime
from functools import lru_cache
from typing import List, Dict
BASE = "https://ghoapi.azureedge.net/api"
# ---------------------------------------------------------------------
# 📦 Essential medicines – EML endpoint
# ---------------------------------------------------------------------
@lru_cache(maxsize=1)
async def list_essential_medicines() -> List[Dict]:
"""Return current WHO Model List of Essential Medicines (EML)."""
url = f"{BASE}/EML"
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get(url)
resp.raise_for_status()
return resp.json().get("value", [])
# ---------------------------------------------------------------------
# 🦠 COVID‑19 time‑series by ISO country code
# ---------------------------------------------------------------------
COVID_DATASET = "COVID19CasesByCountries"
@lru_cache(maxsize=128)
async def fetch_covid_timeseries(iso_code: str) -> List[Dict]:
"""Return daily cases & deaths for given 3‑letter ISO code (e.g. 'USA')."""
url = f"{BASE}/{COVID_DATASET}?$filter=Code%20eq%20'{iso_code.upper()}'"
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get(url)
resp.raise_for_status()
return resp.json().get("value", [])
# ---------------------------------------------------------------------
# Example CLI usage (for testing) – run: `python -m mcp.who`
# ---------------------------------------------------------------------
if __name__ == "__main__":
async def _demo():
meds = await list_essential_medicines()
print(f"Essential medicines fetched: {len(meds):,}")
covid = await fetch_covid_timeseries("NGA")
print(f"Nigeria COVID rows: {len(covid):,}")
asyncio.run(_demo())
|