Spaces:
Sleeping
Sleeping
from fastapi import APIRouter, HTTPException, Depends, Query, status, Response | |
from typing import List, Optional, Dict | |
from pymongo.errors import PyMongoError | |
import logging | |
from datetime import datetime | |
import traceback | |
import asyncio | |
from app.database.mongodb import ( | |
save_session, | |
get_chat_history, | |
update_session_response, | |
check_db_connection, | |
session_collection | |
) | |
from app.models.mongodb_models import ( | |
SessionCreate, | |
SessionResponse, | |
HistoryRequest, | |
HistoryResponse, | |
QuestionAnswer | |
) | |
from app.api.websocket_routes import send_notification | |
# Configure logging | |
logger = logging.getLogger(__name__) | |
# Create router | |
router = APIRouter( | |
prefix="/mongodb", | |
tags=["MongoDB"], | |
) | |
async def create_session(session: SessionCreate, response: Response): | |
""" | |
Create a new session record in MongoDB. | |
- **session_id**: Unique identifier for the session (auto-generated if not provided) | |
- **factor**: Factor type (user, rag, etc.) | |
- **action**: Action type (start, events, faq, emergency, help, asking_freely, etc.) | |
- **first_name**: User's first name | |
- **last_name**: User's last name (optional) | |
- **message**: User's message (optional) | |
- **user_id**: User's ID from Telegram | |
- **username**: User's username (optional) | |
- **response**: Response from RAG (optional) | |
""" | |
try: | |
# Kiểm tra kết nối MongoDB | |
if not check_db_connection(): | |
logger.error("MongoDB connection failed when trying to create session") | |
raise HTTPException( | |
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
detail="MongoDB connection failed" | |
) | |
# Create new session in MongoDB | |
result = save_session( | |
session_id=session.session_id, | |
factor=session.factor, | |
action=session.action, | |
first_name=session.first_name, | |
last_name=session.last_name, | |
message=session.message, | |
user_id=session.user_id, | |
username=session.username, | |
response=session.response | |
) | |
# Chuẩn bị response object | |
session_response = SessionResponse( | |
**session.model_dump(), | |
created_at=datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
) | |
# Kiểm tra nếu session cần gửi thông báo (response bắt đầu bằng "I'm sorry") | |
if session.response and session.response.strip().lower().startswith("i'm sorry"): | |
# Gửi thông báo qua WebSocket | |
try: | |
notification_data = { | |
"session_id": session.session_id, | |
"factor": session.factor, | |
"action": session.action, | |
"message": session.message, | |
"user_id": session.user_id, | |
"username": session.username, | |
"first_name": session.first_name, | |
"last_name": session.last_name, | |
"response": session.response, | |
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
} | |
# Khởi tạo task để gửi thông báo - sử dụng asyncio.create_task để đảm bảo không block quá trình chính | |
asyncio.create_task(send_notification(notification_data)) | |
logger.info(f"Notification queued for session {session.session_id} - response starts with 'I'm sorry'") | |
except Exception as e: | |
logger.error(f"Error queueing notification: {e}") | |
# Không dừng xử lý chính khi gửi thông báo thất bại | |
# Return response | |
return session_response | |
except PyMongoError as e: | |
logger.error(f"MongoDB error creating session: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=f"MongoDB error: {str(e)}" | |
) | |
except Exception as e: | |
logger.error(f"Unexpected error creating session: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=f"Failed to create session: {str(e)}" | |
) | |
async def update_session_with_response(session_id: str, response_text: str): | |
""" | |
Update a session with the response. | |
- **session_id**: ID of the session to update | |
- **response_text**: Response to add to the session | |
""" | |
try: | |
# Kiểm tra kết nối MongoDB | |
if not check_db_connection(): | |
logger.error("MongoDB connection failed when trying to update session response") | |
raise HTTPException( | |
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
detail="MongoDB connection failed" | |
) | |
# Update session in MongoDB | |
result = update_session_response(session_id, response_text) | |
if not result: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, | |
detail=f"Session with ID {session_id} not found" | |
) | |
return {"status": "success", "message": "Response added to session"} | |
except PyMongoError as e: | |
logger.error(f"MongoDB error updating session response: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=f"MongoDB error: {str(e)}" | |
) | |
except HTTPException: | |
# Re-throw HTTP exceptions | |
raise | |
except Exception as e: | |
logger.error(f"Unexpected error updating session response: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=f"Failed to update session: {str(e)}" | |
) | |
async def get_history(user_id: str, n: int = Query(3, ge=1, le=10)): | |
""" | |
Get user history for a specific user. | |
- **user_id**: User's ID from Telegram | |
- **n**: Number of most recent interactions to return (default: 3, min: 1, max: 10) | |
""" | |
try: | |
# Kiểm tra kết nối MongoDB | |
if not check_db_connection(): | |
logger.error("MongoDB connection failed when trying to get user history") | |
raise HTTPException( | |
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
detail="MongoDB connection failed" | |
) | |
# Get user history from MongoDB | |
history_data = get_chat_history(user_id=user_id, n=n) | |
# Convert to response model | |
return HistoryResponse(history=history_data) | |
except PyMongoError as e: | |
logger.error(f"MongoDB error getting user history: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=f"MongoDB error: {str(e)}" | |
) | |
except Exception as e: | |
logger.error(f"Unexpected error getting user history: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=f"Failed to get user history: {str(e)}" | |
) | |
async def health_check(): | |
""" | |
Check health of MongoDB connection. | |
""" | |
try: | |
# Kiểm tra kết nối MongoDB | |
is_connected = check_db_connection() | |
if not is_connected: | |
return { | |
"status": "unhealthy", | |
"message": "MongoDB connection failed", | |
"timestamp": datetime.now().isoformat() | |
} | |
return { | |
"status": "healthy", | |
"message": "MongoDB connection is working", | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"MongoDB health check failed: {e}") | |
logger.error(traceback.format_exc()) | |
return { | |
"status": "error", | |
"message": f"MongoDB health check error: {str(e)}", | |
"timestamp": datetime.now().isoformat() | |
} | |
async def get_session(session_id: str): | |
""" | |
Lấy thông tin session từ MongoDB theo session_id. | |
- **session_id**: ID của session cần lấy | |
""" | |
try: | |
# Kiểm tra kết nối MongoDB | |
if not check_db_connection(): | |
logger.error("MongoDB connection failed when trying to get session") | |
raise HTTPException( | |
status_code=status.HTTP_503_SERVICE_UNAVAILABLE, | |
detail="MongoDB connection failed" | |
) | |
# Lấy thông tin từ MongoDB | |
session_data = session_collection.find_one({"session_id": session_id}) | |
if not session_data: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, | |
detail=f"Session with ID {session_id} not found" | |
) | |
# Chuyển _id thành string để có thể JSON serialize | |
if "_id" in session_data: | |
session_data["_id"] = str(session_data["_id"]) | |
return session_data | |
except PyMongoError as e: | |
logger.error(f"MongoDB error getting session: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=f"MongoDB error: {str(e)}" | |
) | |
except HTTPException: | |
# Re-throw HTTP exceptions | |
raise | |
except Exception as e: | |
logger.error(f"Unexpected error getting session: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException( | |
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
detail=f"Failed to get session: {str(e)}" | |
) |