Spaces:
Sleeping
Sleeping
# genesis/graphdb.py | |
import os | |
from neo4j import GraphDatabase | |
from typing import List, Dict, Optional | |
NEO4J_URI = os.getenv("NEO4J_URI") | |
NEO4J_USER = os.getenv("NEO4J_USER") | |
NEO4J_PASSWORD = os.getenv("NEO4J_PASSWORD") | |
class GraphDB: | |
def __init__(self): | |
if not all([NEO4J_URI, NEO4J_USER, NEO4J_PASSWORD]): | |
raise ValueError("Neo4j credentials are missing from environment variables.") | |
self.driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) | |
def close(self): | |
self.driver.close() | |
# ---------- Basic Helpers ---------- | |
def run_query(self, query: str, params: Optional[Dict] = None) -> List[Dict]: | |
with self.driver.session() as session: | |
result = session.run(query, params or {}) | |
return [record.data() for record in result] | |
def create_node(self, label: str, properties: Dict): | |
props = {k: v for k, v in properties.items() if v is not None} | |
query = f"MERGE (n:{label} {{name: $name}}) SET n += $props RETURN n" | |
return self.run_query(query, {"name": props.get("name"), "props": props}) | |
def create_relationship(self, start_label: str, start_name: str, | |
rel_type: str, end_label: str, end_name: str): | |
query = f""" | |
MATCH (a:{start_label} {{name: $start_name}}) | |
MATCH (b:{end_label} {{name: $end_name}}) | |
MERGE (a)-[r:{rel_type}]->(b) | |
RETURN r | |
""" | |
return self.run_query(query, { | |
"start_name": start_name, | |
"end_name": end_name | |
}) | |
# ---------- Domain-Specific Functions ---------- | |
def add_molecule(self, name: str, description: str = None): | |
return self.create_node("Molecule", {"name": name, "description": description}) | |
def add_pathway(self, name: str, description: str = None): | |
return self.create_node("Pathway", {"name": name, "description": description}) | |
def add_company(self, name: str, country: str = None, funding: float = None): | |
return self.create_node("Company", {"name": name, "country": country, "funding": funding}) | |
def add_trial(self, trial_id: str, title: str = None, status: str = None): | |
return self.create_node("Trial", {"name": trial_id, "title": title, "status": status}) | |
def add_regulation(self, region: str, title: str, summary: str = None): | |
return self.create_node("Regulation", {"name": title, "region": region, "summary": summary}) | |
def add_biosecurity_alert(self, title: str, severity: str, details: str = None): | |
return self.create_node("BiosecurityAlert", {"name": title, "severity": severity, "details": details}) | |
# ---------- Relationships ---------- | |
def link_molecule_pathway(self, molecule: str, pathway: str): | |
return self.create_relationship("Molecule", molecule, "INVOLVED_IN", "Pathway", pathway) | |
def link_company_molecule(self, company: str, molecule: str): | |
return self.create_relationship("Company", company, "DEVELOPS", "Molecule", molecule) | |
def link_trial_molecule(self, trial_id: str, molecule: str): | |
return self.create_relationship("Trial", trial_id, "TESTS", "Molecule", molecule) | |
def link_company_funder(self, company: str, funder: str): | |
return self.create_relationship("Company", company, "FUNDED_BY", "Funder", funder) | |
def link_regulation_pathway(self, regulation: str, pathway: str): | |
return self.create_relationship("Regulation", regulation, "REGULATES", "Pathway", pathway) | |
def link_biosecurity_molecule(self, alert: str, molecule: str): | |
return self.create_relationship("BiosecurityAlert", alert, "ASSOCIATED_WITH", "Molecule", molecule) | |
# ---------- Queries ---------- | |
def get_global_funding_network(self) -> List[Dict]: | |
query = """ | |
MATCH (c:Company)-[r:FUNDED_BY]->(f:Funder) | |
RETURN c.name AS company, f.name AS funder, c.country AS country, c.funding AS amount | |
ORDER BY amount DESC | |
""" | |
return self.run_query(query) | |
def get_pathway_network(self, pathway_name: str) -> List[Dict]: | |
query = """ | |
MATCH (m:Molecule)-[:INVOLVED_IN]->(p:Pathway {name: $pathway}) | |
OPTIONAL MATCH (m)<-[:DEVELOPS]-(c:Company) | |
RETURN m.name AS molecule, c.name AS company | |
""" | |
return self.run_query(query, {"pathway": pathway_name}) | |
def get_all_entities(self) -> List[Dict]: | |
query = """ | |
MATCH (n) RETURN DISTINCT labels(n) AS labels, n.name AS name | |
""" | |
return self.run_query(query) | |
# Singleton instance for app-wide use | |
graphdb = GraphDB() | |