# genesis/utils/graph_tools.py """ Graph Utilities for GENESIS-AI Handles writing research topics and papers into the Neo4j graph database. """ import logging from typing import List, Dict, Optional from neo4j import GraphDatabase import os logging.basicConfig(level=logging.INFO) # Optional Neo4j connection NEO4J_URI = os.getenv("NEO4J_URI") NEO4J_USER = os.getenv("NEO4J_USER") NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD") driver = None if NEO4J_URI and NEO4J_USER and NEO4J_PASSWORD: try: driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) logging.info("[Neo4j] Connected in graph_tools.") except Exception as e: logging.error(f"[Neo4j] Connection failed in graph_tools: {e}") else: logging.info("[Neo4j] No URI/user/password set — skipping graph_tools connection.") def write_topic_and_papers(topic: str, papers: List[Dict], db_driver: Optional[GraphDatabase.driver] = None): """ Store a topic node and its related papers in Neo4j. Args: topic (str): Research topic name papers (List[Dict]): List of paper dicts with keys: title, authors, link db_driver: Optional Neo4j driver instance """ if not (db_driver or driver): logging.warning("[GraphTools] Skipping write — no Neo4j connection.") return neo_driver = db_driver or driver try: with neo_driver.session() as session: # Create topic node session.run( """ MERGE (t:Topic {name: $topic}) """, topic=topic ) # Create paper nodes and relationships for paper in papers: session.run( """ MERGE (p:Paper {title: $title}) ON CREATE SET p.authors = $authors, p.link = $link MERGE (t:Topic {name: $topic}) MERGE (t)-[:HAS_PAPER]->(p) """, title=paper.get("title", ""), authors=paper.get("authors", []), link=paper.get("link", ""), topic=topic ) logging.info(f"[GraphTools] Added topic '{topic}' and {len(papers)} papers.") except Exception as e: logging.error(f"[GraphTools] Error writing to Neo4j: {e}") __all__ = ["write_topic_and_papers"]