|
|
|
|
|
"""MedGenesis – saved‑query alert helper (async). |
|
|
|
Monitors previously saved queries (PubMed/arXiv) and notifies when new |
|
papers appear. Stores a lightweight JSON DB on disk (**locally inside |
|
Space**), keeping the latest *N* paper links per query. |
|
|
|
Changes vs. legacy version |
|
~~~~~~~~~~~~~~~~~~~~~~~~~~ |
|
* **Thread‑safe JSON IO** with `asyncio.Lock` (handles simultaneous |
|
Streamlit sessions). |
|
* Exponential‑back‑off retry around `orchestrate_search` so one bad |
|
request doesn’t kill the whole loop. |
|
* Maximum DB size of 100 queries; oldest evicted automatically. |
|
* Ensures atomic file write via `Path.write_text` to a temp file. |
|
|
|
DB location: `<project_root>/data/medgen_alerts.json` (created on first run). |
|
""" |
|
from __future__ import annotations |
|
|
|
import asyncio, json, tempfile |
|
from pathlib import Path |
|
from typing import Dict, List |
|
|
|
from mcp.orchestrator import orchestrate_search |
|
|
|
_DB_PATH = (Path(__file__).parent / "data" / "medgen_alerts.json").resolve() |
|
_DB_PATH.parent.mkdir(exist_ok=True) |
|
|
|
_MAX_IDS = 30 |
|
_MAX_QUERIES = 100 |
|
|
|
_lock = asyncio.Lock() |
|
|
|
|
|
|
|
|
|
|
|
def _read_db() -> Dict[str, List[str]]: |
|
if _DB_PATH.exists(): |
|
try: |
|
return json.loads(_DB_PATH.read_text()) |
|
except json.JSONDecodeError: |
|
return {} |
|
return {} |
|
|
|
|
|
def _write_db(data: Dict[str, List[str]]): |
|
|
|
tmp = tempfile.NamedTemporaryFile("w", delete=False, dir=_DB_PATH.parent) |
|
json.dump(data, tmp, indent=2) |
|
tmp.flush() |
|
Path(tmp.name).replace(_DB_PATH) |
|
|
|
|
|
|
|
|
|
async def check_alerts(queries: List[str]) -> Dict[str, List[str]]: |
|
"""Return `{query: [fresh_links]}` for queries with new papers.""" |
|
async with _lock: |
|
db = _read_db() |
|
|
|
new_links_map: Dict[str, List[str]] = {} |
|
|
|
async def _process(q: str): |
|
|
|
delay = 2 |
|
for _ in range(3): |
|
try: |
|
res = await orchestrate_search(q) |
|
break |
|
except Exception: |
|
await asyncio.sleep(delay) |
|
delay *= 2 |
|
else: |
|
return |
|
|
|
links = [p["link"] for p in res["papers"]] |
|
prev = set(db.get(q, [])) |
|
fresh = [l for l in links if l not in prev] |
|
if fresh: |
|
new_links_map[q] = fresh |
|
db[q] = links[:_MAX_IDS] |
|
|
|
await asyncio.gather(*[_process(q) for q in queries]) |
|
|
|
|
|
if len(db) > _MAX_QUERIES: |
|
for key in list(db.keys())[ _MAX_QUERIES: ]: |
|
db.pop(key, None) |
|
|
|
async with _lock: |
|
_write_db(db) |
|
|
|
return new_links_map |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
async def _demo(): |
|
demo_map = await check_alerts(["glioblastoma CRISPR"]) |
|
print("Fresh:", demo_map) |
|
asyncio.run(_demo()) |
|
|