File size: 4,273 Bytes
a6fd1a3
253c2e1
 
 
 
 
 
 
d3e9dc7
253c2e1
 
 
 
a6fd1a3
253c2e1
a6fd1a3
 
 
 
 
253c2e1
 
 
 
 
a6fd1a3
 
 
 
 
 
 
 
253c2e1
 
 
 
a6fd1a3
 
 
 
 
 
 
 
 
253c2e1
a6fd1a3
 
 
 
 
253c2e1
 
 
 
 
a6fd1a3
253c2e1
 
5ac785f
253c2e1
a6fd1a3
 
253c2e1
 
 
 
 
 
 
 
 
 
a6fd1a3
253c2e1
 
 
 
 
 
 
 
a6fd1a3
253c2e1
a6fd1a3
 
253c2e1
 
a6fd1a3
 
 
 
 
 
 
 
253c2e1
a6fd1a3
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import os
import hashlib
from typing import List
from io import BytesIO

from fastapi import APIRouter, UploadFile, File, HTTPException, Depends, BackgroundTasks
from fastapi.concurrency import run_in_threadpool

from services.document_service import  check_if_hash_exists
from services.document_service import full_process_and_ingest_pipeline,convert_to_text_content


from config import ALLOWED_EXTENSIONS
from dependencies import get_current_user
from schemas.user import UserOut
import logging
logger = logging.getLogger(__name__)

router = APIRouter()


def calculate_hash_from_content(content: bytes) -> str:
    """Tính toán SHA256 hash từ nội dung bytes của file."""
    return hashlib.sha256(content).hexdigest()


@router.post("/upload", status_code=202)
async def upload_and_ingest_documents(
    background_tasks: BackgroundTasks,
    current_user: UserOut = Depends(get_current_user),
    files: List[UploadFile] = File(..., description="Một hoặc nhiều file tài liệu cần upload.")
):
    """
    Endpoint đã được tối ưu để upload và xử lý tài liệu.
    - Xử lý file trong bộ nhớ để tránh lỗi permission và tăng tốc.
    - Trích xuất text ngay trong request để báo lỗi sớm (fail-fast).
    - Đẩy các tác vụ xử lý nặng (NLP, embedding, ingest) vào nền.
    """
    if not files:
        raise HTTPException(status_code=400, detail="No files were uploaded.")

    accepted_files = []
    skipped_files = []

    for file in files:
        try:
            # 1. KIỂM TRA ĐỊNH DẠNG
            file_extension = os.path.splitext(file.filename)[1].lower()
            if file_extension not in ALLOWED_EXTENSIONS:
                skipped_files.append({"filename": file.filename, "reason": "Unsupported file type"})
                continue

            # 2. ĐỌC NỘI DUNG FILE VÀO BỘ NHỚ
            file_content = await file.read()
            if not file_content:
                skipped_files.append({"filename": file.filename, "reason": "Empty file"})
                continue

            # 3. TÍNH HASH & KIỂM TRA TRÙNG LẶP
            file_hash = calculate_hash_from_content(file_content)
            if await check_if_hash_exists(file_hash):
                skipped_files.append({"filename": file.filename, "reason": "Duplicate file (content already exists)"})
                continue

            # 4. TRÍCH XUẤT TEXT (FAIL-FAST) - Chạy trong thread pool để không block
            # Hàm convert_to_text_content cần được sửa để nhận BytesIO
            try:
                raw_content = await run_in_threadpool(convert_to_text_content, BytesIO(file_content), file.filename)
                if not raw_content or not raw_content.strip():
                    raise ValueError("Extracted content is empty.")
            except Exception as extraction_error:
                logger.error(f"Failed to extract text from {file.filename}: {extraction_error}")
                skipped_files.append({"filename": file.filename, "reason": f"Failed to extract content: {extraction_error}"})
                continue

            # 5. THÊM TÁC VỤ NỀN VỚI DỮ LIỆU TEXT
            # Chỉ truyền dữ liệu cơ bản, không truyền đối tượng phức tạp
            background_tasks.add_task(
                full_process_and_ingest_pipeline,
                raw_content=raw_content,
                filename=file.filename,
                file_hash=file_hash
            )

            accepted_files.append({"filename": file.filename, "hash": file_hash})

        except Exception as e:
            logger.error(f"Error preparing {file.filename} for background processing: {e}", exc_info=True)
            skipped_files.append({"filename": file.filename, "reason": f"Unexpected server error: {str(e)}"})

    if not accepted_files:
        raise HTTPException(
            status_code=400,
            detail={"message": "No valid new files were accepted for processing.", "skipped_files": skipped_files}
        )

    return {
        "message": f"Request received. Accepted {len(accepted_files)} files for background processing.",
        "accepted_files": accepted_files,
        "skipped_files": skipped_files
    }