|
from dotenv import load_dotenv |
|
from openai import OpenAI |
|
import json |
|
import os |
|
import requests |
|
from pypdf import PdfReader |
|
import gradio as gr |
|
import chromadb |
|
import numpy as np |
|
|
|
load_dotenv(override=True) |
|
|
|
def push(text): |
|
requests.post( |
|
"https://api.pushover.net/1/messages.json", |
|
data={ |
|
"token": os.getenv("PUSHOVER_TOKEN"), |
|
"user": os.getenv("PUSHOVER_USER"), |
|
"message": text, |
|
} |
|
) |
|
|
|
|
|
def record_user_details(email, name="Name not provided", notes="not provided"): |
|
push(f"Recording {name} with email {email} and notes {notes}") |
|
return {"recorded": "ok"} |
|
|
|
def record_unknown_question(question): |
|
push(f"Recording {question}") |
|
return {"recorded": "ok"} |
|
|
|
def store_conversation_info(information, context=""): |
|
"""Store new information from conversations""" |
|
return {"stored": "ok", "info": information} |
|
|
|
record_user_details_json = { |
|
"name": "record_user_details", |
|
"description": "Use this tool to record that a user is interested in being in touch and provided an email address", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"email": { |
|
"type": "string", |
|
"description": "The email address of this user" |
|
}, |
|
"name": { |
|
"type": "string", |
|
"description": "The user's name, if they provided it" |
|
} |
|
, |
|
"notes": { |
|
"type": "string", |
|
"description": "Any additional information about the conversation that's worth recording to give context" |
|
} |
|
}, |
|
"required": ["email"], |
|
"additionalProperties": False |
|
} |
|
} |
|
|
|
record_unknown_question_json = { |
|
"name": "record_unknown_question", |
|
"description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"question": { |
|
"type": "string", |
|
"description": "The question that couldn't be answered" |
|
}, |
|
}, |
|
"required": ["question"], |
|
"additionalProperties": False |
|
} |
|
} |
|
|
|
store_conversation_info_json = { |
|
"name": "store_conversation_info", |
|
"description": "Store new information learned during conversations for future reference", |
|
"parameters": { |
|
"type": "object", |
|
"properties": { |
|
"information": { |
|
"type": "string", |
|
"description": "The new information to store" |
|
}, |
|
"context": { |
|
"type": "string", |
|
"description": "Context about when/how this information was learned" |
|
} |
|
}, |
|
"required": ["information"], |
|
"additionalProperties": False |
|
} |
|
} |
|
|
|
tools = [{"type": "function", "function": record_user_details_json}, |
|
{"type": "function", "function": record_unknown_question_json}, |
|
{"type": "function", "function": store_conversation_info_json}] |
|
|
|
|
|
class Me: |
|
|
|
def __init__(self): |
|
self.openai = OpenAI() |
|
self.name = "Alexandre Saadoun" |
|
|
|
|
|
self.chroma_client = chromadb.PersistentClient(path="./chroma_db") |
|
|
|
|
|
self._setup_chroma_collection() |
|
self._populate_initial_data() |
|
|
|
def _setup_chroma_collection(self): |
|
"""Setup Chroma collection for RAG""" |
|
try: |
|
self.collection = self.chroma_client.get_collection(name="knowledge_base") |
|
print("✅ Loaded existing knowledge base") |
|
except: |
|
self.collection = self.chroma_client.create_collection(name="knowledge_base") |
|
print("✅ Created new knowledge base") |
|
|
|
def _get_embedding(self, text): |
|
"""Get embedding for text using OpenAI""" |
|
response = self.openai.embeddings.create( |
|
model="text-embedding-3-small", |
|
input=text |
|
) |
|
return response.data[0].embedding |
|
|
|
def _populate_initial_data(self): |
|
"""Store initial knowledge in Chroma""" |
|
|
|
count = self.collection.count() |
|
|
|
if count == 0: |
|
print("Auto-loading all files from me/ directory...") |
|
self._auto_load_me_directory() |
|
|
|
def _auto_load_me_directory(self): |
|
"""Automatically load and process all files in the me/ directory""" |
|
import glob |
|
|
|
me_dir = "me/" |
|
if not os.path.exists(me_dir): |
|
print(f"Directory {me_dir} not found") |
|
return |
|
|
|
|
|
all_files = glob.glob(os.path.join(me_dir, "*")) |
|
processed_files = [] |
|
|
|
for file_path in all_files: |
|
if os.path.isfile(file_path): |
|
filename = os.path.basename(file_path) |
|
print(f"Auto-processing: {filename}") |
|
|
|
try: |
|
|
|
if file_path.endswith('.pdf'): |
|
reader = PdfReader(file_path) |
|
content = "" |
|
for page in reader.pages: |
|
page_text = page.extract_text() |
|
if page_text: |
|
content += page_text |
|
|
|
elif file_path.endswith(('.txt', '.md')): |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
|
|
else: |
|
print(f"Skipping unsupported file type: {filename}") |
|
continue |
|
|
|
if content.strip(): |
|
self.bulk_load_text_content(content, f"me_{filename}") |
|
processed_files.append(filename) |
|
|
|
except Exception as e: |
|
print(f"Error processing {filename}: {e}") |
|
|
|
if processed_files: |
|
print(f"✅ Auto-loaded {len(processed_files)} files: {', '.join(processed_files)}") |
|
else: |
|
print("No files found to process in me/ directory") |
|
|
|
def reload_me_directory(self): |
|
"""Reload all files from me/ directory (useful when you add new files)""" |
|
print("Reloading me/ directory...") |
|
|
|
|
|
try: |
|
|
|
results = self.collection.get(include=["metadatas"]) |
|
me_ids = [results["ids"][i] for i, metadata in enumerate(results["metadatas"]) |
|
if metadata.get("source", "").startswith("me_")] |
|
|
|
if me_ids: |
|
self.collection.delete(ids=me_ids) |
|
print(f"Cleared {len(me_ids)} existing files from me/") |
|
except Exception as e: |
|
print(f"Error clearing existing data: {e}") |
|
|
|
|
|
self._auto_load_me_directory() |
|
print("✅ me/ directory reloaded!") |
|
|
|
def _search_knowledge(self, query, limit=3): |
|
"""Search for relevant knowledge using vector similarity""" |
|
try: |
|
results = self.collection.query( |
|
query_texts=[query], |
|
n_results=limit, |
|
include=["documents", "metadatas", "distances"] |
|
) |
|
|
|
search_results = [] |
|
if results["documents"] and results["documents"][0]: |
|
for i, doc in enumerate(results["documents"][0]): |
|
search_results.append({ |
|
"content": doc, |
|
"type": results["metadatas"][0][i].get("type", "unknown") if results["metadatas"] else "unknown", |
|
"score": 1 - results["distances"][0][i] if results["distances"] else 1.0 |
|
}) |
|
|
|
return search_results |
|
except Exception as e: |
|
print(f"Search error: {e}") |
|
return [] |
|
|
|
def _store_new_knowledge(self, information, context=""): |
|
"""Store new information in Chroma""" |
|
try: |
|
doc_id = f"conv_{len(self.collection.get()['ids'])}" |
|
self.collection.add( |
|
documents=[information], |
|
metadatas=[{ |
|
"type": "conversation", |
|
"context": context, |
|
"timestamp": str(np.datetime64('now')) |
|
}], |
|
ids=[doc_id] |
|
) |
|
except Exception as e: |
|
print(f"Error storing knowledge: {e}") |
|
|
|
def bulk_load_text_content(self, text_content, source_name="raw_text", chunk_size=800): |
|
""" |
|
Load raw text content into the vector database |
|
|
|
Args: |
|
text_content: Raw text string (summary, report, etc.) |
|
source_name: Name/identifier for this content |
|
chunk_size: Size of chunks to split text into |
|
""" |
|
print(f"Processing text content: {source_name}") |
|
|
|
|
|
chunks = [] |
|
for i in range(0, len(text_content), chunk_size): |
|
chunk = text_content[i:i+chunk_size].strip() |
|
if chunk: |
|
chunks.append(chunk) |
|
|
|
print(f"Created {len(chunks)} chunks") |
|
|
|
|
|
try: |
|
documents = [] |
|
metadatas = [] |
|
ids = [] |
|
|
|
for i, chunk in enumerate(chunks): |
|
documents.append(chunk) |
|
metadatas.append({ |
|
"type": "text_content", |
|
"source": source_name, |
|
"chunk_index": i, |
|
"timestamp": str(np.datetime64('now')) |
|
}) |
|
ids.append(f"{source_name}_chunk_{i}") |
|
|
|
self.collection.add( |
|
documents=documents, |
|
metadatas=metadatas, |
|
ids=ids |
|
) |
|
except Exception as e: |
|
print(f"Error storing chunks: {e}") |
|
|
|
print(f"Loaded {len(chunks)} chunks from {source_name}") |
|
|
|
def load_text_files(self, file_paths, chunk_size=800): |
|
""" |
|
Load raw text files (summaries, reports) into the database |
|
|
|
Args: |
|
file_paths: List of text file paths |
|
chunk_size: Size of chunks to split text into |
|
""" |
|
for file_path in file_paths: |
|
print(f"Loading {file_path}...") |
|
|
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
content = f.read() |
|
|
|
|
|
source_name = os.path.basename(file_path) |
|
self.bulk_load_text_content(content, source_name, chunk_size) |
|
|
|
except Exception as e: |
|
print(f"Error loading {file_path}: {e}") |
|
|
|
def load_directory(self, directory_path, chunk_size=800): |
|
""" |
|
Load all .txt files from a directory |
|
|
|
Args: |
|
directory_path: Path to directory containing text files |
|
chunk_size: Size of chunks to split text into |
|
""" |
|
import glob |
|
|
|
txt_files = glob.glob(os.path.join(directory_path, "*.txt")) |
|
if txt_files: |
|
print(f"Found {len(txt_files)} text files in {directory_path}") |
|
self.load_text_files(txt_files, chunk_size) |
|
else: |
|
print(f"No .txt files found in {directory_path}") |
|
|
|
def clear_knowledge_base(self, knowledge_type=None): |
|
""" |
|
Clear all or specific type of knowledge from the database |
|
|
|
Args: |
|
knowledge_type: If specified, only delete documents of this type |
|
""" |
|
try: |
|
if knowledge_type: |
|
|
|
results = self.collection.get(include=["metadatas"]) |
|
type_ids = [results["ids"][i] for i, metadata in enumerate(results["metadatas"]) |
|
if metadata.get("type") == knowledge_type] |
|
|
|
if type_ids: |
|
self.collection.delete(ids=type_ids) |
|
print(f"Deleted {len(type_ids)} {knowledge_type} documents") |
|
else: |
|
print(f"No {knowledge_type} documents found") |
|
else: |
|
|
|
all_ids = self.collection.get()["ids"] |
|
if all_ids: |
|
self.collection.delete(ids=all_ids) |
|
print(f"Deleted {len(all_ids)} documents") |
|
else: |
|
print("No documents to delete") |
|
|
|
except Exception as e: |
|
print(f"Error clearing knowledge base: {e}") |
|
|
|
def get_knowledge_stats(self): |
|
"""Get statistics about the knowledge base""" |
|
try: |
|
results = self.collection.get(include=["metadatas"]) |
|
|
|
stats = {} |
|
total = len(results["ids"]) |
|
|
|
for metadata in results["metadatas"]: |
|
doc_type = metadata.get("type", "unknown") |
|
stats[doc_type] = stats.get(doc_type, 0) + 1 |
|
|
|
print(f"Knowledge Base Stats (Total: {total} documents):") |
|
for doc_type, count in sorted(stats.items(), key=lambda x: x[1], reverse=True): |
|
print(f" {doc_type}: {count}") |
|
|
|
return stats |
|
|
|
except Exception as e: |
|
print(f"Error getting stats: {e}") |
|
return {} |
|
|
|
def handle_tool_call(self, tool_calls): |
|
results = [] |
|
for tool_call in tool_calls: |
|
tool_name = tool_call.function.name |
|
arguments = json.loads(tool_call.function.arguments) |
|
print(f"Tool called: {tool_name}", flush=True) |
|
|
|
if tool_name == "store_conversation_info": |
|
|
|
self._store_new_knowledge(arguments["information"], arguments.get("context", "")) |
|
result = {"stored": "ok", "info": arguments["information"]} |
|
else: |
|
tool = globals().get(tool_name) |
|
result = tool(**arguments) if tool else {} |
|
|
|
results.append({"role": "tool","content": json.dumps(result),"tool_call_id": tool_call.id}) |
|
return results |
|
|
|
def system_prompt(self, relevant_knowledge=""): |
|
system_prompt = f"You are acting as {self.name}. You are answering questions on {self.name}'s website, \ |
|
particularly questions related to {self.name}'s career, background, skills and experience. \ |
|
Your responsibility is to represent {self.name} for interactions on the website as faithfully as possible. \ |
|
Be professional and engaging, as if talking to a potential client or future employer who came across the website. \ |
|
If you don't know the answer to any question, use your record_unknown_question tool to record the question that you couldn't answer, even if it's about something trivial or unrelated to career. \ |
|
If the user is engaging in discussion, try to steer them towards getting in touch via email; ask for their email and record it using your record_user_details tool. \ |
|
If you learn new relevant information during conversations, use the store_conversation_info tool to remember it for future interactions." |
|
|
|
if relevant_knowledge: |
|
system_prompt += f"\n\n## Relevant Background Information:\n{relevant_knowledge}" |
|
|
|
system_prompt += f"\n\nWith this context, please chat with the user, always staying in character as {self.name}." |
|
return system_prompt |
|
|
|
def chat(self, message, history): |
|
|
|
relevant_docs = self._search_knowledge(message) |
|
relevant_knowledge = "\n".join([f"- {doc['content'][:200]}..." for doc in relevant_docs if doc['score'] > 0.7]) |
|
|
|
messages = [{"role": "system", "content": self.system_prompt(relevant_knowledge)}] + history + [{"role": "user", "content": message}] |
|
done = False |
|
while not done: |
|
response = self.openai.chat.completions.create(model="gpt-4o-mini", messages=messages, tools=tools) |
|
if response.choices[0].finish_reason=="tool_calls": |
|
message_obj = response.choices[0].message |
|
tool_calls = message_obj.tool_calls |
|
results = self.handle_tool_call(tool_calls) |
|
messages.append(message_obj) |
|
messages.extend(results) |
|
else: |
|
done = True |
|
return response.choices[0].message.content |
|
|
|
def __del__(self): |
|
"""Clean up Chroma connection""" |
|
|
|
pass |
|
|
|
|
|
if __name__ == "__main__": |
|
me = Me() |
|
gr.ChatInterface(me.chat, type="messages").launch() |