File size: 6,523 Bytes
209e402
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import os
from typing import List, Dict, Any
import tempfile
import shutil
import logging
import time
import traceback
import asyncio

# Configure logging
logging.basicConfig(level=logging.INFO, 
                    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Make sure aimakerspace is in the path
import sys
sys.path.append(os.path.join(os.path.dirname(os.path.dirname(__file__)), ""))

# Import from local aimakerspace module
from aimakerspace.text_utils import CharacterTextSplitter, TextFileLoader, PDFLoader
from aimakerspace.vectordatabase import VectorDatabase
from aimakerspace.openai_utils.embedding import EmbeddingModel
from openai import OpenAI

# Initialize OpenAI client
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
logger.info(f"Initialized OpenAI client with API key: {'valid key' if os.getenv('OPENAI_API_KEY') else 'API KEY MISSING!'}")

class RetrievalAugmentedQAPipeline:
    def __init__(self, vector_db_retriever: VectorDatabase) -> None:
        self.vector_db_retriever = vector_db_retriever
    
    async def arun_pipeline(self, user_query: str):
        """
        Run the RAG pipeline with the given user query.
        Returns a stream of response chunks.
        """
        try:
            # 1. Retrieve relevant documents
            logger.info(f"RAG Pipeline: Retrieving documents for query: '{user_query}'")
            relevant_docs = self.vector_db_retriever.search_by_text(user_query, k=4)
            
            if not relevant_docs:
                logger.warning("No relevant documents found in vector database")
                documents_context = "No relevant information found in the document."
            else:
                logger.info(f"Found {len(relevant_docs)} relevant document chunks")
                # Format documents
                documents_context = "\n\n".join([doc[0] for doc in relevant_docs])
            
            # Debug similarity scores
            doc_scores = [f"{i+1}. Score: {doc[1]:.4f}" for i, doc in enumerate(relevant_docs)]
            logger.info(f"Document similarity scores: {', '.join(doc_scores) if doc_scores else 'No documents'}")
            
            # 2. Create messaging payload
            messages = [
                {"role": "system", "content": f"""You are a helpful AI assistant that answers questions based on the provided document context.
                If the answer is not in the context, say that you don't know based on the available information. 
                Use the following document extracts to answer the user's question:
                
                {documents_context}"""},
                {"role": "user", "content": user_query}
            ]
            
            # 3. Call LLM and stream the output
            async def generate_response():
                try:
                    logger.info("Initiating streaming completion from OpenAI")
                    stream = client.chat.completions.create(
                        model="gpt-3.5-turbo",
                        messages=messages,
                        temperature=0.2,
                        stream=True
                    )
                    
                    for chunk in stream:
                        if chunk.choices[0].delta.content:
                            yield chunk.choices[0].delta.content
                except Exception as e:
                    logger.error(f"Error generating stream: {str(e)}")
                    yield f"\n\nI apologize, but I encountered an error while generating a response: {str(e)}"
            
            return {
                "response": generate_response()
            }
        
        except Exception as e:
            logger.error(f"Error in RAG pipeline: {str(e)}")
            logger.error(traceback.format_exc())
            return {
                "response": (chunk for chunk in [f"I apologize, but an error occurred: {str(e)}"])
            }

def process_file(file_path: str, file_name: str) -> List[str]:
    """Process an uploaded file and convert it to text chunks"""
    logger.info(f"Processing file: {file_name} at path: {file_path}")
    
    try:
        # Determine loader based on file extension
        if file_name.lower().endswith('.txt'):
            logger.info(f"Using TextFileLoader for {file_name}")
            loader = TextFileLoader(file_path)
            loader.load()
        elif file_name.lower().endswith('.pdf'):
            logger.info(f"Using PDFLoader for {file_name}")
            loader = PDFLoader(file_path)
            loader.load()
        else:
            logger.warning(f"Unsupported file type: {file_name}")
            return ["Unsupported file format. Please upload a .txt or .pdf file."]
        
        # Get documents from loader
        documents = loader.documents
        if documents and len(documents) > 0:
            logger.info(f"Loaded document with {len(documents[0])} characters")
        else:
            logger.warning("No document content loaded")
            return ["No content found in the document"]
        
        # Split text into chunks
        text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
        text_chunks = text_splitter.split_texts(documents)
        
        logger.info(f"Split document into {len(text_chunks)} chunks")
        return text_chunks
        
    except Exception as e:
        logger.error(f"Error processing file: {str(e)}")
        logger.error(traceback.format_exc())
        return [f"Error processing file: {str(e)}"]

async def setup_vector_db(texts: List[str]) -> VectorDatabase:
    """Create vector database from text chunks"""
    logger.info(f"Setting up vector database with {len(texts)} text chunks")
    
    embedding_model = EmbeddingModel()
    vector_db = VectorDatabase(embedding_model=embedding_model)
    
    try:
        await vector_db.abuild_from_list(texts)
        
        vector_db.documents = texts
        
        logger.info(f"Vector database built with {len(texts)} documents")
        return vector_db
    except Exception as e:
        logger.error(f"Error setting up vector database: {str(e)}")
        logger.error(traceback.format_exc())
        
        fallback_db = VectorDatabase(embedding_model=embedding_model)
        error_text = "I'm sorry, but there was an error processing the document."
        fallback_db.insert(error_text, [0.0] * 1536)
        fallback_db.documents = [error_text]
        return fallback_db