BioChat2 / app.py
AYS11231's picture
Upload folder using huggingface_hub
7fd4998 verified
from dotenv import load_dotenv
from openai import OpenAI
import json
import os
import requests
from pypdf import PdfReader
import gradio as gr
import numpy as np
import pickle
import os
load_dotenv(override=True)
def push(text):
requests.post(
"https://api.pushover.net/1/messages.json",
data={
"token": os.getenv("PUSHOVER_TOKEN"),
"user": os.getenv("PUSHOVER_USER"),
"message": text,
}
)
def record_user_details(email, name="Name not provided", notes="not provided"):
push(f"Recording {name} with email {email} and notes {notes}")
return {"recorded": "ok"}
def record_unknown_question(question):
push(f"Recording {question}")
return {"recorded": "ok"}
def store_conversation_info(information, context=""):
"""Store new information from conversations"""
return {"stored": "ok", "info": information}
record_user_details_json = {
"name": "record_user_details",
"description": "Use this tool to record that a user is interested in being in touch and provided an email address",
"parameters": {
"type": "object",
"properties": {
"email": {
"type": "string",
"description": "The email address of this user"
},
"name": {
"type": "string",
"description": "The user's name, if they provided it"
}
,
"notes": {
"type": "string",
"description": "Any additional information about the conversation that's worth recording to give context"
}
},
"required": ["email"],
"additionalProperties": False
}
}
record_unknown_question_json = {
"name": "record_unknown_question",
"description": "Always use this tool to record any question that couldn't be answered as you didn't know the answer",
"parameters": {
"type": "object",
"properties": {
"question": {
"type": "string",
"description": "The question that couldn't be answered"
},
},
"required": ["question"],
"additionalProperties": False
}
}
store_conversation_info_json = {
"name": "store_conversation_info",
"description": "Store new information learned during conversations for future reference",
"parameters": {
"type": "object",
"properties": {
"information": {
"type": "string",
"description": "The new information to store"
},
"context": {
"type": "string",
"description": "Context about when/how this information was learned"
}
},
"required": ["information"],
"additionalProperties": False
}
}
tools = [{"type": "function", "function": record_user_details_json},
{"type": "function", "function": record_unknown_question_json},
{"type": "function", "function": store_conversation_info_json}]
class Me:
def __init__(self):
self.openai = OpenAI()
self.name = "Alexandre Saadoun"
# Initialize simple vector store
self.vector_store_path = "./vector_store.pkl"
self.knowledge_base = {"documents": [], "embeddings": [], "metadata": []}
# Initialize RAG system - this will auto-load all files in me/
self._setup_vector_store()
self._populate_initial_data()
def _setup_vector_store(self):
"""Setup simple vector store for RAG"""
try:
if os.path.exists(self.vector_store_path):
with open(self.vector_store_path, 'rb') as f:
self.knowledge_base = pickle.load(f)
print("✅ Loaded existing knowledge base")
else:
print("✅ Created new knowledge base")
except Exception as e:
print(f"Error loading knowledge base: {e}")
self.knowledge_base = {"documents": [], "embeddings": [], "metadata": []}
def _save_vector_store(self):
"""Save vector store to disk"""
try:
with open(self.vector_store_path, 'wb') as f:
pickle.dump(self.knowledge_base, f)
except Exception as e:
print(f"Error saving knowledge base: {e}")
def _get_embedding(self, text):
"""Get embedding for text using OpenAI"""
response = self.openai.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def _populate_initial_data(self):
"""Store initial knowledge in vector store"""
# Check if data already exists
count = len(self.knowledge_base["documents"])
if count == 0: # Only populate if empty
print("Auto-loading all files from me/ directory...")
self._auto_load_me_directory()
def _auto_load_me_directory(self):
"""Automatically load and process all files in the me/ directory"""
import glob
me_dir = "me/"
if not os.path.exists(me_dir):
print(f"Directory {me_dir} not found")
return
# Find all files in me/ directory
all_files = glob.glob(os.path.join(me_dir, "*"))
processed_files = []
for file_path in all_files:
if os.path.isfile(file_path): # Skip directories
filename = os.path.basename(file_path)
print(f"Auto-processing: {filename}")
try:
# Handle different file types
if file_path.endswith('.pdf'):
reader = PdfReader(file_path)
content = ""
for page in reader.pages:
page_text = page.extract_text()
if page_text:
content += page_text
elif file_path.endswith(('.txt', '.md')):
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
else:
print(f"Skipping unsupported file type: {filename}")
continue
if content.strip(): # Only process if content exists
self.bulk_load_text_content(content, f"me_{filename}")
processed_files.append(filename)
except Exception as e:
print(f"Error processing {filename}: {e}")
if processed_files:
print(f"✅ Auto-loaded {len(processed_files)} files: {', '.join(processed_files)}")
else:
print("No files found to process in me/ directory")
def reload_me_directory(self):
"""Reload all files from me/ directory (useful when you add new files)"""
print("Reloading me/ directory...")
# Clear existing me/ content
try:
indices_to_remove = []
for i, metadata in enumerate(self.knowledge_base["metadata"]):
if metadata.get("source", "").startswith("me_"):
indices_to_remove.append(i)
# Remove in reverse order to maintain indices
for i in reversed(indices_to_remove):
del self.knowledge_base["documents"][i]
del self.knowledge_base["embeddings"][i]
del self.knowledge_base["metadata"][i]
if indices_to_remove:
print(f"Cleared {len(indices_to_remove)} existing files from me/")
self._save_vector_store()
except Exception as e:
print(f"Error clearing existing data: {e}")
# Reload everything
self._auto_load_me_directory()
print("✅ me/ directory reloaded!")
def _search_knowledge(self, query, limit=3):
"""Search for relevant knowledge using vector similarity"""
try:
if not self.knowledge_base["documents"]:
return []
# Get query embedding
query_embedding = self._get_embedding(query)
query_vector = np.array(query_embedding)
# Calculate cosine similarities
similarities = []
for i, doc_embedding in enumerate(self.knowledge_base["embeddings"]):
doc_vector = np.array(doc_embedding)
# Cosine similarity
dot_product = np.dot(query_vector, doc_vector)
norm_query = np.linalg.norm(query_vector)
norm_doc = np.linalg.norm(doc_vector)
if norm_query > 0 and norm_doc > 0:
similarity = dot_product / (norm_query * norm_doc)
else:
similarity = 0.0
similarities.append((similarity, i))
# Sort by similarity and get top results
similarities.sort(reverse=True)
search_results = []
for similarity, idx in similarities[:limit]:
search_results.append({
"content": self.knowledge_base["documents"][idx],
"type": self.knowledge_base["metadata"][idx].get("type", "unknown"),
"score": similarity
})
return search_results
except Exception as e:
print(f"Search error: {e}")
return []
def _store_new_knowledge(self, information, context=""):
"""Store new information in vector store"""
try:
embedding = self._get_embedding(information)
self.knowledge_base["documents"].append(information)
self.knowledge_base["embeddings"].append(embedding)
self.knowledge_base["metadata"].append({
"type": "conversation",
"context": context,
"timestamp": str(np.datetime64('now'))
})
self._save_vector_store()
except Exception as e:
print(f"Error storing knowledge: {e}")
def bulk_load_text_content(self, text_content, source_name="raw_text", chunk_size=800):
"""
Load raw text content into the vector database
Args:
text_content: Raw text string (summary, report, etc.)
source_name: Name/identifier for this content
chunk_size: Size of chunks to split text into
"""
print(f"Processing text content: {source_name}")
# Split into chunks
chunks = []
for i in range(0, len(text_content), chunk_size):
chunk = text_content[i:i+chunk_size].strip()
if chunk: # Skip empty chunks
chunks.append(chunk)
print(f"Created {len(chunks)} chunks")
# Store each chunk
try:
for i, chunk in enumerate(chunks):
embedding = self._get_embedding(chunk)
self.knowledge_base["documents"].append(chunk)
self.knowledge_base["embeddings"].append(embedding)
self.knowledge_base["metadata"].append({
"type": "text_content",
"source": source_name,
"chunk_index": i,
"timestamp": str(np.datetime64('now'))
})
self._save_vector_store()
except Exception as e:
print(f"Error storing chunks: {e}")
print(f"Loaded {len(chunks)} chunks from {source_name}")
def load_text_files(self, file_paths, chunk_size=800):
"""
Load raw text files (summaries, reports) into the database
Args:
file_paths: List of text file paths
chunk_size: Size of chunks to split text into
"""
for file_path in file_paths:
print(f"Loading {file_path}...")
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Use filename as source name
source_name = os.path.basename(file_path)
self.bulk_load_text_content(content, source_name, chunk_size)
except Exception as e:
print(f"Error loading {file_path}: {e}")
def load_directory(self, directory_path, chunk_size=800):
"""
Load all .txt files from a directory
Args:
directory_path: Path to directory containing text files
chunk_size: Size of chunks to split text into
"""
import glob
txt_files = glob.glob(os.path.join(directory_path, "*.txt"))
if txt_files:
print(f"Found {len(txt_files)} text files in {directory_path}")
self.load_text_files(txt_files, chunk_size)
else:
print(f"No .txt files found in {directory_path}")
def clear_knowledge_base(self, knowledge_type=None):
"""
Clear all or specific type of knowledge from the database
Args:
knowledge_type: If specified, only delete documents of this type
"""
try:
if knowledge_type:
# Remove documents of specific type
indices_to_remove = []
for i, metadata in enumerate(self.knowledge_base["metadata"]):
if metadata.get("type") == knowledge_type:
indices_to_remove.append(i)
# Remove in reverse order to maintain indices
for i in reversed(indices_to_remove):
del self.knowledge_base["documents"][i]
del self.knowledge_base["embeddings"][i]
del self.knowledge_base["metadata"][i]
if indices_to_remove:
print(f"Deleted {len(indices_to_remove)} {knowledge_type} documents")
self._save_vector_store()
else:
print(f"No {knowledge_type} documents found")
else:
# Clear entire knowledge base
count = len(self.knowledge_base["documents"])
self.knowledge_base = {"documents": [], "embeddings": [], "metadata": []}
if count > 0:
print(f"Deleted {count} documents")
self._save_vector_store()
else:
print("No documents to delete")
except Exception as e:
print(f"Error clearing knowledge base: {e}")
def get_knowledge_stats(self):
"""Get statistics about the knowledge base"""
try:
stats = {}
total = len(self.knowledge_base["documents"])
for metadata in self.knowledge_base["metadata"]:
doc_type = metadata.get("type", "unknown")
stats[doc_type] = stats.get(doc_type, 0) + 1
print(f"Knowledge Base Stats (Total: {total} documents):")
for doc_type, count in sorted(stats.items(), key=lambda x: x[1], reverse=True):
print(f" {doc_type}: {count}")
return stats
except Exception as e:
print(f"Error getting stats: {e}")
return {}
def handle_tool_call(self, tool_calls):
results = []
for tool_call in tool_calls:
tool_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
print(f"Tool called: {tool_name}", flush=True)
if tool_name == "store_conversation_info":
# Store in Neo4j when this tool is called
self._store_new_knowledge(arguments["information"], arguments.get("context", ""))
result = {"stored": "ok", "info": arguments["information"]}
else:
tool = globals().get(tool_name)
result = tool(**arguments) if tool else {}
results.append({"role": "tool","content": json.dumps(result),"tool_call_id": tool_call.id})
return results
def system_prompt(self, relevant_knowledge=""):
system_prompt = f"You are acting as {self.name}. You are answering questions on {self.name}'s website, \
particularly questions related to {self.name}'s career, background, skills and experience. \
Your responsibility is to represent {self.name} for interactions on the website as faithfully as possible. \
Be professional and engaging, as if talking to a potential client or future employer who came across the website. \
If you don't know the answer to any question, use your record_unknown_question tool to record the question that you couldn't answer, even if it's about something trivial or unrelated to career. \
If the user is engaging in discussion, try to steer them towards getting in touch via email; ask for their email and record it using your record_user_details tool. \
If you learn new relevant information during conversations, use the store_conversation_info tool to remember it for future interactions."
if relevant_knowledge:
system_prompt += f"\n\n## Relevant Background Information:\n{relevant_knowledge}"
system_prompt += f"\n\nWith this context, please chat with the user, always staying in character as {self.name}."
return system_prompt
def chat(self, message, history):
# Search for relevant knowledge
relevant_docs = self._search_knowledge(message)
relevant_knowledge = "\n".join([f"- {doc['content'][:200]}..." for doc in relevant_docs if doc['score'] > 0.7])
messages = [{"role": "system", "content": self.system_prompt(relevant_knowledge)}] + history + [{"role": "user", "content": message}]
done = False
while not done:
response = self.openai.chat.completions.create(model="gpt-4o-mini", messages=messages, tools=tools)
if response.choices[0].finish_reason=="tool_calls":
message_obj = response.choices[0].message
tool_calls = message_obj.tool_calls
results = self.handle_tool_call(tool_calls)
messages.append(message_obj)
messages.extend(results)
else:
done = True
return response.choices[0].message.content
def __del__(self):
"""Clean up Chroma connection"""
# Chroma client doesn't need explicit closing
pass
if __name__ == "__main__":
me = Me()
gr.ChatInterface(me.chat, type="messages").launch()