File size: 21,168 Bytes
4623a33
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
import os
import sys
import shutil
import unittest
import threading
import time
import requests
import json
from unittest.mock import patch, MagicMock
from typing import List, Dict, Any, Optional
from telebot import types # Import types for message matching
from flask import request

# Add the project root to the Python path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..', '..'))
sys.path.insert(0, project_root)

# Ensure RAG_BOT modules are importable
from RAG_BOT.config import Config
from RAG_BOT.logger import logger
from RAG_BOT.vector_store import VectorStore
from RAG_BOT.agent.graph_builder import build_agent
from RAG_BOT.message_handler import MessageHandler
from RAG_BOT.bot import TelegramBotApp # Import the app class
from RAG_BOT import utils
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage

# --- Global variables for mock communication ---
# Using a simple list to capture responses. Ensure thread safety if needed.
mock_bot_responses: List[str] = []
mock_bot_lock = threading.Lock()

# --- Mock TeleBot ---
class MockTeleBot:
    """Mocks telebot.TeleBot to capture outgoing messages and simulate handler dispatch."""
    def __init__(self, token, *args, **kwargs):
        logger.info(f"MockTeleBot initialized with token: {token[:5]}...")
        self.token = token
        # Store handlers registered via register_message_handler
        self.handlers = []

    def reply_to(self, message, text, *args, **kwargs):
        logger.info(f"MockTeleBot captured reply_to: {text[:100]}...")
        with mock_bot_lock:
            mock_bot_responses.append(text)

    def send_message(self, chat_id, text, *args, **kwargs):
        logger.info(f"MockTeleBot captured send_message to {chat_id}: {text[:100]}...")
        with mock_bot_lock:
            mock_bot_responses.append(text)

    def set_webhook(self, *args, **kwargs):
        logger.info("MockTeleBot: set_webhook called")
        return True # Simulate success

    def remove_webhook(self, *args, **kwargs):
        logger.info("MockTeleBot: remove_webhook called")
        return True # Simulate success

    def get_file(self, file_id):
        logger.warning(f"MockTeleBot: get_file called for {file_id} - returning dummy")
        mock_file = MagicMock()
        mock_file.file_path = "dummy/path/file.pdf"
        return mock_file

    def download_file(self, file_path):
        logger.warning(f"MockTeleBot: download_file called for {file_path} - returning dummy bytes")
        return b"dummy pdf content"

    def register_message_handler(self, callback, commands=None, regexp=None, content_types=None, func=None, **kwargs):
        """Stores the handler function and its filters."""
        logger.info(f"MockTeleBot: Registering handler: {callback.__name__} (Commands: {commands}, ContentTypes: {content_types}, Func: {func is not None})")
        self.handlers.append({
            'callback': callback,
            'filters': {
                'commands': commands,
                'regexp': regexp,
                'content_types': content_types or ['text'], # Default to text if None
                'func': func
            }
        })

    def process_new_updates(self, updates: List[types.Update]):
        """
        Simulates pyTelegramBotAPI's update processing by finding and calling
        the appropriate registered handler.
        """
        logger.info(f"MockTeleBot: process_new_updates called with {len(updates)} update(s)")
        for update in updates:
            if update.message:
                message = update.message
                logger.info(f"MockTeleBot: Processing message update (ID: {message.message_id}, Text: '{message.text[:50]}...', Type: {message.content_type})")
                handler_called = False
                # Iterate through registered handlers to find a match
                for handler_reg in self.handlers:
                    filters = handler_reg['filters']
                    callback = handler_reg['callback']

                    # --- Basic Filter Matching Logic ---
                    match = True

                    # Match content_type
                    if filters['content_types'] and message.content_type not in filters['content_types']:
                        match = False

                    # Match commands (only if content_type is text)
                    if match and filters['commands'] and message.content_type == 'text':
                        is_command = False
                        if message.text and message.text.startswith('/'):
                            command = message.text.split()[0][1:]
                            if command in filters['commands']:
                                is_command = True
                        if not is_command:
                             match = False # Command filter exists, but message isn't that command

                    # Match func (lambda filter)
                    if match and filters['func']:
                        if not filters['func'](message):
                            match = False

                    # TODO: Add regexp matching if needed

                    # If all filters pass, call the handler
                    if match:
                        logger.info(f"MockTeleBot: Matched handler '{callback.__name__}'. Calling it.")
                        try:
                            callback(message)
                            handler_called = True
                            break # Stop after first match (like pyTelegramBotAPI often does)
                        except Exception as e:
                            logger.error(f"MockTeleBot: Error executing handler '{callback.__name__}': {e}", exc_info=True)
                            # Decide if you want to stop or continue checking other handlers
                            break
                if not handler_called:
                     logger.warning(f"MockTeleBot: No suitable handler found for message (ID: {message.message_id}, Text: '{message.text[:50]}...')")

            # TODO: Add handling for other update types (callback_query, etc.) if needed
            else:
                 logger.debug(f"MockTeleBot: Skipping update without message: {update}")


