Spaces:
Runtime error
Runtime error
import gradio as gr | |
import os | |
import zipfile | |
import uuid | |
from langchain_huggingface import HuggingFaceEmbeddings | |
from pinecone import Pinecone, ServerlessSpec | |
from langchain_community.document_loaders import WhatsAppChatLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from dotenv import load_dotenv | |
#load env var | |
load_dotenv() | |
# Initialize Pinecone and the index outside the function | |
pinecone_key = os.getenv("PINECONE_API_KEY") | |
pc = Pinecone(api_key=pinecone_key) | |
index_name = "whatsapp-chat-index" | |
if index_name not in pc.list_indexes().names(): | |
pc.create_index( | |
name=index_name, | |
dimension=768, # change as per embedding model | |
metric="cosine", | |
spec=ServerlessSpec( | |
cloud='aws', | |
region='us-east-1' | |
) | |
) | |
index = pc.Index(index_name) | |
# Initialize Hugging Face embeddings | |
embeddings = HuggingFaceEmbeddings() | |
def load_chat_content(file) -> str: | |
"""Load chat content from the uploaded zip file and store it in Pinecone.""" | |
# Load and process the ZIP file | |
temp_dir = 'temp_extracted_files' | |
os.makedirs(temp_dir, exist_ok=True) | |
with zipfile.ZipFile(file, 'r') as z: | |
z.extractall(temp_dir) | |
chat_files = [f for f in os.listdir(temp_dir) if f.endswith('.txt')] | |
if not chat_files: | |
raise ValueError("No chat files found in the zip archive.") | |
chat_file_path = os.path.join(temp_dir, chat_files[0]) | |
loader = WhatsAppChatLoader(path=chat_file_path) | |
raw_messages = loader.lazy_load() | |
messages = list(raw_messages) | |
chat_content = "\n".join([doc.page_content for doc in messages]) | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=200, | |
) | |
chunks = text_splitter.create_documents([chat_content]) | |
# Store chunks in Pinecone with unique IDs | |
vectors_to_upsert = [] | |
unique_ids = [] | |
for i, chunk in enumerate(chunks): | |
vector = embeddings.embed_documents([chunk.page_content])[0] | |
unique_id = str(uuid.uuid4()) # Generate a unique ID | |
vectors_to_upsert.append((unique_id, vector, {"text": chunk.page_content})) | |
unique_ids.append(unique_id) | |
# Upsert vectors to Pinecone | |
index.upsert(vectors_to_upsert) | |
# Verify insertion by querying Pinecone with the unique IDs | |
inserted_ids = [] | |
for uid in unique_ids: | |
result = index.fetch(ids=[uid]) | |
if uid in result["matches"]: | |
inserted_ids.append(uid) | |
if len(inserted_ids) == len(unique_ids): | |
return "All chat content has been successfully upserted to Pinecone." | |
else: | |
return f"Insertion incomplete. Only {len(inserted_ids)} out of {len(unique_ids)} chunks were inserted." | |
# Define the Gradio interface | |
interface = gr.Interface( | |
fn=load_chat_content, | |
inputs=[ | |
gr.File(label="Upload WhatsApp Chat Zip File") | |
], | |
outputs="text", | |
title="WhatsApp Chat Upsert to Pinecone", | |
description="Upload a zip file containing a WhatsApp chat file and upsert its content to Pinecone.", | |
) | |
if __name__ == "__main__": | |
interface.launch() | |