File size: 11,777 Bytes
a6fd1a3
 
 
 
 
 
 
 
 
 
 
3e7b272
a6fd1a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d3eb8bf
a6fd1a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4e001e2
a6fd1a3
 
 
d627b5f
a6fd1a3
 
 
4e001e2
a6fd1a3
d3eb8bf
a6fd1a3
 
 
 
 
 
 
 
 
 
 
a93cb8f
a6fd1a3
 
 
 
05debeb
 
a6fd1a3
 
 
 
 
 
 
05debeb
a6fd1a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d3eb8bf
a6fd1a3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
from fastapi import APIRouter, Depends, HTTPException, Request
from schemas.chat import QueryRequest, AnswerResponse, ChatHistoryResponse
from schemas.user import UserOut
from dependencies import get_current_user
from services.chat_service import ask_question_service, stream_chat_generator
from utils.utils import delete_chat_from_redis
from dependencies import get_app_state
import logging
import uuid
from redis.asyncio import Redis
from datetime import datetime, timezone
from db.mongoDB import mongo_db
from fastapi.responses import StreamingResponse
from schemas.chat import Message,ConversationResponse
from typing import List
# Thiết lập logger
logger = logging.getLogger(__name__)
router = APIRouter()


@router.post("/create-chat")
async def create_chat(
    fastapi_request: Request, # Sử dụng Request từ FastAPI
    current_user: UserOut = Depends(get_current_user) # Sử dụng User model của bạn
):
    app_state = get_app_state(request=fastapi_request)
    redis_client: Redis = app_state.redis # Nên đặt tên rõ ràng là redis_client


    chat_id = str(uuid.uuid4())
    current_utc_time = datetime.now(timezone.utc) # Sử dụng UTC

    # --- Lưu metadata vào Redis với key đã thống nhất ---
    meta_key = f"conversation_meta:{chat_id}"
    conversation_meta_data = {
        "user_id": current_user.email, # Sử dụng key 'user_id' cho nhất quán
        "created_at": current_utc_time.isoformat(), # Lưu dưới dạng ISO string
        "updated_at": current_utc_time.isoformat(), # Ban đầu giống created_at
        "message_count": 0 # Số lượng tin nhắn ban đầu
    }

    try:
        # Sử dụng await nếu redis_client là async
        if hasattr(redis_client, 'hmset_async'): # Kiểm tra phương thức async (ví dụ)
            await redis_client.hmset(meta_key, conversation_meta_data)
            await redis_client.expire(meta_key, 86400) # TTL: 24 giờ
        else: # Client đồng bộ
            await redis_client.hmset(meta_key, conversation_meta_data)
            await redis_client.expire(meta_key, 86400) # TTL: 24 giờ

        logger.info(f"Đã tạo metadata cho chat_id {chat_id} trong Redis với key {meta_key}.")
    except Exception as e:
        logger.error(f"Lỗi khi lưu metadata vào Redis cho chat {chat_id}: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Lỗi khi tạo metadata cho cuộc hội thoại.")

    # --- Lưu hội thoại rỗng vào MongoDB ---
    # (Đảm bảo messages ban đầu là list rỗng cho key messages chính của bạn)
    messages_key_in_mongo = "messages" # Key lưu trữ danh sách tin nhắn trong MongoDB

    conversation_doc = {
        "user_id": current_user.email,
        "conversation_id": chat_id,
        messages_key_in_mongo: [], # Danh sách tin nhắn rỗng
        "created_at": current_utc_time, # Lưu kiểu datetime object
        "updated_at": current_utc_time  # Lưu kiểu datetime object
    }

    try:
        await mongo_db.conversations.insert_one(conversation_doc)
        logger.info(f"Đã tạo hội thoại rỗng {chat_id} trong MongoDB cho user {current_user.email}.")
    except Exception as e:
        logger.error(f"Lỗi khi tạo hội thoại rỗng trong MongoDB cho chat {chat_id}: {e}", exc_info=True)
        # Cân nhắc xóa key meta trong Redis nếu MongoDB thất bại để tránh trạng thái không nhất quán
        try:
            if hasattr(redis_client, 'delete_async'):
                await redis_client.delete(meta_key)
            else:
                await redis_client.delete(meta_key)
            logger.info(f"Đã xóa meta key {meta_key} khỏi Redis do lỗi MongoDB.")
        except Exception as redis_del_err:
            logger.error(f"Lỗi khi xóa meta key {meta_key} khỏi Redis: {redis_del_err}")

        raise HTTPException(status_code=500, detail="Lỗi khi tạo cuộc hội thoại trong cơ sở dữ liệu.")

    return {"chat_id": chat_id}


