Spaces:
Sleeping
Sleeping
# genesis/graphdb.py | |
from __future__ import annotations | |
import os | |
from typing import Dict, List | |
from neo4j import GraphDatabase | |
NEO4J_URI = os.getenv("NEO4J_URI") | |
NEO4J_USER = os.getenv("NEO4J_USER") | |
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD") | |
def _require_config(): | |
if not (NEO4J_URI and NEO4J_USER and NEO4J_PASSWORD): | |
raise RuntimeError( | |
"Neo4j not configured. Set NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD in Space Secrets." | |
) | |
async def write_topic_and_papers(topic: str, citations: List[Dict[str, str]]) -> Dict[str, int]: | |
""" | |
Create/merge a Topic node and Paper nodes, and connect them with MENTIONS edges. | |
Schema: | |
(t:Topic {id})-[:MENTIONS]->(p:Paper {url}) with p.title set/updated. | |
Returns: {"nodes": <int>, "rels": <int>} | |
""" | |
_require_config() | |
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) | |
nodes = 0 | |
rels = 0 | |
try: | |
with driver.session() as sess: | |
# Topic | |
sess.run( | |
"MERGE (t:Topic {id:$id}) SET t.title=$title", | |
id=(topic or "Topic")[:200], | |
title=(topic or "Topic")[:500], | |
) | |
nodes += 1 | |
# Papers + edges | |
for c in citations or []: | |
title = (c.get("title") or "citation")[:500] | |
url = (c.get("url") or "")[:1000] | |
if not url and not title: | |
continue | |
sess.run( | |
""" | |
MERGE (p:Paper {url:$url}) | |
ON CREATE SET p.title=$title | |
ON MATCH SET p.title = coalesce(p.title, $title) | |
WITH p | |
MATCH (t:Topic {id:$topic}) | |
MERGE (t)-[:MENTIONS]->(p) | |
""", | |
url=url, | |
title=title, | |
topic=(topic or "Topic")[:200], | |
) | |
nodes += 1 | |
rels += 1 | |
return {"nodes": nodes, "rels": rels} | |
finally: | |
driver.close() | |
# Optional helper for debugging connectivity in a notebook/Space shell | |
def verify_connection() -> str: | |
_require_config() | |
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) | |
try: | |
with driver.session() as sess: | |
res = sess.run("RETURN 1 AS ok").single() | |
assert res and res["ok"] == 1 | |
return "Neo4j connection OK." | |
finally: | |
driver.close() | |