import os from pymongo import MongoClient from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError from dotenv import load_dotenv from datetime import datetime, timedelta import pytz import logging # Configure logging logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # MongoDB connection string from .env MONGODB_URL = os.getenv("MONGODB_URL") DB_NAME = os.getenv("DB_NAME", "Telegram") COLLECTION_NAME = os.getenv("COLLECTION_NAME", "session_chat") # Set timeout for MongoDB connection MONGODB_TIMEOUT = int(os.getenv("MONGODB_TIMEOUT", "5000")) # 5 seconds by default # Legacy cache settings - now only used for configuration purposes HISTORY_CACHE_TTL = int(os.getenv("HISTORY_CACHE_TTL", "3600")) # 1 hour by default HISTORY_QUEUE_SIZE = int(os.getenv("HISTORY_QUEUE_SIZE", "10")) # 10 items by default # Create MongoDB connection with timeout try: client = MongoClient(MONGODB_URL, serverSelectionTimeoutMS=MONGODB_TIMEOUT) db = client[DB_NAME] # Collections session_collection = db[COLLECTION_NAME] logger.info(f"MongoDB connection initialized to {DB_NAME}.{COLLECTION_NAME}") except Exception as e: logger.error(f"Failed to initialize MongoDB connection: {e}") # Don't raise exception to avoid crash during startup, error handling will be done in functions # Check MongoDB connection def check_db_connection(): """Check MongoDB connection""" try: # Issue a ping to confirm a successful connection client.admin.command('ping') logger.info("MongoDB connection is working") return True except (ConnectionFailure, ServerSelectionTimeoutError) as e: logger.error(f"MongoDB connection failed: {e}") return False except Exception as e: logger.error(f"Unknown error when checking MongoDB connection: {e}") return False # Timezone for Asia/Ho_Chi_Minh asia_tz = pytz.timezone('Asia/Ho_Chi_Minh') def get_local_time(): """Get current time in Asia/Ho_Chi_Minh timezone""" return datetime.now(asia_tz).strftime("%Y-%m-%d %H:%M:%S") def get_local_datetime(): """Get current datetime object in Asia/Ho_Chi_Minh timezone""" return datetime.now(asia_tz) # For backward compatibility get_vietnam_time = get_local_time get_vietnam_datetime = get_local_datetime # Utility functions def save_session(session_id, factor, action, first_name, last_name, message, user_id, username, response=None): """Save user session to MongoDB""" try: session_data = { "session_id": session_id, "factor": factor, "action": action, "created_at": get_local_time(), "created_at_datetime": get_local_datetime(), "first_name": first_name, "last_name": last_name, "message": message, "user_id": user_id, "username": username, "response": response } result = session_collection.insert_one(session_data) logger.info(f"Session saved with ID: {result.inserted_id}") return { "acknowledged": result.acknowledged, "inserted_id": str(result.inserted_id), "session_data": session_data } except Exception as e: logger.error(f"Error saving session: {e}") raise def update_session_response(session_id, response): """Update a session with response""" try: # Lấy session hiện có existing_session = session_collection.find_one({"session_id": session_id}) if not existing_session: logger.warning(f"No session found with ID: {session_id}") return False result = session_collection.update_one( {"session_id": session_id}, {"$set": {"response": response}} ) logger.info(f"Session {session_id} updated with response") return True except Exception as e: logger.error(f"Error updating session response: {e}") raise def get_recent_sessions(user_id, action, n=3): """Get n most recent sessions for a specific user and action""" try: # Truy vấn trực tiếp từ MongoDB result = list( session_collection.find( {"user_id": user_id, "action": action}, {"_id": 0, "message": 1, "response": 1} ).sort("created_at_datetime", -1).limit(n) ) logger.debug(f"Retrieved {len(result)} recent sessions for user {user_id}, action {action}") return result except Exception as e: logger.error(f"Error getting recent sessions: {e}") return [] def get_chat_history(user_id, n = 5) -> str: """ Lấy lịch sử chat cho user_id từ MongoDB và ghép thành chuỗi theo định dạng: User: ... Bot: ... User: ... Bot: ... Chỉ lấy history sau lệnh /start hoặc /clear mới nhất """ try: # Tìm session /start hoặc /clear mới nhất reset_session = session_collection.find_one( { "user_id": str(user_id), "$or": [ {"action": "start"}, {"action": "clear"} ] }, sort=[("created_at_datetime", -1)] ) # Nếu không tìm thấy session reset nào, lấy n session gần nhất if reset_session: reset_time = reset_session["created_at_datetime"] # Lấy các session sau reset_time docs = list( session_collection.find({ "user_id": str(user_id), "created_at_datetime": {"$gt": reset_time} }).sort("created_at_datetime", 1) ) logger.info(f"Lấy {len(docs)} session sau lệnh {reset_session['action']} lúc {reset_time}") else: # Không tìm thấy reset session, lấy n session gần nhất docs = list(session_collection.find({"user_id": str(user_id)}).sort("created_at", -1).limit(n)) # Đảo ngược để có thứ tự từ cũ đến mới docs.reverse() logger.info(f"Không tìm thấy session reset, lấy {len(docs)} session gần nhất") if not docs: logger.info(f"Không tìm thấy dữ liệu cho user_id: {user_id}") return "" conversation_lines = [] # Xử lý từng document theo cấu trúc mới for doc in docs: factor = doc.get("factor", "").lower() action = doc.get("action", "").lower() message = doc.get("message", "") response = doc.get("response", "") # Bỏ qua lệnh start và clear if action in ["start", "clear"]: continue if factor == "user" and action == "asking_freely": conversation_lines.append(f"User: {message}") conversation_lines.append(f"Bot: {response}") # Ghép các dòng thành chuỗi return "\n".join(conversation_lines) except Exception as e: logger.error(f"Lỗi khi lấy lịch sử chat cho user_id {user_id}: {e}") return "" def get_request_history(user_id, n=3): """Get the most recent user requests to use as context for retrieval""" try: # Truy vấn trực tiếp từ MongoDB history = get_chat_history(user_id, n) # Just extract the questions for context requests = [] for line in history.split('\n'): if line.startswith("User: "): requests.append(line[6:]) # Lấy nội dung sau "User: " # Join all recent requests into a single string for context return " ".join(requests) except Exception as e: logger.error(f"Error getting request history: {e}") return ""