juribot-backend / services /chat_service.py
entidi2608's picture
fix
d627b5f
from fastapi import Depends, HTTPException
from schemas.chat import QueryRequest, AnswerResponse, SourceDocument
from schemas.user import UserOut
from dependencies import get_current_user
import time
import json
from utils.utils import save_chat_to_redis, search_term_in_dictionary, minimal_preprocess_for_llm, save_chat_to_mongo, get_langchain_chat_history
import os
import logging
from db.mongoDB import mongo_db
from datetime import datetime, timezone
import asyncio
logger = logging.getLogger(__name__)
async def ask_question_service(app_state, request: QueryRequest, user: UserOut = Depends(get_current_user)):
chat_id = request.chat_id
question_content = request.input # Giữ lại câu hỏi gốc của user để lưu
# --- 1. Xác thực và kiểm tra metadata từ Redis ---
meta_key = f"conversation_meta:{chat_id}"
if not await app_state.redis.exists(meta_key): # Dùng await nếu redis client là async
logger.warning(f"Metadata cho chat_id {chat_id} không tìm thấy trong Redis.")
raise HTTPException(status_code=404, detail="Chat ID not found or session expired. Please reload the conversation.")
user_in_redis = await app_state.redis.hget(meta_key, "user_id") # Key đã đổi thành user_id
if not user_in_redis:
logger.error(f"user_id không có trong metadata của chat {chat_id}.")
raise HTTPException(status_code=404, detail="Chat metadata corrupted.")
if user_in_redis != user.email:
logger.warning(f"User {user.email} không được phép truy cập chat {chat_id} (thuộc về {user_in_redis}).")
raise HTTPException(status_code=403, detail="Unauthorized to access this chat.")
start_time = time.time()
current_utc_time = datetime.now(timezone.utc) # Sử dụng UTC cho timestamp
# --- 2. Tiền xử lý câu hỏi ---
cleaned_question = minimal_preprocess_for_llm(question_content)
# --- 3. Kiểm tra từ điển thuật ngữ (nếu có) ---
if hasattr(app_state, 'dict') and app_state.dict:
term_result = search_term_in_dictionary(cleaned_question, app_state.dict)
if term_result:
answer_def = term_result.get("definition", "Không thể tìm thấy định nghĩa.")
assistant_response_time = datetime.now(timezone.utc)
# Lưu vào Redis và MongoDB
await save_chat_to_redis(
app_state.redis, chat_id, question_content, answer_def, current_utc_time, assistant_response_time
)
await save_chat_to_mongo(
mongo_db.conversations, chat_id, user.email, question_content, answer_def, current_utc_time, assistant_response_time
)
friendly_answer = f"Xin chào! Về câu hỏi '{question_content}' của bạn, tôi đã tìm thấy thông tin sau:\n\n{answer_def}\n\nHy vọng thông tin này hữu ích cho bạn. Bạn có muốn tìm hiểu thêm về chủ đề này hoặc có câu hỏi nào khác không? 😊"
return AnswerResponse(
answer=friendly_answer,
sources=[
SourceDocument(
source="Thuật ngữ pháp lý",
page_content_preview=f"Định nghĩa thuật ngữ từ cơ sở dữ liệu"
)
],
processing_time=round(time.time() - start_time, 2)
)
if not app_state.qa_chain:
logger.error("QA Chain chưa được khởi tạo.")
raise HTTPException(status_code=503, detail="Service Unavailable: QA Chain not ready.")
try:
redis_url = os.environ.get("REDIS_URL_LANGCHAIN", os.environ.get("REDIS_URL")) # Ưu tiên URL riêng cho Langchain nếu có
if not redis_url:
logger.error("REDIS_URL or REDIS_URL_LANGCHAIN not set for RedisChatMessageHistory.")
raise ValueError("Redis URL for chat history is required.")
chat_history_messages = await prepare_chat_history_optimized(
app_state.redis,
chat_id,
max_messages=10
)
chat_history_string = format_chat_history_for_prompt(chat_history_messages)
input_data_for_chain = {
# "chat_history": langchain_chat_history.messages, # Lấy messages đã được đồng bộ
"chat_history": chat_history_string, # Lấy messages đã được đồng bộ
"input": cleaned_question
}
except Exception as e:
logger.error(f"Lỗi khi chuẩn bị chat history cho Langchain (chat_id: {chat_id}): {e}", exc_info=True)
raise HTTPException(status_code=500, detail="Lỗi xử lý lịch sử chat.")
# --- 5. Gọi QA Chain ---
try:
logger.debug(f"Input to QA Chain (chat_id: {chat_id}): {input_data_for_chain}")
# Metadata cho LangSmith trace
langsmith_metadata = {
"user_email": user.email,
"chat_id": chat_id,
"original_question": question_content,
"cleaned_question": cleaned_question,
"request_id": request.request_id if hasattr(request, 'request_id') else "N/A" # Nếu bạn có request ID
}
chain_result = app_state.qa_chain.invoke(input_data_for_chain, config={
"metadata": langsmith_metadata,
"run_name": f"AskService_QA_Invoke_ChatID_{chat_id[:8]}"
# "tags": ["production", "qa_service"]
})
# logger.info(f"QA Chain raw result (chat_id: {chat_id}): {chain_result}")
# Xử lý kết quả từ chain (logic của bạn để trích xuất câu trả lời)
assistant_response_content = ""
if isinstance(chain_result, dict) and "answer" in chain_result:
assistant_response_content = str(chain_result["answer"])
elif isinstance(chain_result, str): # Một số chain có thể trả về string trực tiếp
assistant_response_content = chain_result
else:
logger.error(f"QA Chain result không hợp lệ (chat_id: {chat_id}): {chain_result}")
assistant_response_content = "Xin lỗi, tôi không thể xử lý yêu cầu này vào lúc này."
# Không raise lỗi ở đây ngay, mà trả về thông báo lỗi cho user và log lại.
if not assistant_response_content.strip():
assistant_response_content = "Tôi không tìm thấy câu trả lời phù hợp."
except Exception as chain_error:
logger.error(f"Lỗi QA Chain (chat_id: {chat_id}): {chain_error}", exc_info=True)
raise HTTPException(status_code=500, detail=f"Lỗi xử lý từ QA chain: {str(chain_error)[:100]}")
assistant_response_time = datetime.now(timezone.utc)
# --- 6. Lưu tin nhắn mới (câu hỏi của user và trả lời của AI) ---
# Lưu vào key "conversation_messages:{chat_id}" của chúng ta
await save_chat_to_redis(
app_state.redis, chat_id, question_content, assistant_response_content, current_utc_time, assistant_response_time
)
# Lưu vào MongoDB
# Chạy ngầm hoặc sau khi trả lời user để không làm chậm response (nếu có thể)
await save_chat_to_mongo(
mongo_db.conversations, chat_id, user.email, question_content, assistant_response_content, current_utc_time, assistant_response_time
)
end_time = time.time()
logger.info(f"Trả lời cho chat {chat_id} bởi user {user.email}: {assistant_response_content[:100]}...")
return AnswerResponse(
answer=assistant_response_content,
processing_time=round(end_time - start_time, 2)
)
async def stream_chat_generator(
app_state,
chat_id: str,
question_content: str,
user_email: str
):
"""
Generator function to stream chat responses.
Yields data in Server-Sent Events (SSE) format.
"""
start_time_total = time.time()
current_utc_time = datetime.now(timezone.utc)
full_answer_for_saving = "" # Để lưu toàn bộ câu trả lời vào DB
try:
# --- 1. Xác thực và kiểm tra metadata từ Redis (Tương tự ask_question_service) ---
meta_key = f"conversation_meta:{chat_id}"
if not await app_state.redis.exists(meta_key):
logger.warning(f"Stream: Metadata cho chat_id {chat_id} không tìm thấy.")
error_payload = {"error": "Chat ID not found or session expired. Please reload."}
yield f"event: error\ndata: {json.dumps(error_payload)}\n\n"
return
user_in_redis_bytes =await app_state.redis.hget(meta_key, "user_id")
if not user_in_redis_bytes:
logger.error(f"Stream: user_id không có trong metadata của chat {chat_id}.")
error_payload = {"error": "Chat metadata corrupted."}
yield f"event: error\ndata: {json.dumps(error_payload)}\n\n"
return
user_in_redis = user_in_redis_bytes
if user_in_redis != user_email:
logger.warning(f"Stream: User {user_email} không được phép truy cập chat {chat_id}.")
error_payload = {"error": "Unauthorized to access this chat."}
yield f"event: error\ndata: {json.dumps(error_payload)}\n\n"
return
# --- 2. Tiền xử lý câu hỏi (Tương tự) ---
cleaned_question = minimal_preprocess_for_llm(question_content)
initial_processing_done_time = time.time()
logger.info(f"Stream: Initial processing for {chat_id} took {initial_processing_done_time - start_time_total:.2f}s")
# --- 3. Kiểm tra từ điển thuật ngữ (nếu có, và nó nhanh) ---
if hasattr(app_state, 'dict') and app_state.dict:
term_result = search_term_in_dictionary(cleaned_question, app_state.dict)
if term_result:
answer_def = term_result.get("definition", "Không thể tìm thấy định nghĩa.")
assistant_response_time_dict = datetime.now(timezone.utc)
full_answer_for_saving = answer_def # Gán cho lưu trữ
# Stream toàn bộ định nghĩa như một chunk
data_payload = {"token": answer_def, "is_final": True, "source": "dictionary"}
yield f"data: {json.dumps(data_payload)}\n\n"
# Có thể gửi event kết thúc riêng
yield f"event: end_stream\ndata: {{}}\n\n" # Event kết thúc tùy chỉnh
# Lưu vào Redis và MongoDB (sau khi stream)
await save_chat_to_redis(
app_state.redis, chat_id, question_content, full_answer_for_saving, current_utc_time, assistant_response_time_dict
)
asyncio.create_task(save_chat_to_mongo( # Chạy nền
mongo_db.conversations, chat_id, user_email, question_content, full_answer_for_saving, current_utc_time, assistant_response_time_dict
))
processing_time_dict = round(time.time() - start_time_total, 2)
logger.info(f"Stream: Dictionary answer for {chat_id} sent in {processing_time_dict:.2f}s.")
return # Kết thúc generator ở đây
if not app_state.qa_chain: # qa_chain phải hỗ trợ streaming
logger.error("Stream: QA Chain chưa được khởi tạo hoặc không hỗ trợ streaming.")
error_payload = {"error": "Service Unavailable: QA Chain not ready for streaming."}
yield f"event: error\ndata: {json.dumps(error_payload)}\n\n"
return
# --- 4. Lấy lịch sử chat cho Langchain Chain (Tương tự) ---
try:
langchain_chat_history = await get_langchain_chat_history(app_state, chat_id)
input_data_for_chain = {
"chat_history": langchain_chat_history.messages,
"input": cleaned_question
}
except Exception as e:
logger.error(f"Stream: Lỗi khi chuẩn bị chat history (chat_id: {chat_id}): {e}", exc_info=True)
error_payload = {"error": "Error processing chat history."}
yield f"event: error\ndata: {json.dumps(error_payload)}\n\n"
return
# --- 5. Gọi QA Chain với streaming ---
if not (hasattr(app_state.qa_chain, 'astream') or hasattr(app_state.qa_chain, 'stream')):
logger.error(f"Stream: QA Chain (type: {type(app_state.qa_chain)}) không có phương thức astream hoặc stream.")
error_payload = {"error": "QA Chain does not support streaming."}
yield f"event: error\ndata: {json.dumps(error_payload)}\n\n"
return
chain_stream_method = app_state.qa_chain.astream if hasattr(app_state.qa_chain, 'astream') else app_state.qa_chain.stream
logger.info(f"Stream: Invoking chain stream for {chat_id}...")
stream_start_time = time.time()
chunk_count = 0
sources_streamed = False # Cờ để chỉ stream sources một lần
async for chunk in chain_stream_method(input_data_for_chain):
token = ""
current_sources = None
if isinstance(chunk, str):
token = chunk
elif hasattr(chunk, 'content'): # Giống AIMessageChunk
token = chunk.content
elif isinstance(chunk, dict):
token = chunk.get("answer") or chunk.get("token") or chunk.get("content") or ""
# Kiểm tra sources nếu chunk là dict và chưa stream sources
if not sources_streamed and "source" in chunk:
current_sources = chunk["source"]
if token:
full_answer_for_saving += token
data_payload = {"token": token, "is_final": False}
yield f"data: {json.dumps(data_payload)}\n\n"
chunk_count += 1
# Stream sources nếu có và chưa được stream
if current_sources and not sources_streamed:
sources_list = []
for doc in current_sources:
if hasattr(doc, 'metadata') and hasattr(doc, 'page_content'):
sources_list.append(SourceDocument(
source=doc.metadata.get('source', 'N/A'),
page_content_preview=doc.page_content[:200] + "..."
).dict()) # Chuyển sang dict để JSON serialize
if sources_list:
source_payload = {"sources": sources_list}
yield f"event: sources\ndata: {json.dumps(source_payload)}\n\n" # Event riêng cho sources
sources_streamed = True # Đánh dấu đã stream
stream_end_time = time.time()
logger.info(f"Stream: Chain streaming for {chat_id} completed in {stream_end_time - stream_start_time:.2f}s with {chunk_count} chunks.")
# --- Gửi event kết thúc stream ---
# Frontend có thể dùng event này để biết stream đã hoàn tất.
# Hoặc, frontend có thể dựa vào một chunk đặc biệt như `{"is_final": true}`
# Hoặc đơn giản là khi `EventSource.onmessage` không nhận được gì nữa sau một timeout.
yield f"event: end_stream\ndata: {{ \"message\": \"Stream ended\" }}\n\n"
# --- 6. Lưu tin nhắn hoàn chỉnh (sau khi stream xong) ---
assistant_response_time = datetime.now(timezone.utc)
if not full_answer_for_saving.strip() and chunk_count == 0: # Nếu không có token nào được stream
full_answer_for_saving = "Tôi không tìm thấy câu trả lời phù hợp."
# Stream câu trả lời mặc định này nếu chưa có gì
data_payload = {"token": full_answer_for_saving, "is_final": True}
yield f"data: {json.dumps(data_payload)}\n\n"
yield f"event: end_stream\ndata: {{ \"message\": \"Stream ended with default message\" }}\n\n"
logger.info(f"Stream: Full answer for {chat_id} to be saved: {full_answer_for_saving[:100]}...")
await save_chat_to_redis(
app_state.redis, chat_id, question_content, full_answer_for_saving, current_utc_time, assistant_response_time
)
# Chạy lưu MongoDB ngầm để không block
asyncio.create_task(save_chat_to_mongo(
mongo_db.conversations, chat_id, user_email, question_content, full_answer_for_saving, current_utc_time, assistant_response_time
))
# Cập nhật Langchain history (nếu chain memory không tự làm)
# await langchain_chat_history.aadd_user_message(question_for_chain)
# await langchain_chat_history.aadd_ai_message(full_answer_for_saving)
except HTTPException as e: # Bắt HTTPException đã được raise từ các hàm con
logger.error(f"Stream: HTTPException for chat_id {chat_id}: {e.detail}", exc_info=True)
error_payload = {"error": e.detail, "status_code": e.status_code}
yield f"event: error_stream\ndata: {json.dumps(error_payload)}\n\n"
except Exception as e:
logger.error(f"Stream: Unhandled exception for chat_id {chat_id}: {e}", exc_info=True)
error_payload = {"error": "An unexpected server error occurred during streaming."}
yield f"event: error_stream\ndata: {json.dumps(error_payload)}\n\n"
finally:
# Đảm bảo generator kết thúc đúng cách.
# EventSource trên client sẽ tự động đóng khi generator kết thúc.
# Hoặc bạn có thể gửi một tín hiệu đóng rõ ràng nếu cần.
# yield "event: close\ndata: Connection closed by server\n\n" # Không chuẩn SSE, nhưng một số client có thể hiểu
logger.info(f"Stream: Generator for chat_id {chat_id} finished. Total time: {time.time() - start_time_total:.2f}s")
# Sử dụng GET cho EventSource theo chuẩn, truyền params qua query string
# EventSource chỉ hỗ trợ GET. Nếu bạn BẮT BUỘC phải dùng POST (ví dụ, câu hỏi quá dài cho URL),
# bạn sẽ cần một giải pháp phức tạp hơn, không dùng EventSource trực tiếp trên client
# mà dùng fetch API với ReadableStream và POST.
#helper
from typing import List, Optional,Any
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
async def prepare_chat_history_optimized(
redis:Any,
chat_id: str,
max_messages: int = 10, # Số lượng cặp tin nhắn (user+AI) tối đa để lấy
max_tokens: Optional[int] = None, # (Tùy chọn nâng cao) Giới hạn token
tokenizer: Optional[Any] = None # (Tùy chọn nâng cao) Tokenizer để đếm token
) -> List[BaseMessage]:
"""
CẢI TIẾN: Lấy N tin nhắn gần nhất từ Redis để làm lịch sử chat.
- Hiệu quả hơn bằng cách chỉ lấy một phần lịch sử.
- An toàn hơn bằng cách kiểm soát độ dài ngữ cảnh.
Args:
redis: Client Redis bất đồng bộ.
chat_id: ID của cuộc trò chuyện.
max_messages: Số lượng tin nhắn tối đa để lấy từ cuối (ví dụ: 10 tin nhắn gần nhất).
max_tokens: (Nâng cao) Giới hạn tổng số token của lịch sử.
tokenizer: (Nâng cao) Tokenizer để sử dụng với max_tokens.
Returns:
Một danh sách các đối tượng tin nhắn của LangChain (HumanMessage, AIMessage).
"""
messages_key = f"conversation_messages:{chat_id}"
# 1. Lấy N tin nhắn gần nhất từ Redis
# lrange(key, -N, -1) sẽ lấy N phần tử cuối cùng của list.
# Lấy nhiều hơn một chút để đảm bảo có cặp user/ai hoàn chỉnh.
num_to_fetch = max_messages + 2
try:
# Sử dụng lrange để lấy các tin nhắn gần nhất, hiệu quả hơn nhiều so với lấy tất cả
raw_messages_json = await redis.lrange(messages_key, -num_to_fetch, -1)
if not raw_messages_json:
return []
except Exception as e:
logger.error(f"Lỗi khi đọc lịch sử chat từ Redis cho chat_id {chat_id}: {e}")
return []
# 2. Xây dựng danh sách tin nhắn cho LangChain
langchain_messages: List[BaseMessage] = []
total_tokens = 0
# Lặp ngược từ cuối (tin nhắn mới nhất) để xử lý
for msg_json_str in reversed(raw_messages_json):
try:
msg_data = json.loads(msg_json_str)
content = msg_data.get("content", "")
# (Tùy chọn nâng cao) Kiểm tra giới hạn token
if max_tokens and tokenizer:
num_tokens = len(tokenizer.encode(content))
if total_tokens + num_tokens > max_tokens:
logger.warning(f"Đã đạt giới hạn token ({max_tokens}) cho lịch sử chat. Dừng lại.")
break # Dừng thêm tin nhắn
total_tokens += num_tokens
# Tạo đối tượng tin nhắn phù hợp
if msg_data.get("role") == "user":
langchain_messages.append(HumanMessage(content=content))
elif msg_data.get("role") == "assistant":
langchain_messages.append(AIMessage(content=content))
except (json.JSONDecodeError, KeyError) as e:
logger.warning(f"Lỗi khi parse tin nhắn từ Redis: {e}. Bỏ qua tin nhắn này.")
continue
# 3. Đảo ngược lại danh sách để có đúng thứ tự (cũ -> mới)
langchain_messages.reverse()
# Cắt lại theo max_messages cuối cùng để đảm bảo số lượng chính xác
return langchain_messages[-max_messages:]
def format_chat_history_for_prompt(chat_history: List[BaseMessage]) -> str:
"""
Chuyển đổi danh sách đối tượng tin nhắn thành một chuỗi văn bản duy nhất.
"""
if not chat_history:
return "Không có lịch sử trò chuyện."
formatted_history = []
for message in chat_history:
role = "Người dùng" if isinstance(message, HumanMessage) else "Trợ lý"
formatted_history.append(f"{role}: {message.content}")
return "\n".join(formatted_history)