Spaces:
Runtime error
Runtime error
import os | |
import hashlib | |
from typing import List | |
from io import BytesIO | |
from fastapi import APIRouter, UploadFile, File, HTTPException, Depends, BackgroundTasks | |
from fastapi.concurrency import run_in_threadpool | |
from services.document_service import check_if_hash_exists | |
from services.document_service import full_process_and_ingest_pipeline,convert_to_text_content | |
from config import ALLOWED_EXTENSIONS | |
from dependencies import get_current_user | |
from schemas.user import UserOut | |
import logging | |
logger = logging.getLogger(__name__) | |
router = APIRouter() | |
def calculate_hash_from_content(content: bytes) -> str: | |
"""Tính toán SHA256 hash từ nội dung bytes của file.""" | |
return hashlib.sha256(content).hexdigest() | |
async def upload_and_ingest_documents( | |
background_tasks: BackgroundTasks, | |
current_user: UserOut = Depends(get_current_user), | |
files: List[UploadFile] = File(..., description="Một hoặc nhiều file tài liệu cần upload.") | |
): | |
""" | |
Endpoint đã được tối ưu để upload và xử lý tài liệu. | |
- Xử lý file trong bộ nhớ để tránh lỗi permission và tăng tốc. | |
- Trích xuất text ngay trong request để báo lỗi sớm (fail-fast). | |
- Đẩy các tác vụ xử lý nặng (NLP, embedding, ingest) vào nền. | |
""" | |
if not files: | |
raise HTTPException(status_code=400, detail="No files were uploaded.") | |
accepted_files = [] | |
skipped_files = [] | |
for file in files: | |
try: | |
# 1. KIỂM TRA ĐỊNH DẠNG | |
file_extension = os.path.splitext(file.filename)[1].lower() | |
if file_extension not in ALLOWED_EXTENSIONS: | |
skipped_files.append({"filename": file.filename, "reason": "Unsupported file type"}) | |
continue | |
# 2. ĐỌC NỘI DUNG FILE VÀO BỘ NHỚ | |
file_content = await file.read() | |
if not file_content: | |
skipped_files.append({"filename": file.filename, "reason": "Empty file"}) | |
continue | |
# 3. TÍNH HASH & KIỂM TRA TRÙNG LẶP | |
file_hash = calculate_hash_from_content(file_content) | |
if await check_if_hash_exists(file_hash): | |
skipped_files.append({"filename": file.filename, "reason": "Duplicate file (content already exists)"}) | |
continue | |
# 4. TRÍCH XUẤT TEXT (FAIL-FAST) - Chạy trong thread pool để không block | |
# Hàm convert_to_text_content cần được sửa để nhận BytesIO | |
try: | |
raw_content = await run_in_threadpool(convert_to_text_content, BytesIO(file_content), file.filename) | |
if not raw_content or not raw_content.strip(): | |
raise ValueError("Extracted content is empty.") | |
except Exception as extraction_error: | |
logger.error(f"Failed to extract text from {file.filename}: {extraction_error}") | |
skipped_files.append({"filename": file.filename, "reason": f"Failed to extract content: {extraction_error}"}) | |
continue | |
# 5. THÊM TÁC VỤ NỀN VỚI DỮ LIỆU TEXT | |
# Chỉ truyền dữ liệu cơ bản, không truyền đối tượng phức tạp | |
background_tasks.add_task( | |
full_process_and_ingest_pipeline, | |
raw_content=raw_content, | |
filename=file.filename, | |
file_hash=file_hash | |
) | |
accepted_files.append({"filename": file.filename, "hash": file_hash}) | |
except Exception as e: | |
logger.error(f"Error preparing {file.filename} for background processing: {e}", exc_info=True) | |
skipped_files.append({"filename": file.filename, "reason": f"Unexpected server error: {str(e)}"}) | |
if not accepted_files: | |
raise HTTPException( | |
status_code=400, | |
detail={"message": "No valid new files were accepted for processing.", "skipped_files": skipped_files} | |
) | |
return { | |
"message": f"Request received. Accepted {len(accepted_files)} files for background processing.", | |
"accepted_files": accepted_files, | |
"skipped_files": skipped_files | |
} |