@router.post("", response_model=AnswerResponse)
async def chat_message(request_body: QueryRequest,request: Request, user:UserOut=Depends(get_current_user)):
    app_state = get_app_state(request=request)
    result = await ask_question_service(app_state,request_body, user)

    if not result:
        raise HTTPException(status_code=500, detail="Error during QA Chain invocation")
    return result

@router.get("/stream") # Đổi thành GET
async def stream_chat_endpoint(
    chat_id: str,  # Lấy từ query param
    input: str,    # Lấy từ query param (tên param này phải khớp với FE)
    request: Request,
    user: UserOut = Depends(get_current_user) # Sửa kiểu user
):
    app_state = get_app_state(request=request)
    user_email = getattr(user, 'email', str(user)) # Lấy email an toàn

    # Kiểm tra input cơ bản
    if not chat_id or not input:
        raise HTTPException(status_code=400, detail="chat_id and input are required.")

    # Sử dụng EventSourceResponse (từ sse-starlette, cài đặt: pip install sse-starlette)
    # Nó xử lý các chi tiết của SSE tốt hơn StreamingResponse thô.
    # return EventSourceResponse(stream_chat_generator(app_state, chat_id, input, user_email))

    # Hoặc dùng StreamingResponse trực tiếp (đơn giản hơn nhưng ít tính năng SSE hơn)
    return StreamingResponse(
        stream_chat_generator(app_state, chat_id, input, user_email),
        media_type="text/event-stream"
    )

@router.delete("/chats/{chat_id}")
async def delete_chat(chat_id: str, request: Request, user: UserOut = Depends(get_current_user)):
    app_state = get_app_state(request=request)
    redis = app_state.redis

    meta_key = f"conversation_meta:{chat_id}"
    # Kiểm tra quyền trước khi xóa
    user_in_chat = await redis.hget(meta_key, "user_id")
    if user_in_chat is None:
        raise HTTPException(status_code=404, detail="Chat not found")

    if user_in_chat != user.email:
        raise HTTPException(status_code=403, detail="Unauthorized")

    # Xóa chat
    await delete_chat_from_redis(redis, chat_id)
    # Xóa hội thoại trong MongoDB
    result =await mongo_db.conversations.delete_one({"conversation_id": chat_id, "user_id": user.email})
    if result.deleted_count == 0:
        raise HTTPException(status_code=404, detail="Chat not found in MongoDB")

    return {"message": "Chat deleted successfully"}



@router.get("/conversations", response_model=List[ConversationResponse])
async def get_conversations(user: UserOut = Depends(get_current_user)):
    try:
        logger.info(f"Attempting to get conversations for user: {user.email}")
        db_conversations_cursor =  mongo_db.conversations.find({"user_id": user.email})

        response_list = []


        async for conv_doc in db_conversations_cursor:
            logger.debug(f"Processing conversation doc: {conv_doc}")
            all_messages = conv_doc.get("messages", [])
            logger.debug(f"Messages for this conversation: {all_messages}")

            response_list.append({
                "conversation_id": conv_doc["conversation_id"],
                "created_at": conv_doc["created_at"],
                "updated_at": conv_doc["updated_at"],
                "messages": all_messages
            })
        logger.info(f"Successfully processed {len(response_list)} conversations.")
        return response_list
    except Exception as e:
        logger.error(f"Error in get_conversations for user {user.email}: {e}", exc_info=True) # exc_info=True sẽ log cả traceback
        raise HTTPException(status_code=500, detail="An error occurred while fetching conversations.")


