# mcp/alerts.py #!/usr/bin/env python3 """MedGenesis – saved‑query alert helper (async). Monitors previously saved queries (PubMed/arXiv) and notifies when new papers appear. Stores a lightweight JSON DB on disk (**locally inside Space**), keeping the latest *N* paper links per query. Changes vs. legacy version ~~~~~~~~~~~~~~~~~~~~~~~~~~ * **Thread‑safe JSON IO** with `asyncio.Lock` (handles simultaneous Streamlit sessions). * Exponential‑back‑off retry around `orchestrate_search` so one bad request doesn’t kill the whole loop. * Maximum DB size of 100 queries; oldest evicted automatically. * Ensures atomic file write via `Path.write_text` to a temp file. DB location: `/data/medgen_alerts.json` (created on first run). """ from __future__ import annotations import asyncio, json, tempfile from pathlib import Path from typing import Dict, List from mcp.orchestrator import orchestrate_search _DB_PATH = (Path(__file__).parent / "data" / "medgen_alerts.json").resolve() _DB_PATH.parent.mkdir(exist_ok=True) _MAX_IDS = 30 # keep last N paper links per query _MAX_QUERIES = 100 # protect runaway DB growth _lock = asyncio.Lock() # --------------------------------------------------------------------- # Internal JSON helpers # --------------------------------------------------------------------- def _read_db() -> Dict[str, List[str]]: if _DB_PATH.exists(): try: return json.loads(_DB_PATH.read_text()) except json.JSONDecodeError: return {} return {} def _write_db(data: Dict[str, List[str]]): # atomic write: write to tmp then replace tmp = tempfile.NamedTemporaryFile("w", delete=False, dir=_DB_PATH.parent) json.dump(data, tmp, indent=2) tmp.flush() Path(tmp.name).replace(_DB_PATH) # --------------------------------------------------------------------- # Public API # --------------------------------------------------------------------- async def check_alerts(queries: List[str]) -> Dict[str, List[str]]: """Return `{query: [fresh_links]}` for queries with new papers.""" async with _lock: # serialize DB read/write db = _read_db() new_links_map: Dict[str, List[str]] = {} async def _process(q: str): # Retry orchestrate_search up to 3× (2 s back‑off) delay = 2 for _ in range(3): try: res = await orchestrate_search(q) break except Exception: await asyncio.sleep(delay) delay *= 2 else: return # give up on this query links = [p["link"] for p in res["papers"]] prev = set(db.get(q, [])) fresh = [l for l in links if l not in prev] if fresh: new_links_map[q] = fresh db[q] = links[:_MAX_IDS] await asyncio.gather(*[_process(q) for q in queries]) # Trim DB if exceeding query cap if len(db) > _MAX_QUERIES: for key in list(db.keys())[ _MAX_QUERIES: ]: db.pop(key, None) async with _lock: _write_db(db) return new_links_map # --------------------------------------------------------------------- # CLI demo # --------------------------------------------------------------------- if __name__ == "__main__": async def _demo(): demo_map = await check_alerts(["glioblastoma CRISPR"]) print("Fresh:", demo_map) asyncio.run(_demo())