mgbam commited on
Commit
72a7fea
·
verified ·
1 Parent(s): aae312e

Update mcp/alerts.py

Browse files
Files changed (1) hide show
  1. mcp/alerts.py +86 -28
mcp/alerts.py CHANGED
@@ -1,48 +1,106 @@
1
  # mcp/alerts.py
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
  """
3
- Saved-query alert helper.
4
- • Stores the last 30 PubMed/arXiv links per query.
5
- • Returns a dict of {query: [new_links]} when fresh papers appear.
6
- Implementation is intentionally simple: JSON on disk + orchestrate_search.
7
- """
8
 
9
- import json, asyncio
10
  from pathlib import Path
11
- from typing import List, Dict
12
 
13
  from mcp.orchestrator import orchestrate_search
14
 
15
- _ALERT_DB = Path("saved_alerts.json")
16
- _MAX_IDS = 30 # keep last N links per query
17
 
 
 
 
 
 
 
 
 
18
 
19
  def _read_db() -> Dict[str, List[str]]:
20
- if _ALERT_DB.exists():
21
- return json.loads(_ALERT_DB.read_text())
 
 
 
22
  return {}
23
 
24
 
25
  def _write_db(data: Dict[str, List[str]]):
26
- _ALERT_DB.write_text(json.dumps(data, indent=2))
27
-
 
 
 
28
 
 
 
 
29
  async def check_alerts(queries: List[str]) -> Dict[str, List[str]]:
30
- """
31
- For each saved query, run a quick orchestrate_search and detect new paper links.
32
- Returns {query: [fresh_links]} (empty dict if nothing new).
33
- """
34
- db = _read_db()
35
- new_map = {}
36
-
37
- async def _check(q: str):
38
- res = await orchestrate_search(q)
 
 
 
 
 
 
 
 
 
 
39
  links = [p["link"] for p in res["papers"]]
40
  prev = set(db.get(q, []))
41
  fresh = [l for l in links if l not in prev]
42
  if fresh:
43
- new_map[q] = fresh
44
- db[q] = links[:_MAX_IDS] # save trimmed
45
- # run in parallel
46
- await asyncio.gather(*[_check(q) for q in queries])
47
- _write_db(db)
48
- return new_map
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  # mcp/alerts.py
2
+ #!/usr/bin/env python3
3
+ """MedGenesis – saved‑query alert helper (async).
4
+
5
+ Monitors previously saved queries (PubMed/arXiv) and notifies when new
6
+ papers appear. Stores a lightweight JSON DB on disk (**locally inside
7
+ Space**), keeping the latest *N* paper links per query.
8
+
9
+ Changes vs. legacy version
10
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~
11
+ * **Thread‑safe JSON IO** with `asyncio.Lock` (handles simultaneous
12
+ Streamlit sessions).
13
+ * Exponential‑back‑off retry around `orchestrate_search` so one bad
14
+ request doesn’t kill the whole loop.
15
+ * Maximum DB size of 100 queries; oldest evicted automatically.
16
+ * Ensures atomic file write via `Path.write_text` to a temp file.
17
+
18
+ DB location: `<project_root>/data/medgen_alerts.json` (created on first run).
19
  """
20
+ from __future__ import annotations
 
 
 
 
21
 
22
+ import asyncio, json, tempfile
23
  from pathlib import Path
24
+ from typing import Dict, List
25
 
26
  from mcp.orchestrator import orchestrate_search
27
 
28
+ _DB_PATH = (Path(__file__).parent / "data" / "medgen_alerts.json").resolve()
29
+ _DB_PATH.parent.mkdir(exist_ok=True)
30
 
31
+ _MAX_IDS = 30 # keep last N paper links per query
32
+ _MAX_QUERIES = 100 # protect runaway DB growth
33
+
34
+ _lock = asyncio.Lock()
35
+
36
+ # ---------------------------------------------------------------------
37
+ # Internal JSON helpers
38
+ # ---------------------------------------------------------------------
39
 
40
  def _read_db() -> Dict[str, List[str]]:
41
+ if _DB_PATH.exists():
42
+ try:
43
+ return json.loads(_DB_PATH.read_text())
44
+ except json.JSONDecodeError:
45
+ return {}
46
  return {}
47
 
48
 
49
  def _write_db(data: Dict[str, List[str]]):
50
+ # atomic write: write to tmp then replace
51
+ tmp = tempfile.NamedTemporaryFile("w", delete=False, dir=_DB_PATH.parent)
52
+ json.dump(data, tmp, indent=2)
53
+ tmp.flush()
54
+ Path(tmp.name).replace(_DB_PATH)
55
 
56
+ # ---------------------------------------------------------------------
57
+ # Public API
58
+ # ---------------------------------------------------------------------
59
  async def check_alerts(queries: List[str]) -> Dict[str, List[str]]:
60
+ """Return `{query: [fresh_links]}` for queries with new papers."""
61
+ async with _lock: # serialize DB read/write
62
+ db = _read_db()
63
+
64
+ new_links_map: Dict[str, List[str]] = {}
65
+
66
+ async def _process(q: str):
67
+ # Retry orchestrate_search up to 3× (2 s back‑off)
68
+ delay = 2
69
+ for _ in range(3):
70
+ try:
71
+ res = await orchestrate_search(q)
72
+ break
73
+ except Exception:
74
+ await asyncio.sleep(delay)
75
+ delay *= 2
76
+ else:
77
+ return # give up on this query
78
+
79
  links = [p["link"] for p in res["papers"]]
80
  prev = set(db.get(q, []))
81
  fresh = [l for l in links if l not in prev]
82
  if fresh:
83
+ new_links_map[q] = fresh
84
+ db[q] = links[:_MAX_IDS]
85
+
86
+ await asyncio.gather(*[_process(q) for q in queries])
87
+
88
+ # Trim DB if exceeding query cap
89
+ if len(db) > _MAX_QUERIES:
90
+ for key in list(db.keys())[ _MAX_QUERIES: ]:
91
+ db.pop(key, None)
92
+
93
+ async with _lock:
94
+ _write_db(db)
95
+
96
+ return new_links_map
97
+
98
+
99
+ # ---------------------------------------------------------------------
100
+ # CLI demo
101
+ # ---------------------------------------------------------------------
102
+ if __name__ == "__main__":
103
+ async def _demo():
104
+ demo_map = await check_alerts(["glioblastoma CRISPR"])
105
+ print("Fresh:", demo_map)
106
+ asyncio.run(_demo())