File size: 6,029 Bytes
525ef5c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#!/usr/bin/env python3
"""
Test script to verify RAG functionality fixes
"""

import os
import tempfile
import warnings
from pathlib import Path

# Suppress known warnings
warnings.filterwarnings("ignore", message=".*use_auth_token.*")
warnings.filterwarnings("ignore", message=".*urllib3.*")
warnings.filterwarnings("ignore", message=".*resource_tracker.*")

# Set environment variables to prevent multiprocessing issues
os.environ['TOKENIZERS_PARALLELISM'] = 'false'

def test_rag_dependencies():
    """Test that RAG dependencies are available"""
    print("Testing RAG dependencies...")
    
    try:
        import sentence_transformers
        print("βœ… sentence-transformers available")
    except ImportError:
        print("❌ sentence-transformers not available")
        return False
    
    try:
        import faiss
        print("βœ… faiss-cpu available") 
    except ImportError:
        print("❌ faiss-cpu not available")
        return False
        
    try:
        import fitz  # PyMuPDF
        print("βœ… PyMuPDF available")
    except ImportError:
        print("⚠️  PyMuPDF not available (PDF processing disabled)")
    
    try:
        from docx import Document
        print("βœ… python-docx available")
    except ImportError:
        print("⚠️  python-docx not available (DOCX processing disabled)")
    
    return True

def test_vector_store_initialization():
    """Test vector store initialization with improved error handling"""
    print("\nTesting vector store initialization...")
    
    try:
        from vector_store import VectorStore
        
        # Test with CPU-only settings
        store = VectorStore(embedding_model="all-MiniLM-L6-v2")
        print("βœ… VectorStore created successfully")
        
        # Test a small embedding operation
        test_texts = ["This is a test sentence.", "Another test sentence."]
        embeddings = store.create_embeddings(test_texts)
        print(f"βœ… Created embeddings: shape {embeddings.shape}")
        
        return True
        
    except Exception as e:
        print(f"❌ VectorStore initialization failed: {e}")
        return False

def test_document_processing():
    """Test document processing with a simple text file"""
    print("\nTesting document processing...")
    
    try:
        from document_processor import DocumentProcessor
        
        # Create a temporary test file
        with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
            f.write("This is a test document for RAG processing. ")
            f.write("It contains multiple sentences that should be processed into chunks. ")
            f.write("Each chunk should have proper metadata and be ready for embedding.")
            test_file = f.name
        
        try:
            processor = DocumentProcessor(chunk_size=50, chunk_overlap=10)
            chunks = processor.process_file(test_file)
            
            print(f"βœ… Created {len(chunks)} chunks from test document")
            if chunks:
                print(f"   First chunk: {chunks[0].text[:50]}...")
                print(f"   Metadata keys: {list(chunks[0].metadata.keys())}")
            
            return True
            
        finally:
            # Clean up test file
            os.unlink(test_file)
            
    except Exception as e:
        print(f"❌ Document processing failed: {e}")
        return False

def test_rag_tool_integration():
    """Test the complete RAG tool integration"""
    print("\nTesting complete RAG tool integration...")
    
    try:
        from rag_tool import RAGTool
        
        # Create a temporary test file
        with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f:
            f.write("RAG integration test document. ")
            f.write("This document tests the complete RAG pipeline from file processing to vector search. ")
            f.write("The system should handle this without crashing the server.")
            test_file = f.name
        
        try:
            rag_tool = RAGTool()
            result = rag_tool.process_uploaded_files([test_file])
            
            if result['success']:
                print(f"βœ… RAG processing succeeded: {result['message']}")
                print(f"   Files processed: {len(result['summary']['files_processed'])}")
                print(f"   Total chunks: {result['summary']['total_chunks']}")
                
                # Test search functionality
                context = rag_tool.get_relevant_context("test document")
                if context:
                    print(f"βœ… Search functionality working: {context[:100]}...")
                else:
                    print("⚠️  Search returned no results")
                
                return True
            else:
                print(f"❌ RAG processing failed: {result['message']}")
                return False
                
        finally:
            # Clean up test file
            os.unlink(test_file)
            
    except Exception as e:
        print(f"❌ RAG tool integration failed: {e}")
        return False

def main():
    """Run all RAG tests"""
    print("πŸš€ Testing RAG functionality fixes...")
    print("=" * 50)
    
    tests = [
        test_rag_dependencies,
        test_vector_store_initialization,
        test_document_processing,
        test_rag_tool_integration
    ]
    
    passed = 0
    total = len(tests)
    
    for test in tests:
        try:
            if test():
                passed += 1
        except Exception as e:
            print(f"❌ Test failed with exception: {e}")
    
    print("\n" + "=" * 50)
    print(f"πŸ“Š Test Results: {passed}/{total} tests passed")
    
    if passed == total:
        print("πŸŽ‰ All tests passed! RAG functionality should work correctly.")
        return True
    else:
        print("⚠️  Some tests failed. Check error messages above.")
        return False

if __name__ == "__main__":
    main()