Pix-Agent / app /database /mongodb.py
Cuong2004's picture
version 1.1
e83f5e9
raw
history blame
6.83 kB
import os
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from dotenv import load_dotenv
from datetime import datetime, timedelta
import pytz
import logging
# Configure logging
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# MongoDB connection string from .env
MONGODB_URL = os.getenv("MONGODB_URL")
DB_NAME = os.getenv("DB_NAME", "Telegram")
COLLECTION_NAME = os.getenv("COLLECTION_NAME", "session_chat")
# Set timeout for MongoDB connection
MONGODB_TIMEOUT = int(os.getenv("MONGODB_TIMEOUT", "5000")) # 5 seconds by default
# Legacy cache settings - now only used for configuration purposes
HISTORY_CACHE_TTL = int(os.getenv("HISTORY_CACHE_TTL", "3600")) # 1 hour by default
HISTORY_QUEUE_SIZE = int(os.getenv("HISTORY_QUEUE_SIZE", "10")) # 10 items by default
# Create MongoDB connection with timeout
try:
client = MongoClient(MONGODB_URL, serverSelectionTimeoutMS=MONGODB_TIMEOUT)
db = client[DB_NAME]
# Collections
session_collection = db[COLLECTION_NAME]
logger.info(f"MongoDB connection initialized to {DB_NAME}.{COLLECTION_NAME}")
except Exception as e:
logger.error(f"Failed to initialize MongoDB connection: {e}")
# Don't raise exception to avoid crash during startup, error handling will be done in functions
# Check MongoDB connection
def check_db_connection():
"""Check MongoDB connection"""
try:
# Issue a ping to confirm a successful connection
client.admin.command('ping')
logger.info("MongoDB connection is working")
return True
except (ConnectionFailure, ServerSelectionTimeoutError) as e:
logger.error(f"MongoDB connection failed: {e}")
return False
except Exception as e:
logger.error(f"Unknown error when checking MongoDB connection: {e}")
return False
# Timezone for Asia/Ho_Chi_Minh
asia_tz = pytz.timezone('Asia/Ho_Chi_Minh')
def get_local_time():
"""Get current time in Asia/Ho_Chi_Minh timezone"""
return datetime.now(asia_tz).strftime("%Y-%m-%d %H:%M:%S")
def get_local_datetime():
"""Get current datetime object in Asia/Ho_Chi_Minh timezone"""
return datetime.now(asia_tz)
# For backward compatibility
get_vietnam_time = get_local_time
get_vietnam_datetime = get_local_datetime
# Utility functions
def save_session(session_id, factor, action, first_name, last_name, message, user_id, username, response=None):
"""Save user session to MongoDB"""
try:
session_data = {
"session_id": session_id,
"factor": factor,
"action": action,
"created_at": get_local_time(),
"created_at_datetime": get_local_datetime(),
"first_name": first_name,
"last_name": last_name,
"message": message,
"user_id": user_id,
"username": username,
"response": response
}
result = session_collection.insert_one(session_data)
logger.info(f"Session saved with ID: {result.inserted_id}")
return {
"acknowledged": result.acknowledged,
"inserted_id": str(result.inserted_id),
"session_data": session_data
}
except Exception as e:
logger.error(f"Error saving session: {e}")
raise
def update_session_response(session_id, response):
"""Update a session with response"""
try:
# Lấy session hiện có
existing_session = session_collection.find_one({"session_id": session_id})
if not existing_session:
logger.warning(f"No session found with ID: {session_id}")
return False
result = session_collection.update_one(
{"session_id": session_id},
{"$set": {"response": response}}
)
logger.info(f"Session {session_id} updated with response")
return True
except Exception as e:
logger.error(f"Error updating session response: {e}")
raise
def get_recent_sessions(user_id, action, n=3):
"""Get n most recent sessions for a specific user and action"""
try:
# Truy vấn trực tiếp từ MongoDB
result = list(
session_collection.find(
{"user_id": user_id, "action": action},
{"_id": 0, "message": 1, "response": 1}
).sort("created_at_datetime", -1).limit(n)
)
logger.debug(f"Retrieved {len(result)} recent sessions for user {user_id}, action {action}")
return result
except Exception as e:
logger.error(f"Error getting recent sessions: {e}")
return []
def get_chat_history(user_id, n = 5) -> str:
"""
Lấy lịch sử chat cho user_id từ MongoDB và ghép thành chuỗi theo định dạng:
User: ...
Bot: ...
User: ...
Bot: ...
"""
try:
# Truy vấn các document có user_id, sắp xếp theo created_at tăng dần
# Get the 4 most recent documents first, then sort them in ascending order
docs = list(session_collection.find({"user_id": str(user_id)}).sort("created_at", -1).limit(n))
# Reverse the list to get chronological order (oldest to newest)
docs.reverse()
if not docs:
logger.info(f"Không tìm thấy dữ liệu cho user_id: {user_id}")
return ""
conversation_lines = []
# Xử lý từng document theo cấu trúc mới
for doc in docs:
factor = doc.get("factor", "").lower()
action = doc.get("action", "").lower()
message = doc.get("message", "")
response = doc.get("response", "")
if factor == "user" and action == "asking_freely":
conversation_lines.append(f"User: {message}")
conversation_lines.append(f"Bot: {response}")
# Ghép các dòng thành chuỗi
return "\n".join(conversation_lines)
except Exception as e:
logger.error(f"Lỗi khi lấy lịch sử chat cho user_id {user_id}: {e}")
return ""
def get_request_history(user_id, n=3):
"""Get the most recent user requests to use as context for retrieval"""
try:
# Lấy lịch sử trực tiếp từ MongoDB (thông qua get_user_history đã sửa đổi)
history = get_user_history(user_id, n)
# Just extract the questions for context
requests = []
for item in history:
requests.append(item['question'])
# Join all recent requests into a single string for context
return " ".join(requests)
except Exception as e:
logger.error(f"Error getting request history: {e}")
return ""