from fastapi import APIRouter, Depends, HTTPException, Request from schemas.chat import QueryRequest, AnswerResponse, ChatHistoryResponse from schemas.user import UserOut from dependencies import get_current_user from services.chat_service import ask_question_service, stream_chat_generator from utils.utils import delete_chat_from_redis from dependencies import get_app_state import logging import uuid from redis.asyncio import Redis from datetime import datetime, timezone from db.mongoDB import mongo_db from fastapi.responses import StreamingResponse from schemas.chat import Message,ConversationResponse from typing import List # Thiết lập logger logger = logging.getLogger(__name__) router = APIRouter() @router.post("/create-chat") async def create_chat( fastapi_request: Request, # Sử dụng Request từ FastAPI current_user: UserOut = Depends(get_current_user) # Sử dụng User model của bạn ): app_state = get_app_state(request=fastapi_request) redis_client: Redis = app_state.redis # Nên đặt tên rõ ràng là redis_client chat_id = str(uuid.uuid4()) current_utc_time = datetime.now(timezone.utc) # Sử dụng UTC # --- Lưu metadata vào Redis với key đã thống nhất --- meta_key = f"conversation_meta:{chat_id}" conversation_meta_data = { "user_id": current_user.email, # Sử dụng key 'user_id' cho nhất quán "created_at": current_utc_time.isoformat(), # Lưu dưới dạng ISO string "updated_at": current_utc_time.isoformat(), # Ban đầu giống created_at "message_count": 0 # Số lượng tin nhắn ban đầu } try: # Sử dụng await nếu redis_client là async if hasattr(redis_client, 'hmset_async'): # Kiểm tra phương thức async (ví dụ) await redis_client.hmset(meta_key, conversation_meta_data) await redis_client.expire(meta_key, 86400) # TTL: 24 giờ else: # Client đồng bộ await redis_client.hmset(meta_key, conversation_meta_data) await redis_client.expire(meta_key, 86400) # TTL: 24 giờ logger.info(f"Đã tạo metadata cho chat_id {chat_id} trong Redis với key {meta_key}.") except Exception as e: logger.error(f"Lỗi khi lưu metadata vào Redis cho chat {chat_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Lỗi khi tạo metadata cho cuộc hội thoại.") # --- Lưu hội thoại rỗng vào MongoDB --- # (Đảm bảo messages ban đầu là list rỗng cho key messages chính của bạn) messages_key_in_mongo = "messages" # Key lưu trữ danh sách tin nhắn trong MongoDB conversation_doc = { "user_id": current_user.email, "conversation_id": chat_id, messages_key_in_mongo: [], # Danh sách tin nhắn rỗng "created_at": current_utc_time, # Lưu kiểu datetime object "updated_at": current_utc_time # Lưu kiểu datetime object } try: await mongo_db.conversations.insert_one(conversation_doc) logger.info(f"Đã tạo hội thoại rỗng {chat_id} trong MongoDB cho user {current_user.email}.") except Exception as e: logger.error(f"Lỗi khi tạo hội thoại rỗng trong MongoDB cho chat {chat_id}: {e}", exc_info=True) # Cân nhắc xóa key meta trong Redis nếu MongoDB thất bại để tránh trạng thái không nhất quán try: if hasattr(redis_client, 'delete_async'): await redis_client.delete(meta_key) else: await redis_client.delete(meta_key) logger.info(f"Đã xóa meta key {meta_key} khỏi Redis do lỗi MongoDB.") except Exception as redis_del_err: logger.error(f"Lỗi khi xóa meta key {meta_key} khỏi Redis: {redis_del_err}") raise HTTPException(status_code=500, detail="Lỗi khi tạo cuộc hội thoại trong cơ sở dữ liệu.") return {"chat_id": chat_id} @router.post("", response_model=AnswerResponse) async def chat_message(request_body: QueryRequest,request: Request, user:UserOut=Depends(get_current_user)): app_state = get_app_state(request=request) result = await ask_question_service(app_state,request_body, user) if not result: raise HTTPException(status_code=500, detail="Error during QA Chain invocation") return result @router.get("/stream") # Đổi thành GET async def stream_chat_endpoint( chat_id: str, # Lấy từ query param input: str, # Lấy từ query param (tên param này phải khớp với FE) request: Request, user: UserOut = Depends(get_current_user) # Sửa kiểu user ): app_state = get_app_state(request=request) user_email = getattr(user, 'email', str(user)) # Lấy email an toàn # Kiểm tra input cơ bản if not chat_id or not input: raise HTTPException(status_code=400, detail="chat_id and input are required.") # Sử dụng EventSourceResponse (từ sse-starlette, cài đặt: pip install sse-starlette) # Nó xử lý các chi tiết của SSE tốt hơn StreamingResponse thô. # return EventSourceResponse(stream_chat_generator(app_state, chat_id, input, user_email)) # Hoặc dùng StreamingResponse trực tiếp (đơn giản hơn nhưng ít tính năng SSE hơn) return StreamingResponse( stream_chat_generator(app_state, chat_id, input, user_email), media_type="text/event-stream" ) @router.delete("/chats/{chat_id}") async def delete_chat(chat_id: str, request: Request, user: UserOut = Depends(get_current_user)): app_state = get_app_state(request=request) redis = app_state.redis meta_key = f"conversation_meta:{chat_id}" # Kiểm tra quyền trước khi xóa user_in_chat = await redis.hget(meta_key, "user_id") if user_in_chat is None: raise HTTPException(status_code=404, detail="Chat not found") if user_in_chat != user.email: raise HTTPException(status_code=403, detail="Unauthorized") # Xóa chat await delete_chat_from_redis(redis, chat_id) # Xóa hội thoại trong MongoDB result =await mongo_db.conversations.delete_one({"conversation_id": chat_id, "user_id": user.email}) if result.deleted_count == 0: raise HTTPException(status_code=404, detail="Chat not found in MongoDB") return {"message": "Chat deleted successfully"} @router.get("/conversations", response_model=List[ConversationResponse]) async def get_conversations(user: UserOut = Depends(get_current_user)): try: logger.info(f"Attempting to get conversations for user: {user.email}") db_conversations_cursor = mongo_db.conversations.find({"user_id": user.email}) response_list = [] async for conv_doc in db_conversations_cursor: logger.debug(f"Processing conversation doc: {conv_doc}") all_messages = conv_doc.get("messages", []) logger.debug(f"Messages for this conversation: {all_messages}") response_list.append({ "conversation_id": conv_doc["conversation_id"], "created_at": conv_doc["created_at"], "updated_at": conv_doc["updated_at"], "messages": all_messages }) logger.info(f"Successfully processed {len(response_list)} conversations.") return response_list except Exception as e: logger.error(f"Error in get_conversations for user {user.email}: {e}", exc_info=True) # exc_info=True sẽ log cả traceback raise HTTPException(status_code=500, detail="An error occurred while fetching conversations.") @router.get("/c/{chat_id}", response_model=ChatHistoryResponse) async def load_conversation_and_sync_redis( fastapi_request: Request, # Đổi tên biến request chat_id: str, # Lấy trực tiếp từ path param current_user: UserOut = Depends(get_current_user) # Sử dụng User model ): app_state = get_app_state(request=fastapi_request) redis_client = app_state.redis # Nên là client async nếu có thể # 1. Kiểm tra hội thoại trong MongoDB try: conversation_doc = await mongo_db.conversations.find_one( {"conversation_id": chat_id, "user_id": current_user.email} ) if not conversation_doc: logger.warning(f"Hội thoại {chat_id} không tồn tại hoặc không thuộc user {current_user.email}") raise HTTPException( status_code=404, detail="Hội thoại không tồn tại hoặc bạn không có quyền truy cập" ) except Exception as e: logger.error(f"Lỗi MongoDB khi kiểm tra hội thoại {chat_id}: {e}", exc_info=True) raise HTTPException(status_code=500, detail="Lỗi kết nối cơ sở dữ liệu MongoDB") # 2. Chuẩn bị lịch sử tin nhắn từ MongoDB để trả về và nạp vào Redis raw_messages_from_db = conversation_doc.get("messages", []) validated_history_for_response: List[Message] = [] for msg_data in raw_messages_from_db: try: # Validate và chuyển đổi timestamp nếu cần (Pydantic sẽ tự làm nếu input là datetime obj) validated_history_for_response.append(Message(**msg_data)) except Exception as p_err: logger.warning(f"Bỏ qua message không hợp lệ trong chat {chat_id} từ DB: {msg_data}. Lỗi: {p_err}") # 3. Nạp/Đồng bộ tin nhắn và metadata vào Redis (Sử dụng key thống nhất) messages_redis_key = f"conversation_messages:{chat_id}" meta_redis_key = f"conversation_meta:{chat_id}" try: # Sử dụng pipeline cho hiệu quả # Giả sử redis_client là async with redis_client.pipeline() as pipe: pipe.delete(messages_redis_key) # Xóa messages cũ để nạp lại toàn bộ if validated_history_for_response: for msg_model in validated_history_for_response: # Đảm bảo lưu trữ theo cấu trúc Pydantic `Message` pipe.rpush(messages_redis_key, msg_model.model_dump_json()) # Pydantic V2 # hoặc .json() cho Pydantic V1 pipe.expire(messages_redis_key, 86400) # TTL: 24 giờ # Cập nhật/Tạo mới metadata # Lấy created_at, updated_at từ document MongoDB created_at_iso = conversation_doc["created_at"].isoformat() updated_at_iso = conversation_doc["updated_at"].isoformat() conversation_meta_data = { "user_id": current_user.email, "created_at": created_at_iso, "updated_at": updated_at_iso, "message_count": len(validated_history_for_response) } # Xóa meta cũ và đặt lại, hoặc dùng hmset để cập nhật pipe.delete(meta_redis_key) pipe.hset(meta_redis_key, conversation_meta_data) pipe.expire(meta_redis_key, 86400) pipe.execute() logger.info(f"Đã nạp và đồng bộ hội thoại {chat_id} vào Redis với {len(validated_history_for_response)} tin nhắn.") except Exception as e: logger.error(f"Lỗi khi nạp hội thoại {chat_id} vào Redis: {e}", exc_info=True) # 4. Trả về response return ChatHistoryResponse( chat_id=chat_id, history=validated_history_for_response, created_at=conversation_doc["created_at"], # Lấy từ doc MongoDB updated_at=conversation_doc["updated_at"], # Lấy từ doc MongoDB user_id=current_user.email )