bk-anupam
feat: Enhance document processing with HTML support and improve language detection. Documents indexed from 1976 to 1980 in both english and hindi.
5bda5ed
# /home/bk_anupam/code/LLM_agents/RAG_BOT/bot.py
import telebot
import sys
from telebot.types import Message, Update
from datetime import datetime
import re
import os
from flask import Flask, request, jsonify
# Add the project root to the Python path
project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, project_root)
from config import Config
from RAG_BOT.logger import logger
from vector_store import VectorStore
# Updated import for build_agent
from RAG_BOT.agent.graph_builder import build_agent
from langchain_core.messages import HumanMessage
from message_handler import MessageHandler
from RAG_BOT.utils import detect_document_language
from RAG_BOT.file_manager import FileManager
from RAG_BOT.document_indexer import DocumentIndexer
from RAG_BOT.pdf_processor import PdfProcessor
from RAG_BOT.htm_processor import HtmProcessor # Added import
class TelegramBotApp:
def __init__(self, config: Config, vector_store_instance: VectorStore, agent,
handler: MessageHandler, pdf_processor: PdfProcessor = None, htm_processor: HtmProcessor = None):
# Initialize Flask app
self.app = Flask(__name__)
self.config = config
self.vector_store_instance = vector_store_instance
self.pdf_processor = pdf_processor or PdfProcessor()
self.htm_processor = htm_processor or HtmProcessor()
# Use injected dependencies
self.vectordb = vector_store_instance.get_vectordb()
self.agent = agent
self.handler = handler
# Assumes a 'data' folder path exists in .env
self.DATA_DIRECTORY = self.config.DATA_PATH
logger.info(f"Data directory set to: {self.DATA_DIRECTORY}")
if not self.config.TELEGRAM_BOT_TOKEN:
logger.error("TELEGRAM_BOT_TOKEN is not set. Please set it in your environment variables.")
exit(1)
try:
# Create Telegram bot instance
self.bot = telebot.TeleBot(self.config.TELEGRAM_BOT_TOKEN)
logger.info("Telegram bot initialized successfully")
# Setup webhook route after initializing bot and config
self._setup_webhook_route()
logger.info("Webhook route set up successfully")
# Register message handlers after bot initialization
self.bot.register_message_handler(self.send_welcome, commands=['start'])
self.bot.register_message_handler(self.send_help, commands=['help'])
self.bot.register_message_handler(self.handle_language_command, commands=['language']) # Register new command
self.bot.register_message_handler(self.handle_document, content_types=['document'])
self.bot.register_message_handler(self.handle_all_messages, func=lambda message: True)
logger.info("Message handlers registered successfully")
except Exception as e:
logger.critical(f"Failed during application startup: {str(e)}", exc_info=True)
exit(1)
def _setup_webhook_route(self):
"""Sets up the webhook endpoint for Telegram."""
@self.app.route(f'/{self.config.TELEGRAM_BOT_TOKEN}', methods=['POST'])
def webhook():
"""Handle incoming webhook requests from Telegram"""
if request.headers.get('content-type') == 'application/json':
logger.info("Received webhook request") # Changed level to debug for less noise
try:
json_data = request.get_json()
update = Update.de_json(json_data)
self.bot.process_new_updates([update])
return jsonify({"status": "ok"})
except Exception as e:
logger.error(f"Error processing webhook update: {e}", exc_info=True)
return jsonify({"status": "error", "message": "Internal server error"}), 500
else:
logger.warning(f"Received invalid content type for webhook: {request.headers.get('content-type')}")
return jsonify({"status": "error", "message": "Invalid content type"}), 400
def send_response(self, message, user_id, response_text):
"""
Sends a response to the user, handling potential message length limits.
"""
if not response_text:
logger.warning(f"Attempted to send empty response to user {user_id}")
response_text = "Sorry, I could not generate a response."
# Maximum allowed message length in Telegram (adjust if needed)
max_telegram_length = 4096
chunks = [response_text[i:i + max_telegram_length] for i in range(0, len(response_text), max_telegram_length)]
try:
# Send first chunk as reply, subsequent as regular messages to the chat
if chunks:
logger.info(f"Sending response to user {user_id}: {chunks[0][:100]}...")
self.bot.reply_to(message, chunks[0])
for chunk in chunks[1:]:
self.bot.send_message(message.chat.id, chunk)
except telebot.apihelper.ApiException as e:
logger.error(f"Error sending message chunk to user {user_id} in chat {message.chat.id}: {str(e)}")
# Maybe try sending a generic error message if the main response failed
try:
self.bot.reply_to(message, "Sorry, there was an error sending the full response.")
except Exception:
logger.error(f"Failed even to send error notification to user {user_id}")
except Exception as e:
logger.error(f"Unexpected error in send_response for user {user_id}: {e}", exc_info=True)
# Telegram message handlers
@property
def message_handlers(self):
"""Returns a list of message handlers for the bot."""
return [
self.send_welcome,
self.send_help,
self.handle_language_command,
self.handle_document,
self.handle_all_messages,
]
def send_welcome(self, message):
logger.info(f"Received /start command from user {message.from_user.id}")
self.bot.reply_to(message, "Welcome to the spiritual chatbot! Ask me questions about the indexed documents, or use /help for commands.")
def send_help(self, message):
logger.info(f"Received /help command from user {message.from_user.id}")
self.bot.reply_to(message,
"""
Available Commands:
/start - Show welcome message.
/help - Show this help message.
/language <lang> - Set bot language (english or hindi). Example: /language hindi
/query <your question> [date:YYYY-MM-DD] - Ask a question about the documents. Optionally filter by date.
You can also just type your question directly.
"""
)
def handle_language_command(self, message: Message):
"""Handles the /language command to set user preference."""
user_id = message.from_user.id
parts = message.text.split(maxsplit=1)
if len(parts) < 2:
# Fetch usage help from config
usage_text = self.config.get_user_message('language_usage_help',
"Usage: /language <language>\nSupported languages: english, hindi")
self.bot.reply_to(message, usage_text)
return
lang_input = parts[1].strip().lower()
lang_code = None
if lang_input == 'english':
lang_code = 'en'
elif lang_input == 'hindi':
lang_code = 'hi'
else:
unsupported_text = self.config.get_user_message('language_unsupported',
"Unsupported language. Please use 'english' or 'hindi'.")
self.bot.reply_to(message, unsupported_text)
return
# Initialize session for the user if it doesn't exist
self.config.USER_SESSIONS.setdefault(user_id, {})
# Store the language preference
self.config.USER_SESSIONS[user_id]['language'] = lang_code
logger.info(f"Set language preference for user {user_id} to '{lang_code}'")
# Get confirmation message in the selected language (fetch from prompts or use defaults)
confirmation_prompt_key = f"language_set_{lang_code}"
# Define defaults just in case the keys are missing from prompts.yaml
default_confirmations = {'en': "Language set to English.", 'hi': "भाषा हिंदी में सेट कर दी गई है।"}
# Use the new config method to get the message
reply_text = self.config.get_user_message(confirmation_prompt_key, default_confirmations[lang_code])
self.bot.reply_to(message, reply_text)
def _cleanup_uploaded_file(self, file_path, processed_successfully):
"""Handles cleanup of uploaded files after processing."""
if processed_successfully and os.path.exists(file_path):
try:
os.remove(file_path)
logger.info(f"Successfully processed and removed '{file_path}' from uploads directory.")
except OSError as e:
logger.error(f"Error removing processed file '{file_path}' from uploads: {e}")
elif not processed_successfully and os.path.exists(file_path):
logger.info(f"File '{file_path}' was not successfully processed/indexed and will remain in the uploads directory.")
elif not os.path.exists(file_path) and processed_successfully:
logger.warning(f"Attempted to remove '{file_path}', but it was already deleted (or never saved properly).")
def _determine_file_name(self, message, file_ext, default_doc_name):
"""Determines the correct file name for the uploaded document."""
original_file_name = message.document.file_name
file_name = original_file_name or default_doc_name
# Ensure the filename has the correct extension if it was defaulted
if not file_name.lower().endswith(file_ext) and original_file_name is None:
file_name = os.path.splitext(file_name)[0] + file_ext
return file_name
def _process_document_metadata(self, message: Message):
"""
Determines file extension, default name, and processing mime type
based on the uploaded document's mime type and filename.
Returns a tuple: (file_ext, default_doc_name, processing_mime_type)
Raises ValueError if the file type is unsupported.
"""
mime_type = message.document.mime_type
file_id = message.document.file_id
original_file_name = message.document.file_name
file_ext = None
processing_mime_type = mime_type # Default to original mime type
if mime_type == 'application/pdf':
file_ext = ".pdf"
default_doc_name = f"doc_{file_id}.pdf"
elif mime_type in ['text/html', 'application/xhtml+xml']:
file_ext = ".htm"
default_doc_name = f"doc_{file_id}.htm"
elif mime_type == 'application/octet-stream':
# If generic binary, try to determine type from file name
if original_file_name:
name, ext = os.path.splitext(original_file_name)
if ext.lower() in ['.htm', '.html']:
file_ext = ".htm"
default_doc_name = original_file_name
processing_mime_type = 'text/html' # Treat as html for processing
elif ext.lower() == '.pdf':
file_ext = ".pdf"
default_doc_name = original_file_name
processing_mime_type = 'application/pdf' # Treat as pdf for processing
if file_ext is None: # If still no specific type determined
raise ValueError(f"Unsupported file type or unable to determine type from '{original_file_name or 'uploaded file'}'.")
else: # Handle other explicit unsupported mime types
raise ValueError(f"Unsupported file type ({mime_type}).")
return file_ext, default_doc_name, processing_mime_type
# --- Document Upload Handling (Consider if needed with startup indexing) ---
def handle_document(self, message: Message):
"""
Handles incoming document messages. Checks for PDF, saves, and indexes.
Detects language using utility function.
"""
user_id = message.from_user.id
if not message.document:
self.bot.reply_to(message, "No document provided.")
return
file_id = message.document.file_id
mime_type = message.document.mime_type # Keep original mime_type for logging initially
logger.info(f"Received document from user mime_type: {mime_type} (file_id: {file_id})")
file_path = None # Initialize file_path
documents = [] # Initialize documents list
processed_successfully = False
try:
# Use the new helper method to process metadata
file_ext, default_doc_name, processing_mime_type = self._process_document_metadata(message)
file_name = self._determine_file_name(message, file_ext, default_doc_name)
logger.info(f"User {user_id} uploaded {mime_type} (processed as {processing_mime_type}): {file_name}")
# Define a specific upload directory
upload_dir = os.path.join(project_root, "uploads")
os.makedirs(upload_dir, exist_ok=True)
file_path = os.path.join(upload_dir, file_name)
file_info = self.bot.get_file(file_id)
downloaded_file = self.bot.download_file(file_info.file_path)
with open(file_path, 'wb') as new_file:
new_file.write(downloaded_file)
logger.info(f"Document saved to: {file_path}")
# Load the document using the appropriate processor based on processing_mime_type
if processing_mime_type == 'application/pdf':
documents = self.pdf_processor.load_pdf(file_path)
elif processing_mime_type in ['text/html', 'application/xhtml+xml']:
# HtmProcessor.load_htm returns a single Document or None
doc = self.htm_processor.load_htm(file_path)
if doc:
documents.append(doc)
if not documents:
logger.warning(f"No documents loaded from: {file_path}. Skipping indexing.")
self.bot.reply_to(message, f"Could not load content from '{file_name}'.")
# File remains in uploads dir if loading fails
return
# Detect language using the utility function with loaded documents
language = detect_document_language(documents, file_name_for_logging=file_name)
# Add detected language metadata
for doc in documents:
doc.metadata['language'] = language
logger.info(f"Added language metadata '{language}' to uploaded document: {file_name}")
# Index the document list
was_indexed = self.vector_store_instance.index_document(documents, semantic_chunk=self.config.SEMANTIC_CHUNKING)
if was_indexed:
self.bot.reply_to(message, f"Document '{file_name}' uploaded and indexed successfully.")
processed_successfully = True
else:
self.bot.reply_to(message, f"Document '{file_name}' was not indexed (possibly already exists or an error occurred).")
# File remains in uploads dir if indexing fails or it's a duplicate
except ValueError as ve: # Catch unsupported file type errors from _process_document_metadata
logger.warning(f"Unsupported file type for user {user_id}: {ve}")
self.bot.reply_to(message, str(ve))
# No file was saved in this case, so no cleanup needed
return
except Exception as e:
logger.error(f"Error handling document upload from user {user_id} for {file_name}: {str(e)}", exc_info=True)
self.bot.reply_to(message, "Sorry, I encountered an error processing your document.")
finally:
# Delete the file from upload_dir ONLY if processed and indexed successfully
# Ensure file_path is not None before attempting cleanup
if file_path:
self._cleanup_uploaded_file(file_path, processed_successfully)
# --- End Document Upload Handling ---
def handle_all_messages(self, message: Message):
"""
Handles all non-command text messages.
"""
user_id = message.from_user.id
# Get user's preferred language from session, default to 'en' if not set
user_lang = self.config.USER_SESSIONS.get(user_id, {}).get('language', 'en')
logger.info(f"Received message from user {user_id}: '{message.text[:100]}...'")
try:
# Process the message using the handler (which might invoke the agent or query directly)
response_text = self.handler.process_message(message, user_lang)
self.send_response(message, user_id, response_text)
except Exception as e:
logger.error(f"Error processing message from user {user_id}: {str(e)}", exc_info=True)
self.bot.reply_to(message, "Sorry, I encountered an error processing your request.")
# Setup and webhook configuration functions
def setup_webhook(self, url):
"""Set up the webhook for the Telegram bot"""
if not url:
logger.error("WEBHOOK_URL is not configured. Cannot set webhook.")
return False # Indicate failure
try:
webhook_url = f"{url.rstrip('/')}/{self.config.TELEGRAM_BOT_TOKEN}"
logger.info("Removing existing webhook (if any)...")
self.bot.remove_webhook()
logger.info(f"Setting webhook to: {webhook_url}")
success = self.bot.set_webhook(url=webhook_url)
if success:
logger.info("Webhook set successfully.")
return True
else:
logger.error("Failed to set webhook.")
return False
except Exception as e:
logger.error(f"Error setting up webhook: {e}", exc_info=True)
return False
def run(self):
"""Runs the Flask application."""
WEBHOOK_URL = self.config.WEBHOOK_URL
if not WEBHOOK_URL:
logger.error("WEBHOOK_URL is not set in config. Cannot start Flask server with webhook.")
exit(1)
if self.setup_webhook(WEBHOOK_URL):
logger.info(f"Starting Flask server on port {self.config.PORT}")
self.app.run(host='0.0.0.0', port=self.config.PORT, debug=False)
else:
logger.critical("Failed to set up webhook. Aborting Flask server start.")
exit(1)
if __name__ == "__main__":
try:
# Initialize dependencies
config = Config()
# Assumes a 'data' folder path exists in .env
DATA_DIRECTORY = config.DATA_PATH
logger.info(f"Data directory set to: {DATA_DIRECTORY}")
# Instantiate the vector store instance
logger.info("Initializing VectorStore...")
vector_store_instance = VectorStore(config.VECTOR_STORE_PATH)
vectordb = vector_store_instance.get_vectordb() # Get the db instance after init
logger.info("VectorStore initialized.")
# --- Index data directory on startup ---
# Instantiate FileManager and DocumentIndexer
file_manager_instance = FileManager()
document_indexer_instance = DocumentIndexer(vector_store_instance=vector_store_instance, file_manager_instance=file_manager_instance)
# Call index_directory on the DocumentIndexer instance
document_indexer_instance.index_directory(DATA_DIRECTORY)
# --- End Indexing ---
# Log the final state of indexed metadata after potential indexing
logger.info("Logging final indexed metadata...")
vector_store_instance.log_all_indexed_metadata()
# Create rag agent instance
logger.info("Initializing RAG agent...")
# Ensure vectordb is valid before passing to agent
if vectordb is None:
logger.error("VectorDB instance is None after initialization and indexing. Cannot build agent.")
exit(1)
agent = build_agent(vectordb=vectordb, model_name=config.LLM_MODEL_NAME)
logger.info("RAG agent initialized successfully")
# Initialize message handler (for non-command messages)
handler = MessageHandler(agent=agent, config=config)
pdf_processor = PdfProcessor() # Initialize PDF processor
htm_processor = HtmProcessor() # Initialize HTM processor
# Create an instance of the TelegramBotApp and run it
bot_app = TelegramBotApp(config=config, vector_store_instance=vector_store_instance, agent=agent,
handler=handler, pdf_processor=pdf_processor, htm_processor=htm_processor) # Pass htm_processor
bot_app.run()
except Exception as e:
logger.critical(f"Failed during application startup: {str(e)}", exc_info=True)
exit(1)
# Keep start_bot for potential polling mode if needed, but it's not used with webhook
# def start_bot():
# logger.info("Starting bot in polling mode...")
# bot.infinity_polling()