Spaces:
Runtime error
Runtime error
from fastapi import APIRouter, Depends, HTTPException, Request | |
from schemas.chat import QueryRequest, AnswerResponse, ChatHistoryResponse | |
from schemas.user import UserOut | |
from dependencies import get_current_user | |
from services.chat_service import ask_question_service, stream_chat_generator | |
from utils.utils import delete_chat_from_redis | |
from dependencies import get_app_state | |
import logging | |
import uuid | |
from redis.asyncio import Redis | |
from datetime import datetime, timezone | |
from db.mongoDB import mongo_db | |
from fastapi.responses import StreamingResponse | |
from schemas.chat import Message,ConversationResponse | |
from typing import List | |
# Thiết lập logger | |
logger = logging.getLogger(__name__) | |
router = APIRouter() | |
async def create_chat( | |
fastapi_request: Request, # Sử dụng Request từ FastAPI | |
current_user: UserOut = Depends(get_current_user) # Sử dụng User model của bạn | |
): | |
app_state = get_app_state(request=fastapi_request) | |
redis_client: Redis = app_state.redis # Nên đặt tên rõ ràng là redis_client | |
chat_id = str(uuid.uuid4()) | |
current_utc_time = datetime.now(timezone.utc) # Sử dụng UTC | |
# --- Lưu metadata vào Redis với key đã thống nhất --- | |
meta_key = f"conversation_meta:{chat_id}" | |
conversation_meta_data = { | |
"user_id": current_user.email, # Sử dụng key 'user_id' cho nhất quán | |
"created_at": current_utc_time.isoformat(), # Lưu dưới dạng ISO string | |
"updated_at": current_utc_time.isoformat(), # Ban đầu giống created_at | |
"message_count": 0 # Số lượng tin nhắn ban đầu | |
} | |
try: | |
# Sử dụng await nếu redis_client là async | |
if hasattr(redis_client, 'hmset_async'): # Kiểm tra phương thức async (ví dụ) | |
await redis_client.hmset(meta_key, conversation_meta_data) | |
await redis_client.expire(meta_key, 86400) # TTL: 24 giờ | |
else: # Client đồng bộ | |
await redis_client.hmset(meta_key, conversation_meta_data) | |
await redis_client.expire(meta_key, 86400) # TTL: 24 giờ | |
logger.info(f"Đã tạo metadata cho chat_id {chat_id} trong Redis với key {meta_key}.") | |
except Exception as e: | |
logger.error(f"Lỗi khi lưu metadata vào Redis cho chat {chat_id}: {e}", exc_info=True) | |
raise HTTPException(status_code=500, detail="Lỗi khi tạo metadata cho cuộc hội thoại.") | |
# --- Lưu hội thoại rỗng vào MongoDB --- | |
# (Đảm bảo messages ban đầu là list rỗng cho key messages chính của bạn) | |
messages_key_in_mongo = "messages" # Key lưu trữ danh sách tin nhắn trong MongoDB | |
conversation_doc = { | |
"user_id": current_user.email, | |
"conversation_id": chat_id, | |
messages_key_in_mongo: [], # Danh sách tin nhắn rỗng | |
"created_at": current_utc_time, # Lưu kiểu datetime object | |
"updated_at": current_utc_time # Lưu kiểu datetime object | |
} | |
try: | |
await mongo_db.conversations.insert_one(conversation_doc) | |
logger.info(f"Đã tạo hội thoại rỗng {chat_id} trong MongoDB cho user {current_user.email}.") | |
except Exception as e: | |
logger.error(f"Lỗi khi tạo hội thoại rỗng trong MongoDB cho chat {chat_id}: {e}", exc_info=True) | |
# Cân nhắc xóa key meta trong Redis nếu MongoDB thất bại để tránh trạng thái không nhất quán | |
try: | |
if hasattr(redis_client, 'delete_async'): | |
await redis_client.delete(meta_key) | |
else: | |
await redis_client.delete(meta_key) | |
logger.info(f"Đã xóa meta key {meta_key} khỏi Redis do lỗi MongoDB.") | |
except Exception as redis_del_err: | |
logger.error(f"Lỗi khi xóa meta key {meta_key} khỏi Redis: {redis_del_err}") | |
raise HTTPException(status_code=500, detail="Lỗi khi tạo cuộc hội thoại trong cơ sở dữ liệu.") | |
return {"chat_id": chat_id} | |
async def chat_message(request_body: QueryRequest,request: Request, user:UserOut=Depends(get_current_user)): | |
app_state = get_app_state(request=request) | |
result = await ask_question_service(app_state,request_body, user) | |
if not result: | |
raise HTTPException(status_code=500, detail="Error during QA Chain invocation") | |
return result | |
# Đổi thành GET | |
async def stream_chat_endpoint( | |
chat_id: str, # Lấy từ query param | |
input: str, # Lấy từ query param (tên param này phải khớp với FE) | |
request: Request, | |
user: UserOut = Depends(get_current_user) # Sửa kiểu user | |
): | |
app_state = get_app_state(request=request) | |
user_email = getattr(user, 'email', str(user)) # Lấy email an toàn | |
# Kiểm tra input cơ bản | |
if not chat_id or not input: | |
raise HTTPException(status_code=400, detail="chat_id and input are required.") | |
# Sử dụng EventSourceResponse (từ sse-starlette, cài đặt: pip install sse-starlette) | |
# Nó xử lý các chi tiết của SSE tốt hơn StreamingResponse thô. | |
# return EventSourceResponse(stream_chat_generator(app_state, chat_id, input, user_email)) | |
# Hoặc dùng StreamingResponse trực tiếp (đơn giản hơn nhưng ít tính năng SSE hơn) | |
return StreamingResponse( | |
stream_chat_generator(app_state, chat_id, input, user_email), | |
media_type="text/event-stream" | |
) | |
async def delete_chat(chat_id: str, request: Request, user: UserOut = Depends(get_current_user)): | |
app_state = get_app_state(request=request) | |
redis = app_state.redis | |
meta_key = f"conversation_meta:{chat_id}" | |
# Kiểm tra quyền trước khi xóa | |
user_in_chat = await redis.hget(meta_key, "user_id") | |
if user_in_chat is None: | |
raise HTTPException(status_code=404, detail="Chat not found") | |
if user_in_chat != user.email: | |
raise HTTPException(status_code=403, detail="Unauthorized") | |
# Xóa chat | |
await delete_chat_from_redis(redis, chat_id) | |
# Xóa hội thoại trong MongoDB | |
result =await mongo_db.conversations.delete_one({"conversation_id": chat_id, "user_id": user.email}) | |
if result.deleted_count == 0: | |
raise HTTPException(status_code=404, detail="Chat not found in MongoDB") | |
return {"message": "Chat deleted successfully"} | |
async def get_conversations(user: UserOut = Depends(get_current_user)): | |
try: | |
logger.info(f"Attempting to get conversations for user: {user.email}") | |
db_conversations_cursor = mongo_db.conversations.find({"user_id": user.email}) | |
response_list = [] | |
async for conv_doc in db_conversations_cursor: | |
logger.debug(f"Processing conversation doc: {conv_doc}") | |
all_messages = conv_doc.get("messages", []) | |
logger.debug(f"Messages for this conversation: {all_messages}") | |
response_list.append({ | |
"conversation_id": conv_doc["conversation_id"], | |
"created_at": conv_doc["created_at"], | |
"updated_at": conv_doc["updated_at"], | |
"messages": all_messages | |
}) | |
logger.info(f"Successfully processed {len(response_list)} conversations.") | |
return response_list | |
except Exception as e: | |
logger.error(f"Error in get_conversations for user {user.email}: {e}", exc_info=True) # exc_info=True sẽ log cả traceback | |
raise HTTPException(status_code=500, detail="An error occurred while fetching conversations.") | |
async def load_conversation_and_sync_redis( | |
fastapi_request: Request, # Đổi tên biến request | |
chat_id: str, # Lấy trực tiếp từ path param | |
current_user: UserOut = Depends(get_current_user) # Sử dụng User model | |
): | |
app_state = get_app_state(request=fastapi_request) | |
redis_client = app_state.redis # Nên là client async nếu có thể | |
# 1. Kiểm tra hội thoại trong MongoDB | |
try: | |
conversation_doc = await mongo_db.conversations.find_one( | |
{"conversation_id": chat_id, "user_id": current_user.email} | |
) | |
if not conversation_doc: | |
logger.warning(f"Hội thoại {chat_id} không tồn tại hoặc không thuộc user {current_user.email}") | |
raise HTTPException( | |
status_code=404, | |
detail="Hội thoại không tồn tại hoặc bạn không có quyền truy cập" | |
) | |
except Exception as e: | |
logger.error(f"Lỗi MongoDB khi kiểm tra hội thoại {chat_id}: {e}", exc_info=True) | |
raise HTTPException(status_code=500, detail="Lỗi kết nối cơ sở dữ liệu MongoDB") | |
# 2. Chuẩn bị lịch sử tin nhắn từ MongoDB để trả về và nạp vào Redis | |
raw_messages_from_db = conversation_doc.get("messages", []) | |
validated_history_for_response: List[Message] = [] | |
for msg_data in raw_messages_from_db: | |
try: | |
# Validate và chuyển đổi timestamp nếu cần (Pydantic sẽ tự làm nếu input là datetime obj) | |
validated_history_for_response.append(Message(**msg_data)) | |
except Exception as p_err: | |
logger.warning(f"Bỏ qua message không hợp lệ trong chat {chat_id} từ DB: {msg_data}. Lỗi: {p_err}") | |
# 3. Nạp/Đồng bộ tin nhắn và metadata vào Redis (Sử dụng key thống nhất) | |
messages_redis_key = f"conversation_messages:{chat_id}" | |
meta_redis_key = f"conversation_meta:{chat_id}" | |
try: | |
# Sử dụng pipeline cho hiệu quả | |
# Giả sử redis_client là async | |
with redis_client.pipeline() as pipe: | |
pipe.delete(messages_redis_key) # Xóa messages cũ để nạp lại toàn bộ | |
if validated_history_for_response: | |
for msg_model in validated_history_for_response: | |
# Đảm bảo lưu trữ theo cấu trúc Pydantic `Message` | |
pipe.rpush(messages_redis_key, msg_model.model_dump_json()) # Pydantic V2 | |
# hoặc .json() cho Pydantic V1 | |
pipe.expire(messages_redis_key, 86400) # TTL: 24 giờ | |
# Cập nhật/Tạo mới metadata | |
# Lấy created_at, updated_at từ document MongoDB | |
created_at_iso = conversation_doc["created_at"].isoformat() | |
updated_at_iso = conversation_doc["updated_at"].isoformat() | |
conversation_meta_data = { | |
"user_id": current_user.email, | |
"created_at": created_at_iso, | |
"updated_at": updated_at_iso, | |
"message_count": len(validated_history_for_response) | |
} | |
# Xóa meta cũ và đặt lại, hoặc dùng hmset để cập nhật | |
pipe.delete(meta_redis_key) | |
pipe.hset(meta_redis_key, conversation_meta_data) | |
pipe.expire(meta_redis_key, 86400) | |
pipe.execute() | |
logger.info(f"Đã nạp và đồng bộ hội thoại {chat_id} vào Redis với {len(validated_history_for_response)} tin nhắn.") | |
except Exception as e: | |
logger.error(f"Lỗi khi nạp hội thoại {chat_id} vào Redis: {e}", exc_info=True) | |
# 4. Trả về response | |
return ChatHistoryResponse( | |
chat_id=chat_id, | |
history=validated_history_for_response, | |
created_at=conversation_doc["created_at"], # Lấy từ doc MongoDB | |
updated_at=conversation_doc["updated_at"], # Lấy từ doc MongoDB | |
user_id=current_user.email | |
) |