entidi2608 commited on
Commit
d3e9dc7
·
1 Parent(s): 253c2e1

update: check file uploaded

Browse files
Files changed (8) hide show
  1. Dockerfile +18 -24
  2. config.py +2 -2
  3. db/mongoDB.py +1 -1
  4. dependencies.py +4 -20
  5. main.py +3 -3
  6. routers/documents.py +1 -111
  7. services/document_service.py +2 -67
  8. utils/utils.py +1 -22
Dockerfile CHANGED
@@ -1,41 +1,33 @@
1
  # =================================================================
2
  # STAGE 1: BUILDER - Stage để cài đặt các dependencies nặng
3
  # =================================================================
4
- # Sử dụng image đầy đủ để có các công cụ build cần thiết
5
  FROM python:3.10 as builder
6
 
7
  # Cập nhật và cài đặt các gói hệ thống cho việc build
8
- # Chỉ cài những gì thực sự cần để `pip install` hoạt động
9
  RUN apt-get update && apt-get install -y --no-install-recommends \
10
  build-essential \
11
  && apt-get clean \
12
  && rm -rf /var/lib/apt/lists/*
13
 
14
- # Thiết lập thư mục làm việc
15
  WORKDIR /app
16
 
17
- # Tạo một môi trường ảo (virtual environment) để quản lý dependencies
18
- # Đây là một thực hành tốt, giúp cô lập thư viện
19
  RUN python -m venv /opt/venv
20
- # Kích hoạt venv cho các lệnh RUN tiếp theo
21
  ENV PATH="/opt/venv/bin:$PATH"
22
 
23
  # Sao chép file requirements trước để tận dụng Docker layer caching
24
  COPY requirements.txt .
25
 
26
- # Cài đặt tất cả các thư viện Python trong một lệnh RUN duy nhất
27
- # Điều này giúp tối ưu hóa số lượng layer của Docker
28
  RUN pip install --no-cache-dir --upgrade pip && \
29
  pip install --no-cache-dir -r requirements.txt
30
 
31
  # =================================================================
32
  # STAGE 2: FINAL - Stage cuối cùng, nhỏ gọn để chạy ứng dụng
33
  # =================================================================
34
- # Bắt đầu từ một image slim siêu nhẹ
35
  FROM python:3.10-slim
36
 
37
  # Cài đặt chỉ các dependencies hệ thống cần thiết cho RUNTIME
38
- # Không cần `build-essential`, `git`, `curl` ở đây nữa
39
  RUN apt-get update && apt-get install -y --no-install-recommends \
40
  poppler-utils \
41
  libgl1-mesa-glx \
@@ -43,37 +35,39 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
43
  && apt-get clean \
44
  && rm -rf /var/lib/apt/lists/*
45
 
46
- # Thiết lập thư mục làm việc
47
  WORKDIR /app
48
 
49
- # Sao chép môi trường ảo đã được cài đặt sẵn từ stage builder
50
  COPY --from=builder /opt/venv /opt/venv
51
 
52
- # Kích hoạt virtual environment cho container
53
  ENV PATH="/opt/venv/bin:$PATH"
54
 
 
55
  # Thiết lập các biến môi trường quan trọng
56
- # Thư mục cache sẽ nằm bên trong container
57
- ENV HF_HOME=/app/cache
 
 
 
 
 
 
 
58
  ENV HF_HUB_DISABLE_SYMLINKS_WARNING=1
59
- # Đảm bảo log Python hiển thị ngay lập tức, rất quan trọng cho việc debug trên Render
60
  ENV PYTHONUNBUFFERED=1
 
61
 
62
  # Sao chép toàn bộ mã nguồn của ứng dụng
63
  COPY . .
64
 
65
  # Tải trước (pre-download/bake) các model vào trong image
66
- # Điều này giúp giảm đáng kể thời gian khởi động (cold start) trên Render.
67
- # Các model sẽ được lưu vào thư mục cache đã định nghĩa bởi HF_HOME.
68
- # **QUAN TRỌNG**: Đảm bảo tên model ở đây khớp chính xác với tên trong file config.py của bạn.
69
  RUN python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('bkai-foundation-models/vietnamese-bi-encoder')"
70
  RUN python -c "from langchain_community.cross_encoders import HuggingFaceCrossEncoder; HuggingFaceCrossEncoder(model_name='cross-encoder/ms-marco-MiniLM-L-6-v2')"
71
 
72
- # Mở cổng mà ứng dụng sẽ lắng nghe bên trong container
73
- # Port này phải khớp với port trong lệnh CMD
74
  EXPOSE 7860
75
 
76
- # Lệnh chạy ứng dụng cho PRODUCTION sử dụng Gunicorn
77
- # Gunicorn ổn định và hiệu quả hơn Uvicorn --reload
78
- # Nó sẽ tự động sử dụng biến $PORT do Render cung cấp
79
  CMD ["gunicorn", "-w", "2", "-k", "uvicorn.workers.UvicornWorker", "main:app", "--bind", "0.0.0.0:7860", "--timeout", "120"]
 
1
  # =================================================================
2
  # STAGE 1: BUILDER - Stage để cài đặt các dependencies nặng
3
  # =================================================================
 
4
  FROM python:3.10 as builder
5
 
6
  # Cập nhật và cài đặt các gói hệ thống cho việc build
 
7
  RUN apt-get update && apt-get install -y --no-install-recommends \
8
  build-essential \
9
  && apt-get clean \
10
  && rm -rf /var/lib/apt/lists/*
11
 
 
12
  WORKDIR /app
13
 
14
+ # Tạo kích hoạt venv
 
15
  RUN python -m venv /opt/venv
 
16
  ENV PATH="/opt/venv/bin:$PATH"
17
 
18
  # Sao chép file requirements trước để tận dụng Docker layer caching
19
  COPY requirements.txt .
20
 
21
+ # Cài đặt thư viện Python
 
22
  RUN pip install --no-cache-dir --upgrade pip && \
23
  pip install --no-cache-dir -r requirements.txt
24
 
25
  # =================================================================
26
  # STAGE 2: FINAL - Stage cuối cùng, nhỏ gọn để chạy ứng dụng
27
  # =================================================================
 
28
  FROM python:3.10-slim
29
 
30
  # Cài đặt chỉ các dependencies hệ thống cần thiết cho RUNTIME
 
31
  RUN apt-get update && apt-get install -y --no-install-recommends \
32
  poppler-utils \
33
  libgl1-mesa-glx \
 
35
  && apt-get clean \
36
  && rm -rf /var/lib/apt/lists/*
37
 
 
38
  WORKDIR /app
39
 
40
+ # Sao chép môi trường ảo từ stage builder
41
  COPY --from=builder /opt/venv /opt/venv
42
 
43
+ # Kích hoạt virtual environment
44
  ENV PATH="/opt/venv/bin:$PATH"
45
 
46
+ # --- PHẦN SỬA ĐỔI QUAN TRỌNG ---
47
  # Thiết lập các biến môi trường quan trọng
48
+ # 1. **SỬA LẠI ĐÂY**: Trỏ thư mục cache vào /tmp, nơi ứng dụng có quyền ghi
49
+ # Điều này sẽ sửa lỗi "Permission denied"
50
+ ENV HF_HOME=/tmp/huggingface_cache
51
+ ENV SENTENCE_TRANSFORMERS_HOME=/tmp/huggingface_cache
52
+
53
+ # 2. Tạo thư mục cache và cấp quyền (thực hành tốt)
54
+ RUN mkdir -p /tmp/huggingface_cache && chmod 777 /tmp/huggingface_cache
55
+
56
+ # 3. Các biến môi trường khác giữ nguyên
57
  ENV HF_HUB_DISABLE_SYMLINKS_WARNING=1
 
58
  ENV PYTHONUNBUFFERED=1
59
+ # --- KẾT THÚC PHẦN SỬA ĐỔI ---
60
 
61
  # Sao chép toàn bộ mã nguồn của ứng dụng
62
  COPY . .
63
 
64
  # Tải trước (pre-download/bake) các model vào trong image
65
+ # Bây giờ các model sẽ được lưu vào /tmp/huggingface_cache bên trong image
 
 
66
  RUN python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('bkai-foundation-models/vietnamese-bi-encoder')"
67
  RUN python -c "from langchain_community.cross_encoders import HuggingFaceCrossEncoder; HuggingFaceCrossEncoder(model_name='cross-encoder/ms-marco-MiniLM-L-6-v2')"
68
 
69
+ # Mở cổng
 
70
  EXPOSE 7860
71
 
72
+ # Lệnh chạy ứng dụng
 
 
73
  CMD ["gunicorn", "-w", "2", "-k", "uvicorn.workers.UvicornWorker", "main:app", "--bind", "0.0.0.0:7860", "--timeout", "120"]
config.py CHANGED
@@ -18,7 +18,7 @@ CORE_DATA_FOLDER = os.path.join(BASE_DIR, "data", "core")
18
  # PROCESSED_FILES_FOLDER = os.path.join(BASE_DIR, "data", "processed_files")
19
  # FAILED_FILES_FOLDER = os.path.join(BASE_DIR, "data", "failed_files")
20
  # PROCESSED_HASH_LOG = os.path.join(BASE_DIR, "data", "processed_hashes.log")
21
- PENDING_UPLOADS_FOLDER = '/tmp/pending_uploads'
22
  LEGAL_DIC_FOLDER = os.path.join(BASE_DIR, "data", "dictionary")
23
 
24
  # Cấu hình cho DB
@@ -53,7 +53,7 @@ FRONTEND_URL = os.environ.get("FRONTEND_URL")
53
 
54
  APP_ENVIRONMENT = os.environ.get("APP_ENVIRONMENT")
55
 
56
- CHECKPOINT_FILE = "processed_files.log"
57
 
58
  MONGODB_CLOUD_URI= os.environ.get("MONGODB_CLOUD_URI")
59
  DB_NAME= os.environ.get("DB_NAME")
 
18
  # PROCESSED_FILES_FOLDER = os.path.join(BASE_DIR, "data", "processed_files")
19
  # FAILED_FILES_FOLDER = os.path.join(BASE_DIR, "data", "failed_files")
20
  # PROCESSED_HASH_LOG = os.path.join(BASE_DIR, "data", "processed_hashes.log")
21
+ # PENDING_UPLOADS_FOLDER = '/tmp/pending_uploads'
22
  LEGAL_DIC_FOLDER = os.path.join(BASE_DIR, "data", "dictionary")
23
 
24
  # Cấu hình cho DB
 
53
 
54
  APP_ENVIRONMENT = os.environ.get("APP_ENVIRONMENT")
55
 
56
+ # CHECKPOINT_FILE = "processed_files.log"
57
 
58
  MONGODB_CLOUD_URI= os.environ.get("MONGODB_CLOUD_URI")
59
  DB_NAME= os.environ.get("DB_NAME")
db/mongoDB.py CHANGED
@@ -31,7 +31,7 @@ async def connect_to_mongo():
31
  Hàm này sẽ được gọi từ lifespan của FastAPI.
32
  """
33
  if mongo_db.client:
34
- logger.info("MongoDB connection already established.")
35
  return
36
 
37
  logger.info(f"🔸 Connecting to MongoDB Atlas...")
 
31
  Hàm này sẽ được gọi từ lifespan của FastAPI.
32
  """
33
  if mongo_db.client:
34
+ logger.info("MongoDB connection already established.")
35
  return
36
 
37
  logger.info(f"🔸 Connecting to MongoDB Atlas...")
dependencies.py CHANGED
@@ -38,22 +38,6 @@ def get_app_state(request: Request):
38
  raise RuntimeError("Application state ('app_state') not found. Initialization failed?")
39
  return request.app.state.app_state
40
 
41
- # def initialize_redis_client():
42
- # redis_url = os.environ.get("REDIS_URL")
43
- # if not redis_url:
44
- # logger.error("🔸[Redis] REDIS_URL environment variable not set.")
45
- # raise ValueError("REDIS_URL is not configured.")
46
- # try:
47
- # logger.info(f"🔸[Redis] Attempting to connect to Redis at {redis_url}...")
48
- # client = redis.Redis.from_url(redis_url, socket_connect_timeout=5, socket_timeout=5)
49
- # logger.info("🔸[Redis] Connected successfully and pinged.")
50
- # return client
51
- # except redis.exceptions.ConnectionError as e:
52
- # logger.error(f"🔸[Redis] Connection failed for URL '{redis_url}': {e}")
53
- # raise ConnectionError(f"Failed to connect to Redis: {e}")
54
- # except Exception as e:
55
- # logger.error(f"🔸[Redis] Error initializing Redis from URL '{redis_url}': {e}")
56
- # raise RuntimeError(f"Error initializing Redis: {e}")
57
 
58
  async def initialize_api_components(app_state: AppState):
59
  """Khởi tạo các thành phần cần thiết cho API """
@@ -184,7 +168,7 @@ async def get_current_user(
184
  logger.error(f"GET_CURRENT_USER: *** KHÔNG TÌM THẤY TOKEN (Nguồn: {source_of_token}) - RAISING 401 ***")
185
  raise credentials_exception
186
 
187
- logger.info(f"GET_CURRENT_USER: Token để verify (nguồn: {source_of_token}): {token_to_verify[:20]}...")
188
 
189
  # 1. Kiểm tra token trong blacklist
190
  try:
@@ -221,7 +205,7 @@ async def get_current_user(
221
  email = payload.get("sub")
222
  exp = payload.get("exp")
223
 
224
- logger.info(f"GET_CURRENT_USER: JWT decode thành công - email: {email}, exp: {exp}")
225
 
226
  if not isinstance(email, str) or not email:
227
  logger.error("GET_CURRENT_USER: *** EMAIL KHÔNG HỢP LỆ TRONG TOKEN ***")
@@ -265,7 +249,7 @@ async def get_current_user(
265
  # 3. Lấy thông tin người dùng từ database
266
  user_data: Optional[dict] = None # Khởi tạo để tránh UnboundLocalError
267
  try:
268
- logger.info(f"GET_CURRENT_USER: Đang tìm user trong DB: {email.lower()}") # email đã được validate là str
269
  user_data = await mongo_db.users.find_one({"email": email.lower()}, {"password": 0, "_id": 0})
270
  # print(user_data) # Bỏ print trong production
271
 
@@ -273,7 +257,7 @@ async def get_current_user(
273
  logger.error(f"GET_CURRENT_USER: *** KHÔNG TÌM THẤY USER TRONG DB ({email.lower()}) - RAISING 401 ***")
274
  raise credentials_exception
275
 
276
- logger.info(f"GET_CURRENT_USER: Tìm thấy user - data: {user_data}")
277
 
278
  except HTTPException:
279
  raise
 
38
  raise RuntimeError("Application state ('app_state') not found. Initialization failed?")
39
  return request.app.state.app_state
40
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
 
42
  async def initialize_api_components(app_state: AppState):
43
  """Khởi tạo các thành phần cần thiết cho API """
 
168
  logger.error(f"GET_CURRENT_USER: *** KHÔNG TÌM THẤY TOKEN (Nguồn: {source_of_token}) - RAISING 401 ***")
169
  raise credentials_exception
170
 
171
+ # logger.info(f"GET_CURRENT_USER: Token để verify (nguồn: {source_of_token}): {token_to_verify[:20]}...")
172
 
173
  # 1. Kiểm tra token trong blacklist
174
  try:
 
205
  email = payload.get("sub")
206
  exp = payload.get("exp")
207
 
208
+ # logger.info(f"GET_CURRENT_USER: JWT decode thành công - email: {email}, exp: {exp}")
209
 
210
  if not isinstance(email, str) or not email:
211
  logger.error("GET_CURRENT_USER: *** EMAIL KHÔNG HỢP LỆ TRONG TOKEN ***")
 
249
  # 3. Lấy thông tin người dùng từ database
250
  user_data: Optional[dict] = None # Khởi tạo để tránh UnboundLocalError
251
  try:
252
+ # logger.info(f"GET_CURRENT_USER: Đang tìm user trong DB: {email.lower()}") # email đã được validate là str
253
  user_data = await mongo_db.users.find_one({"email": email.lower()}, {"password": 0, "_id": 0})
254
  # print(user_data) # Bỏ print trong production
255
 
 
257
  logger.error(f"GET_CURRENT_USER: *** KHÔNG TÌM THẤY USER TRONG DB ({email.lower()}) - RAISING 401 ***")
258
  raise credentials_exception
259
 
260
+ # logger.info(f"GET_CURRENT_USER: Tìm thấy user - data: {user_data}")
261
 
262
  except HTTPException:
263
  raise
main.py CHANGED
@@ -22,7 +22,7 @@ logger = logging.getLogger(__name__)
22
 
23
  @asynccontextmanager
24
  async def lifespan(app: FastAPI):
25
- logger.info(" [Lifespan] STARTING UP...")
26
 
27
  current_app_state_instance = AppState()
28
  initialization_successful = False
@@ -71,7 +71,7 @@ app = FastAPI(
71
 
72
  app.add_middleware(
73
  SessionMiddleware,
74
- secret_key=os.environ.get("SESSION_SECRET_KEY", "a_very_secret_key_for_development")
75
  )
76
 
77
 
@@ -93,7 +93,7 @@ app.include_router(health_router, prefix="/api", tags=["Status"]) # Hoặc chỉ
93
 
94
  # Run with Uvicorn
95
  if __name__ == "__main__":
96
- logger.info("=> Chạy FastAPI server với Uvicorn...")
97
  is_dev_mode = config.APP_ENVIRONMENT.lower() == "development"
98
  uvicorn.run(
99
  "main:app", # Đảm bảo "main" là tên file python của bạn
 
22
 
23
  @asynccontextmanager
24
  async def lifespan(app: FastAPI):
25
+ logger.info("🚀 [Lifespan] STARTING UP...")
26
 
27
  current_app_state_instance = AppState()
28
  initialization_successful = False
 
71
 
72
  app.add_middleware(
73
  SessionMiddleware,
74
+ secret_key=os.environ.get("SESSION_SECRET_KEY")
75
  )
76
 
77
 
 
93
 
94
  # Run with Uvicorn
95
  if __name__ == "__main__":
96
+ logger.info("🚀 Chạy FastAPI server với Uvicorn...")
97
  is_dev_mode = config.APP_ENVIRONMENT.lower() == "development"
98
  uvicorn.run(
99
  "main:app", # Đảm bảo "main" là tên file python của bạn
routers/documents.py CHANGED
@@ -1,113 +1,3 @@
1
- # from fastapi import APIRouter, UploadFile, File, BackgroundTasks, HTTPException, Depends, Request
2
- # import os
3
- # import time
4
- # import shutil
5
- # from schemas.user import UserOut
6
- # from dependencies import get_current_user
7
- # import logging
8
- # from typing import List
9
- # import config
10
- # from utils.utils import calculate_file_hash, check_if_hash_exists
11
- # from services.document_service import full_process_and_ingest_pipeline
12
- # from dependencies import get_app_state
13
- # logger = logging.getLogger(__name__)
14
-
15
- # router = APIRouter()
16
-
17
- # ALLOWED_EXTENSIONS = {".pdf", ".docx", ".doc"}
18
-
19
- # @router.post("/upload", status_code=202)
20
- # async def upload_and_ingest_documents(
21
- # fastapi_request: Request,
22
- # background_tasks: BackgroundTasks,
23
- # current_user: UserOut = Depends(get_current_user),
24
- # files: List[UploadFile] = File(..., description="Một hoặc nhiều file tài liệu cần upload.")
25
- # ):
26
- # """
27
- # Endpoint duy nhất để upload một hoặc nhiều tài liệu.
28
-
29
- # - **files**: Danh sách các file tài liệu cần upload.
30
- # - API sẽ xử lý từng file trong nền và trả về ngay một báo cáo tổng hợp.
31
- # - File trùng lặp (dựa trên nội dung) hoặc có định dạng không hỗ trợ sẽ bị bỏ qua.
32
- # """
33
-
34
-
35
- # # Dòng này giờ sẽ chạy thành công vì bạn có quyền ghi vào /tmp
36
- # os.makedirs(config.PENDING_UPLOADS_FOLDER , exist_ok=True)
37
-
38
- # app_state = get_app_state(request=fastapi_request)
39
- # embedding_model = app_state.embeddings
40
- # if not files:
41
- # raise HTTPException(status_code=400, detail="No files were uploaded.")
42
-
43
- # accepted_files = []
44
- # skipped_files = []
45
-
46
- # for file in files:
47
- # temp_file_path = None
48
- # try:
49
- # # Kiểm tra định dạng file
50
- # file_extension = os.path.splitext(file.filename)[1].lower()
51
- # if file_extension not in ALLOWED_EXTENSIONS:
52
- # skipped_files.append({"filename": file.filename, "reason": "Unsupported file type"})
53
- # continue
54
-
55
- # # 1. Lưu file tạm để tính hash
56
- # # Thêm timestamp để tránh xung đột tên file nếu upload nhiều file cùng tên trong 1 request
57
- # temp_filename = f"temp_{int(time.time()*1000)}_{file.filename}"
58
- # temp_file_path = os.path.join(config.PENDING_UPLOADS_FOLDER, temp_filename)
59
- # with open(temp_file_path, "wb") as buffer:
60
- # shutil.copyfileobj(file.file, buffer)
61
-
62
- # # 2. Tính toán hash
63
- # file_hash = calculate_file_hash(temp_file_path)
64
-
65
- # # 3. Kiểm tra trùng lặp
66
- # if await check_if_hash_exists(file_hash):
67
- # skipped_files.append({"filename": file.filename, "reason": "Duplicate file content"})
68
- # os.remove(temp_file_path)
69
- # continue
70
-
71
- # # 4. File hợp lệ, chuẩn bị để xử lý
72
- # final_filename = file.filename
73
- # final_file_path = os.path.join(config.PENDING_UPLOADS_FOLDER, final_filename)
74
- # # Xử lý nếu tên file đã tồn tại để tránh ghi đè
75
- # if os.path.exists(final_file_path):
76
- # base, ext = os.path.splitext(final_filename)
77
- # final_filename = f"{base}_{file_hash[:8]}{ext}"
78
- # final_file_path = os.path.join(config.PENDING_UPLOADS_FOLDER, final_filename)
79
-
80
- # os.rename(temp_file_path, final_file_path)
81
- # temp_file_path = None # Đánh dấu là đã di chuyển
82
-
83
- # # 5. Thêm tác vụ nền cho file này
84
- # background_tasks.add_task(full_process_and_ingest_pipeline, final_file_path, file_hash,embedding_model)
85
-
86
- # accepted_files.append({"filename": final_filename, "hash": file_hash})
87
-
88
- # except Exception as e:
89
- # logger.error(f"Error processing {file.filename} in upload batch: {e}", exc_info=True)
90
- # skipped_files.append({"filename": file.filename, "reason": f"Server error: {str(e)}"})
91
- # if temp_file_path and os.path.exists(temp_file_path):
92
- # os.remove(temp_file_path)
93
-
94
- # # Nếu không có file nào được chấp nhận sau khi lọc
95
- # if not accepted_files:
96
- # raise HTTPException(
97
- # status_code=400,
98
- # detail={"message": "No valid new files were accepted for processing.", "skipped_files": skipped_files}
99
- # )
100
-
101
- # # Trả về kết quả tổng hợp
102
- # return {
103
- # "message": f"Request completed. Accepted {len(accepted_files)} files for background processing.",
104
- # "accepted_files": accepted_files,
105
- # "skipped_files": skipped_files
106
- # }
107
-
108
-
109
- # routers/documents.py
110
-
111
  import os
112
  import hashlib
113
  from typing import List
@@ -116,7 +6,7 @@ from io import BytesIO
116
  from fastapi import APIRouter, UploadFile, File, HTTPException, Depends, BackgroundTasks
117
  from fastapi.concurrency import run_in_threadpool
118
 
119
- from utils.utils import check_if_hash_exists
120
  from services.document_service import full_process_and_ingest_pipeline,convert_to_text_content
121
 
122
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import hashlib
3
  from typing import List
 
6
  from fastapi import APIRouter, UploadFile, File, HTTPException, Depends, BackgroundTasks
7
  from fastapi.concurrency import run_in_threadpool
8
 
9
+ from services.document_service import check_if_hash_exists
10
  from services.document_service import full_process_and_ingest_pipeline,convert_to_text_content
11
 
12
 
services/document_service.py CHANGED
@@ -18,71 +18,6 @@ logger = logging.getLogger(__name__)
18
  from rag_components import create_weaviate_schema_if_not_exists, ingest_chunks_with_native_batching
19
  from utils.process_data import hierarchical_split_law_document,extract_document_metadata,clean_document_text,infer_field, infer_entity_type, filter_and_serialize_complex_metadata
20
 
21
- # def convert_to_text_content(source_path: str) -> str:
22
- # source_file = Path(source_path)
23
- # file_extension = source_file.suffix.lower()
24
- # logger.info(f"Extracting content from: {source_file.name}")
25
- # content = ""
26
- # if file_extension == ".pdf":
27
- # parser = LlamaParse( api_key=config.LLAMA_CLOUD_API_KEY,
28
- # result_type="text",
29
- # verbose=True, # Giữ verbose để theo dõi
30
- # language="vi")
31
- # documents = parser.load_data([str(source_file)])
32
- # if documents: content = documents[0].text
33
- # elif file_extension == ".docx":
34
- # doc = docx.Document(source_path)
35
- # content = '\n'.join([para.text for para in doc.paragraphs])
36
- # elif file_extension == ".doc":
37
- # content = pypandoc.convert_file(source_path, 'plain', format='doc')
38
- # else:
39
- # raise ValueError(f"Unsupported file format: {file_extension}")
40
- # if not content.strip():
41
- # raise ValueError("Extracted content is empty.")
42
- # logger.info(f"✅ Successfully extracted content from {source_file.name}.")
43
- # return content
44
-
45
- # async def full_process_and_ingest_pipeline(filepath: str, file_hash: str, embedding_model):
46
- # filename = os.path.basename(filepath)
47
- # logger.info(f"BACKGROUND TASK: Starting full pipeline for: {filename} (Hash: {file_hash[:10]}...)")
48
- # weaviate_client = None
49
- # try:
50
- # raw_content = convert_to_text_content(filepath)
51
-
52
- # doc_metadata = extract_document_metadata(raw_content, filename)
53
- # doc_metadata["source"] = filename
54
- # cleaned_content = clean_document_text(raw_content)
55
- # doc_metadata["field"] = infer_field(cleaned_content, doc_metadata.get("ten_van_ban"))
56
- # doc_metadata["entity_type"] = infer_entity_type(cleaned_content, doc_metadata.get("field", ""))
57
-
58
- # doc_to_split = Document(page_content=cleaned_content, metadata=doc_metadata)
59
- # chunks_from_file = hierarchical_split_law_document(doc_to_split)
60
-
61
- # if not chunks_from_file:
62
- # raise ValueError("File did not yield any chunks after processing.")
63
-
64
- # processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file)
65
-
66
- # weaviate_client = connect_to_weaviate()
67
- # embeddings_model = embedding_model
68
- # collection_name = config.WEAVIATE_COLLECTION_NAME
69
- # create_weaviate_schema_if_not_exists(weaviate_client, collection_name)
70
-
71
- # ingest_chunks_with_native_batching(weaviate_client, collection_name, processed_chunks, embeddings_model)
72
-
73
- # await utils.log_processed_hash(file_hash, filename)
74
- # logger.info(f"✅ Successfully ingested '{filename}'.")
75
- # # shutil.move(filepath, os.path.join(config.PROCESSED_FILES_FOLDER, filename))
76
- # logger.info(f"Moved '{filename}' to processed folder.")
77
- # except Exception as e:
78
- # logger.error(f"❌ FAILED pipeline for '{filename}': {e}", exc_info=True)
79
- # # shutil.move(filepath, os.path.join(config.FAILED_FILES_FOLDER, filename))
80
- # logger.info(f"Moved '{filename}' to failed folder.")
81
- # finally:
82
- # if weaviate_client and weaviate_client.is_connected():
83
- # weaviate_client.close()
84
-
85
-
86
  # --- SỬA LẠI HÀM NÀY ĐỂ NHẬN STREAM ---
87
  def convert_to_text_content(source_stream: BytesIO, original_filename: str) -> str:
88
  """Trích xuất nội dung text từ một stream trong bộ nhớ."""
@@ -147,7 +82,7 @@ async def full_process_and_ingest_pipeline(raw_content: str, filename: str, file
147
  processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file)
148
 
149
  # Giai đoạn 2: Ingest vào Weaviate (I/O-bound và CPU-bound)
150
- weaviate_client = connect_to_weaviate()
151
 
152
 
153
  await run_in_threadpool(create_weaviate_schema_if_not_exists, weaviate_client, config.WEAVIATE_COLLECTION_NAME)
@@ -191,4 +126,4 @@ async def log_failed_process(file_hash: str, filename: str, error_message: str):
191
  # Hàm kiểm tra trùng lặp
192
  async def check_if_hash_exists(file_hash: str) -> bool:
193
  count = await mongo_db.processed_documents.count_documents({"file_hash": file_hash, "status": "SUCCESS"})
194
- return count > 0
 
18
  from rag_components import create_weaviate_schema_if_not_exists, ingest_chunks_with_native_batching
19
  from utils.process_data import hierarchical_split_law_document,extract_document_metadata,clean_document_text,infer_field, infer_entity_type, filter_and_serialize_complex_metadata
20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  # --- SỬA LẠI HÀM NÀY ĐỂ NHẬN STREAM ---
22
  def convert_to_text_content(source_stream: BytesIO, original_filename: str) -> str:
23
  """Trích xuất nội dung text từ một stream trong bộ nhớ."""
 
82
  processed_chunks = filter_and_serialize_complex_metadata(chunks_from_file)
83
 
84
  # Giai đoạn 2: Ingest vào Weaviate (I/O-bound và CPU-bound)
85
+ weaviate_client = connect_to_weaviate(run_diagnostics=False)
86
 
87
 
88
  await run_in_threadpool(create_weaviate_schema_if_not_exists, weaviate_client, config.WEAVIATE_COLLECTION_NAME)
 
126
  # Hàm kiểm tra trùng lặp
127
  async def check_if_hash_exists(file_hash: str) -> bool:
128
  count = await mongo_db.processed_documents.count_documents({"file_hash": file_hash, "status": "SUCCESS"})
129
+ return count > 0
utils/utils.py CHANGED
@@ -7,7 +7,7 @@ from typing import List, Optional
7
  from schemas.chat import Message
8
  from redis.asyncio import Redis
9
  import bcrypt
10
- from datetime import datetime, timedelta, timezone
11
  from jose import jwt
12
  from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
13
  from typing import List, Dict, Optional
@@ -444,7 +444,6 @@ async def get_langchain_chat_history(app_state, chat_id: str) -> RedisChatMessag
444
  # api/utils.py
445
 
446
  import hashlib
447
- import config
448
 
449
  logger = logging.getLogger(__name__)
450
 
@@ -465,23 +464,3 @@ def calculate_file_hash(filepath: str) -> str:
465
  # except IOError as e:
466
  # logger.error(f"Could not read hash log file: {e}")
467
  # return False
468
-
469
- async def check_if_hash_exists(file_hash: str) -> bool:
470
- # Đếm số document có hash tương ứng
471
- count = await mongo_db.processed_documents.count_documents({"file_hash": file_hash})
472
- return count > 0
473
-
474
- async def log_processed_hash(file_hash: str, filename: str):
475
- try:
476
- document_record = {
477
- "file_hash": file_hash, # Hash của file
478
- "original_filename": filename, # Tên file gốc
479
- "processed_at": datetime.now(timezone.utc), # Thời gian xử lý
480
- "status": "SUCCESS",
481
- # Thêm các thông tin khác nếu cần, ví dụ:
482
- # "source_url": "https://url_cua_file_tren_s3_hoac_cloudinary",
483
- # "user_uploader": user_email
484
- }
485
- await mongo_db.processed_documents.insert_one(document_record)
486
- except IOError as e:
487
- logger.error(f"Could not write to hash log file: {e}")
 
7
  from schemas.chat import Message
8
  from redis.asyncio import Redis
9
  import bcrypt
10
+ from datetime import datetime, timedelta
11
  from jose import jwt
12
  from config import SECRET_KEY, ALGORITHM, ACCESS_TOKEN_EXPIRE_MINUTES
13
  from typing import List, Dict, Optional
 
444
  # api/utils.py
445
 
446
  import hashlib
 
447
 
448
  logger = logging.getLogger(__name__)
449
 
 
464
  # except IOError as e:
465
  # logger.error(f"Could not read hash log file: {e}")
466
  # return False