entidi2608 commited on
Commit
3e7b272
·
1 Parent(s): 0bf745a

new update

Browse files
db/mongoDB.py CHANGED
@@ -1,31 +1,83 @@
1
- from pymongo import MongoClient, errors
2
- import config
3
  import logging
 
 
 
 
 
4
  logger = logging.getLogger(__name__)
5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
 
7
- try:
8
- # Kết nối MongoDB
9
- client = MongoClient(config.MONGODB_CLOUD_URI, serverSelectionTimeoutMS=30000)
 
 
10
 
11
- # Trigger kết nối thử
12
- client.server_info() # Gây lỗi nếu không kết nối được
 
 
 
 
 
13
 
14
- db = client[config.DB_NAME]
15
- user_collection = db["users"]
16
- blacklist_collection = db["token_blacklist"]
17
- conversations_collection = db["conversations"]
18
 
19
- # ⚠️ Tạo TTL index để MongoDB tự động xoá token khi tới hạn
20
- # Chỉ tạo index nếu chưa tồn tại
21
- if "expires_at_1" not in blacklist_collection.index_information():
22
- blacklist_collection.create_index("expires_at", expireAfterSeconds=0)
23
- logger.info("🔸Đã tạo TTL index cho 'expires_at' trong 'token_blacklist'.")
 
 
 
 
24
 
25
- logger.info("🔸Đã kết nối tới MongoDB Cloud thành công!")
26
 
27
- except errors.ServerSelectionTimeoutError as e:
28
- logger.error("🔸Không thể kết nối tới MongoDB Cloud:")
29
- logger.error(f"🔸Error:{e}")
30
- user_collection = None
31
- blacklist_collection = None
 
1
+ # db/mongoDB.py
2
+
3
  import logging
4
+ from typing import Optional
5
+ from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection
6
+ from pymongo.errors import ConnectionFailure, ConfigurationError
7
+ import config # Import từ file config tập trung
8
+
9
  logger = logging.getLogger(__name__)
10
 
11
+ class MongoDatabase:
12
+ """
13
+ Một lớp singleton để quản lý kết nối và các collection của MongoDB.
14
+ Điều này đảm bảo chúng ta chỉ có một kết nối duy nhất trong toàn bộ ứng dụng.
15
+ """
16
+ client: Optional[AsyncIOMotorClient] = None
17
+ db: Optional[AsyncIOMotorDatabase] = None
18
+
19
+ # Khai báo các collection bạn sẽ sử dụng
20
+ users: Optional[AsyncIOMotorCollection] = None
21
+ token_blacklist: Optional[AsyncIOMotorCollection] = None
22
+ conversations: Optional[AsyncIOMotorCollection] = None
23
+
24
+ # Tạo một instance duy nhất để import và sử dụng trong toàn bộ ứng dụng
25
+ mongo_db = MongoDatabase()
26
+
27
+ async def connect_to_mongo():
28
+ """
29
+ Hàm khởi tạo kết nối đến MongoDB Atlas và gán vào object mongo_db.
30
+ Hàm này sẽ được gọi từ lifespan của FastAPI.
31
+ """
32
+ if mongo_db.client:
33
+ logger.info("MongoDB connection already established.")
34
+ return
35
+
36
+ logger.info(f"🔸 Connecting to MongoDB Atlas...")
37
+
38
+ if not config.MONGO_URI or not config.MONGO_DB_NAME:
39
+ logger.error("❌ MONGO_URI hoặc MONGO_DB_NAME chưa được thiết lập trong biến môi trường.")
40
+ raise ConfigurationError("MONGO_URI and MONGO_DB_NAME must be set.")
41
+
42
+ try:
43
+ # 1. Khởi tạo client bất đồng bộ
44
+ mongo_db.client = AsyncIOMotorClient(
45
+ config.MONGO_URI,
46
+ serverSelectionTimeoutMS=5000 # Thời gian chờ kết nối là 5 giây
47
+ )
48
+
49
+ # 2. Kiểm tra kết nối một cách rõ ràng
50
+ await mongo_db.client.admin.command('ping')
51
 
52
+ # 3. Gán các đối tượng database và collection
53
+ mongo_db.db = mongo_db.client[config.MONGO_DB_NAME]
54
+ mongo_db.user = mongo_db.db["users"]
55
+ mongo_db.token_blacklist = mongo_db.db["token_blacklist"]
56
+ mongo_db.conversations = mongo_db.db["conversations"]
57
 
58
+ # 4. Tạo TTL index một cách an toàn
59
+ # Lấy danh sách index hiện
60
+ index_info = await mongo_db.token_blacklist.index_information()
61
+ if "expires_at_1" not in index_info:
62
+ # Chỉ tạo index nếu nó chưa tồn tại
63
+ await mongo_db.token_blacklist.create_index("expires_at", expireAfterSeconds=0)
64
+ logger.info("🔸 Successfully created TTL index for 'expires_at' in 'token_blacklist'.")
65
 
66
+ logger.info("✅ MongoDB connection successful and collections are ready.")
 
 
 
67
 
68
+ except ConnectionFailure as e:
69
+ logger.error(f"❌ Failed to connect to MongoDB: Connection Failure. Check your URI and network access rules in Atlas. Error: {e}", exc_info=True)
70
+ raise e
71
+ except ConfigurationError as e:
72
+ logger.error(f" Failed to connect to MongoDB: Configuration Error. Check your connection string format. Error: {e}", exc_info=True)
73
+ raise e
74
+ except Exception as e:
75
+ logger.error(f"❌ An unexpected error occurred while connecting to MongoDB: {e}", exc_info=True)
76
+ raise e
77
 
 
78
 