@router.get("/c/{chat_id}", response_model=ChatHistoryResponse)
async def load_conversation_and_sync_redis(
    fastapi_request: Request, # Đổi tên biến request
    chat_id: str, # Lấy trực tiếp từ path param
    current_user: UserOut = Depends(get_current_user) # Sử dụng User model
):
    app_state = get_app_state(request=fastapi_request)
    redis_client = app_state.redis # Nên là client async nếu có thể

    # 1. Kiểm tra hội thoại trong MongoDB
    try:
        conversation_doc = await mongo_db.conversations.find_one(
            {"conversation_id": chat_id, "user_id": current_user.email}
        )
        if not conversation_doc:
            logger.warning(f"Hội thoại {chat_id} không tồn tại hoặc không thuộc user {current_user.email}")
            raise HTTPException(
                status_code=404,
                detail="Hội thoại không tồn tại hoặc bạn không có quyền truy cập"
            )
    except Exception as e:
        logger.error(f"Lỗi MongoDB khi kiểm tra hội thoại {chat_id}: {e}", exc_info=True)
        raise HTTPException(status_code=500, detail="Lỗi kết nối cơ sở dữ liệu MongoDB")

    # 2. Chuẩn bị lịch sử tin nhắn từ MongoDB để trả về và nạp vào Redis
    raw_messages_from_db = conversation_doc.get("messages", [])
    validated_history_for_response: List[Message] = []
    for msg_data in raw_messages_from_db:
        try:
            # Validate và chuyển đổi timestamp nếu cần (Pydantic sẽ tự làm nếu input là datetime obj)
            validated_history_for_response.append(Message(**msg_data))
        except Exception as p_err:
            logger.warning(f"Bỏ qua message không hợp lệ trong chat {chat_id} từ DB: {msg_data}. Lỗi: {p_err}")


    # 3. Nạp/Đồng bộ tin nhắn và metadata vào Redis (Sử dụng key thống nhất)
    messages_redis_key = f"conversation_messages:{chat_id}"
    meta_redis_key = f"conversation_meta:{chat_id}"

    try:
        # Sử dụng pipeline cho hiệu quả
        # Giả sử redis_client là async
        with redis_client.pipeline() as pipe:
            pipe.delete(messages_redis_key) # Xóa messages cũ để nạp lại toàn bộ
            if validated_history_for_response:
                for msg_model in validated_history_for_response:
                    # Đảm bảo lưu trữ theo cấu trúc Pydantic `Message`
                    pipe.rpush(messages_redis_key, msg_model.model_dump_json()) # Pydantic V2
                    # hoặc .json() cho Pydantic V1
            pipe.expire(messages_redis_key, 86400)  # TTL: 24 giờ

            # Cập nhật/Tạo mới metadata
            # Lấy created_at, updated_at từ document MongoDB
            created_at_iso = conversation_doc["created_at"].isoformat()
            updated_at_iso = conversation_doc["updated_at"].isoformat()

            conversation_meta_data = {
                "user_id": current_user.email,
                "created_at": created_at_iso,
                "updated_at": updated_at_iso,
                "message_count": len(validated_history_for_response)
            }
            # Xóa meta cũ và đặt lại, hoặc dùng hmset để cập nhật
            pipe.delete(meta_redis_key)
            pipe.hset(meta_redis_key, conversation_meta_data)
            pipe.expire(meta_redis_key, 86400)


            pipe.execute()
        logger.info(f"Đã nạp và đồng bộ hội thoại {chat_id} vào Redis với {len(validated_history_for_response)} tin nhắn.")

    except Exception as e:
        logger.error(f"Lỗi khi nạp hội thoại {chat_id} vào Redis: {e}", exc_info=True)

    # 4. Trả về response
    return ChatHistoryResponse(
        chat_id=chat_id,
        history=validated_history_for_response,
        created_at=conversation_doc["created_at"], # Lấy từ doc MongoDB
        updated_at=conversation_doc["updated_at"], # Lấy từ doc MongoDB
        user_id=current_user.email
    )