Spaces:
Sleeping
Sleeping
# genesis/utils/neo4j_utils.py | |
from neo4j import GraphDatabase | |
import os | |
NEO4J_URI = os.getenv("NEO4J_URI") | |
NEO4J_USER = os.getenv("NEO4J_USER", "neo4j") | |
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD") | |
driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) | |
def run_query(query: str, params: dict = None): | |
"""Run a Cypher query against the Neo4j database.""" | |
with driver.session() as session: | |
result = session.run(query, params or {}) | |
return [record.data() for record in result] | |
def close_driver(): | |
driver.close() | |