79
+ async def close_mongo_connection():
80
+ """Hàm đóng kết nối MongoDB khi ứng dụng tắt."""
81
+ if mongo_db.client:
82
+ mongo_db.client.close()
83
+ logger.info("✅ MongoDB connection closed.")
dependencies.py CHANGED
@@ -3,7 +3,8 @@ from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
3
  from jose import jwt, JWTError, ExpiredSignatureError
4
  import os
5
  from dotenv import load_dotenv
6
- from db.mongoDB import user_collection, blacklist_collection
 
7
  import torch
8
  import rag_components
9
  from schemas.chat import AppState
@@ -69,7 +70,7 @@ async def initialize_api_components(app_state: AppState):
69
  app_state.dict = load_legal_dictionary(config.LEGAL_DIC_FOLDER+ "/legal_terms.json")
70
  app_state.weaviateDB = connect_to_weaviate(run_diagnostics=False)
71
  # --- Kiểm tra kết nối tới MongoDB ---
72
- if user_collection is None or app_state.weaviateDB is None:
73
  logger.error("🔸Lỗi kết nối tới MongoDB hoặc Weaviate.")
74
  raise HTTPException(status_code=500, detail="Lỗi kết nối tới database.")
75
 
@@ -188,7 +189,7 @@ async def get_current_user(
188
  # 1. Kiểm tra token trong blacklist
189
  try:
190
  logger.info("GET_CURRENT_USER: Đang kiểm tra blacklist...")
191
- is_blacklisted = blacklist_collection.find_one({"token": token_to_verify})
192
  if is_blacklisted:
193
  logger.error(f"GET_CURRENT_USER: *** TOKEN TRONG BLACKLIST - RAISING 401 ***")
194
  raise HTTPException( # Sử dụng credentials_exception hoặc một cái cụ thể hơn
@@ -265,7 +266,7 @@ async def get_current_user(
265
  user_data: Optional[dict] = None # Khởi tạo để tránh UnboundLocalError
266
  try:
267
  logger.info(f"GET_CURRENT_USER: Đang tìm user trong DB: {email.lower()}") # email đã được validate là str
268
- user_data = user_collection.find_one({"email": email.lower()}, {"password": 0, "_id": 0})
269
  # print(user_data) # Bỏ print trong production
270
 
271
  if user_data is None:
 
3
  from jose import jwt, JWTError, ExpiredSignatureError
4
  import os
5
  from dotenv import load_dotenv
6
+ # from db.mongoDB import user_collection, blacklist_collection
7
+ from db.mongoDB import mongo_db
8
  import torch
9
  import rag_components
10
  from schemas.chat import AppState
 
70
  app_state.dict = load_legal_dictionary(config.LEGAL_DIC_FOLDER+ "/legal_terms.json")
71
  app_state.weaviateDB = connect_to_weaviate(run_diagnostics=False)
72
  # --- Kiểm tra kết nối tới MongoDB ---
73
+ if mongo_db.users is None or app_state.weaviateDB is None:
74
  logger.error("🔸Lỗi kết nối tới MongoDB hoặc Weaviate.")
75
  raise HTTPException(status_code=500, detail="Lỗi kết nối tới database.")
76
 
 
189
  # 1. Kiểm tra token trong blacklist
190
  try:
191
  logger.info("GET_CURRENT_USER: Đang kiểm tra blacklist...")
192
+ is_blacklisted = await mongo_db.token_blacklist.find_one({"token": token_to_verify})
193
  if is_blacklisted:
194
  logger.error(f"GET_CURRENT_USER: *** TOKEN TRONG BLACKLIST - RAISING 401 ***")
195
  raise HTTPException( # Sử dụng credentials_exception hoặc một cái cụ thể hơn
 
266
  user_data: Optional[dict] = None # Khởi tạo để tránh UnboundLocalError
267
  try:
268
  logger.info(f"GET_CURRENT_USER: Đang tìm user trong DB: {email.lower()}") # email đã được validate là str
269
+ user_data =await mongo_db.users.find_one({"email": email.lower()}, {"password": 0, "_id": 0})
270
  # print(user_data) # Bỏ print trong production
271
 
272
  if user_data is None:
main.py CHANGED
@@ -15,6 +15,7 @@ import os
15
  import traceback
16
  from core.logging_config import setup_logging
17
  setup_logging()
 
18
 
19
  logger = logging.getLogger(__name__)
20
 
@@ -22,6 +23,10 @@ logger = logging.getLogger(__name__)
22
  @asynccontextmanager
23
  async def lifespan(app: FastAPI):
24
  logger.info("✅ [Lifespan] STARTING UP...")
 
 
 
 
25
  current_app_state_instance = AppState()
26
  initialization_successful = False
27
  try:
@@ -32,6 +37,7 @@ async def lifespan(app: FastAPI):
32
  logger.info("✅ [Lifespan] SUCCESSFULLY set app.state.app_state.")
33
  yield
34
  logger.info("✅ [Lifespan] SHUTTING DOWN (after yield)...")
 
35
  except Exception as e:
36
  logger.error(f"❌ [Lifespan] FATAL ERROR DURING STARTUP: {type(e).__name__} - {e}")
37
  logger.error(traceback.format_exc())
 
15
  import traceback
16
  from core.logging_config import setup_logging
17
  setup_logging()
18
+ from db.mongoDB import connect_to_mongo, close_mongo_connection
19
 
20
  logger = logging.getLogger(__name__)
21
 
 
23
  @asynccontextmanager
24
  async def lifespan(app: FastAPI):
25
  logger.info("✅ [Lifespan] STARTING UP...")
26
+
27
+ await connect_to_mongo()
28
+
29
+
30
  current_app_state_instance = AppState()
31
  initialization_successful = False
32
  try:
 
37
  logger.info("✅ [Lifespan] SUCCESSFULLY set app.state.app_state.")
38
  yield
39
  logger.info("✅ [Lifespan] SHUTTING DOWN (after yield)...")
40
+ await close_mongo_connection()
41
  except Exception as e:
42
  logger.error(f"❌ [Lifespan] FATAL ERROR DURING STARTUP: {type(e).__name__} - {e}")
43
  logger.error(traceback.format_exc())
requirements.txt CHANGED
@@ -37,4 +37,5 @@ itsdangerous
37
  Authlib[fastapi]
38
  httpx
39
 
40
- gunicorn
 
 
37
  Authlib[fastapi]
38
  httpx
39
 
40
+ gunicorn
41
+ motor
routers/chat.py CHANGED
@@ -9,7 +9,7 @@ import logging
9
  import uuid
10
  from redis.asyncio import Redis
11
  from datetime import datetime, timezone
12
- from db.mongoDB import conversations_collection
13
  from fastapi.responses import StreamingResponse
14
  from schemas.chat import Message,ConversationResponse
15
  from typing import List
@@ -135,7 +135,7 @@ async def delete_chat(chat_id: str, request: Request, user: UserOut = Depends(ge
135
  # Xóa chat
136
  delete_chat_from_redis(redis, chat_id)
137
  # Xóa hội thoại trong MongoDB
138
- result = conversations_collection.delete_one({"conversation_id": chat_id, "user_id": user.email})
139
  if result.deleted_count == 0:
140
  raise HTTPException(status_code=404, detail="Chat not found in MongoDB")
141
 
@@ -147,7 +147,7 @@ async def delete_chat(chat_id: str, request: Request, user: UserOut = Depends(ge
147
  async def get_conversations(user: UserOut = Depends(get_current_user)):
148
  try:
149
  logger.info(f"Attempting to get conversations for user: {user.email}")
150
- db_conversations_cursor = conversations_collection.find({"user_id": user.email})
151
 
152
  response_list = []
153
 
@@ -182,7 +182,7 @@ async def load_conversation_and_sync_redis(
182
 
183
  # 1. Kiểm tra hội thoại trong MongoDB
184
  try:
185
- conversation_doc = conversations_collection.find_one(
186
  {"conversation_id": chat_id, "user_id": current_user.email}
187
  )
188
  if not conversation_doc:
 
9
  import uuid
10
  from redis.asyncio import Redis
11
  from datetime import datetime, timezone
12
+ from db.mongoDB import mongo_db
13
  from fastapi.responses import StreamingResponse
14
  from schemas.chat import Message,ConversationResponse
15
  from typing import List
 
135
  # Xóa chat
136
  delete_chat_from_redis(redis, chat_id)
137
  # Xóa hội thoại trong MongoDB
138
+ result = await mongo_db.conversations.delete_one({"conversation_id": chat_id, "user_id": user.email})
139
  if result.deleted_count == 0:
140
  raise HTTPException(status_code=404, detail="Chat not found in MongoDB")
141
 
 
147
  async def get_conversations(user: UserOut = Depends(get_current_user)):
148
  try:
149
  logger.info(f"Attempting to get conversations for user: {user.email}")
150
+ db_conversations_cursor = await mongo_db.conversations.find({"user_id": user.email})
151
 
152
  response_list = []
153
 
 
182
 
183
  # 1. Kiểm tra hội thoại trong MongoDB
184
  try:
185
+ conversation_doc =await mongo_db.conversations.find_one(
186
  {"conversation_id": chat_id, "user_id": current_user.email}
187
  )
188
  if not conversation_doc:
routers/user.py CHANGED
@@ -12,7 +12,7 @@ from starlette.config import Config
12
  from authlib.integrations.starlette_client import OAuth
13
  import os
14
  import config
15
- from db.mongoDB import user_collection
16
  import uuid
17
  from passlib.context import CryptContext
18
  from datetime import datetime, timedelta, timezone
@@ -304,7 +304,7 @@ async def auth_google_callback(request: Request):
304
 
305
 
306
  # Kiểm tra xem user đã tồn tại trong DB chưa
307
- db_user = user_collection.find_one({"email": user_email})
308
 
309
  if not db_user:
310
  placeholder_password = f"google-oauth2|{uuid.uuid4()}"
@@ -314,7 +314,7 @@ async def auth_google_callback(request: Request):
314
 
315
 
316
  # Tạo user mới với thông tin từ Google
317
- user_collection.insert_one({
318
  "email": user_email,
319
  "username": username,
320
  "password": hashed_password, # Mật khẩu tạm thời, sẽ không dùng đến
@@ -324,7 +324,7 @@ async def auth_google_callback(request: Request):
324
  })
325
 
326
  # Lấy lại user vừa tạo để đảm bảo có _id và các trường khác
327
- db_user = user_collection.find_one({"email": user_email})
328
  if not db_user: # Kiểm tra lại sau khi insert
329
  raise HTTPException(status_code=500, detail="Could not create and retrieve new user account.")
330
 
@@ -362,7 +362,7 @@ async def exchange_google_code_for_token(request: Request,response: Response, co
362
  redis_client.delete(redis_key) # Dùng một lần
363
  user_email = user_email_bytes.decode()
364
 
365
- user_collection.update_one({
366
  "email": user_email
367
  }, {
368
  "$set": {
@@ -408,7 +408,7 @@ async def exchange_google_code_for_token(request: Request,response: Response, co
408
  path="/api/user/refresh-token",
409
  )
410
 
411
- user_info = user_collection.find_one({"email": user_email})
412
  user = {
413
  "email": user_info.get("email"),
414
  "username": user_info.get("username"),
 
12
  from authlib.integrations.starlette_client import OAuth
13
  import os
14
  import config
15
+ from db.mongoDB import mongo_db
16
  import uuid
17
  from passlib.context import CryptContext
18
  from datetime import datetime, timedelta, timezone
 
304
 
305
 
306
  # Kiểm tra xem user đã tồn tại trong DB chưa
307
+ db_user = await mongo_db.users.find_one({"email": user_email})
308
 
309
  if not db_user:
310
  placeholder_password = f"google-oauth2|{uuid.uuid4()}"
 
314
 
315
 
316
  # Tạo user mới với thông tin từ Google
317
+ await mongo_db.users.insert_one({
318
  "email": user_email,
319
  "username": username,
320
  "password": hashed_password, # Mật khẩu tạm thời, sẽ không dùng đến
 
324
  })
325
 
326
  # Lấy lại user vừa tạo để đảm bảo có _id và các trường khác
327
+ db_user =await mongo_db.users.find_one({"email": user_email})
328
  if not db_user: # Kiểm tra lại sau khi insert
329
  raise HTTPException(status_code=500, detail="Could not create and retrieve new user account.")
330
 
 
362
  redis_client.delete(redis_key) # Dùng một lần
363
  user_email = user_email_bytes.decode()
364
 
365
+ await mongo_db.users.update_one({
366
  "email": user_email
367
  }, {
368
  "$set": {
 
408
  path="/api/user/refresh-token",
409
  )
410
 
411
+ user_info =await mongo_db.users.find_one({"email": user_email})
412
  user = {
413
  "email": user_info.get("email"),
414
  "username": user_info.get("username"),
services/auth_service.py CHANGED
@@ -3,7 +3,7 @@ from jose import JWTError, jwt
3
  from datetime import datetime, timedelta, timezone
4
  from fastapi import HTTPException, Depends, Request, status
5
  from schemas.user import RegisterRequest, LoginRequest
6
- from db.mongoDB import user_collection, blacklist_collection
7
  from utils.utils import verify_password
8
  from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
9
  from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
@@ -31,7 +31,7 @@ async def register_user(user: RegisterRequest):
31
 
32
  try:
33
  # Kiểm tra username đã tồn tại chưa
34
- existing_user = user_collection.find_one({"email": user.email})
35
  if existing_user:
36
  raise HTTPException(status_code=400, detail="Tài khoản đã tồn tại.")
37
 
@@ -40,7 +40,7 @@ async def register_user(user: RegisterRequest):
40
 
41
  avatar = await get_random_unsplash_image()
42
  # Lưu user mới
43
- user_collection.insert_one({
44
  "username": user.username,
45
  "password": hashed_password,
46
  "email": user.email,
@@ -53,8 +53,8 @@ async def register_user(user: RegisterRequest):
53
  except HTTPException as e:
54
  raise HTTPException(status_code=e.status_code, detail=e.detail)
55
  # Hàm xác thực đăng nhập
56
- def authenticate_user(request: LoginRequest):
57
- user = user_collection.find_one({"email": request.email})
58
  if not user:
59
  raise HTTPException(status_code=401, detail="Sai tài khoản hoặc mật khẩu")
60
 
@@ -64,8 +64,8 @@ def authenticate_user(request: LoginRequest):
64
  return user
65
 
66
  # Hàm tạo và trả về JWT token sau khi đăng nhập thành công
67
- def login_user(request: LoginRequest):
68
- user = authenticate_user(request)
69
 
70
  accessToken = create_access_token(data={"sub": request.email})
71
 
@@ -101,7 +101,7 @@ async def logout_user(req: Request, credentials: HTTPAuthorizationCredentials =
101
  raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Access token không hợp lệ")
102
 
103
  # Blacklist access token
104
- await blacklist_collection.insert_one({
105
  "token": token,
106
  "expires_at": datetime.fromtimestamp(exp, tz=timezone.utc)
107
  })
@@ -110,9 +110,9 @@ async def logout_user(req: Request, credentials: HTTPAuthorizationCredentials =
110
  refresh_token = req.cookies.get("refresh_token")
111
  if refresh_token:
112
  # Invalidate refresh token in database
113
- user = await user_collection.find_one({"refresh_token": refresh_token})
114
  if user:
115
- await user_collection.update_one(
116
  {"_id": user["_id"]},
117
  {"$set": {"refresh_token": None, "refresh_token_expiry": None, "revoked": True}}
118
  )
 
3
  from datetime import datetime, timedelta, timezone
4
  from fastapi import HTTPException, Depends, Request, status
5
  from schemas.user import RegisterRequest, LoginRequest
6
+ from db.mongoDB import mongo_db
7
  from utils.utils import verify_password
8
  from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
9
  from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
 
31
 
32
  try:
33
  # Kiểm tra username đã tồn tại chưa
34
+ existing_user =await mongo_db.users.find_one({"email": user.email})
35
  if existing_user:
36
  raise HTTPException(status_code=400, detail="Tài khoản đã tồn tại.")
37
 
 
40
 
41
  avatar = await get_random_unsplash_image()
42
  # Lưu user mới
43
+ await mongo_db.users.insert_one({
44
  "username": user.username,
45
  "password": hashed_password,
46
  "email": user.email,
 
53
  except HTTPException as e:
54
  raise HTTPException(status_code=e.status_code, detail=e.detail)
55
  # Hàm xác thực đăng nhập
56
+ async def authenticate_user(request: LoginRequest):
57
+ user =await mongo_db.users.find_one({"email": request.email})
58
  if not user:
59
  raise HTTPException(status_code=401, detail="Sai tài khoản hoặc mật khẩu")
60
 
 
64
  return user
65
 
66
  # Hàm tạo và trả về JWT token sau khi đăng nhập thành công
67
+ async def login_user(request: LoginRequest):
68
+ user = await authenticate_user(request)
69
 
70
  accessToken = create_access_token(data={"sub": request.email})
71
 
 
101
  raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Access token không hợp lệ")
102
 
103
  # Blacklist access token
104
+ await mongo_db.token_blacklist.insert_one({
105
  "token": token,
106
  "expires_at": datetime.fromtimestamp(exp, tz=timezone.utc)
107
  })
 
110
  refresh_token = req.cookies.get("refresh_token")
111
  if refresh_token:
112
  # Invalidate refresh token in database
113
+ user = await mongo_db.users.find_one({"refresh_token": refresh_token})
114
  if user:
115
+ await mongo_db.users.update_one(
116
  {"_id": user["_id"]},
117
  {"$set": {"refresh_token": None, "refresh_token_expiry": None, "revoked": True}}
118
  )
services/chat_service.py CHANGED
@@ -7,7 +7,7 @@ import json
7
  from utils.utils import save_chat_to_redis, search_term_in_dictionary, minimal_preprocess_for_llm, save_chat_to_mongo, get_langchain_chat_history
8
  import os
9
  import logging
10
- from db.mongoDB import conversations_collection
11
  from datetime import datetime, timezone
12
  import asyncio
13
 
@@ -51,7 +51,7 @@ async def ask_question_service(app_state, request: QueryRequest, user: UserOut =
51
  app_state.redis, chat_id, question_content, answer_def, current_utc_time, assistant_response_time
52
  )
53
  await save_chat_to_mongo(
54
- conversations_collection, chat_id, user.email, question_content, answer_def, current_utc_time, assistant_response_time
55
  )
56
  friendly_answer = f"Xin chào! Về câu hỏi '{question_content}' của bạn, tôi đã tìm thấy thông tin sau:\n\n{answer_def}\n\nHy vọng thông tin này hữu ích cho bạn. Bạn có muốn tìm hiểu thêm về chủ đề này hoặc có câu hỏi nào khác không? 😊"
57
  return AnswerResponse(
@@ -146,7 +146,7 @@ async def ask_question_service(app_state, request: QueryRequest, user: UserOut =
146
  # Lưu vào MongoDB
147
  # Chạy ngầm hoặc sau khi trả lời user để không làm chậm response (nếu có thể)
148
  await save_chat_to_mongo(
149
- conversations_collection, chat_id, user.email, question_content, assistant_response_content, current_utc_time, assistant_response_time
150
  )
151
 
152
  end_time = time.time()
@@ -219,7 +219,7 @@ async def stream_chat_generator(
219
  app_state.redis, chat_id, question_content, full_answer_for_saving, current_utc_time, assistant_response_time_dict
220
  )
221
  asyncio.create_task(save_chat_to_mongo( # Chạy nền
222
- conversations_collection, chat_id, user_email, question_content, full_answer_for_saving, current_utc_time, assistant_response_time_dict
223
  ))
224
  processing_time_dict = round(time.time() - start_time_total, 2)
225
  logger.info(f"Stream: Dictionary answer for {chat_id} sent in {processing_time_dict:.2f}s.")
@@ -321,7 +321,7 @@ async def stream_chat_generator(
321
  )
322
  # Chạy lưu MongoDB ngầm để không block
323
  asyncio.create_task(save_chat_to_mongo(
324
- conversations_collection, chat_id, user_email, question_content, full_answer_for_saving, current_utc_time, assistant_response_time
325
  ))
326
 
327
  # Cập nhật Langchain history (nếu chain memory không tự làm)
 
7
  from utils.utils import save_chat_to_redis, search_term_in_dictionary, minimal_preprocess_for_llm, save_chat_to_mongo, get_langchain_chat_history
8
  import os
9
  import logging
10
+ from db.mongoDB import mongo_db
11
  from datetime import datetime, timezone
12
  import asyncio
13
 
 
51
  app_state.redis, chat_id, question_content, answer_def, current_utc_time, assistant_response_time
52
  )
53
  await save_chat_to_mongo(
54
+ mongo_db.conversations, chat_id, user.email, question_content, answer_def, current_utc_time, assistant_response_time
55
  )
56
  friendly_answer = f"Xin chào! Về câu hỏi '{question_content}' của bạn, tôi đã tìm thấy thông tin sau:\n\n{answer_def}\n\nHy vọng thông tin này hữu ích cho bạn. Bạn có muốn tìm hiểu thêm về chủ đề này hoặc có câu hỏi nào khác không? 😊"
57
  return AnswerResponse(
 
146
  # Lưu vào MongoDB
147
  # Chạy ngầm hoặc sau khi trả lời user để không làm chậm response (nếu có thể)
148
  await save_chat_to_mongo(
149
+ mongo_db.conversations, chat_id, user.email, question_content, assistant_response_content, current_utc_time, assistant_response_time
150
  )
151
 
152
  end_time = time.time()
 
219
  app_state.redis, chat_id, question_content, full_answer_for_saving, current_utc_time, assistant_response_time_dict
220
  )
221
  asyncio.create_task(save_chat_to_mongo( # Chạy nền
222
+ mongo_db.conversations, chat_id, user_email, question_content, full_answer_for_saving, current_utc_time, assistant_response_time_dict
223
  ))
224
  processing_time_dict = round(time.time() - start_time_total, 2)
225
  logger.info(f"Stream: Dictionary answer for {chat_id} sent in {processing_time_dict:.2f}s.")
 
321
  )
322
  # Chạy lưu MongoDB ngầm để không block
323
  asyncio.create_task(save_chat_to_mongo(
324
+ mongo_db.conversations, chat_id, user_email, question_content, full_answer_for_saving, current_utc_time, assistant_response_time
325
  ))
326
 
327
  # Cập nhật Langchain history (nếu chain memory không tự làm)
services/user_service.py CHANGED
@@ -1,4 +1,4 @@
1
- from db.mongoDB import user_collection
2
  from fastapi import HTTPException, Request, Response
3
  import math
4
  from datetime import datetime, timedelta, timezone
@@ -17,7 +17,7 @@ logger = logging.getLogger(__name__)
17
  # Initialize password hashing context
18
  pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
19
 
20
- def get_users(skip: int = 0, limit: int = 100, query: dict = None):
21
  """
22
  Lấy danh sách người dùng với phân trang và tìm kiếm nâng cao
23
 
@@ -37,7 +37,7 @@ def get_users(skip: int = 0, limit: int = 100, query: dict = None):
37
  search_query = query or {}
38
 
39
  # Thực hiện truy vấn với phân trang
40
- users_cursor = user_collection.find(
41
  search_query,
42
  {"_id": 0, "password": 0} # Loại bỏ các trường nhạy cảm
43
  ).skip(skip).limit(limit)
@@ -56,7 +56,7 @@ def get_users(skip: int = 0, limit: int = 100, query: dict = None):
56
  detail=f"Lỗi khi lấy danh sách người dùng: {str(e)}"
57
  )
58
 
59
- def count_users(query: dict = None):
60
  """
61
  Đếm tổng số người dùng thỏa mãn điều kiện tìm kiếm
62
 
@@ -68,7 +68,7 @@ def count_users(query: dict = None):
68
  """
69
  try:
70
  search_query = query or {}
71
- return user_collection.count_documents(search_query)
72
  except Exception as e:
73
  logger.error(f"Lỗi khi đếm người dùng: {str(e)}")
74
  raise HTTPException(
@@ -112,7 +112,7 @@ async def get_paginated_users(
112
  sort_criteria = [(sort_by, sort_order)]
113
 
114
  # Thực hiện truy vấn
115
- users_cursor = user_collection.find(
116
  query,
117
  {"_id": 0, "password": 0} # Loại bỏ các trường nhạy cảm
118
  ).sort(sort_criteria).skip(skip).limit(limit)
@@ -121,7 +121,7 @@ async def get_paginated_users(
121
  user_list = users_cursor.to_list(length=limit)
122
 
123
  # Đếm tổng số bản ghi
124
- total = user_collection.count_documents(query)
125
 
126
  # Tính toán thông tin phân trang
127
  total_pages = math.ceil(total / limit) if limit > 0 else 0
@@ -156,7 +156,7 @@ async def get_current_user_profile(email: str):
156
  dict: Thông tin profile của người dùng
157
  """
158
  try:
159
- user = user_collection.find_one({"email": email}, {"_id": 0, "password": 0})
160
  logger.info(f"check user user_service: {user}")
161
  if not user:
162
  raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Người dùng không tồn tại")
@@ -171,7 +171,7 @@ async def get_current_user_profile(email: str):
171
  )
172
 
173
 
174
- def delete_user(user_id: str):
175
  """
176
  Xóa người dùng theo ID
177
 
@@ -182,7 +182,7 @@ def delete_user(user_id: str):
182
  bool: True nếu xóa thành công, False nếu không tìm thấy người dùng
183
  """
184
  try:
185
- result = user_collection.delete_one({"_id": user_id})
186
  return result.deleted_count > 0
187
  except Exception as e:
188
  logger.error(f"Lỗi khi xóa người dùng: {str(e)}")
@@ -191,7 +191,7 @@ def delete_user(user_id: str):
191
  detail=f"Lỗi khi xóa người dùng: {str(e)}"
192
  )
193
 
194
- def change_password(email: str, current_password: str, new_password: str):
195
  """
196
  Đổi mật khẩu của người dùng
197
 
@@ -204,7 +204,7 @@ def change_password(email: str, current_password: str, new_password: str):
204
  bool: True nếu đổi mật khẩu thành công, False nếu không thành công
205
  """
206
  try:
207
- user = user_collection.find_one({"email": email})
208
  if not user:
209
  raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Người dùng không tồn tại")
210
 
@@ -216,7 +216,7 @@ def change_password(email: str, current_password: str, new_password: str):
216
  # Hash the new password
217
  hashed_password = pwd_context.hash(new_password)
218
  # Update the password in the database
219
- result = user_collection.update_one({"email": email}, {"$set": {"password": hashed_password}})
220
  return result.modified_count > 0
221
  except HTTPException as he:
222
  raise he
@@ -242,13 +242,13 @@ async def reset_password_request(email: str) -> bool:
242
  """
243
  try:
244
  # Check if user exists
245
- user = user_collection.find_one({"email": email.lower()})
246
  if not user:
247
  # Return True even if user not found to prevent email enumeration
248
  return True
249
 
250
  # Check for rate limiting (e.g., max 3 requests per hour)
251
- reset_requests = user_collection.count_documents({
252
  "email": email.lower(),
253
  "reset_password_timestamp": {
254
  "$gte": datetime.now(tz=timezone.utc) - timedelta(hours=1)
@@ -265,7 +265,7 @@ async def reset_password_request(email: str) -> bool:
265
  expiry = datetime.now() + timedelta(minutes=10)
266
 
267
  # Store reset token and timestamp in database
268
- user_collection.update_one(
269
  {"_id": ObjectId(user["_id"])},
270
  {
271
  "$set": {
@@ -312,7 +312,7 @@ async def reset_password(code: str, newPassword: str) -> bool:
312
  """
313
  try:
314
  # Find user by reset token
315
- user = user_collection.find_one({"reset_password_code": code})
316
  if not user:
317
  logger.warning(f"Code không hợp lệ: {code}")
318
  return False
@@ -334,7 +334,7 @@ async def reset_password(code: str, newPassword: str) -> bool:
334
  hashed_password = pwd_context.hash(newPassword)
335
 
336
  # Update user's password and clear reset token
337
- result = user_collection.update_one(
338
  {"_id": ObjectId(user["_id"])},
339
  {
340
  "$set": {"password": hashed_password},
@@ -380,7 +380,7 @@ async def generate_and_store_verification_code(email: str) -> bool:
380
  expiry = datetime.now() + timedelta(minutes=10)
381
 
382
  # Store code and expiry in database
383
- result = user_collection.update_one(
384
  {"email": email.lower()},
385
  {
386
  "$set": {
@@ -425,12 +425,12 @@ async def verify_login_code(email: str, code: str, res: Response): # Bỏ kiểu
425
  HTTPException: Nếu mã không hợp lệ, đã hết hạn hoặc có lỗi hệ thống.
426
  """
427
  try:
428
- user = user_collection.find_one({ # Sử dụng await nếu user_collection là async (ví dụ Motor)
429
  "email": email.lower(),
430
  "login_verification_code": code
431
  })
432
- # Nếu user_collection là đồng bộ (ví dụ PyMongo)
433
- # user = user_collection.find_one({ ... })
434
 
435
  if not user:
436
  logger.warning(f"Mã xác minh không hợp lệ: {code} cho email: {email}")
@@ -443,7 +443,7 @@ async def verify_login_code(email: str, code: str, res: Response): # Bỏ kiểu
443
  if not expiry or expiry < datetime.now(expiry.tzinfo if expiry.tzinfo else None): # So sánh aware với aware, naive với naive
444
  logger.warning(f"Mã xác minh đã hết hạn cho email: {email}")
445
  # Xóa mã đã hết hạn để tránh sử dụng lại
446
- user_collection.update_one(
447
  {"_id": ObjectId(user["_id"])},
448
  {
449
  "$unset": {
@@ -509,7 +509,7 @@ async def verify_login_code(email: str, code: str, res: Response): # Bỏ kiểu
509
  # --------------------
510
 
511
  # Clear verification code và cập nhật last_login
512
- update_result = user_collection.update_one( # Sử dụng await nếu là async
513
  {"_id": ObjectId(user["_id"])},
514
  {
515
  "$unset": {
@@ -567,7 +567,7 @@ async def authenticate_user(request: LoginRequest) -> dict:
567
  Raises:
568
  HTTPException: Nếu thông tin đăng nhập không hợp lệ.
569
  """
570
- user = user_collection.find_one({"email": request.email.lower()})
571
  if not user or not pwd_context.verify(request.password, user["password"]):
572
  logger.warning(f"Xác thực thất bại cho email: {request.email}")
573
  raise HTTPException(
@@ -603,7 +603,7 @@ async def refresh_access_token(req: Request , res: Response ) -> dict:
603
  )
604
 
605
  # Find user by refresh token in database
606
- user = user_collection.find_one({"refresh_token": refresh_token})
607
 
608
 
609
  if not user:
@@ -640,7 +640,7 @@ async def refresh_access_token(req: Request , res: Response ) -> dict:
640
  new_refresh_token = await create_refresh_token(email)
641
 
642
  # Update database with new refresh token
643
- user_collection.update_one(
644
  {"_id": user["_id"]},
645
  {
646
  "$set": {
@@ -702,7 +702,7 @@ async def verify_forgot_password_code(email: str, code: str) -> dict:
702
 
703
  try:
704
  # Find user by email and code
705
- user = user_collection.find_one({
706
  "email": email.lower(),
707
  "reset_password_code": code
708
  })
 
1
+ from db.mongoDB import mongo_db
2
  from fastapi import HTTPException, Request, Response
3
  import math
4
  from datetime import datetime, timedelta, timezone
 
17
  # Initialize password hashing context
18
  pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
19
 
20
+ async def get_users(skip: int = 0, limit: int = 100, query: dict = None):
21
  """
22
  Lấy danh sách người dùng với phân trang và tìm kiếm nâng cao
23
 
 
37
  search_query = query or {}
38
 
39
  # Thực hiện truy vấn với phân trang
40
+ users_cursor =await mongo_db.users.find(
41
  search_query,
42
  {"_id": 0, "password": 0} # Loại bỏ các trường nhạy cảm
43
  ).skip(skip).limit(limit)
 
56
  detail=f"Lỗi khi lấy danh sách người dùng: {str(e)}"
57
  )
58
 
59
+ async def count_users(query: dict = None):
60
  """
61
  Đếm tổng số người dùng thỏa mãn điều kiện tìm kiếm
62
 
 
68
  """
69
  try:
70
  search_query = query or {}
71
+ return await mongo_db.users.count_documents(search_query)
72
  except Exception as e:
73
  logger.error(f"Lỗi khi đếm người dùng: {str(e)}")
74
  raise HTTPException(
 
112
  sort_criteria = [(sort_by, sort_order)]
113
 
114
  # Thực hiện truy vấn
115
+ users_cursor =await mongo_db.users.find(
116
  query,
117
  {"_id": 0, "password": 0} # Loại bỏ các trường nhạy cảm
118
  ).sort(sort_criteria).skip(skip).limit(limit)
 
121
  user_list = users_cursor.to_list(length=limit)
122
 
123
  # Đếm tổng số bản ghi
124
+ total =await mongo_db.users.count_documents(query)
125
 
126
  # Tính toán thông tin phân trang
127
  total_pages = math.ceil(total / limit) if limit > 0 else 0
 
156
  dict: Thông tin profile của người dùng
157
  """
158
  try:
159
+ user =await mongo_db.users.find_one({"email": email}, {"_id": 0, "password": 0})
160
  logger.info(f"check user user_service: {user}")
161
  if not user:
162
  raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Người dùng không tồn tại")
 
171
  )
172
 
173
 
174
+ async def delete_user(user_id: str):
175
  """
176
  Xóa người dùng theo ID
177
 
 
182
  bool: True nếu xóa thành công, False nếu không tìm thấy người dùng
183
  """
184
  try:
185
+ result =await mongo_db.users.delete_one({"_id": user_id})
186
  return result.deleted_count > 0
187
  except Exception as e:
188
  logger.error(f"Lỗi khi xóa người dùng: {str(e)}")
 
191
  detail=f"Lỗi khi xóa người dùng: {str(e)}"
192
  )
193
 
194
+ async def change_password(email: str, current_password: str, new_password: str):
195
  """
196
  Đổi mật khẩu của người dùng
197
 
 
204
  bool: True nếu đổi mật khẩu thành công, False nếu không thành công
205
  """
206
  try:
207
+ user =await mongo_db.users.find_one({"email": email})
208
  if not user:
209
  raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Người dùng không tồn tại")
210
 
 
216
  # Hash the new password
217
  hashed_password = pwd_context.hash(new_password)
218
  # Update the password in the database
219
+ result =await mongo_db.users.update_one({"email": email}, {"$set": {"password": hashed_password}})
220
  return result.modified_count > 0
221
  except HTTPException as he:
222
  raise he
 
242
  """
243
  try:
244
  # Check if user exists
245
+ user = await mongo_db.users.find_one({"email": email.lower()})
246
  if not user:
247
  # Return True even if user not found to prevent email enumeration
248
  return True
249
 
250
  # Check for rate limiting (e.g., max 3 requests per hour)
251
+ reset_requests = await mongo_db.users.count_documents({
252
  "email": email.lower(),
253
  "reset_password_timestamp": {
254
  "$gte": datetime.now(tz=timezone.utc) - timedelta(hours=1)
 
265
  expiry = datetime.now() + timedelta(minutes=10)
266
 
267
  # Store reset token and timestamp in database
268
+ await mongo_db.users.update_one(
269
  {"_id": ObjectId(user["_id"])},
270
  {
271
  "$set": {
 
312
  """
313
  try:
314
  # Find user by reset token
315
+ user =await mongo_db.users.find_one({"reset_password_code": code})
316
  if not user:
317
  logger.warning(f"Code không hợp lệ: {code}")
318
  return False
 
334
  hashed_password = pwd_context.hash(newPassword)
335
 
336
  # Update user's password and clear reset token
337
+ result =await mongo_db.users.update_one(
338
  {"_id": ObjectId(user["_id"])},
339
  {
340
  "$set": {"password": hashed_password},
 
380
  expiry = datetime.now() + timedelta(minutes=10)
381
 
382
  # Store code and expiry in database
383
+ result = await mongo_db.users.update_one(
384
  {"email": email.lower()},
385
  {
386
  "$set": {
 
425
  HTTPException: Nếu mã không hợp lệ, đã hết hạn hoặc có lỗi hệ thống.
426
  """
427
  try:
428
+ user = await mongo_db.users.find_one({ # Sử dụng await nếu mongo_db.users là async (ví dụ Motor)
429
  "email": email.lower(),
430
  "login_verification_code": code
431
  })
432
+ # Nếu mongo_db.users là đồng bộ (ví dụ PyMongo)
433
+ # user = mongo_db.users.find_one({ ... })
434
 
435
  if not user:
436
  logger.warning(f"Mã xác minh không hợp lệ: {code} cho email: {email}")
 
443
  if not expiry or expiry < datetime.now(expiry.tzinfo if expiry.tzinfo else None): # So sánh aware với aware, naive với naive
444
  logger.warning(f"Mã xác minh đã hết hạn cho email: {email}")
445
  # Xóa mã đã hết hạn để tránh sử dụng lại
446
+ await mongo_db.users.update_one(
447
  {"_id": ObjectId(user["_id"])},
448
  {
449
  "$unset": {
 
509
  # --------------------
510
 
511
  # Clear verification code và cập nhật last_login
512
+ await mongo_db.users.update_one( # Sử dụng await nếu là async
513
  {"_id": ObjectId(user["_id"])},
514
  {
515
  "$unset": {
 
567
  Raises:
568
  HTTPException: Nếu thông tin đăng nhập không hợp lệ.
569
  """
570
+ user = await mongo_db.users.find_one({"email": request.email.lower()})
571
  if not user or not pwd_context.verify(request.password, user["password"]):
572
  logger.warning(f"Xác thực thất bại cho email: {request.email}")
573
  raise HTTPException(
 
603
  )
604
 
605
  # Find user by refresh token in database
606
+ user = await mongo_db.users.find_one({"refresh_token": refresh_token})
607
 
608
 
609
  if not user:
 
640
  new_refresh_token = await create_refresh_token(email)
641
 
642
  # Update database with new refresh token
643
+ await mongo_db.users.update_one(
644
  {"_id": user["_id"]},
645
  {
646
  "$set": {
 
702
 
703
  try:
704
  # Find user by email and code
705
+ user =await mongo_db.users.find_one({
706
  "email": email.lower(),
707
  "reset_password_code": code
708
  })
utils/utils.py CHANGED
@@ -12,7 +12,7 @@ from jose import jwt
12
  from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
13
  from typing import List, Dict, Optional
14
  from unidecode import unidecode
15
- from db.mongoDB import user_collection
16
  import secrets
17
  from fastapi import HTTPException, status
18
  from langchain_community.chat_message_histories import RedisChatMessageHistory
@@ -233,7 +233,7 @@ async def create_refresh_token(email: str) -> str:
233
 
234
 
235
  # Store refresh token in database
236
- result = user_collection.update_one(
237
  {"email": email.lower()},
238
  {
239
  "$set": {
 
12
  from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
13
  from typing import List, Dict, Optional
14
  from unidecode import unidecode
15
+ from db.mongoDB import mongo_db
16
  import secrets
17
  from fastapi import HTTPException, status
18
  from langchain_community.chat_message_histories import RedisChatMessageHistory
 
233
 
234
 
235
  # Store refresh token in database
236
+ result = await mongo_db.users.update_one(
237
  {"email": email.lower()},
238
  {
239
  "$set": {