Spaces:
Sleeping
Sleeping
File size: 3,547 Bytes
5632bad |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 |
# genesis/graph_tools.py
import os
import logging
from neo4j import GraphDatabase, basic_auth
# =========================
# CONFIG
# =========================
NEO4J_URI = os.getenv("NEO4J_URI")
NEO4J_USER = os.getenv("NEO4J_USER")
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD")
driver = None
# =========================
# INIT CONNECTION
# =========================
def init_driver():
"""Initialize Neo4j driver if credentials are set."""
global driver
if NEO4J_URI and NEO4J_USER and NEO4J_PASSWORD:
try:
driver = GraphDatabase.driver(
NEO4J_URI,
auth=basic_auth(NEO4J_USER, NEO4J_PASSWORD)
)
logging.info("[Neo4j] Connected successfully.")
except Exception as e:
logging.error(f"[Neo4j] Connection failed: {e}")
driver = None
else:
logging.info("[Neo4j] No URI/user/password set β skipping connection.")
driver = None
# Call on import
init_driver()
def is_connected():
"""Check if driver is active and ready."""
return driver is not None and hasattr(driver, "session")
# =========================
# QUERY FUNCTIONS
# =========================
def run_query(cypher, params=None):
"""Run a read/write Cypher query safely."""
if not is_connected():
logging.warning("[Neo4j] No active connection β returning empty result.")
return []
try:
with driver.session() as session:
return list(session.run(cypher, params or {}))
except Exception as e:
logging.error(f"[Neo4j] Query failed: {e}")
return []
def write_data(cypher, params=None):
"""Write data to Neo4j (CREATE/MERGE)."""
if not is_connected():
logging.warning("[Neo4j] No active connection β skipping write.")
return False
try:
with driver.session() as session:
session.run(cypher, params or {})
return True
except Exception as e:
logging.error(f"[Neo4j] Write failed: {e}")
return False
# =========================
# BULK GRAPH CREATION
# =========================
def save_graph_data(nodes, edges):
"""
Save nodes and edges to Neo4j.
nodes: list of dicts {id, label, type}
edges: list of dicts {source, target, type}
"""
if not is_connected():
logging.warning("[Neo4j] No active connection β skipping graph save.")
return False
try:
with driver.session() as session:
# Create nodes
for node in nodes:
session.run(
"""
MERGE (n:Entity {id: $id})
SET n.label = $label, n.type = $type
""",
node
)
# Create edges
for edge in edges:
session.run(
"""
MATCH (a:Entity {id: $source})
MATCH (b:Entity {id: $target})
MERGE (a)-[r:RELATION {type: $type}]->(b)
""",
edge
)
logging.info("[Neo4j] Graph data saved successfully.")
return True
except Exception as e:
logging.error(f"[Neo4j] Error saving graph data: {e}")
return False
# =========================
# CLEANUP
# =========================
def close_driver():
"""Close Neo4j driver."""
global driver
if driver:
driver.close()
driver = None
logging.info("[Neo4j] Connection closed.")
|