from dotenv import load_dotenv from openai import OpenAI import json import os import requests from pypdf import PdfReader import gradio as gr import numpy as np import pickle import os load_dotenv(override=True) def push(text): requests.post( "https://api.pushover.net/1/messages.json", data={ "token": os.getenv("PUSHOVER_TOKEN"), "user": os.getenv("PUSHOVER_USER"), "message": text, } ) def record_user_details(email, name="Name not provided", notes="not provided"): push(f"Recording {name} with email {email} and notes {notes}") return {"recorded": "ok"} def record_unknown_question(question): push(f"Recording {question}") return {"recorded": "ok"} def store_conversation_info(information, context=""): """Store new information from conversations""" return {"stored": "ok", "info": information} record_user_details_json = { "name": "record_user_details", "description": "Use this tool to record that a user is interested in being in touch and provided an email address", "parameters": { "type": "object", "properties": { "email": { "type": "string", "description": "The email address of this user" }, "name": { "type": "string", "description": "The user's name, if they provided it" } , "notes": { "type": "string", "description": "Any additional information about the conversation that's worth recording to give context" } }, "required": ["email"], "additionalProperties": False } } record_unknown_question_json = { "name": "record_unknown_question", "description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer", "parameters": { "type": "object", "properties": { "question": { "type": "string", "description": "The question that couldn't be answered" }, }, "required": ["question"], "additionalProperties": False } } store_conversation_info_json = { "name": "store_conversation_info", "description": "Store new information learned during conversations for future reference", "parameters": { "type": "object", "properties": { "information": { "type": "string", "description": "The new information to store" }, "context": { "type": "string", "description": "Context about when/how this information was learned" } }, "required": ["information"], "additionalProperties": False } } tools = [{"type": "function", "function": record_user_details_json}, {"type": "function", "function": record_unknown_question_json}, {"type": "function", "function": store_conversation_info_json}] class Me: def __init__(self): self.openai = OpenAI() self.name = "Alexandre Saadoun" # Initialize simple vector store self.vector_store_path = "./vector_store.pkl" self.knowledge_base = {"documents": [], "embeddings": [], "metadata": []} # Initialize RAG system - this will auto-load all files in me/ self._setup_vector_store() self._populate_initial_data() def _setup_vector_store(self): """Setup simple vector store for RAG""" try: if os.path.exists(self.vector_store_path): with open(self.vector_store_path, 'rb') as f: self.knowledge_base = pickle.load(f) print("✅ Loaded existing knowledge base") else: print("✅ Created new knowledge base") except Exception as e: print(f"Error loading knowledge base: {e}") self.knowledge_base = {"documents": [], "embeddings": [], "metadata": []} def _save_vector_store(self): """Save vector store to disk""" try: with open(self.vector_store_path, 'wb') as f: pickle.dump(self.knowledge_base, f) except Exception as e: print(f"Error saving knowledge base: {e}") def _get_embedding(self, text): """Get embedding for text using OpenAI""" response = self.openai.embeddings.create( model="text-embedding-3-small", input=text ) return response.data[0].embedding def _populate_initial_data(self): """Store initial knowledge in vector store""" # Check if data already exists count = len(self.knowledge_base["documents"]) if count == 0: # Only populate if empty print("Auto-loading all files from me/ directory...") self._auto_load_me_directory() def _auto_load_me_directory(self): """Automatically load and process all files in the me/ directory""" import glob me_dir = "me/" if not os.path.exists(me_dir): print(f"Directory {me_dir} not found") return # Find all files in me/ directory all_files = glob.glob(os.path.join(me_dir, "*")) processed_files = [] for file_path in all_files: if os.path.isfile(file_path): # Skip directories filename = os.path.basename(file_path) print(f"Auto-processing: {filename}") try: # Handle different file types if file_path.endswith('.pdf'): reader = PdfReader(file_path) content = "" for page in reader.pages: page_text = page.extract_text() if page_text: content += page_text elif file_path.endswith(('.txt', '.md')): with open(file_path, 'r', encoding='utf-8') as f: content = f.read() else: print(f"Skipping unsupported file type: {filename}") continue if content.strip(): # Only process if content exists self.bulk_load_text_content(content, f"me_{filename}") processed_files.append(filename) except Exception as e: print(f"Error processing {filename}: {e}") if processed_files: print(f"✅ Auto-loaded {len(processed_files)} files: {', '.join(processed_files)}") else: print("No files found to process in me/ directory") def reload_me_directory(self): """Reload all files from me/ directory (useful when you add new files)""" print("Reloading me/ directory...") # Clear existing me/ content try: indices_to_remove = [] for i, metadata in enumerate(self.knowledge_base["metadata"]): if metadata.get("source", "").startswith("me_"): indices_to_remove.append(i) # Remove in reverse order to maintain indices for i in reversed(indices_to_remove): del self.knowledge_base["documents"][i] del self.knowledge_base["embeddings"][i] del self.knowledge_base["metadata"][i] if indices_to_remove: print(f"Cleared {len(indices_to_remove)} existing files from me/") self._save_vector_store() except Exception as e: print(f"Error clearing existing data: {e}") # Reload everything self._auto_load_me_directory() print("✅ me/ directory reloaded!") def _search_knowledge(self, query, limit=3): """Search for relevant knowledge using vector similarity""" try: if not self.knowledge_base["documents"]: return [] # Get query embedding query_embedding = self._get_embedding(query) query_vector = np.array(query_embedding) # Calculate cosine similarities similarities = [] for i, doc_embedding in enumerate(self.knowledge_base["embeddings"]): doc_vector = np.array(doc_embedding) # Cosine similarity dot_product = np.dot(query_vector, doc_vector) norm_query = np.linalg.norm(query_vector) norm_doc = np.linalg.norm(doc_vector) if norm_query > 0 and norm_doc > 0: similarity = dot_product / (norm_query * norm_doc) else: similarity = 0.0 similarities.append((similarity, i)) # Sort by similarity and get top results similarities.sort(reverse=True) search_results = [] for similarity, idx in similarities[:limit]: search_results.append({ "content": self.knowledge_base["documents"][idx], "type": self.knowledge_base["metadata"][idx].get("type", "unknown"), "score": similarity }) return search_results except Exception as e: print(f"Search error: {e}") return [] def _store_new_knowledge(self, information, context=""): """Store new information in vector store""" try: embedding = self._get_embedding(information) self.knowledge_base["documents"].append(information) self.knowledge_base["embeddings"].append(embedding) self.knowledge_base["metadata"].append({ "type": "conversation", "context": context, "timestamp": str(np.datetime64('now')) }) self._save_vector_store() except Exception as e: print(f"Error storing knowledge: {e}") def bulk_load_text_content(self, text_content, source_name="raw_text", chunk_size=800): """ Load raw text content into the vector database Args: text_content: Raw text string (summary, report, etc.) source_name: Name/identifier for this content chunk_size: Size of chunks to split text into """ print(f"Processing text content: {source_name}") # Split into chunks chunks = [] for i in range(0, len(text_content), chunk_size): chunk = text_content[i:i+chunk_size].strip() if chunk: # Skip empty chunks chunks.append(chunk) print(f"Created {len(chunks)} chunks") # Store each chunk try: for i, chunk in enumerate(chunks): embedding = self._get_embedding(chunk) self.knowledge_base["documents"].append(chunk) self.knowledge_base["embeddings"].append(embedding) self.knowledge_base["metadata"].append({ "type": "text_content", "source": source_name, "chunk_index": i, "timestamp": str(np.datetime64('now')) }) self._save_vector_store() except Exception as e: print(f"Error storing chunks: {e}") print(f"Loaded {len(chunks)} chunks from {source_name}") def load_text_files(self, file_paths, chunk_size=800): """ Load raw text files (summaries, reports) into the database Args: file_paths: List of text file paths chunk_size: Size of chunks to split text into """ for file_path in file_paths: print(f"Loading {file_path}...") try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() # Use filename as source name source_name = os.path.basename(file_path) self.bulk_load_text_content(content, source_name, chunk_size) except Exception as e: print(f"Error loading {file_path}: {e}") def load_directory(self, directory_path, chunk_size=800): """ Load all .txt files from a directory Args: directory_path: Path to directory containing text files chunk_size: Size of chunks to split text into """ import glob txt_files = glob.glob(os.path.join(directory_path, "*.txt")) if txt_files: print(f"Found {len(txt_files)} text files in {directory_path}") self.load_text_files(txt_files, chunk_size) else: print(f"No .txt files found in {directory_path}") def clear_knowledge_base(self, knowledge_type=None): """ Clear all or specific type of knowledge from the database Args: knowledge_type: If specified, only delete documents of this type """ try: if knowledge_type: # Remove documents of specific type indices_to_remove = [] for i, metadata in enumerate(self.knowledge_base["metadata"]): if metadata.get("type") == knowledge_type: indices_to_remove.append(i) # Remove in reverse order to maintain indices for i in reversed(indices_to_remove): del self.knowledge_base["documents"][i] del self.knowledge_base["embeddings"][i] del self.knowledge_base["metadata"][i] if indices_to_remove: print(f"Deleted {len(indices_to_remove)} {knowledge_type} documents") self._save_vector_store() else: print(f"No {knowledge_type} documents found") else: # Clear entire knowledge base count = len(self.knowledge_base["documents"]) self.knowledge_base = {"documents": [], "embeddings": [], "metadata": []} if count > 0: print(f"Deleted {count} documents") self._save_vector_store() else: print("No documents to delete") except Exception as e: print(f"Error clearing knowledge base: {e}") def get_knowledge_stats(self): """Get statistics about the knowledge base""" try: stats = {} total = len(self.knowledge_base["documents"]) for metadata in self.knowledge_base["metadata"]: doc_type = metadata.get("type", "unknown") stats[doc_type] = stats.get(doc_type, 0) + 1 print(f"Knowledge Base Stats (Total: {total} documents):") for doc_type, count in sorted(stats.items(), key=lambda x: x[1], reverse=True): print(f" {doc_type}: {count}") return stats except Exception as e: print(f"Error getting stats: {e}") return {} def handle_tool_call(self, tool_calls): results = [] for tool_call in tool_calls: tool_name = tool_call.function.name arguments = json.loads(tool_call.function.arguments) print(f"Tool called: {tool_name}", flush=True) if tool_name == "store_conversation_info": # Store in Neo4j when this tool is called self._store_new_knowledge(arguments["information"], arguments.get("context", "")) result = {"stored": "ok", "info": arguments["information"]} else: tool = globals().get(tool_name) result = tool(**arguments) if tool else {} results.append({"role": "tool","content": json.dumps(result),"tool_call_id": tool_call.id}) return results def system_prompt(self, relevant_knowledge=""): system_prompt = f"You are acting as {self.name}. You are answering questions on {self.name}'s website, \ particularly questions related to {self.name}'s career, background, skills and experience. \ Your responsibility is to represent {self.name} for interactions on the website as faithfully as possible. \ Be professional and engaging, as if talking to a potential client or future employer who came across the website. \ If you don't know the answer to any question, use your record_unknown_question tool to record the question that you couldn't answer, even if it's about something trivial or unrelated to career. \ If the user is engaging in discussion, try to steer them towards getting in touch via email; ask for their email and record it using your record_user_details tool. \ If you learn new relevant information during conversations, use the store_conversation_info tool to remember it for future interactions." if relevant_knowledge: system_prompt += f"\n\n## Relevant Background Information:\n{relevant_knowledge}" system_prompt += f"\n\nWith this context, please chat with the user, always staying in character as {self.name}." return system_prompt def chat(self, message, history): # Search for relevant knowledge relevant_docs = self._search_knowledge(message) relevant_knowledge = "\n".join([f"- {doc['content'][:200]}..." for doc in relevant_docs if doc['score'] > 0.7]) messages = [{"role": "system", "content": self.system_prompt(relevant_knowledge)}] + history + [{"role": "user", "content": message}] done = False while not done: response = self.openai.chat.completions.create(model="gpt-4o-mini", messages=messages, tools=tools) if response.choices[0].finish_reason=="tool_calls": message_obj = response.choices[0].message tool_calls = message_obj.tool_calls results = self.handle_tool_call(tool_calls) messages.append(message_obj) messages.extend(results) else: done = True return response.choices[0].message.content def __del__(self): """Clean up Chroma connection""" # Chroma client doesn't need explicit closing pass if __name__ == "__main__": me = Me() gr.ChatInterface(me.chat, type="messages").launch()