entidi2608 commited on
Commit
7f9bd61
·
1 Parent(s): e6cba72
Files changed (4) hide show
  1. config.py +1 -1
  2. db/mongoDB.py +3 -0
  3. services/document_service.py +2 -2
  4. utils/utils.py +28 -15
config.py CHANGED
@@ -17,7 +17,7 @@ CORE_DATA_FOLDER = os.path.join(BASE_DIR, "data", "core")
17
  # PENDING_UPLOADS_FOLDER = os.path.join(BASE_DIR, "data", "pending_uploads")
18
  # PROCESSED_FILES_FOLDER = os.path.join(BASE_DIR, "data", "processed_files")
19
  # FAILED_FILES_FOLDER = os.path.join(BASE_DIR, "data", "failed_files")
20
- PROCESSED_HASH_LOG = os.path.join(BASE_DIR, "data", "processed_hashes.log")
21
  PENDING_UPLOADS_FOLDER = '/tmp/pending_uploads'
22
  LEGAL_DIC_FOLDER = os.path.join(BASE_DIR, "data", "dictionary")
23
 
 
17
  # PENDING_UPLOADS_FOLDER = os.path.join(BASE_DIR, "data", "pending_uploads")
18
  # PROCESSED_FILES_FOLDER = os.path.join(BASE_DIR, "data", "processed_files")
19
  # FAILED_FILES_FOLDER = os.path.join(BASE_DIR, "data", "failed_files")
20
+ # PROCESSED_HASH_LOG = os.path.join(BASE_DIR, "data", "processed_hashes.log")
21
  PENDING_UPLOADS_FOLDER = '/tmp/pending_uploads'
22
  LEGAL_DIC_FOLDER = os.path.join(BASE_DIR, "data", "dictionary")
23
 
db/mongoDB.py CHANGED
@@ -20,6 +20,7 @@ class MongoDatabase:
20
  users: Optional[AsyncIOMotorCollection] = None
21
  token_blacklist: Optional[AsyncIOMotorCollection] = None
22
  conversations: Optional[AsyncIOMotorCollection] = None
 
23
 
24
  # Tạo một instance duy nhất để import và sử dụng trong toàn bộ ứng dụng
25
  mongo_db = MongoDatabase()
@@ -54,6 +55,8 @@ async def connect_to_mongo():
54
  mongo_db.users = mongo_db.db["users"]
55
  mongo_db.token_blacklist = mongo_db.db["token_blacklist"]
56
  mongo_db.conversations = mongo_db.db["conversations"]
 
 
57
 
58
  # 4. Tạo TTL index một cách an toàn
59
  # Lấy danh sách index hiện có
 
20
  users: Optional[AsyncIOMotorCollection] = None
21
  token_blacklist: Optional[AsyncIOMotorCollection] = None
22
  conversations: Optional[AsyncIOMotorCollection] = None
23
+ processed_documents: Optional[AsyncIOMotorCollection] = None
24
 
25
  # Tạo một instance duy nhất để import và sử dụng trong toàn bộ ứng dụng
26
  mongo_db = MongoDatabase()
 
55
  mongo_db.users = mongo_db.db["users"]
56
  mongo_db.token_blacklist = mongo_db.db["token_blacklist"]
57
  mongo_db.conversations = mongo_db.db["conversations"]
58
+ mongo_db.processed_documents = mongo_db.db["processed_documents"]
59
+
60
 
61
  # 4. Tạo TTL index một cách an toàn
62
  # Lấy danh sách index hiện có
services/document_service.py CHANGED
@@ -38,7 +38,7 @@ def convert_to_text_content(source_path: str) -> str:
38
  logger.info(f"✅ Successfully extracted content from {source_file.name}.")
39
  return content
40
 
41
- def full_process_and_ingest_pipeline(filepath: str, file_hash: str, embedding_model):
42
  filename = os.path.basename(filepath)
43
  logger.info(f"BACKGROUND TASK: Starting full pipeline for: {filename} (Hash: {file_hash[:10]}...)")
44
  weaviate_client = None
