|
import os |
|
import shutil |
|
import uuid |
|
from fastapi import APIRouter, UploadFile, File, Form, HTTPException, BackgroundTasks |
|
from fastapi.responses import JSONResponse |
|
from typing import Optional, List, Dict, Any |
|
|
|
from app.utils.pdf_processor import PDFProcessor |
|
from app.models.pdf_models import PDFResponse, DeleteDocumentRequest, DocumentsListResponse |
|
from app.api.pdf_websocket import ( |
|
send_pdf_upload_started, |
|
send_pdf_upload_progress, |
|
send_pdf_upload_completed, |
|
send_pdf_upload_failed, |
|
send_pdf_delete_started, |
|
send_pdf_delete_completed, |
|
send_pdf_delete_failed |
|
) |
|
|
|
|
|
router = APIRouter( |
|
prefix="/pdf", |
|
tags=["PDF Processing"], |
|
) |
|
|
|
|
|
TEMP_UPLOAD_DIR = "/tmp/uploads/temp" |
|
STORAGE_DIR = "/tmp/uploads/pdfs" |
|
|
|
|
|
os.makedirs(TEMP_UPLOAD_DIR, exist_ok=True) |
|
os.makedirs(STORAGE_DIR, exist_ok=True) |
|
|
|
|
|
@router.post("/upload", response_model=PDFResponse) |
|
async def upload_pdf( |
|
file: UploadFile = File(...), |
|
namespace: str = Form("Default"), |
|
index_name: str = Form("testbot768"), |
|
title: Optional[str] = Form(None), |
|
description: Optional[str] = Form(None), |
|
user_id: Optional[str] = Form(None), |
|
background_tasks: BackgroundTasks = None |
|
): |
|
""" |
|
Upload và xử lý file PDF để tạo embeddings và lưu vào Pinecone |
|
|
|
- **file**: File PDF cần xử lý |
|
- **namespace**: Namespace trong Pinecone để lưu embeddings (mặc định: "Default") |
|
- **index_name**: Tên index Pinecone (mặc định: "testbot768") |
|
- **title**: Tiêu đề của tài liệu (tùy chọn) |
|
- **description**: Mô tả về tài liệu (tùy chọn) |
|
- **user_id**: ID của người dùng để cập nhật trạng thái qua WebSocket |
|
""" |
|
try: |
|
|
|
if not file.filename.lower().endswith('.pdf'): |
|
raise HTTPException(status_code=400, detail="Chỉ chấp nhận file PDF") |
|
|
|
|
|
file_id = str(uuid.uuid4()) |
|
temp_file_path = os.path.join(TEMP_UPLOAD_DIR, f"{file_id}.pdf") |
|
|
|
|
|
if user_id: |
|
await send_pdf_upload_started(user_id, file.filename, file_id) |
|
|
|
|
|
with open(temp_file_path, "wb") as buffer: |
|
shutil.copyfileobj(file.file, buffer) |
|
|
|
|
|
metadata = { |
|
"filename": file.filename, |
|
"content_type": file.content_type |
|
} |
|
|
|
if title: |
|
metadata["title"] = title |
|
if description: |
|
metadata["description"] = description |
|
|
|
|
|
if user_id: |
|
await send_pdf_upload_progress( |
|
user_id, |
|
file_id, |
|
"file_preparation", |
|
0.2, |
|
"File saved, preparing for processing" |
|
) |
|
|
|
|
|
processor = PDFProcessor(index_name=index_name, namespace=namespace) |
|
|
|
|
|
if user_id: |
|
await send_pdf_upload_progress( |
|
user_id, |
|
file_id, |
|
"embedding_start", |
|
0.4, |
|
"Starting to process PDF and create embeddings" |
|
) |
|
|
|
|
|
|
|
async def progress_callback_wrapper(step, progress, message): |
|
if user_id: |
|
await send_progress_update(user_id, file_id, step, progress, message) |
|
|
|
|
|
result = await processor.process_pdf( |
|
file_path=temp_file_path, |
|
document_id=file_id, |
|
metadata=metadata, |
|
progress_callback=progress_callback_wrapper |
|
) |
|
|
|
|
|
if result.get('success'): |
|
storage_path = os.path.join(STORAGE_DIR, f"{file_id}.pdf") |
|
shutil.move(temp_file_path, storage_path) |
|
|
|
|
|
if user_id: |
|
await send_pdf_upload_completed( |
|
user_id, |
|
file_id, |
|
file.filename, |
|
result.get('chunks_processed', 0) |
|
) |
|
else: |
|
|
|
if user_id: |
|
await send_pdf_upload_failed( |
|
user_id, |
|
file_id, |
|
file.filename, |
|
result.get('error', 'Unknown error') |
|
) |
|
|
|
|
|
if os.path.exists(temp_file_path): |
|
os.remove(temp_file_path) |
|
|
|
return result |
|
except Exception as e: |
|
|
|
if 'temp_file_path' in locals() and os.path.exists(temp_file_path): |
|
os.remove(temp_file_path) |
|
|
|
|
|
if 'user_id' in locals() and user_id and 'file_id' in locals(): |
|
await send_pdf_upload_failed( |
|
user_id, |
|
file_id, |
|
file.filename, |
|
str(e) |
|
) |
|
|
|
return PDFResponse( |
|
success=False, |
|
error=str(e) |
|
) |
|
|
|
|
|
async def send_progress_update(user_id, document_id, step, progress, message): |
|
if user_id: |
|
await send_pdf_upload_progress(user_id, document_id, step, progress, message) |
|
|
|
|
|
@router.delete("/namespace", response_model=PDFResponse) |
|
async def delete_namespace( |
|
namespace: str = "Default", |
|
index_name: str = "testbot768", |
|
user_id: Optional[str] = None |
|
): |
|
""" |
|
Xóa toàn bộ embeddings trong một namespace từ Pinecone (tương ứng xoá namespace) |
|
|
|
- **namespace**: Namespace trong Pinecone (mặc định: "Default") |
|
- **index_name**: Tên index Pinecone (mặc định: "testbot768") |
|
- **user_id**: ID của người dùng để cập nhật trạng thái qua WebSocket |
|
""" |
|
try: |
|
|
|
if user_id: |
|
await send_pdf_delete_started(user_id, namespace) |
|
|
|
processor = PDFProcessor(index_name=index_name, namespace=namespace) |
|
result = await processor.delete_namespace() |
|
|
|
|
|
if user_id: |
|
if result.get('success'): |
|
await send_pdf_delete_completed(user_id, namespace) |
|
else: |
|
await send_pdf_delete_failed(user_id, namespace, result.get('error', 'Unknown error')) |
|
|
|
return result |
|
except Exception as e: |
|
|
|
if user_id: |
|
await send_pdf_delete_failed(user_id, namespace, str(e)) |
|
|
|
return PDFResponse( |
|
success=False, |
|
error=str(e) |
|
) |
|
|
|
|
|
@router.get("/documents", response_model=DocumentsListResponse) |
|
async def get_documents(namespace: str = "Default", index_name: str = "testbot768"): |
|
""" |
|
Lấy thông tin về tất cả tài liệu đã được embed |
|
|
|
- **namespace**: Namespace trong Pinecone (mặc định: "Default") |
|
- **index_name**: Tên index Pinecone (mặc định: "testbot768") |
|
""" |
|
try: |
|
|
|
processor = PDFProcessor(index_name=index_name, namespace=namespace) |
|
|
|
|
|
result = await processor.list_documents() |
|
|
|
return result |
|
except Exception as e: |
|
return DocumentsListResponse( |
|
success=False, |
|
error=str(e) |
|
) |