mgbam's picture
Create graphdb.py
df3294b verified
raw
history blame
2.52 kB
# genesis/graphdb.py
from __future__ import annotations
import os
from typing import Dict, List
from neo4j import GraphDatabase
NEO4J_URI = os.getenv("NEO4J_URI")
NEO4J_USER = os.getenv("NEO4J_USER")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")
def _require_config():
if not (NEO4J_URI and NEO4J_USER and NEO4J_PASSWORD):
raise RuntimeError(
"Neo4j not configured. Set NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD in Space Secrets."
)
async def write_topic_and_papers(topic: str, citations: List[Dict[str, str]]) -> Dict[str, int]:
"""
Create/merge a Topic node and Paper nodes, and connect them with MENTIONS edges.
Schema:
(t:Topic {id})-[:MENTIONS]->(p:Paper {url}) with p.title set/updated.
Returns: {"nodes": <int>, "rels": <int>}
"""
_require_config()
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
nodes = 0
rels = 0
try:
with driver.session() as sess:
# Topic
sess.run(
"MERGE (t:Topic {id:$id}) SET t.title=$title",
id=(topic or "Topic")[:200],
title=(topic or "Topic")[:500],
)
nodes += 1
# Papers + edges
for c in citations or []:
title = (c.get("title") or "citation")[:500]
url = (c.get("url") or "")[:1000]
if not url and not title:
continue
sess.run(
"""
MERGE (p:Paper {url:$url})
ON CREATE SET p.title=$title
ON MATCH SET p.title = coalesce(p.title, $title)
WITH p
MATCH (t:Topic {id:$topic})
MERGE (t)-[:MENTIONS]->(p)
""",
url=url,
title=title,
topic=(topic or "Topic")[:200],
)
nodes += 1
rels += 1
return {"nodes": nodes, "rels": rels}
finally:
driver.close()
# Optional helper for debugging connectivity in a notebook/Space shell
def verify_connection() -> str:
_require_config()
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
try:
with driver.session() as sess:
res = sess.run("RETURN 1 AS ok").single()
assert res and res["ok"] == 1
return "Neo4j connection OK."
finally:
driver.close()