Spaces:
Runtime error
Runtime error
from fastapi import Depends, HTTPException | |
from schemas.chat import QueryRequest, AnswerResponse, SourceDocument | |
from schemas.user import UserOut | |
from dependencies import get_current_user | |
import time | |
import json | |
from utils.utils import save_chat_to_redis, search_term_in_dictionary, minimal_preprocess_for_llm, save_chat_to_mongo, get_langchain_chat_history | |
import os | |
import logging | |
from db.mongoDB import mongo_db | |
from datetime import datetime, timezone | |
import asyncio | |
logger = logging.getLogger(__name__) | |
async def ask_question_service(app_state, request: QueryRequest, user: UserOut = Depends(get_current_user)): | |
chat_id = request.chat_id | |
question_content = request.input # Giữ lại câu hỏi gốc của user để lưu | |
# --- 1. Xác thực và kiểm tra metadata từ Redis --- | |
meta_key = f"conversation_meta:{chat_id}" | |
if not await app_state.redis.exists(meta_key): # Dùng await nếu redis client là async | |
logger.warning(f"Metadata cho chat_id {chat_id} không tìm thấy trong Redis.") | |
raise HTTPException(status_code=404, detail="Chat ID not found or session expired. Please reload the conversation.") | |
user_in_redis = await app_state.redis.hget(meta_key, "user_id") # Key đã đổi thành user_id | |
if not user_in_redis: | |
logger.error(f"user_id không có trong metadata của chat {chat_id}.") | |
raise HTTPException(status_code=404, detail="Chat metadata corrupted.") | |
if user_in_redis != user.email: | |
logger.warning(f"User {user.email} không được phép truy cập chat {chat_id} (thuộc về {user_in_redis}).") | |
raise HTTPException(status_code=403, detail="Unauthorized to access this chat.") | |
start_time = time.time() | |
current_utc_time = datetime.now(timezone.utc) # Sử dụng UTC cho timestamp | |
# --- 2. Tiền xử lý câu hỏi --- | |
cleaned_question = minimal_preprocess_for_llm(question_content) | |
# --- 3. Kiểm tra từ điển thuật ngữ (nếu có) --- | |
if hasattr(app_state, 'dict') and app_state.dict: | |
term_result = search_term_in_dictionary(cleaned_question, app_state.dict) | |
if term_result: | |
answer_def = term_result.get("definition", "Không thể tìm thấy định nghĩa.") | |
assistant_response_time = datetime.now(timezone.utc) | |
# Lưu vào Redis và MongoDB | |
await save_chat_to_redis( | |
app_state.redis, chat_id, question_content, answer_def, current_utc_time, assistant_response_time | |
) | |
await save_chat_to_mongo( | |
mongo_db.conversations, chat_id, user.email, question_content, answer_def, current_utc_time, assistant_response_time | |
) | |
friendly_answer = f"Xin chào! Về câu hỏi '{question_content}' của bạn, tôi đã tìm thấy thông tin sau:\n\n{answer_def}\n\nHy vọng thông tin này hữu ích cho bạn. Bạn có muốn tìm hiểu thêm về chủ đề này hoặc có câu hỏi nào khác không? 😊" | |
return AnswerResponse( | |
answer=friendly_answer, | |
sources=[ | |
SourceDocument( | |
source="Thuật ngữ pháp lý", | |
page_content_preview=f"Định nghĩa thuật ngữ từ cơ sở dữ liệu" | |
) | |
], | |
processing_time=round(time.time() - start_time, 2) | |
) | |
if not app_state.qa_chain: | |
logger.error("QA Chain chưa được khởi tạo.") | |
raise HTTPException(status_code=503, detail="Service Unavailable: QA Chain not ready.") | |
try: | |
redis_url = os.environ.get("REDIS_URL_LANGCHAIN", os.environ.get("REDIS_URL")) # Ưu tiên URL riêng cho Langchain nếu có | |
if not redis_url: | |
logger.error("REDIS_URL or REDIS_URL_LANGCHAIN not set for RedisChatMessageHistory.") | |
raise ValueError("Redis URL for chat history is required.") | |
chat_history_messages = await prepare_chat_history_optimized( | |
app_state.redis, | |
chat_id, | |
max_messages=10 | |
) | |
chat_history_string = format_chat_history_for_prompt(chat_history_messages) | |
input_data_for_chain = { | |
# "chat_history": langchain_chat_history.messages, # Lấy messages đã được đồng bộ | |
"chat_history": chat_history_string, # Lấy messages đã được đồng bộ | |
"input": cleaned_question | |
} | |
except Exception as e: | |
logger.error(f"Lỗi khi chuẩn bị chat history cho Langchain (chat_id: {chat_id}): {e}", exc_info=True) | |
raise HTTPException(status_code=500, detail="Lỗi xử lý lịch sử chat.") | |
# --- 5. Gọi QA Chain --- | |
try: | |
logger.debug(f"Input to QA Chain (chat_id: {chat_id}): {input_data_for_chain}") | |
# Metadata cho LangSmith trace | |
langsmith_metadata = { | |
"user_email": user.email, | |
"chat_id": chat_id, | |
"original_question": question_content, | |
"cleaned_question": cleaned_question, | |
"request_id": request.request_id if hasattr(request, 'request_id') else "N/A" # Nếu bạn có request ID | |
} | |
chain_result = app_state.qa_chain.invoke(input_data_for_chain, config={ | |
"metadata": langsmith_metadata, | |
"run_name": f"AskService_QA_Invoke_ChatID_{chat_id[:8]}" | |
# "tags": ["production", "qa_service"] | |
}) | |
# logger.info(f"QA Chain raw result (chat_id: {chat_id}): {chain_result}") | |
# Xử lý kết quả từ chain (logic của bạn để trích xuất câu trả lời) | |
assistant_response_content = "" | |
if isinstance(chain_result, dict) and "answer" in chain_result: | |
assistant_response_content = str(chain_result["answer"]) | |
elif isinstance(chain_result, str): # Một số chain có thể trả về string trực tiếp | |
assistant_response_content = chain_result | |
else: | |
logger.error(f"QA Chain result không hợp lệ (chat_id: {chat_id}): {chain_result}") | |
assistant_response_content = "Xin lỗi, tôi không thể xử lý yêu cầu này vào lúc này." | |
# Không raise lỗi ở đây ngay, mà trả về thông báo lỗi cho user và log lại. | |
if not assistant_response_content.strip(): | |
assistant_response_content = "Tôi không tìm thấy câu trả lời phù hợp." | |
except Exception as chain_error: | |
logger.error(f"Lỗi QA Chain (chat_id: {chat_id}): {chain_error}", exc_info=True) | |
raise HTTPException(status_code=500, detail=f"Lỗi xử lý từ QA chain: {str(chain_error)[:100]}") | |
assistant_response_time = datetime.now(timezone.utc) | |
# --- 6. Lưu tin nhắn mới (câu hỏi của user và trả lời của AI) --- | |
# Lưu vào key "conversation_messages:{chat_id}" của chúng ta | |
await save_chat_to_redis( | |
app_state.redis, chat_id, question_content, assistant_response_content, current_utc_time, assistant_response_time | |
) | |
# Lưu vào MongoDB | |
# Chạy ngầm hoặc sau khi trả lời user để không làm chậm response (nếu có thể) | |
await save_chat_to_mongo( | |
mongo_db.conversations, chat_id, user.email, question_content, assistant_response_content, current_utc_time, assistant_response_time | |
) | |
end_time = time.time() | |
logger.info(f"Trả lời cho chat {chat_id} bởi user {user.email}: {assistant_response_content[:100]}...") | |
return AnswerResponse( | |
answer=assistant_response_content, | |
processing_time=round(end_time - start_time, 2) | |
) | |
async def stream_chat_generator( | |
app_state, | |
chat_id: str, | |
question_content: str, | |
user_email: str | |
): | |
""" | |
Generator function to stream chat responses. | |
Yields data in Server-Sent Events (SSE) format. | |
""" | |
start_time_total = time.time() | |
current_utc_time = datetime.now(timezone.utc) | |
full_answer_for_saving = "" # Để lưu toàn bộ câu trả lời vào DB | |
try: | |
# --- 1. Xác thực và kiểm tra metadata từ Redis (Tương tự ask_question_service) --- | |
meta_key = f"conversation_meta:{chat_id}" | |
if not await app_state.redis.exists(meta_key): | |
logger.warning(f"Stream: Metadata cho chat_id {chat_id} không tìm thấy.") | |
error_payload = {"error": "Chat ID not found or session expired. Please reload."} | |
yield f"event: error\ndata: {json.dumps(error_payload)}\n\n" | |
return | |
user_in_redis_bytes =await app_state.redis.hget(meta_key, "user_id") | |
if not user_in_redis_bytes: | |
logger.error(f"Stream: user_id không có trong metadata của chat {chat_id}.") | |
error_payload = {"error": "Chat metadata corrupted."} | |
yield f"event: error\ndata: {json.dumps(error_payload)}\n\n" | |
return | |
user_in_redis = user_in_redis_bytes | |
if user_in_redis != user_email: | |
logger.warning(f"Stream: User {user_email} không được phép truy cập chat {chat_id}.") | |
error_payload = {"error": "Unauthorized to access this chat."} | |
yield f"event: error\ndata: {json.dumps(error_payload)}\n\n" | |
return | |
# --- 2. Tiền xử lý câu hỏi (Tương tự) --- | |
cleaned_question = minimal_preprocess_for_llm(question_content) | |
initial_processing_done_time = time.time() | |
logger.info(f"Stream: Initial processing for {chat_id} took {initial_processing_done_time - start_time_total:.2f}s") | |
# --- 3. Kiểm tra từ điển thuật ngữ (nếu có, và nó nhanh) --- | |
if hasattr(app_state, 'dict') and app_state.dict: | |
term_result = search_term_in_dictionary(cleaned_question, app_state.dict) | |
if term_result: | |
answer_def = term_result.get("definition", "Không thể tìm thấy định nghĩa.") | |
assistant_response_time_dict = datetime.now(timezone.utc) | |
full_answer_for_saving = answer_def # Gán cho lưu trữ | |
# Stream toàn bộ định nghĩa như một chunk | |
data_payload = {"token": answer_def, "is_final": True, "source": "dictionary"} | |
yield f"data: {json.dumps(data_payload)}\n\n" | |
# Có thể gửi event kết thúc riêng | |
yield f"event: end_stream\ndata: {{}}\n\n" # Event kết thúc tùy chỉnh | |
# Lưu vào Redis và MongoDB (sau khi stream) | |
await save_chat_to_redis( | |
app_state.redis, chat_id, question_content, full_answer_for_saving, current_utc_time, assistant_response_time_dict | |
) | |
asyncio.create_task(save_chat_to_mongo( # Chạy nền | |
mongo_db.conversations, chat_id, user_email, question_content, full_answer_for_saving, current_utc_time, assistant_response_time_dict | |
)) | |
processing_time_dict = round(time.time() - start_time_total, 2) | |
logger.info(f"Stream: Dictionary answer for {chat_id} sent in {processing_time_dict:.2f}s.") | |
return # Kết thúc generator ở đây | |
if not app_state.qa_chain: # qa_chain phải hỗ trợ streaming | |
logger.error("Stream: QA Chain chưa được khởi tạo hoặc không hỗ trợ streaming.") | |
error_payload = {"error": "Service Unavailable: QA Chain not ready for streaming."} | |
yield f"event: error\ndata: {json.dumps(error_payload)}\n\n" | |
return | |
# --- 4. Lấy lịch sử chat cho Langchain Chain (Tương tự) --- | |
try: | |
langchain_chat_history = await get_langchain_chat_history(app_state, chat_id) | |
input_data_for_chain = { | |
"chat_history": langchain_chat_history.messages, | |
"input": cleaned_question | |
} | |
except Exception as e: | |
logger.error(f"Stream: Lỗi khi chuẩn bị chat history (chat_id: {chat_id}): {e}", exc_info=True) | |
error_payload = {"error": "Error processing chat history."} | |
yield f"event: error\ndata: {json.dumps(error_payload)}\n\n" | |
return | |
# --- 5. Gọi QA Chain với streaming --- | |
if not (hasattr(app_state.qa_chain, 'astream') or hasattr(app_state.qa_chain, 'stream')): | |
logger.error(f"Stream: QA Chain (type: {type(app_state.qa_chain)}) không có phương thức astream hoặc stream.") | |
error_payload = {"error": "QA Chain does not support streaming."} | |
yield f"event: error\ndata: {json.dumps(error_payload)}\n\n" | |
return | |
chain_stream_method = app_state.qa_chain.astream if hasattr(app_state.qa_chain, 'astream') else app_state.qa_chain.stream | |
logger.info(f"Stream: Invoking chain stream for {chat_id}...") | |
stream_start_time = time.time() | |
chunk_count = 0 | |
sources_streamed = False # Cờ để chỉ stream sources một lần | |
async for chunk in chain_stream_method(input_data_for_chain): | |
token = "" | |
current_sources = None | |
if isinstance(chunk, str): | |
token = chunk | |
elif hasattr(chunk, 'content'): # Giống AIMessageChunk | |
token = chunk.content | |
elif isinstance(chunk, dict): | |
token = chunk.get("answer") or chunk.get("token") or chunk.get("content") or "" | |
# Kiểm tra sources nếu chunk là dict và chưa stream sources | |
if not sources_streamed and "source" in chunk: | |
current_sources = chunk["source"] | |
if token: | |
full_answer_for_saving += token | |
data_payload = {"token": token, "is_final": False} | |
yield f"data: {json.dumps(data_payload)}\n\n" | |
chunk_count += 1 | |
# Stream sources nếu có và chưa được stream | |
if current_sources and not sources_streamed: | |
sources_list = [] | |
for doc in current_sources: | |
if hasattr(doc, 'metadata') and hasattr(doc, 'page_content'): | |
sources_list.append(SourceDocument( | |
source=doc.metadata.get('source', 'N/A'), | |
page_content_preview=doc.page_content[:200] + "..." | |
).dict()) # Chuyển sang dict để JSON serialize | |
if sources_list: | |
source_payload = {"sources": sources_list} | |
yield f"event: sources\ndata: {json.dumps(source_payload)}\n\n" # Event riêng cho sources | |
sources_streamed = True # Đánh dấu đã stream | |
stream_end_time = time.time() | |
logger.info(f"Stream: Chain streaming for {chat_id} completed in {stream_end_time - stream_start_time:.2f}s with {chunk_count} chunks.") | |
# --- Gửi event kết thúc stream --- | |
# Frontend có thể dùng event này để biết stream đã hoàn tất. | |
# Hoặc, frontend có thể dựa vào một chunk đặc biệt như `{"is_final": true}` | |
# Hoặc đơn giản là khi `EventSource.onmessage` không nhận được gì nữa sau một timeout. | |
yield f"event: end_stream\ndata: {{ \"message\": \"Stream ended\" }}\n\n" | |
# --- 6. Lưu tin nhắn hoàn chỉnh (sau khi stream xong) --- | |
assistant_response_time = datetime.now(timezone.utc) | |
if not full_answer_for_saving.strip() and chunk_count == 0: # Nếu không có token nào được stream | |
full_answer_for_saving = "Tôi không tìm thấy câu trả lời phù hợp." | |
# Stream câu trả lời mặc định này nếu chưa có gì | |
data_payload = {"token": full_answer_for_saving, "is_final": True} | |
yield f"data: {json.dumps(data_payload)}\n\n" | |
yield f"event: end_stream\ndata: {{ \"message\": \"Stream ended with default message\" }}\n\n" | |
logger.info(f"Stream: Full answer for {chat_id} to be saved: {full_answer_for_saving[:100]}...") | |
await save_chat_to_redis( | |
app_state.redis, chat_id, question_content, full_answer_for_saving, current_utc_time, assistant_response_time | |
) | |
# Chạy lưu MongoDB ngầm để không block | |
asyncio.create_task(save_chat_to_mongo( | |
mongo_db.conversations, chat_id, user_email, question_content, full_answer_for_saving, current_utc_time, assistant_response_time | |
)) | |
# Cập nhật Langchain history (nếu chain memory không tự làm) | |
# await langchain_chat_history.aadd_user_message(question_for_chain) | |
# await langchain_chat_history.aadd_ai_message(full_answer_for_saving) | |
except HTTPException as e: # Bắt HTTPException đã được raise từ các hàm con | |
logger.error(f"Stream: HTTPException for chat_id {chat_id}: {e.detail}", exc_info=True) | |
error_payload = {"error": e.detail, "status_code": e.status_code} | |
yield f"event: error_stream\ndata: {json.dumps(error_payload)}\n\n" | |
except Exception as e: | |
logger.error(f"Stream: Unhandled exception for chat_id {chat_id}: {e}", exc_info=True) | |
error_payload = {"error": "An unexpected server error occurred during streaming."} | |
yield f"event: error_stream\ndata: {json.dumps(error_payload)}\n\n" | |
finally: | |
# Đảm bảo generator kết thúc đúng cách. | |
# EventSource trên client sẽ tự động đóng khi generator kết thúc. | |
# Hoặc bạn có thể gửi một tín hiệu đóng rõ ràng nếu cần. | |
# yield "event: close\ndata: Connection closed by server\n\n" # Không chuẩn SSE, nhưng một số client có thể hiểu | |
logger.info(f"Stream: Generator for chat_id {chat_id} finished. Total time: {time.time() - start_time_total:.2f}s") | |
# Sử dụng GET cho EventSource theo chuẩn, truyền params qua query string | |
# EventSource chỉ hỗ trợ GET. Nếu bạn BẮT BUỘC phải dùng POST (ví dụ, câu hỏi quá dài cho URL), | |
# bạn sẽ cần một giải pháp phức tạp hơn, không dùng EventSource trực tiếp trên client | |
# mà dùng fetch API với ReadableStream và POST. | |
#helper | |
from typing import List, Optional,Any | |
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage | |
async def prepare_chat_history_optimized( | |
redis:Any, | |
chat_id: str, | |
max_messages: int = 10, # Số lượng cặp tin nhắn (user+AI) tối đa để lấy | |
max_tokens: Optional[int] = None, # (Tùy chọn nâng cao) Giới hạn token | |
tokenizer: Optional[Any] = None # (Tùy chọn nâng cao) Tokenizer để đếm token | |
) -> List[BaseMessage]: | |
""" | |
CẢI TIẾN: Lấy N tin nhắn gần nhất từ Redis để làm lịch sử chat. | |
- Hiệu quả hơn bằng cách chỉ lấy một phần lịch sử. | |
- An toàn hơn bằng cách kiểm soát độ dài ngữ cảnh. | |
Args: | |
redis: Client Redis bất đồng bộ. | |
chat_id: ID của cuộc trò chuyện. | |
max_messages: Số lượng tin nhắn tối đa để lấy từ cuối (ví dụ: 10 tin nhắn gần nhất). | |
max_tokens: (Nâng cao) Giới hạn tổng số token của lịch sử. | |
tokenizer: (Nâng cao) Tokenizer để sử dụng với max_tokens. | |
Returns: | |
Một danh sách các đối tượng tin nhắn của LangChain (HumanMessage, AIMessage). | |
""" | |
messages_key = f"conversation_messages:{chat_id}" | |
# 1. Lấy N tin nhắn gần nhất từ Redis | |
# lrange(key, -N, -1) sẽ lấy N phần tử cuối cùng của list. | |
# Lấy nhiều hơn một chút để đảm bảo có cặp user/ai hoàn chỉnh. | |
num_to_fetch = max_messages + 2 | |
try: | |
# Sử dụng lrange để lấy các tin nhắn gần nhất, hiệu quả hơn nhiều so với lấy tất cả | |
raw_messages_json = await redis.lrange(messages_key, -num_to_fetch, -1) | |
if not raw_messages_json: | |
return [] | |
except Exception as e: | |
logger.error(f"Lỗi khi đọc lịch sử chat từ Redis cho chat_id {chat_id}: {e}") | |
return [] | |
# 2. Xây dựng danh sách tin nhắn cho LangChain | |
langchain_messages: List[BaseMessage] = [] | |
total_tokens = 0 | |
# Lặp ngược từ cuối (tin nhắn mới nhất) để xử lý | |
for msg_json_str in reversed(raw_messages_json): | |
try: | |
msg_data = json.loads(msg_json_str) | |
content = msg_data.get("content", "") | |
# (Tùy chọn nâng cao) Kiểm tra giới hạn token | |
if max_tokens and tokenizer: | |
num_tokens = len(tokenizer.encode(content)) | |
if total_tokens + num_tokens > max_tokens: | |
logger.warning(f"Đã đạt giới hạn token ({max_tokens}) cho lịch sử chat. Dừng lại.") | |
break # Dừng thêm tin nhắn | |
total_tokens += num_tokens | |
# Tạo đối tượng tin nhắn phù hợp | |
if msg_data.get("role") == "user": | |
langchain_messages.append(HumanMessage(content=content)) | |
elif msg_data.get("role") == "assistant": | |
langchain_messages.append(AIMessage(content=content)) | |
except (json.JSONDecodeError, KeyError) as e: | |
logger.warning(f"Lỗi khi parse tin nhắn từ Redis: {e}. Bỏ qua tin nhắn này.") | |
continue | |
# 3. Đảo ngược lại danh sách để có đúng thứ tự (cũ -> mới) | |
langchain_messages.reverse() | |
# Cắt lại theo max_messages cuối cùng để đảm bảo số lượng chính xác | |
return langchain_messages[-max_messages:] | |
def format_chat_history_for_prompt(chat_history: List[BaseMessage]) -> str: | |
""" | |
Chuyển đổi danh sách đối tượng tin nhắn thành một chuỗi văn bản duy nhất. | |
""" | |
if not chat_history: | |
return "Không có lịch sử trò chuyện." | |
formatted_history = [] | |
for message in chat_history: | |
role = "Người dùng" if isinstance(message, HumanMessage) else "Trợ lý" | |
formatted_history.append(f"{role}: {message.content}") | |
return "\n".join(formatted_history) |