|
import os |
|
from pymongo import MongoClient |
|
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError |
|
from dotenv import load_dotenv |
|
from datetime import datetime, timedelta |
|
import pytz |
|
import logging |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
MONGODB_URL = os.getenv("MONGODB_URL") |
|
DB_NAME = os.getenv("DB_NAME", "Telegram") |
|
COLLECTION_NAME = os.getenv("COLLECTION_NAME", "session_chat") |
|
|
|
|
|
MONGODB_TIMEOUT = int(os.getenv("MONGODB_TIMEOUT", "5000")) |
|
|
|
|
|
HISTORY_CACHE_TTL = int(os.getenv("HISTORY_CACHE_TTL", "3600")) |
|
HISTORY_QUEUE_SIZE = int(os.getenv("HISTORY_QUEUE_SIZE", "10")) |
|
|
|
|
|
try: |
|
client = MongoClient(MONGODB_URL, serverSelectionTimeoutMS=MONGODB_TIMEOUT) |
|
db = client[DB_NAME] |
|
|
|
|
|
session_collection = db[COLLECTION_NAME] |
|
logger.info(f"MongoDB connection initialized to {DB_NAME}.{COLLECTION_NAME}") |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to initialize MongoDB connection: {e}") |
|
|
|
|
|
|
|
def check_db_connection(): |
|
"""Check MongoDB connection""" |
|
try: |
|
|
|
client.admin.command('ping') |
|
logger.info("MongoDB connection is working") |
|
return True |
|
except (ConnectionFailure, ServerSelectionTimeoutError) as e: |
|
logger.error(f"MongoDB connection failed: {e}") |
|
return False |
|
except Exception as e: |
|
logger.error(f"Unknown error when checking MongoDB connection: {e}") |
|
return False |
|
|
|
|
|
asia_tz = pytz.timezone('Asia/Ho_Chi_Minh') |
|
|
|
def get_local_time(): |
|
"""Get current time in Asia/Ho_Chi_Minh timezone""" |
|
return datetime.now(asia_tz).strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
def get_local_datetime(): |
|
"""Get current datetime object in Asia/Ho_Chi_Minh timezone""" |
|
return datetime.now(asia_tz) |
|
|
|
|
|
get_vietnam_time = get_local_time |
|
get_vietnam_datetime = get_local_datetime |
|
|
|
|
|
def save_session(session_id, factor, action, first_name, last_name, message, user_id, username, response=None): |
|
"""Save user session to MongoDB""" |
|
try: |
|
session_data = { |
|
"session_id": session_id, |
|
"factor": factor, |
|
"action": action, |
|
"created_at": get_local_time(), |
|
"created_at_datetime": get_local_datetime(), |
|
"first_name": first_name, |
|
"last_name": last_name, |
|
"message": message, |
|
"user_id": user_id, |
|
"username": username, |
|
"response": response |
|
} |
|
result = session_collection.insert_one(session_data) |
|
logger.info(f"Session saved with ID: {result.inserted_id}") |
|
|
|
return { |
|
"acknowledged": result.acknowledged, |
|
"inserted_id": str(result.inserted_id), |
|
"session_data": session_data |
|
} |
|
except Exception as e: |
|
logger.error(f"Error saving session: {e}") |
|
raise |
|
|
|
def update_session_response(session_id, response): |
|
"""Update a session with response""" |
|
try: |
|
|
|
existing_session = session_collection.find_one({"session_id": session_id}) |
|
|
|
if not existing_session: |
|
logger.warning(f"No session found with ID: {session_id}") |
|
return False |
|
|
|
result = session_collection.update_one( |
|
{"session_id": session_id}, |
|
{"$set": {"response": response}} |
|
) |
|
|
|
logger.info(f"Session {session_id} updated with response") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Error updating session response: {e}") |
|
raise |
|
|
|
def get_recent_sessions(user_id, action, n=3): |
|
"""Get n most recent sessions for a specific user and action""" |
|
try: |
|
|
|
result = list( |
|
session_collection.find( |
|
{"user_id": user_id, "action": action}, |
|
{"_id": 0, "message": 1, "response": 1} |
|
).sort("created_at_datetime", -1).limit(n) |
|
) |
|
|
|
logger.debug(f"Retrieved {len(result)} recent sessions for user {user_id}, action {action}") |
|
return result |
|
except Exception as e: |
|
logger.error(f"Error getting recent sessions: {e}") |
|
return [] |
|
|
|
def get_chat_history(user_id, n = 5) -> str: |
|
""" |
|
Lấy lịch sử chat cho user_id từ MongoDB và ghép thành chuỗi theo định dạng: |
|
|
|
User: ... |
|
Bot: ... |
|
User: ... |
|
Bot: ... |
|
|
|
Chỉ lấy history sau lệnh /start hoặc /clear mới nhất |
|
""" |
|
try: |
|
|
|
reset_session = session_collection.find_one( |
|
{ |
|
"user_id": str(user_id), |
|
"$or": [ |
|
{"action": "start"}, |
|
{"action": "clear"} |
|
] |
|
}, |
|
sort=[("created_at_datetime", -1)] |
|
) |
|
|
|
|
|
if reset_session: |
|
reset_time = reset_session["created_at_datetime"] |
|
|
|
docs = list( |
|
session_collection.find({ |
|
"user_id": str(user_id), |
|
"created_at_datetime": {"$gt": reset_time} |
|
}).sort("created_at_datetime", 1) |
|
) |
|
logger.info(f"Lấy {len(docs)} session sau lệnh {reset_session['action']} lúc {reset_time}") |
|
else: |
|
|
|
docs = list(session_collection.find({"user_id": str(user_id)}).sort("created_at", -1).limit(n)) |
|
|
|
docs.reverse() |
|
logger.info(f"Không tìm thấy session reset, lấy {len(docs)} session gần nhất") |
|
|
|
if not docs: |
|
logger.info(f"Không tìm thấy dữ liệu cho user_id: {user_id}") |
|
return "" |
|
|
|
conversation_lines = [] |
|
|
|
for doc in docs: |
|
factor = doc.get("factor", "").lower() |
|
action = doc.get("action", "").lower() |
|
message = doc.get("message", "") |
|
response = doc.get("response", "") |
|
|
|
|
|
if action in ["start", "clear"]: |
|
continue |
|
|
|
if factor == "user" and action == "asking_freely": |
|
conversation_lines.append(f"User: {message}") |
|
conversation_lines.append(f"Bot: {response}") |
|
|
|
|
|
return "\n".join(conversation_lines) |
|
except Exception as e: |
|
logger.error(f"Lỗi khi lấy lịch sử chat cho user_id {user_id}: {e}") |
|
return "" |
|
|
|
def get_request_history(user_id, n=3): |
|
"""Get the most recent user requests to use as context for retrieval""" |
|
try: |
|
|
|
history = get_chat_history(user_id, n) |
|
|
|
|
|
requests = [] |
|
for line in history.split('\n'): |
|
if line.startswith("User: "): |
|
requests.append(line[6:]) |
|
|
|
|
|
return " ".join(requests) |
|
except Exception as e: |
|
logger.error(f"Error getting request history: {e}") |
|
return "" |