Spaces:
Building
Building
bk-anupam
feat: Enhance document processing with HTML support and improve language detection. Documents indexed from 1976 to 1980 in both english and hindi.
5bda5ed
# /home/bk_anupam/code/LLM_agents/RAG_BOT/bot.py | |
import telebot | |
import sys | |
from telebot.types import Message, Update | |
from datetime import datetime | |
import re | |
import os | |
from flask import Flask, request, jsonify | |
# Add the project root to the Python path | |
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) | |
sys.path.insert(0, project_root) | |
from config import Config | |
from RAG_BOT.logger import logger | |
from vector_store import VectorStore | |
# Updated import for build_agent | |
from RAG_BOT.agent.graph_builder import build_agent | |
from langchain_core.messages import HumanMessage | |
from message_handler import MessageHandler | |
from RAG_BOT.utils import detect_document_language | |
from RAG_BOT.file_manager import FileManager | |
from RAG_BOT.document_indexer import DocumentIndexer | |
from RAG_BOT.pdf_processor import PdfProcessor | |
from RAG_BOT.htm_processor import HtmProcessor # Added import | |
class TelegramBotApp: | |
def __init__(self, config: Config, vector_store_instance: VectorStore, agent, | |
handler: MessageHandler, pdf_processor: PdfProcessor = None, htm_processor: HtmProcessor = None): | |
# Initialize Flask app | |
self.app = Flask(__name__) | |
self.config = config | |
self.vector_store_instance = vector_store_instance | |
self.pdf_processor = pdf_processor or PdfProcessor() | |
self.htm_processor = htm_processor or HtmProcessor() | |
# Use injected dependencies | |
self.vectordb = vector_store_instance.get_vectordb() | |
self.agent = agent | |
self.handler = handler | |
# Assumes a 'data' folder path exists in .env | |
self.DATA_DIRECTORY = self.config.DATA_PATH | |
logger.info(f"Data directory set to: {self.DATA_DIRECTORY}") | |
if not self.config.TELEGRAM_BOT_TOKEN: | |
logger.error("TELEGRAM_BOT_TOKEN is not set. Please set it in your environment variables.") | |
exit(1) | |
try: | |
# Create Telegram bot instance | |
self.bot = telebot.TeleBot(self.config.TELEGRAM_BOT_TOKEN) | |
logger.info("Telegram bot initialized successfully") | |
# Setup webhook route after initializing bot and config | |
self._setup_webhook_route() | |
logger.info("Webhook route set up successfully") | |
# Register message handlers after bot initialization | |
self.bot.register_message_handler(self.send_welcome, commands=['start']) | |
self.bot.register_message_handler(self.send_help, commands=['help']) | |
self.bot.register_message_handler(self.handle_language_command, commands=['language']) # Register new command | |
self.bot.register_message_handler(self.handle_document, content_types=['document']) | |
self.bot.register_message_handler(self.handle_all_messages, func=lambda message: True) | |
logger.info("Message handlers registered successfully") | |
except Exception as e: | |
logger.critical(f"Failed during application startup: {str(e)}", exc_info=True) | |
exit(1) | |
def _setup_webhook_route(self): | |
"""Sets up the webhook endpoint for Telegram.""" | |
def webhook(): | |
"""Handle incoming webhook requests from Telegram""" | |
if request.headers.get('content-type') == 'application/json': | |
logger.info("Received webhook request") # Changed level to debug for less noise | |
try: | |
json_data = request.get_json() | |
update = Update.de_json(json_data) | |
self.bot.process_new_updates([update]) | |
return jsonify({"status": "ok"}) | |
except Exception as e: | |
logger.error(f"Error processing webhook update: {e}", exc_info=True) | |
return jsonify({"status": "error", "message": "Internal server error"}), 500 | |
else: | |
logger.warning(f"Received invalid content type for webhook: {request.headers.get('content-type')}") | |
return jsonify({"status": "error", "message": "Invalid content type"}), 400 | |
def send_response(self, message, user_id, response_text): | |
""" | |
Sends a response to the user, handling potential message length limits. | |
""" | |
if not response_text: | |
logger.warning(f"Attempted to send empty response to user {user_id}") | |
response_text = "Sorry, I could not generate a response." | |
# Maximum allowed message length in Telegram (adjust if needed) | |
max_telegram_length = 4096 | |
chunks = [response_text[i:i + max_telegram_length] for i in range(0, len(response_text), max_telegram_length)] | |
try: | |
# Send first chunk as reply, subsequent as regular messages to the chat | |
if chunks: | |
logger.info(f"Sending response to user {user_id}: {chunks[0][:100]}...") | |
self.bot.reply_to(message, chunks[0]) | |
for chunk in chunks[1:]: | |
self.bot.send_message(message.chat.id, chunk) | |
except telebot.apihelper.ApiException as e: | |
logger.error(f"Error sending message chunk to user {user_id} in chat {message.chat.id}: {str(e)}") | |
# Maybe try sending a generic error message if the main response failed | |
try: | |
self.bot.reply_to(message, "Sorry, there was an error sending the full response.") | |
except Exception: | |
logger.error(f"Failed even to send error notification to user {user_id}") | |
except Exception as e: | |
logger.error(f"Unexpected error in send_response for user {user_id}: {e}", exc_info=True) | |
# Telegram message handlers | |
def message_handlers(self): | |
"""Returns a list of message handlers for the bot.""" | |
return [ | |
self.send_welcome, | |
self.send_help, | |
self.handle_language_command, | |
self.handle_document, | |
self.handle_all_messages, | |
] | |
def send_welcome(self, message): | |
logger.info(f"Received /start command from user {message.from_user.id}") | |
self.bot.reply_to(message, "Welcome to the spiritual chatbot! Ask me questions about the indexed documents, or use /help for commands.") | |
def send_help(self, message): | |
logger.info(f"Received /help command from user {message.from_user.id}") | |
self.bot.reply_to(message, | |
""" | |
Available Commands: | |
/start - Show welcome message. | |
/help - Show this help message. | |
/language <lang> - Set bot language (english or hindi). Example: /language hindi | |
/query <your question> [date:YYYY-MM-DD] - Ask a question about the documents. Optionally filter by date. | |
You can also just type your question directly. | |
""" | |
) | |
def handle_language_command(self, message: Message): | |
"""Handles the /language command to set user preference.""" | |
user_id = message.from_user.id | |
parts = message.text.split(maxsplit=1) | |
if len(parts) < 2: | |
# Fetch usage help from config | |
usage_text = self.config.get_user_message('language_usage_help', | |
"Usage: /language <language>\nSupported languages: english, hindi") | |
self.bot.reply_to(message, usage_text) | |
return | |
lang_input = parts[1].strip().lower() | |
lang_code = None | |
if lang_input == 'english': | |
lang_code = 'en' | |
elif lang_input == 'hindi': | |
lang_code = 'hi' | |
else: | |
unsupported_text = self.config.get_user_message('language_unsupported', | |
"Unsupported language. Please use 'english' or 'hindi'.") | |
self.bot.reply_to(message, unsupported_text) | |
return | |
# Initialize session for the user if it doesn't exist | |
self.config.USER_SESSIONS.setdefault(user_id, {}) | |
# Store the language preference | |
self.config.USER_SESSIONS[user_id]['language'] = lang_code | |
logger.info(f"Set language preference for user {user_id} to '{lang_code}'") | |
# Get confirmation message in the selected language (fetch from prompts or use defaults) | |
confirmation_prompt_key = f"language_set_{lang_code}" | |
# Define defaults just in case the keys are missing from prompts.yaml | |
default_confirmations = {'en': "Language set to English.", 'hi': "भाषा हिंदी में सेट कर दी गई है।"} | |
# Use the new config method to get the message | |
reply_text = self.config.get_user_message(confirmation_prompt_key, default_confirmations[lang_code]) | |
self.bot.reply_to(message, reply_text) | |
def _cleanup_uploaded_file(self, file_path, processed_successfully): | |
"""Handles cleanup of uploaded files after processing.""" | |
if processed_successfully and os.path.exists(file_path): | |
try: | |
os.remove(file_path) | |
logger.info(f"Successfully processed and removed '{file_path}' from uploads directory.") | |
except OSError as e: | |
logger.error(f"Error removing processed file '{file_path}' from uploads: {e}") | |
elif not processed_successfully and os.path.exists(file_path): | |
logger.info(f"File '{file_path}' was not successfully processed/indexed and will remain in the uploads directory.") | |
elif not os.path.exists(file_path) and processed_successfully: | |
logger.warning(f"Attempted to remove '{file_path}', but it was already deleted (or never saved properly).") | |
def _determine_file_name(self, message, file_ext, default_doc_name): | |
"""Determines the correct file name for the uploaded document.""" | |
original_file_name = message.document.file_name | |
file_name = original_file_name or default_doc_name | |
# Ensure the filename has the correct extension if it was defaulted | |
if not file_name.lower().endswith(file_ext) and original_file_name is None: | |
file_name = os.path.splitext(file_name)[0] + file_ext | |
return file_name | |
def _process_document_metadata(self, message: Message): | |
""" | |
Determines file extension, default name, and processing mime type | |
based on the uploaded document's mime type and filename. | |
Returns a tuple: (file_ext, default_doc_name, processing_mime_type) | |
Raises ValueError if the file type is unsupported. | |
""" | |
mime_type = message.document.mime_type | |
file_id = message.document.file_id | |
original_file_name = message.document.file_name | |
file_ext = None | |
processing_mime_type = mime_type # Default to original mime type | |
if mime_type == 'application/pdf': | |
file_ext = ".pdf" | |
default_doc_name = f"doc_{file_id}.pdf" | |
elif mime_type in ['text/html', 'application/xhtml+xml']: | |
file_ext = ".htm" | |
default_doc_name = f"doc_{file_id}.htm" | |
elif mime_type == 'application/octet-stream': | |
# If generic binary, try to determine type from file name | |
if original_file_name: | |
name, ext = os.path.splitext(original_file_name) | |
if ext.lower() in ['.htm', '.html']: | |
file_ext = ".htm" | |
default_doc_name = original_file_name | |
processing_mime_type = 'text/html' # Treat as html for processing | |
elif ext.lower() == '.pdf': | |
file_ext = ".pdf" | |
default_doc_name = original_file_name | |
processing_mime_type = 'application/pdf' # Treat as pdf for processing | |
if file_ext is None: # If still no specific type determined | |
raise ValueError(f"Unsupported file type or unable to determine type from '{original_file_name or 'uploaded file'}'.") | |
else: # Handle other explicit unsupported mime types | |
raise ValueError(f"Unsupported file type ({mime_type}).") | |
return file_ext, default_doc_name, processing_mime_type | |
# --- Document Upload Handling (Consider if needed with startup indexing) --- | |
def handle_document(self, message: Message): | |
""" | |
Handles incoming document messages. Checks for PDF, saves, and indexes. | |
Detects language using utility function. | |
""" | |
user_id = message.from_user.id | |
if not message.document: | |
self.bot.reply_to(message, "No document provided.") | |
return | |
file_id = message.document.file_id | |
mime_type = message.document.mime_type # Keep original mime_type for logging initially | |
logger.info(f"Received document from user mime_type: {mime_type} (file_id: {file_id})") | |
file_path = None # Initialize file_path | |
documents = [] # Initialize documents list | |
processed_successfully = False | |
try: | |
# Use the new helper method to process metadata | |
file_ext, default_doc_name, processing_mime_type = self._process_document_metadata(message) | |
file_name = self._determine_file_name(message, file_ext, default_doc_name) | |
logger.info(f"User {user_id} uploaded {mime_type} (processed as {processing_mime_type}): {file_name}") | |
# Define a specific upload directory | |
upload_dir = os.path.join(project_root, "uploads") | |
os.makedirs(upload_dir, exist_ok=True) | |
file_path = os.path.join(upload_dir, file_name) | |
file_info = self.bot.get_file(file_id) | |
downloaded_file = self.bot.download_file(file_info.file_path) | |
with open(file_path, 'wb') as new_file: | |
new_file.write(downloaded_file) | |
logger.info(f"Document saved to: {file_path}") | |
# Load the document using the appropriate processor based on processing_mime_type | |
if processing_mime_type == 'application/pdf': | |
documents = self.pdf_processor.load_pdf(file_path) | |
elif processing_mime_type in ['text/html', 'application/xhtml+xml']: | |
# HtmProcessor.load_htm returns a single Document or None | |
doc = self.htm_processor.load_htm(file_path) | |
if doc: | |
documents.append(doc) | |
if not documents: | |
logger.warning(f"No documents loaded from: {file_path}. Skipping indexing.") | |
self.bot.reply_to(message, f"Could not load content from '{file_name}'.") | |
# File remains in uploads dir if loading fails | |
return | |
# Detect language using the utility function with loaded documents | |
language = detect_document_language(documents, file_name_for_logging=file_name) | |
# Add detected language metadata | |
for doc in documents: | |
doc.metadata['language'] = language | |
logger.info(f"Added language metadata '{language}' to uploaded document: {file_name}") | |
# Index the document list | |
was_indexed = self.vector_store_instance.index_document(documents, semantic_chunk=self.config.SEMANTIC_CHUNKING) | |
if was_indexed: | |
self.bot.reply_to(message, f"Document '{file_name}' uploaded and indexed successfully.") | |
processed_successfully = True | |
else: | |
self.bot.reply_to(message, f"Document '{file_name}' was not indexed (possibly already exists or an error occurred).") | |
# File remains in uploads dir if indexing fails or it's a duplicate | |
except ValueError as ve: # Catch unsupported file type errors from _process_document_metadata | |
logger.warning(f"Unsupported file type for user {user_id}: {ve}") | |
self.bot.reply_to(message, str(ve)) | |
# No file was saved in this case, so no cleanup needed | |
return | |
except Exception as e: | |
logger.error(f"Error handling document upload from user {user_id} for {file_name}: {str(e)}", exc_info=True) | |
self.bot.reply_to(message, "Sorry, I encountered an error processing your document.") | |
finally: | |
# Delete the file from upload_dir ONLY if processed and indexed successfully | |
# Ensure file_path is not None before attempting cleanup | |
if file_path: | |
self._cleanup_uploaded_file(file_path, processed_successfully) | |
# --- End Document Upload Handling --- | |
def handle_all_messages(self, message: Message): | |
""" | |
Handles all non-command text messages. | |
""" | |
user_id = message.from_user.id | |
# Get user's preferred language from session, default to 'en' if not set | |
user_lang = self.config.USER_SESSIONS.get(user_id, {}).get('language', 'en') | |
logger.info(f"Received message from user {user_id}: '{message.text[:100]}...'") | |
try: | |
# Process the message using the handler (which might invoke the agent or query directly) | |
response_text = self.handler.process_message(message, user_lang) | |
self.send_response(message, user_id, response_text) | |
except Exception as e: | |
logger.error(f"Error processing message from user {user_id}: {str(e)}", exc_info=True) | |
self.bot.reply_to(message, "Sorry, I encountered an error processing your request.") | |
# Setup and webhook configuration functions | |
def setup_webhook(self, url): | |
"""Set up the webhook for the Telegram bot""" | |
if not url: | |
logger.error("WEBHOOK_URL is not configured. Cannot set webhook.") | |
return False # Indicate failure | |
try: | |
webhook_url = f"{url.rstrip('/')}/{self.config.TELEGRAM_BOT_TOKEN}" | |
logger.info("Removing existing webhook (if any)...") | |
self.bot.remove_webhook() | |
logger.info(f"Setting webhook to: {webhook_url}") | |
success = self.bot.set_webhook(url=webhook_url) | |
if success: | |
logger.info("Webhook set successfully.") | |
return True | |
else: | |
logger.error("Failed to set webhook.") | |
return False | |
except Exception as e: | |
logger.error(f"Error setting up webhook: {e}", exc_info=True) | |
return False | |
def run(self): | |
"""Runs the Flask application.""" | |
WEBHOOK_URL = self.config.WEBHOOK_URL | |
if not WEBHOOK_URL: | |
logger.error("WEBHOOK_URL is not set in config. Cannot start Flask server with webhook.") | |
exit(1) | |
if self.setup_webhook(WEBHOOK_URL): | |
logger.info(f"Starting Flask server on port {self.config.PORT}") | |
self.app.run(host='0.0.0.0', port=self.config.PORT, debug=False) | |
else: | |
logger.critical("Failed to set up webhook. Aborting Flask server start.") | |
exit(1) | |
if __name__ == "__main__": | |
try: | |
# Initialize dependencies | |
config = Config() | |
# Assumes a 'data' folder path exists in .env | |
DATA_DIRECTORY = config.DATA_PATH | |
logger.info(f"Data directory set to: {DATA_DIRECTORY}") | |
# Instantiate the vector store instance | |
logger.info("Initializing VectorStore...") | |
vector_store_instance = VectorStore(config.VECTOR_STORE_PATH) | |
vectordb = vector_store_instance.get_vectordb() # Get the db instance after init | |
logger.info("VectorStore initialized.") | |
# --- Index data directory on startup --- | |
# Instantiate FileManager and DocumentIndexer | |
file_manager_instance = FileManager() | |
document_indexer_instance = DocumentIndexer(vector_store_instance=vector_store_instance, file_manager_instance=file_manager_instance) | |
# Call index_directory on the DocumentIndexer instance | |
document_indexer_instance.index_directory(DATA_DIRECTORY) | |
# --- End Indexing --- | |
# Log the final state of indexed metadata after potential indexing | |
logger.info("Logging final indexed metadata...") | |
vector_store_instance.log_all_indexed_metadata() | |
# Create rag agent instance | |
logger.info("Initializing RAG agent...") | |
# Ensure vectordb is valid before passing to agent | |
if vectordb is None: | |
logger.error("VectorDB instance is None after initialization and indexing. Cannot build agent.") | |
exit(1) | |
agent = build_agent(vectordb=vectordb, model_name=config.LLM_MODEL_NAME) | |
logger.info("RAG agent initialized successfully") | |
# Initialize message handler (for non-command messages) | |
handler = MessageHandler(agent=agent, config=config) | |
pdf_processor = PdfProcessor() # Initialize PDF processor | |
htm_processor = HtmProcessor() # Initialize HTM processor | |
# Create an instance of the TelegramBotApp and run it | |
bot_app = TelegramBotApp(config=config, vector_store_instance=vector_store_instance, agent=agent, | |
handler=handler, pdf_processor=pdf_processor, htm_processor=htm_processor) # Pass htm_processor | |
bot_app.run() | |
except Exception as e: | |
logger.critical(f"Failed during application startup: {str(e)}", exc_info=True) | |
exit(1) | |
# Keep start_bot for potential polling mode if needed, but it's not used with webhook | |
# def start_bot(): | |
# logger.info("Starting bot in polling mode...") | |
# bot.infinity_polling() | |