Spaces:
Building
Building
# /home/bk_anupam/code/LLM_agents/RAG_BOT/message_handler.py | |
import datetime | |
from config import Config | |
from RAG_BOT.logger import logger # Corrected import path based on other files | |
from langchain_core.messages import HumanMessage | |
from telebot.types import Message | |
from langgraph.graph import StateGraph | |
from RAG_BOT.utils import parse_json_answer | |
class MessageHandler: | |
def __init__(self, agent: StateGraph, config: Config): | |
# Store user session data (could be moved to a database for persistence) | |
self.config = config | |
self.sessions = config.USER_SESSIONS | |
self.agent = agent | |
def _get_user_session(self, user_id): | |
if user_id not in self.sessions: | |
logger.info(f"Creating new session for user {user_id}") | |
self.sessions[user_id] = { | |
'last_interaction': datetime.datetime.now(), | |
'conversation': [], | |
'context': {}, | |
'language': 'en' # Default language | |
} | |
# Ensure essential keys are always present using setdefault | |
if 'language' not in self.sessions[user_id]: | |
self.sessions[user_id]['language'] = 'en' | |
# *** Ensure 'conversation' key exists *** | |
self.sessions[user_id].setdefault('conversation', []) | |
return self.sessions[user_id] | |
def _update_session(self, user_id, message, response): | |
"""Update the user session with new interaction""" | |
session = self._get_user_session(user_id) | |
session['last_interaction'] = datetime.datetime.now() | |
session['conversation'].append({ | |
'user': message, | |
'bot': response, | |
'timestamp': datetime.datetime.now().isoformat() | |
}) | |
# Limit conversation history (optional) | |
# Consider making the limit configurable | |
history_limit = self.config.CONVERSATION_HISTORY_LIMIT or 10 | |
if len(session['conversation']) > history_limit: | |
session['conversation'] = session['conversation'][-history_limit:] | |
def process_message(self, incoming_message: Message, language_code: str): | |
""" | |
Process the incoming message and generate a response | |
This is where you implement your custom logic | |
""" | |
# language_code is now passed as an argument | |
user_id = incoming_message.from_user.id | |
message = incoming_message.text | |
if not message: # Handle cases like stickers or empty messages if needed | |
logger.warning(f"Received empty message text from user {user_id}") | |
return "Sorry, I didn't receive any text." | |
logger.info(f"Processing message from {user_id}: {message[:100]}...") # Log snippet | |
# Get user session (still useful for history, etc.) | |
session = self._get_user_session(user_id) | |
# Convert message to lowercase for easier matching | |
message_lower = message.lower().strip() | |
# --- Updated Greeting Check --- | |
# Check for exact matches for greetings | |
greetings = {'hello', 'hi', 'hey'} | |
if message_lower in greetings: | |
response = "π Hello! I'm your Telegram assistant. How can I help you today?" | |
# --- End Updated Check --- | |
elif "help" == message_lower: # Check for exact match for help too | |
response = ("Here's what I can do:\n" | |
"- Answer your query (on pdf documents uploaded). Use /query command followed by the query for this\n" | |
"- Index and store in vector DB uploaded pdf documents. Just send the pdf document as a message\n" | |
"- Answer any general query \n" | |
"- Last message - to see your last message\n" | |
"Just let me know what you need!") | |
elif "last message" == message_lower: # Exact match | |
if session['conversation']: # Check if list is not empty | |
last_user_message = session['conversation'][-1]['user'] | |
response = f"Your last message was: '{last_user_message}'" | |
else: | |
response = "You haven't sent any previous messages in this session." | |
else: | |
# --- Agent Invocation Logic --- | |
try: | |
logger.info(f"Invoking agent for thread_id={str(incoming_message.chat.id)}, lang='{language_code}', query='{message[:50]}...'") | |
config_thread = {"configurable": {"thread_id": str(incoming_message.chat.id)}} | |
# Build the initial state for the agent | |
initial_state = { | |
"messages": [HumanMessage(content=message)], | |
"language_code": language_code | |
} | |
# It's good practice to stream or use async invoke if available and the agent call might take time | |
# Using synchronous invoke for now as per the original code | |
final_state = self.agent.invoke(initial_state, config_thread) | |
# Extract the answer more robustly | |
answer = None | |
if isinstance(final_state, dict) and "messages" in final_state: | |
final_messages = final_state["messages"] | |
if isinstance(final_messages, list) and final_messages: | |
# Get the last message, assuming it's the agent's response | |
last_msg = final_messages[-1] | |
if hasattr(last_msg, 'content'): | |
json_result = parse_json_answer(last_msg.content) | |
answer = json_result.get("answer") if json_result else None | |
else: | |
logger.warning(f"Last message in agent response has no 'content' attribute: {last_msg}") | |
else: | |
logger.warning(f"Agent response 'messages' is not a non-empty list: {final_messages}") | |
else: | |
logger.warning(f"Unexpected agent response format: {final_state}") | |
if not answer: | |
answer = "Sorry, I couldn't retrieve an answer for that." # More specific default | |
response = answer | |
except Exception as e: | |
logger.error(f"Error invoking RAG agent for user {user_id}: {str(e)}", exc_info=True) | |
response = "Sorry, I encountered an internal error while processing your query." | |
# --- End Agent Invocation --- | |
# Update session with this interaction | |
self._update_session(user_id, message, response) | |
logger.info(f"Generated response for user {user_id}: {response[:100]}...") # Log response snippet | |
return response | |