entidi2608's picture
update: check file uploaded
d3e9dc7
# utils.py
import os
import logging
import regex as re
import json
from typing import List, Optional
from schemas.chat import Message
from redis.asyncio import Redis
import bcrypt
from datetime import datetime, timedelta
from jose import jwt
from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
from typing import List, Dict, Optional
from unidecode import unidecode
from db.mongoDB import mongo_db
import secrets
from fastapi import HTTPException, status
from langchain_community.chat_message_histories import RedisChatMessageHistory
from redis.exceptions import RedisError
logger = logging.getLogger(__name__)
async def save_chat_to_redis(
r: Redis, # Hoặc redis.asyncio.Redis
chat_id: str,
user_question_content: str, # Nội dung câu hỏi gốc hoặc đã xử lý (tùy bạn)
assistant_answer_content: str,
user_question_timestamp: datetime, # Cung cấp timestamp
assistant_answer_timestamp: datetime # Cung cấp timestamp
) -> bool:
"""
Lưu tin nhắn mới của người dùng và trợ lý vào Redis với định dạng chuẩn hóa.
Cập nhật 'updated_at' và 'message_count' trong metadata.
"""
if not all([chat_id, user_question_content, assistant_answer_content]):
logger.error("chat_id, user_question_content, và assistant_answer_content không được rỗng.")
# raise ValueError("Đầu vào không hợp lệ.") # Hoặc trả về False
return False
messages_key = f"conversation_messages:{chat_id}"
meta_key = f"conversation_meta:{chat_id}"
try:
# Tạo Pydantic models cho tin nhắn
user_message = Message(role="user", content=user_question_content, timestamp=user_question_timestamp)
assistant_message = Message(role="assistant", content=assistant_answer_content, timestamp=assistant_answer_timestamp)
# Sử dụng pipeline cho các thao tác Redis
pipe = await r.pipeline()
await pipe.rpush(messages_key, user_message.model_dump_json()) # Pydantic V2
await pipe.rpush(messages_key, assistant_message.model_dump_json()) # Pydantic V2
# Hoặc .json() cho Pydantic V1
# Đặt TTL cho key messages nếu nó mới được tạo (hoặc luôn refresh TTL)
# Nếu bạn muốn TTL chỉ đặt một lần, bạn cần kiểm tra sự tồn tại của key trước
# hoặc kiểm tra llen trước khi push (phức tạp hơn với pipeline).
# Cách đơn giản là luôn đặt lại TTL.
await pipe.expire(messages_key, 86400) # 24 giờ
# Cập nhật metadata
await pipe.hset(meta_key, "updated_at", assistant_answer_timestamp.isoformat())
await pipe.hincrby(meta_key, "message_count", 2) # Tăng số lượng tin nhắn
await pipe.expire(meta_key, 86400) # Refresh TTL cho meta
await pipe.execute()
logger.info(f"Đã lưu 2 tin nhắn mới vào {messages_key} và cập nhật {meta_key}.")
return True
except RedisError as e:
logger.error(f"Lỗi Redis khi lưu tin nhắn cho chat_id {chat_id}: {e}", exc_info=True)
raise # Re-raise để service xử lý HTTPException
except Exception as e:
logger.error(f"Lỗi không mong muốn khi lưu vào Redis cho chat_id {chat_id}: {e}", exc_info=True)
# raise # Hoặc trả về False tùy theo cách bạn muốn xử lý
return False
async def get_redis_history(r: Redis, chat_id: str, max_messages: int = 100) -> List[Message]:
"""
Lấy lịch sử hội thoại từ Redis với định dạng chuẩn hóa.
Args:
r (Redis): Đối tượng Redis client.
chat_id (str): ID của hội thoại.
max_messages (int): Số tin nhắn tối đa trả về (mặc định 100).
Returns:
List[Message]: Danh sách tin nhắn (role, content, timestamp).
Raises:
ValueError: Nếu chat_id rỗng.
redis.RedisError: Nếu có lỗi khi tương tác với Redis.
"""
# Kiểm tra đầu vào
if not chat_id:
logger.error("chat_id không được rỗng")
raise ValueError("chat_id là bắt buộc")
messages_key = f"conversation_messages:{chat_id}"
try:
# Kiểm tra kết nối Redis
r.ping()
# Lấy tin nhắn (giới hạn max_messages từ cuối)
history_raw =await r.lrange(messages_key, -max_messages, -1)
chat_history = []
for item in history_raw:
try:
parsed = json.loads(item)
if not isinstance(parsed, dict):
logger.warning(f"Tin nhắn không phải dict trong {messages_key}: {item}")
continue
# Kiểm tra các trường bắt buộc
role = parsed.get("role")
content = parsed.get("content")
timestamp = parsed.get("timestamp")
if not all([role, content, timestamp]):
logger.warning(f"Tin nhắn thiếu trường trong {messages_key}: {parsed}")
continue
# Đảm bảo role hợp lệ
if role not in ["user", "assistant"]:
logger.warning(f"Role không hợp lệ trong {messages_key}: {role}")
continue
chat_history.append(Message(
role=role,
content=content,
timestamp=timestamp
))
except json.JSONDecodeError as e:
logger.error(f"Lỗi parse JSON trong {messages_key}: {item}, lỗi: {e}")
continue
except Exception as e:
logger.error(f"Lỗi xử lý tin nhắn trong {messages_key}: {item}, lỗi: {e}")
continue
logger.info(f"Lấy {len(chat_history)} tin nhắn từ {messages_key}")
return chat_history
except r.RedisError as e:
logger.error(f"Lỗi khi lấy lịch sử từ Redis cho chat_id {chat_id}: {e}")
raise
except Exception as e:
logger.error(f"Lỗi không mong muốn khi lấy lịch sử từ Redis cho chat_id {chat_id}: {e}")
return []
async def delete_chat_from_redis(r: Redis, chat_id: str) -> bool:
"""
Xóa dữ liệu hội thoại và metadata từ Redis.
Args:
r (Redis): Đối tượng Redis client.
chat_id (str): ID của hội thoại.
Returns:
bool: True nếu xóa thành công, False nếu thất bại.
Raises:
ValueError: Nếu chat_id rỗng.
redis.RedisError: Nếu có lỗi khi tương tác với Redis.
"""
# Kiểm tra đầu vào
if not chat_id:
logger.error("chat_id không được rỗng")
raise ValueError("chat_id là bắt buộc")
redis_key = f"conversation:{chat_id}"
meta_key = f"chat:{chat_id}:meta"
try:
# Kiểm tra kết nối Redis
await r.ping()
# Kiểm tra sự tồn tại của các key
keys_to_delete = []
if await r.exists(redis_key):
keys_to_delete.append(redis_key)
if await r.exists(meta_key):
keys_to_delete.append(meta_key)
if not keys_to_delete:
logger.info(f"Không tìm thấy dữ liệu cho chat_id {chat_id} trong Redis")
return True # Không có gì để xóa, coi như thành công
# Xóa các key
deleted_count = await r.delete(*keys_to_delete)
logger.info(f"Đã xóa {deleted_count} key cho chat_id {chat_id}: {keys_to_delete}")
return True
except r.RedisError as e:
logger.error(f"Lỗi khi xóa dữ liệu từ Redis cho chat_id {chat_id}: {e}")
raise
except Exception as e:
logger.error(f"Lỗi không mong muốn khi xóa dữ liệu từ Redis cho chat_id {chat_id}: {e}")
return False
def hash_password(password: str) -> str:
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def verify_password(password: str, hashed: str) -> bool:
return bcrypt.checkpw(password.encode('utf-8'), hashed.encode('utf-8'))
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
expire = datetime.now() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def create_refresh_token(email: str) -> str:
"""
Tạo và lưu refresh token vào cơ sở dữ liệu.
Args:
email (str): Địa chỉ email của người dùng.
Returns:
str: Refresh token được tạo.
Raises:
HTTPException: Nếu có lỗi khi lưu token.
"""
try:
# Generate refresh token
refresh_token = secrets.token_urlsafe(32)
expire = datetime.now() + timedelta(days=7)
# Store refresh token in database
result = await mongo_db.users.update_one(
{"email": email.lower()},
{
"$set": {
"refresh_token": refresh_token,
"refresh_token_expiry": expire,
"refresh_token_timestamp": datetime.now(),
"revoked": False
}
}
)
if result.modified_count != 1:
logger.error(f"Không thể lưu refresh token cho email: {email}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Lỗi khi lưu refresh token."
)
logger.info(f"Refresh token created for email: {email}")
return refresh_token
except Exception as e:
logger.error(f"Lỗi khi tạo refresh token: {str(e)}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Lỗi hệ thống khi tạo refresh token."
)
def load_legal_dictionary(path: str = 'legal_terms.json') -> list:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data['dictionary']
def is_definition_question(query: str) -> bool:
definition_keywords = ["là gì", "định nghĩa", "nghĩa là gì", "hiểu thế nào", "khái niệm"]
query_lower = query.lower()
return any(keyword in query_lower for keyword in definition_keywords)
def normalize_text_for_matching(text: str) -> str:
"""
Chuẩn hóa text cho việc so khớp: chữ thường, loại bỏ ký tự đặc biệt (chỉ giữ chữ và số),
loại bỏ dấu tiếng Việt, chuẩn hóa khoảng trắng.
"""
if not text or not isinstance(text, str):
return ""
text_no_diacritics = unidecode(text.lower()) # Chuyển không dấu và chữ thường
# Loại bỏ tất cả ký tự không phải là chữ cái hoặc số hoặc khoảng trắng
text_alphanumeric = re.sub(r'[^\w\s]', '', text_no_diacritics, flags=re.UNICODE)
return re.sub(r'\s+', ' ', text_alphanumeric).strip()
def search_term_in_dictionary(query: str, dictionary: List[Dict]) -> Optional[Dict]:
"""
Tìm kiếm thuật ngữ trong từ điển.
Chỉ tìm nếu là câu hỏi định nghĩa.
Cải thiện logic so khớp.
"""
if not is_definition_question(query):
logger.debug(f"'{query}' không phải câu hỏi định nghĩa, bỏ qua tìm từ điển.")
return None
if not dictionary:
logger.warning("Từ điển rỗng, không thể tìm kiếm.")
return None
# Cố gắng trích xuất thuật ngữ chính từ câu hỏi định nghĩa
# Ví dụ: "Khái niệm hợp đồng lao động là gì?" -> "hợp đồng lao động"
# Đây là một regex đơn giản, có thể cần tinh chỉnh
term_to_search_raw = query
match = re.match(r"^(.*?)\s+(là gì|định nghĩa|nghĩa là gì|hiểu thế nào|khái niệm)\??$", query.lower().strip(), re.IGNORECASE)
if match:
term_to_search_raw = match.group(1).strip()
logger.info(f"Trích xuất thuật ngữ từ câu hỏi định nghĩa: '{term_to_search_raw}'")
query_normalized_for_match = normalize_text_for_matching(term_to_search_raw)
if not query_normalized_for_match:
logger.debug("Thuật ngữ tìm kiếm rỗng sau khi chuẩn hóa.")
return None
logger.info(f"Tìm kiếm thuật ngữ đã chuẩn hóa (không dấu): '{query_normalized_for_match}'")
# Sắp xếp từ điển theo độ dài thuật ngữ giảm dần (để ưu tiên khớp cụm dài hơn)
# và chuẩn hóa thuật ngữ từ điển một lần
normalized_dictionary = []
for entry in dictionary:
term = entry.get("term")
if term and isinstance(term, str):
normalized_dictionary.append({
"original_entry": entry,
"normalized_term": normalize_text_for_matching(term)
})
# Sắp xếp theo độ dài thuật ngữ đã chuẩn hóa giảm dần
# Điều này giúp "an toàn lao động" được khớp trước "an toàn" hoặc "lao động"
# nếu query là "an toàn lao động là gì"
normalized_dictionary.sort(key=lambda x: len(x["normalized_term"]), reverse=True)
# Tìm kiếm khớp chính xác (sau khi chuẩn hóa cả query và term từ điển)
for item in normalized_dictionary:
if item["normalized_term"] == query_normalized_for_match:
logger.info(f"Tìm thấy khớp chính xác (sau chuẩn hóa): '{item['original_entry']['term']}'")
return item["original_entry"]
# Tìm kiếm "chứa" (thuật ngữ từ điển là một phần của query đã chuẩn hóa)
# Điều này hữu ích nếu query_normalized_for_match dài hơn thuật ngữ từ điển
# Ví dụ: query_normalized = "dinh nghia an toan lao dong", term_normalized = "an toan lao dong"
for item in normalized_dictionary:
if item["normalized_term"] and item["normalized_term"] in query_normalized_for_match:
logger.info(f"Tìm thấy khớp 'chứa' (từ điển trong query): '{item['original_entry']['term']}' (query norm: '{query_normalized_for_match}')")
return item["original_entry"]
logger.info(f"Không tìm thấy thuật ngữ '{query_normalized_for_match}' trong từ điển.")
return None
def minimal_preprocess_for_llm(text: str) -> str:
"""
Thực hiện tiền xử lý tối thiểu trước khi đưa vào LLM.
Chỉ chuẩn hóa khoảng trắng và chuyển thành chữ thường.
"""
if not text or not text.strip():
# Vẫn cần kiểm tra input rỗng
raise ValueError("Input không được rỗng")
# 1. Chuẩn hóa khoảng trắng
processed_text = re.sub(r'\s+', ' ', text).strip()
# 2. Chuyển thành chữ thường để nhất quán
processed_text = processed_text.lower()
return processed_text
async def save_chat_to_mongo(conversations_collection,chat_id: str, user_email: str,user_question_content: str, # Nội dung câu hỏi
assistant_answer_content: str, # Nội dung trả lời
user_question_timestamp: datetime,
assistant_answer_timestamp: datetime):
user_message = {
"role": "user",
"content": user_question_content,
"timestamp": user_question_timestamp
}
assistant_message = {
"role": "assistant",
"content": assistant_answer_content,
"timestamp": assistant_answer_timestamp
}
conversation = conversations_collection.find_one({"conversation_id": chat_id})
if not conversation:
conversation = {
"user_id": user_email,
"conversation_id": chat_id,
"messages": [user_message, assistant_message],
"created_at": datetime.now(),
"updated_at": datetime.now()
}
conversations_collection.insert_one(conversation)
else:
conversations_collection.update_one(
{"conversation_id": chat_id},
{
"$push": {"messages": {"$each": [user_message, assistant_message]}},
"$set": {"updated_at": datetime.now()}
}
)
async def get_langchain_chat_history(app_state, chat_id: str) -> RedisChatMessageHistory:
"""
Retrieves and synchronizes chat history for Langchain.
"""
redis_url = os.environ.get("REDIS_URL_LANGCHAIN", os.environ.get("REDIS_URL"))
if not redis_url:
raise ValueError("Redis URL for chat history is required.")
# Đây là history mà Langchain sẽ sử dụng để đọc/ghi
langchain_chat_history = RedisChatMessageHistory( # Hoặc RedisChatMessageHistoryAsync
url=redis_url,
session_id=chat_id,
ttl=86400, # 1 day
)
# Đồng bộ hóa: Lấy từ key "source of truth" của chúng ta và nạp vào key của Langchain
messages_key = f"conversation_messages:{chat_id}"
# Sử dụng await nếu redis client của app_state là async
raw_messages_from_our_redis = app_state.redis.lrange(messages_key, 0, -1)
# Xóa history cũ trong key của Langchain để tránh trùng lặp khi đồng bộ
# Nếu dùng RedisChatMessageHistoryAsync: await langchain_chat_history.aclear()
langchain_chat_history.clear() # Cho bản đồng bộ
for msg_json_bytes in raw_messages_from_our_redis:
msg_data = json.loads(msg_json_bytes)
message = Message(**msg_data) # Validate
if message.role == "user":
# Nếu dùng RedisChatMessageHistoryAsync: await langchain_chat_history.aadd_user_message(message.content)
langchain_chat_history.add_user_message(message.content)
elif message.role == "assistant":
# await langchain_chat_history.aadd_ai_message(message.content)
langchain_chat_history.add_ai_message(message.content)
return langchain_chat_history
# api/utils.py
import hashlib
logger = logging.getLogger(__name__)
def calculate_file_hash(filepath: str) -> str:
sha256_hash = hashlib.sha256()
with open(filepath, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
# def check_if_hash_exists(file_hash: str) -> bool:
# if not os.path.exists(config.PROCESSED_HASH_LOG):
# return False
# try:
# with open(config.PROCESSED_HASH_LOG, "r") as f:
# processed_hashes = {line.strip() for line in f}
# return file_hash in processed_hashes
# except IOError as e:
# logger.error(f"Could not read hash log file: {e}")
# return False