File size: 16,839 Bytes
0af0679 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 |
from dotenv import load_dotenv
from openai import OpenAI
import json
import os
import requests
from pypdf import PdfReader
import gradio as gr
import neo4j
from neo4j import GraphDatabase
import numpy as np
load_dotenv(override=True)
def push(text):
requests.post(
"https://api.pushover.net/1/messages.json",
data={
"token": os.getenv("PUSHOVER_TOKEN"),
"user": os.getenv("PUSHOVER_USER"),
"message": text,
}
)
def record_user_details(email, name="Name not provided", notes="not provided"):
push(f"Recording {name} with email {email} and notes {notes}")
return {"recorded": "ok"}
def record_unknown_question(question):
push(f"Recording {question}")
return {"recorded": "ok"}
def store_conversation_info(information, context=""):
"""Store new information from conversations"""
return {"stored": "ok", "info": information}
record_user_details_json = {
"name": "record_user_details",
"description": "Use this tool to record that a user is interested in being in touch and provided an email address",
"parameters": {
"type": "object",
"properties": {
"email": {
"type": "string",
"description": "The email address of this user"
},
"name": {
"type": "string",
"description": "The user's name, if they provided it"
}
,
"notes": {
"type": "string",
"description": "Any additional information about the conversation that's worth recording to give context"
}
},
"required": ["email"],
"additionalProperties": False
}
}
record_unknown_question_json = {
"name": "record_unknown_question",
"description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question that couldn't be answered"
},
},
"required": ["question"],
"additionalProperties": False
}
}
store_conversation_info_json = {
"name": "store_conversation_info",
"description": "Store new information learned during conversations for future reference",
"parameters": {
"type": "object",
"properties": {
"information": {
"type": "string",
"description": "The new information to store"
},
"context": {
"type": "string",
"description": "Context about when/how this information was learned"
}
},
"required": ["information"],
"additionalProperties": False
}
}
tools = [{"type": "function", "function": record_user_details_json},
{"type": "function", "function": record_unknown_question_json},
{"type": "function", "function": store_conversation_info_json}]
class Me:
def __init__(self):
self.openai = OpenAI()
self.name = "Alexandre Saadoun"
# Initialize Neo4j connection
self.neo4j_driver = GraphDatabase.driver(
os.getenv("NEO4J_URI", "bolt://localhost:7687"),
auth=(os.getenv("NEO4J_USER", "neo4j"), os.getenv("NEO4J_PASSWORD", "password"))
)
# Initialize RAG system - this will auto-load all files in me/
self._setup_neo4j_schema()
self._populate_initial_data()
def _setup_neo4j_schema(self):
"""Setup Neo4j schema for RAG"""
with self.neo4j_driver.session() as session:
# Create vector index for embeddings
try:
session.run("""
CREATE VECTOR INDEX knowledge_embeddings IF NOT EXISTS
FOR (n:Knowledge) ON (n.embedding)
OPTIONS {indexConfig: {
`vector.dimensions`: 1536,
`vector.similarity_function`: 'cosine'
}}
""")
except Exception as e:
print(f"Index might already exist: {e}")
def _get_embedding(self, text):
"""Get embedding for text using OpenAI"""
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def _populate_initial_data(self):
"""Store initial knowledge in Neo4j"""
with self.neo4j_driver.session() as session:
# Check if data already exists
result = session.run("MATCH (n:Knowledge) RETURN count(n) as count")
count = result.single()["count"]
if count == 0: # Only populate if empty
print("Auto-loading all files from me/ directory...")
self._auto_load_me_directory()
def _auto_load_me_directory(self):
"""Automatically load and process all files in the me/ directory"""
import glob
me_dir = "me/"
if not os.path.exists(me_dir):
print(f"Directory {me_dir} not found")
return
# Find all files in me/ directory
all_files = glob.glob(os.path.join(me_dir, "*"))
processed_files = []
for file_path in all_files:
if os.path.isfile(file_path): # Skip directories
filename = os.path.basename(file_path)
print(f"Auto-processing: {filename}")
try:
# Handle different file types
if file_path.endswith('.pdf'):
reader = PdfReader(file_path)
content = ""
for page in reader.pages:
page_text = page.extract_text()
if page_text:
content += page_text
elif file_path.endswith(('.txt', '.md')):
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
else:
print(f"Skipping unsupported file type: {filename}")
continue
if content.strip(): # Only process if content exists
self.bulk_load_text_content(content, f"me_{filename}")
processed_files.append(filename)
except Exception as e:
print(f"Error processing {filename}: {e}")
if processed_files:
print(f"✅ Auto-loaded {len(processed_files)} files: {', '.join(processed_files)}")
else:
print("No files found to process in me/ directory")
def reload_me_directory(self):
"""Reload all files from me/ directory (useful when you add new files)"""
print("Reloading me/ directory...")
# Clear existing me/ content
with self.neo4j_driver.session() as session:
result = session.run("""
MATCH (n:Knowledge)
WHERE n.source STARTS WITH 'me_'
DELETE n
RETURN count(n) as deleted
""")
deleted = result.single()["deleted"]
if deleted > 0:
print(f"Cleared {deleted} existing files from me/")
# Reload everything
self._auto_load_me_directory()
print("✅ me/ directory reloaded!")
def _search_knowledge(self, query, limit=3):
"""Search for relevant knowledge using vector similarity"""
query_embedding = self._get_embedding(query)
with self.neo4j_driver.session() as session:
result = session.run("""
CALL db.index.vector.queryNodes('knowledge_embeddings', $limit, $query_embedding)
YIELD node, score
RETURN node.content as content, node.type as type, score
ORDER BY score DESC
""", query_embedding=query_embedding, limit=limit)
return [{"content": record["content"], "type": record["type"], "score": record["score"]}
for record in result]
def _store_new_knowledge(self, information, context=""):
"""Store new information in Neo4j"""
embedding = self._get_embedding(information)
with self.neo4j_driver.session() as session:
session.run("""
CREATE (n:Knowledge {
content: $content,
type: 'conversation',
context: $context,
embedding: $embedding,
timestamp: datetime()
})
""", content=information, context=context, embedding=embedding)
def bulk_load_text_content(self, text_content, source_name="raw_text", chunk_size=800):
"""
Load raw text content into the vector database
Args:
text_content: Raw text string (summary, report, etc.)
source_name: Name/identifier for this content
chunk_size: Size of chunks to split text into
"""
print(f"Processing text content: {source_name}")
# Split into chunks
chunks = []
for i in range(0, len(text_content), chunk_size):
chunk = text_content[i:i+chunk_size].strip()
if chunk: # Skip empty chunks
chunks.append(chunk)
print(f"Created {len(chunks)} chunks")
# Store each chunk
with self.neo4j_driver.session() as session:
for i, chunk in enumerate(chunks):
embedding = self._get_embedding(chunk)
session.run("""
CREATE (n:Knowledge {
content: $content,
type: 'text_content',
source: $source,
chunk_index: $chunk_index,
embedding: $embedding,
timestamp: datetime()
})
""",
content=chunk,
source=source_name,
chunk_index=i,
embedding=embedding)
print(f"Loaded {len(chunks)} chunks from {source_name}")
def load_text_files(self, file_paths, chunk_size=800):
"""
Load raw text files (summaries, reports) into the database
Args:
file_paths: List of text file paths
chunk_size: Size of chunks to split text into
"""
for file_path in file_paths:
print(f"Loading {file_path}...")
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Use filename as source name
source_name = os.path.basename(file_path)
self.bulk_load_text_content(content, source_name, chunk_size)
except Exception as e:
print(f"Error loading {file_path}: {e}")
def load_directory(self, directory_path, chunk_size=800):
"""
Load all .txt files from a directory
Args:
directory_path: Path to directory containing text files
chunk_size: Size of chunks to split text into
"""
import glob
txt_files = glob.glob(os.path.join(directory_path, "*.txt"))
if txt_files:
print(f"Found {len(txt_files)} text files in {directory_path}")
self.load_text_files(txt_files, chunk_size)
else:
print(f"No .txt files found in {directory_path}")
def clear_knowledge_base(self, knowledge_type=None):
"""
Clear all or specific type of knowledge from the database
Args:
knowledge_type: If specified, only delete nodes of this type
"""
with self.neo4j_driver.session() as session:
if knowledge_type:
result = session.run("MATCH (n:Knowledge {type: $type}) DELETE n RETURN count(n) as deleted",
type=knowledge_type)
else:
result = session.run("MATCH (n:Knowledge) DELETE n RETURN count(n) as deleted")
deleted_count = result.single()["deleted"]
print(f"Deleted {deleted_count} knowledge nodes")
def get_knowledge_stats(self):
"""Get statistics about the knowledge base"""
with self.neo4j_driver.session() as session:
result = session.run("""
MATCH (n:Knowledge)
RETURN n.type as type, count(n) as count
ORDER BY count DESC
""")
stats = {}
total = 0
for record in result:
stats[record["type"]] = record["count"]
total += record["count"]
print(f"Knowledge Base Stats (Total: {total} documents):")
for doc_type, count in stats.items():
print(f" {doc_type}: {count}")
return stats
def handle_tool_call(self, tool_calls):
results = []
for tool_call in tool_calls:
tool_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
print(f"Tool called: {tool_name}", flush=True)
if tool_name == "store_conversation_info":
# Store in Neo4j when this tool is called
self._store_new_knowledge(arguments["information"], arguments.get("context", ""))
result = {"stored": "ok", "info": arguments["information"]}
else:
tool = globals().get(tool_name)
result = tool(**arguments) if tool else {}
results.append({"role": "tool","content": json.dumps(result),"tool_call_id": tool_call.id})
return results
def system_prompt(self, relevant_knowledge=""):
system_prompt = f"You are acting as {self.name}. You are answering questions on {self.name}'s website, \
particularly questions related to {self.name}'s career, background, skills and experience. \
Your responsibility is to represent {self.name} for interactions on the website as faithfully as possible. \
Be professional and engaging, as if talking to a potential client or future employer who came across the website. \
If you don't know the answer to any question, use your record_unknown_question tool to record the question that you couldn't answer, even if it's about something trivial or unrelated to career. \
If the user is engaging in discussion, try to steer them towards getting in touch via email; ask for their email and record it using your record_user_details tool. \
If you learn new relevant information during conversations, use the store_conversation_info tool to remember it for future interactions."
if relevant_knowledge:
system_prompt += f"\n\n## Relevant Background Information:\n{relevant_knowledge}"
system_prompt += f"\n\nWith this context, please chat with the user, always staying in character as {self.name}."
return system_prompt
def chat(self, message, history):
# Search for relevant knowledge
relevant_docs = self._search_knowledge(message)
relevant_knowledge = "\n".join([f"- {doc['content'][:200]}..." for doc in relevant_docs if doc['score'] > 0.7])
messages = [{"role": "system", "content": self.system_prompt(relevant_knowledge)}] + history + [{"role": "user", "content": message}]
done = False
while not done:
response = self.openai.chat.completions.create(model="gpt-4o-mini", messages=messages, tools=tools)
if response.choices[0].finish_reason=="tool_calls":
message_obj = response.choices[0].message
tool_calls = message_obj.tool_calls
results = self.handle_tool_call(tool_calls)
messages.append(message_obj)
messages.extend(results)
else:
done = True
return response.choices[0].message.content
def __del__(self):
"""Close Neo4j connection"""
if hasattr(self, 'neo4j_driver'):
self.neo4j_driver.close()
if __name__ == "__main__":
me = Me()
gr.ChatInterface(me.chat, type="messages").launch() |