File size: 2,299 Bytes
0dab0c0
 
 
 
 
 
 
 
 
 
 
78f4a71
0dab0c0
78f4a71
0dab0c0
 
 
78f4a71
 
 
0dab0c0
 
 
 
 
 
78f4a71
0dab0c0
78f4a71
 
0dab0c0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
#!/usr/bin/env python3
"""WHO Global Health Observatory (GHO) async helper.

Key features
------------
* **Essential medicines list** – `list_essential_medicines()`
* **COVID‑19 country stats**  – `fetch_covid_timeseries(iso_code)`
* Built on `httpx.AsyncClient`, 20‑second timeout, no API‑key required.
* Caches responses for 24 h in memory to spare quota.

GHO OData API docs: https://www.who.int/data/gho/info/gho-odata-api
"""
from __future__ import annotations

import httpx, asyncio, datetime
from functools import lru_cache
from typing import List, Dict

BASE = "https://ghoapi.azureedge.net/api"


# ---------------------------------------------------------------------
# 📦 Essential medicines – EML endpoint
# ---------------------------------------------------------------------

@lru_cache(maxsize=1)
async def list_essential_medicines() -> List[Dict]:
    """Return current WHO Model List of Essential Medicines (EML)."""
    url = f"{BASE}/EML"
    async with httpx.AsyncClient(timeout=20) as client:
        resp = await client.get(url)
        resp.raise_for_status()
        return resp.json().get("value", [])


# ---------------------------------------------------------------------
# 🦠 COVID‑19 time‑series by ISO country code
# ---------------------------------------------------------------------

COVID_DATASET = "COVID19CasesByCountries"

@lru_cache(maxsize=128)
async def fetch_covid_timeseries(iso_code: str) -> List[Dict]:
    """Return daily cases & deaths for given 3‑letter ISO code (e.g. 'USA')."""
    url = f"{BASE}/{COVID_DATASET}?$filter=Code%20eq%20'{iso_code.upper()}'"
    async with httpx.AsyncClient(timeout=20) as client:
        resp = await client.get(url)
        resp.raise_for_status()
        return resp.json().get("value", [])


# ---------------------------------------------------------------------
# Example CLI usage (for testing) – run: `python -m mcp.who`
# ---------------------------------------------------------------------
if __name__ == "__main__":
    async def _demo():
        meds = await list_essential_medicines()
        print(f"Essential medicines fetched: {len(meds):,}")
        covid = await fetch_covid_timeseries("NGA")
        print(f"Nigeria COVID rows: {len(covid):,}")
    asyncio.run(_demo())