# --- Test Class ---
class TestEndToEnd(unittest.TestCase):

    bot_thread: Optional[threading.Thread] = None
    bot_app_instance: Optional[TelegramBotApp] = None
    webhook_url: Optional[str] = None
    config: Optional[Config] = None
    test_vector_store_dir: Optional[str] = None
    patcher: Optional[Any] = None # To hold the patch object

    @classmethod
    def setUpClass(cls):
        """Setup method called once before all tests."""
        cls.config = Config()
        cls.setup_test_vector_store() # Create and index test data

        # --- Start Mocking ---
        # Patch telebot.TeleBot where it's imported in RAG_BOT.bot
        cls.patcher = patch('RAG_BOT.bot.telebot.TeleBot', MockTeleBot)
        cls.patcher.start()
        logger.info("Telebot patch started.")

        # --- Initialize Bot Components ---
        # These need to be created *after* the patch is active
        try:
            logger.info("Initializing VectorStore for E2E test...")
            vector_store_instance = VectorStore(persist_directory=cls.test_vector_store_dir)
            vectordb = vector_store_instance.get_vectordb()
            if vectordb is None:
                raise ValueError("Failed to initialize vectordb for E2E test.")
            logger.info("VectorStore initialized.")

            logger.info("Initializing RAG agent for E2E test...")
            agent = build_agent(vectordb=vectordb, model_name=cls.config.LLM_MODEL_NAME)
            logger.info("RAG agent initialized.")

            logger.info("Initializing MessageHandler for E2E test...")
            handler = MessageHandler(agent=agent, config=cls.config)
            logger.info("MessageHandler initialized.")

            logger.info("Initializing TelegramBotApp for E2E test...")
            # Pass dependencies explicitly
            cls.bot_app_instance = TelegramBotApp(
                config=cls.config,
                vectordb=vectordb, # Pass the Chroma instance directly
                agent=agent,
                handler=handler
            )
            # Add a shutdown route for testing
            @cls.bot_app_instance.app.route('/shutdown', methods=['POST'])
            def shutdown():
                logger.warning("Shutdown route called, attempting to stop server.")
                func = request.environ.get('werkzeug.server.shutdown')
                if func is None:
                    logger.error('Not running with the Werkzeug Server or shutdown unavailable.')
                    # Fallback or error handling needed here if not using Werkzeug
                    # For simplicity, we might rely on thread termination, but graceful shutdown is better.
                    return 'Could not shut down server gracefully.', 500
                func()
                return 'Server shutting down...'

            logger.info("TelegramBotApp initialized.")

        except Exception as e:
            logger.critical(f"Failed to initialize bot components during setUpClass: {e}", exc_info=True)
            cls.tearDownClass() # Attempt cleanup
            raise # Re-raise the exception to fail the setup

        # --- Start Bot in Thread ---
        cls.webhook_url = f"http://127.0.0.1:{cls.config.PORT}/{cls.config.TELEGRAM_BOT_TOKEN}"
        cls.bot_thread = threading.Thread(
            target=cls.bot_app_instance.run, # Use the instance's run method
            daemon=True # Allows main thread to exit even if this thread is running
        )
        cls.bot_thread.start()
        logger.info(f"Bot thread started. Waiting for server at {cls.webhook_url}...")

        # --- Wait for Server Readiness ---
        max_wait = 30 # seconds
        start_time = time.time()
        server_ready = False
        while time.time() - start_time < max_wait:
            try:
                # Use a simple GET request to check if the base URL is responding
                # Note: The webhook URL expects POST, so GET might 404/405, but indicates the server is up.
                response = requests.get(f"http://127.0.0.1:{cls.config.PORT}/", timeout=1)
                # Check for any response, even errors like 404/405, means the server is listening
                if response.status_code:
                    logger.info(f"Server responded with status {response.status_code}. Assuming ready.")
                    server_ready = True
                    break
            except requests.exceptions.ConnectionError:
                time.sleep(0.5)
            except Exception as e:
                 logger.warning(f"Error checking server readiness: {e}")
                 time.sleep(0.5)

        if not server_ready:
            logger.error("Server did not become ready within the timeout period.")
            cls.tearDownClass() # Attempt cleanup
            raise ConnectionError("Flask server did not start in time for E2E tests.")

        logger.info("setUpClass completed.")


    @classmethod
    def tearDownClass(cls):
        """Teardown method called once after all tests."""
        logger.info("Starting tearDownClass...")
        if cls.bot_thread and cls.bot_thread.is_alive():
            logger.info("Attempting to shut down Flask server...")
            try:
                # Send shutdown request
                shutdown_url = f"http://127.0.0.1:{cls.config.PORT}/shutdown"
                requests.post(shutdown_url, timeout=5)
                logger.info("Shutdown request sent.")
            except Exception as e:
                logger.error(f"Failed to send shutdown request: {e}")
                # If shutdown request fails, thread might still be running.

            # Wait for the thread to finish
            cls.bot_thread.join(timeout=10)
            if cls.bot_thread.is_alive():
                logger.warning("Bot thread did not terminate gracefully after shutdown request and join timeout.")
                # Consider more forceful termination if necessary, but can leave resources hanging.
            else:
                logger.info("Bot thread terminated.")
        else:
             logger.info("Bot thread was not running or already stopped.")

        # Stop the patcher
        if cls.patcher:
            cls.patcher.stop()
            logger.info("Telebot patch stopped.")
            cls.patcher = None

        # Clean up test vector store
        cls.delete_test_vector_store()
        logger.info("tearDownClass completed.")

    @classmethod
    def setup_test_vector_store(cls):
        """Creates a clean vector store and indexes test PDFs."""
        current_dir = os.path.dirname(os.path.abspath(__file__))
        cls.test_vector_store_dir = os.path.join(current_dir, "..", "test_vector_store")
        pdf_dir = os.path.join(current_dir, "..", "data")

        cls.delete_test_vector_store() # Ensure clean start
        os.makedirs(cls.test_vector_store_dir, exist_ok=True)
        logger.info(f"Setting up test vector store for E2E in: {cls.test_vector_store_dir}")

        test_vector_store = VectorStore(persist_directory=cls.test_vector_store_dir)
        pdf_files = [
            os.path.join(pdf_dir, f)
            for f in os.listdir(pdf_dir)
            if f.endswith(".pdf")
        ]
        if not pdf_files:
             logger.warning(f"No PDF files found in {pdf_dir} for E2E indexing.")
             return

        for pdf_file in pdf_files:
            logger.info(f"Indexing E2E test file: {pdf_file}")
            # Use config for semantic chunking setting
            test_vector_store.build_index(pdf_file, semantic_chunk=cls.config.SEMANTIC_CHUNKING)
        logger.info("E2E test vector store setup complete.")


    @classmethod
    def delete_test_vector_store(cls):
        """Deletes the test vector store directory if it exists."""
        if cls.test_vector_store_dir and os.path.exists(cls.test_vector_store_dir):
            try:
                shutil.rmtree(cls.test_vector_store_dir)
                logger.info(f"Deleted E2E test vector store at: {cls.test_vector_store_dir}")
            except Exception as e:
                logger.error(f"Error deleting E2E test vector store: {e}")


    def setUp(self):
        """Called before each test method."""
        # Clear responses before each test
        with mock_bot_lock:
            mock_bot_responses.clear()
        # Ensure server is still alive (optional sanity check)
        self.assertTrue(self.bot_thread and self.bot_thread.is_alive(), "Bot thread is not alive at start of test.")


    def _send_message(self, text: str, user_id: int = 12345, chat_id: int = 12345):
        """Simulates sending a message to the bot's webhook."""
        if not self.webhook_url:
            self.fail("Webhook URL not set.")

        # Construct a basic Telegram Update JSON structure
        update_data = {
            "update_id": int(time.time() * 1000), # Unique enough for testing
            "message": {
                "message_id": int(time.time() * 1000) + 1,
                "from": {"id": user_id, "is_bot": False, "first_name": "Test", "last_name": "User", "username": "testuser"},
                "chat": {"id": chat_id, "type": "private", "first_name": "Test", "last_name": "User", "username": "testuser"},
                "date": int(time.time()),
                "text": text
            }
        }
        headers = {'Content-Type': 'application/json'}
        try:
            response = requests.post(self.webhook_url, headers=headers, json=update_data, timeout=20) # Increased timeout for agent processing
            response.raise_for_status() # Raise exception for bad status codes (4xx or 5xx)
            logger.info(f"Sent message to webhook: '{text[:50]}...'. Response status: {response.status_code}")
            # Give the bot thread some time to process and call the mock reply
            time.sleep(5) # Adjust as needed based on agent complexity
        except requests.exceptions.RequestException as e:
            self.fail(f"Failed to send message to webhook {self.webhook_url}: {e}")


    def _get_latest_response(self) -> Optional[str]:
        """Retrieves the latest response captured by the mock bot."""
        with mock_bot_lock:
            if mock_bot_responses:
                # Return the last captured response
                return mock_bot_responses[-1]
            return None


    def evaluate_response_with_llm(self, query: str, response: str) -> bool:
        """Uses an LLM to judge the quality of the agent's response."""
        # Note: Context is not easily available in E2E like in integration tests.
        # We judge based on query and response only for simplicity here.
        judge_llm = ChatGoogleGenerativeAI(model=Config.JUDGE_LLM_MODEL_NAME, temperature=0.0)
        # Simplified judge prompt for E2E (without context)
        judge_prompt_template_str = """
        You are an impartial judge evaluating the quality of an AI assistant's response to a user query.
        Consider if the response directly addresses the query and is informative.
        User Query:
        {query}

        Assistant Response:
        {response}

        Is the assistant's answer relevant and informative for the user query?
        Respond with only 'PASS' or 'FAIL'.
        """
        judge_prompt = judge_prompt_template_str.format(query=query, response=response)
        try:
            evaluation = judge_llm.invoke([HumanMessage(content=judge_prompt)]).content.strip().upper()
            logger.info(f"LLM Judge Evaluation for E2E query '{query[:50]}...': {evaluation}")
            return evaluation == 'PASS'
        except Exception as e:
            logger.error(f"LLM Judge call failed during E2E test: {e}")
            return False # Fail the test if judge fails


    # --- Test Cases ---

    def test_successful_query_e2e(self):
        """Tests a query expected to succeed with context retrieval."""
        query = "What is the title of the murli from 1969-01-23?"
        self._send_message(query)
        response = self._get_latest_response()

        self.assertIsNotNone(response, "Bot did not send any response.")
        logger.info(f"Raw response received: {response}")

        # Check for expected content (case-insensitive substring)
        self.assertIn("the ashes are to remind you of the stage", response.lower(),
                      f"Expected content not found in answer: {response}")

        # Evaluate with LLM Judge
        self.assertTrue(self.evaluate_response_with_llm(query, response),
                        f"LLM Judge evaluation failed for successful query. Response: {response}")


    def test_insufficient_context_query_e2e(self):
        """Tests a query expected to fail due to lack of relevant context."""
        query = "Can you summarize the murli from 1950-01-18?" # Date likely not in test data
        self._send_message(query)
        response = self._get_latest_response()

        self.assertIsNotNone(response, "Bot did not send any response for insufficient context query.")
        logger.info(f"Raw response received (insufficient context): {response}")

        # Check for "cannot find" message
        self.assertTrue(
            "cannot be found" in response.lower() or "cannot find" in response.lower(),
            f"Agent did not return a 'cannot find' message: {response}"
        )

        # Evaluate with LLM Judge (optional for 'cannot find', but can check relevance)
        # self.assertTrue(self.evaluate_response_with_llm(query, response),
        #                 f"LLM Judge evaluation failed for insufficient context query. Response: {response}")


    def test_general_knowledge_query_e2e(self):
        """Tests a general knowledge question not requiring retrieval."""
        query = "What is the capital of France?"
        self._send_message(query)
        response = self._get_latest_response()

        self.assertIsNotNone(response, "Bot did not send any response for general query.")
        logger.info(f"Raw response received (general query): {response}")

        # Check that it didn't say "cannot find"
        answer_lower = response.lower()
        self.assertNotIn("cannot be found", answer_lower)
        self.assertNotIn("cannot find", answer_lower)
        # Check for expected content (case-insensitive substring)
        self.assertIn("paris", answer_lower, f"Expected 'Paris' not found in answer: {response}")

        # Evaluate with LLM Judge
        self.assertTrue(self.evaluate_response_with_llm(query, response),
                        f"LLM Judge evaluation failed for general query. Response: {response}")


if __name__ == "__main__":
    # Ensure Flask and other dependencies are importable when running directly
    # This might require adjusting PYTHONPATH or running with `python -m unittest ...`
    # Add request explicitly for the shutdown route
    from flask import request
    unittest.main()