@@ -66,7 +66,7 @@ def full_process_and_ingest_pipeline(filepath: str, file_hash: str, embedding_mo
66
 
67
  ingest_chunks_with_native_batching(weaviate_client, collection_name, processed_chunks, embeddings_model)
68
 
69
- utils.log_processed_hash(file_hash)
70
  logger.info(f"✅ Successfully ingested '{filename}'.")
71
  # shutil.move(filepath, os.path.join(config.PROCESSED_FILES_FOLDER, filename))
72
  logger.info(f"Moved '{filename}' to processed folder.")
 
38
  logger.info(f"✅ Successfully extracted content from {source_file.name}.")
39
  return content
40
 
41
+ async def full_process_and_ingest_pipeline(filepath: str, file_hash: str, embedding_model):
42
  filename = os.path.basename(filepath)
43
  logger.info(f"BACKGROUND TASK: Starting full pipeline for: {filename} (Hash: {file_hash[:10]}...)")
44
  weaviate_client = None
 
66
 
67
  ingest_chunks_with_native_batching(weaviate_client, collection_name, processed_chunks, embeddings_model)
68
 
69
+ await utils.log_processed_hash(file_hash, filename)
70
  logger.info(f"✅ Successfully ingested '{filename}'.")
71
  # shutil.move(filepath, os.path.join(config.PROCESSED_FILES_FOLDER, filename))
72
  logger.info(f"Moved '{filename}' to processed folder.")
utils/utils.py CHANGED
@@ -7,7 +7,7 @@ from typing import List, Optional
7
  from schemas.chat import Message
8
  from redis.asyncio import Redis
9
  import bcrypt
10
- from datetime import datetime, timedelta
11
  from jose import jwt
12
  from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
13
  from typing import List, Dict, Optional
@@ -455,20 +455,33 @@ def calculate_file_hash(filepath: str) -> str:
455
  sha256_hash.update(byte_block)
456
  return sha256_hash.hexdigest()
457
 
458
- def check_if_hash_exists(file_hash: str) -> bool:
459
- if not os.path.exists(config.PROCESSED_HASH_LOG):
460
- return False
461
- try:
462
- with open(config.PROCESSED_HASH_LOG, "r") as f:
463
- processed_hashes = {line.strip() for line in f}
464
- return file_hash in processed_hashes
465
- except IOError as e:
466
- logger.error(f"Could not read hash log file: {e}")
467
- return False
468
-
469
- def log_processed_hash(file_hash: str):
 
 
 
 
 
470
  try:
471
- with open(config.PROCESSED_HASH_LOG, "a") as f:
472
- f.write(file_hash + "\n")
 
 
 
 
 
 
 
 
473
  except IOError as e:
474
  logger.error(f"Could not write to hash log file: {e}")
 
7
  from schemas.chat import Message
8
  from redis.asyncio import Redis
9
  import bcrypt
10
+ from datetime import datetime, timedelta, timezone
11
  from jose import jwt
12
  from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
13
  from typing import List, Dict, Optional
 
455
  sha256_hash.update(byte_block)
456
  return sha256_hash.hexdigest()
457
 
458
+ # def check_if_hash_exists(file_hash: str) -> bool:
459
+ # if not os.path.exists(config.PROCESSED_HASH_LOG):
460
+ # return False
461
+ # try:
462
+ # with open(config.PROCESSED_HASH_LOG, "r") as f:
463
+ # processed_hashes = {line.strip() for line in f}
464
+ # return file_hash in processed_hashes
465
+ # except IOError as e:
466
+ # logger.error(f"Could not read hash log file: {e}")
467
+ # return False
468
+
469
+ async def check_if_hash_exists(file_hash: str) -> bool:
470
+ # Đếm số document có hash tương ứng
471
+ count = await mongo_db.processed_documents.count_documents({"file_hash": file_hash})
472
+ return count > 0
473
+
474
+ async def log_processed_hash(file_hash: str, filename: str):
475
  try:
476
+ document_record = {
477
+ "file_hash": file_hash, # Hash của file
478
+ "original_filename": filename, # Tên file gốc
479
+ "processed_at": datetime.now(timezone.utc), # Thời gian xử lý
480
+ "status": "SUCCESS",
481
+ # Thêm các thông tin khác nếu cần, ví dụ:
482
+ # "source_url": "https://url_cua_file_tren_s3_hoac_cloudinary",
483
+ # "user_uploader": user_email
484
+ }
485
+ await mongo_db.processed_documents.insert_one(document_record)
486
  except IOError as e:
487
  logger.error(f"Could not write to hash log file: {e}")