Spaces:
Sleeping
Sleeping
import os | |
import shutil | |
import uuid | |
import sys | |
import traceback | |
from fastapi import APIRouter, UploadFile, File, Form, HTTPException, BackgroundTasks, Depends, Query | |
from fastapi.responses import JSONResponse | |
from typing import Optional, List, Dict, Any | |
from sqlalchemy.orm import Session | |
import os.path | |
import logging | |
import tempfile | |
import time | |
import json | |
from datetime import datetime | |
from app.utils.pdf_processor import PDFProcessor | |
from app.models.pdf_models import PDFResponse, DeleteDocumentRequest, DocumentsListResponse | |
from app.database.postgresql import get_db | |
from app.database.models import VectorDatabase, Document, VectorStatus, ApiKey, DocumentContent | |
from app.api.pdf_websocket import ( | |
send_pdf_upload_started, | |
send_pdf_upload_progress, | |
send_pdf_upload_completed, | |
send_pdf_upload_failed, | |
send_pdf_delete_started, | |
send_pdf_delete_completed, | |
send_pdf_delete_failed | |
) | |
# Setup logger | |
logger = logging.getLogger(__name__) | |
# Add a stream handler for PDF debug logging | |
pdf_debug_logger = logging.getLogger("pdf_debug_api") | |
pdf_debug_logger.setLevel(logging.DEBUG) | |
# Check if a stream handler already exists, add one if not | |
if not any(isinstance(h, logging.StreamHandler) for h in pdf_debug_logger.handlers): | |
stream_handler = logging.StreamHandler(sys.stdout) | |
stream_handler.setLevel(logging.INFO) | |
pdf_debug_logger.addHandler(stream_handler) | |
# Initialize router | |
router = APIRouter( | |
prefix="/pdf", | |
tags=["PDF Processing"], | |
) | |
# Constants - Use system temp directory instead of creating our own | |
TEMP_UPLOAD_DIR = tempfile.gettempdir() | |
STORAGE_DIR = tempfile.gettempdir() # Also use system temp for storage | |
USE_MOCK_MODE = False # Disabled - using real database with improved connection handling | |
logger.info(f"PDF API starting with USE_MOCK_MODE={USE_MOCK_MODE}") | |
# Helper function to log with timestamp | |
def log_with_timestamp(message: str, level: str = "info", error: Exception = None): | |
"""Add timestamps to log messages and log to the PDF debug logger if available""" | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
full_message = f"{timestamp} - {message}" | |
if level.lower() == "debug": | |
logger.debug(full_message) | |
pdf_debug_logger.debug(full_message) | |
elif level.lower() == "info": | |
logger.info(full_message) | |
pdf_debug_logger.info(full_message) | |
elif level.lower() == "warning": | |
logger.warning(full_message) | |
pdf_debug_logger.warning(full_message) | |
elif level.lower() == "error": | |
logger.error(full_message) | |
pdf_debug_logger.error(full_message) | |
if error: | |
logger.error(traceback.format_exc()) | |
pdf_debug_logger.error(traceback.format_exc()) | |
else: | |
logger.info(full_message) | |
pdf_debug_logger.info(full_message) | |
# Helper function to log debug information during upload | |
def log_upload_debug(correlation_id: str, message: str, error: Exception = None): | |
"""Log detailed debug information about PDF uploads""" | |
pdf_debug_logger.debug(f"[{correlation_id}] {message}") | |
if error: | |
pdf_debug_logger.error(f"[{correlation_id}] Error: {str(error)}") | |
pdf_debug_logger.error(traceback.format_exc()) | |
# Helper function to send progress updates | |
async def send_progress_update(user_id, file_id, step, progress=0.0, message=""): | |
"""Send PDF processing progress updates via WebSocket""" | |
try: | |
await send_pdf_upload_progress(user_id, file_id, step, progress, message) | |
except Exception as e: | |
logger.error(f"Error sending progress update: {e}") | |
logger.error(traceback.format_exc()) | |
# Function with fixed indentation for the troublesome parts | |
async def handle_pdf_processing_result(result, correlation_id, user_id, file_id, filename, document, vector_status, | |
vector_database_id, temp_file_path, db, is_pdf): | |
"""Process the result of PDF processing and update database records""" | |
# If successful, move file to permanent storage | |
if result.get('success'): | |
try: | |
storage_path = os.path.join(STORAGE_DIR, f"{file_id}{'.pdf' if is_pdf else '.txt'}") | |
shutil.move(temp_file_path, storage_path) | |
log_upload_debug(correlation_id, f"Moved file to storage at {storage_path}") | |
except Exception as move_error: | |
log_upload_debug(correlation_id, f"Error moving file to storage: {move_error}", move_error) | |
# Update status in PostgreSQL | |
if vector_database_id and document and vector_status: | |
try: | |
log_upload_debug(correlation_id, f"Updating vector status to 'completed' for document ID {document.id}") | |
# Update the vector status with the result document_id (important for later deletion) | |
result_document_id = result.get('document_id') | |
vector_status.status = "completed" | |
vector_status.embedded_at = datetime.now() | |
# Critical: Store the correct vector ID for future deletion | |
# This can be either the original file_id or the result_document_id | |
if result_document_id and result_document_id != file_id: | |
# If Pinecone returned a specific document_id, use that | |
vector_status.vector_id = result_document_id | |
log_upload_debug(correlation_id, f"Updated vector_id to {result_document_id} (from result)") | |
elif file_id: | |
# Make sure file_id is stored as the vector_id | |
vector_status.vector_id = file_id | |
log_upload_debug(correlation_id, f"Updated vector_id to {file_id} (from file_id)") | |
# Also ensure we store some backup identifiers in case the primary one fails | |
# Store the document name as a secondary identifier | |
vector_status.document_name = document.name | |
log_upload_debug(correlation_id, f"Stored document_name '{document.name}' in vector status for backup") | |
# Mark document as embedded | |
document.is_embedded = True | |
db.commit() | |
log_upload_debug(correlation_id, f"Database status updated successfully") | |
except Exception as db_error: | |
log_upload_debug(correlation_id, f"Error updating database status: {db_error}", db_error) | |
# Send completion notification via WebSocket | |
if user_id: | |
try: | |
await send_pdf_upload_completed( | |
user_id, | |
file_id, | |
filename, | |
result.get('chunks_processed', 0) | |
) | |
log_upload_debug(correlation_id, f"Sent upload completed notification to user {user_id}") | |
except Exception as ws_error: | |
log_upload_debug(correlation_id, f"Error sending WebSocket notification: {ws_error}", ws_error) | |
# Add document information to the result | |
if document: | |
result["document_database_id"] = document.id | |
else: | |
log_upload_debug(correlation_id, f"PDF processing failed: {result.get('error', 'Unknown error')}") | |
# Update error status in PostgreSQL | |
if vector_database_id and document and vector_status: | |
try: | |
log_upload_debug(correlation_id, f"Updating vector status to 'failed' for document ID {document.id}") | |
vector_status.status = "failed" | |
vector_status.error_message = result.get('error', 'Unknown error') | |
db.commit() | |
log_upload_debug(correlation_id, f"Database status updated for failure") | |
except Exception as db_error: | |
log_upload_debug(correlation_id, f"Error updating database status for failure: {db_error}", db_error) | |
# Send failure notification via WebSocket | |
if user_id: | |
try: | |
await send_pdf_upload_failed( | |
user_id, | |
file_id, | |
filename, | |
result.get('error', 'Unknown error') | |
) | |
log_upload_debug(correlation_id, f"Sent upload failed notification to user {user_id}") | |
except Exception as ws_error: | |
log_upload_debug(correlation_id, f"Error sending WebSocket notification: {ws_error}", ws_error) | |
# Cleanup: delete temporary file if it still exists | |
if temp_file_path and os.path.exists(temp_file_path): | |
try: | |
os.remove(temp_file_path) | |
log_upload_debug(correlation_id, f"Removed temporary file {temp_file_path}") | |
except Exception as cleanup_error: | |
log_upload_debug(correlation_id, f"Error removing temporary file: {cleanup_error}", cleanup_error) | |
log_upload_debug(correlation_id, f"Upload request completed with success={result.get('success', False)}") | |
return result | |
# Endpoint for uploading and processing PDFs | |
async def upload_pdf( | |
file: UploadFile = File(...), | |
namespace: str = Form("Default"), | |
index_name: str = Form("testbot768"), | |
title: Optional[str] = Form(None), | |
description: Optional[str] = Form(None), | |
user_id: Optional[str] = Form(None), | |
vector_database_id: Optional[int] = Form(None), | |
content_type: Optional[str] = Form(None), # Add content_type parameter | |
background_tasks: BackgroundTasks = None, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Upload and process PDF file to create embeddings and store in Pinecone | |
- **file**: PDF file to process | |
- **namespace**: Namespace in Pinecone to store embeddings (default: "Default") | |
- **index_name**: Name of Pinecone index (default: "testbot768") | |
- **title**: Document title (optional) | |
- **description**: Document description (optional) | |
- **user_id**: User ID for WebSocket status updates | |
- **vector_database_id**: ID of vector database in PostgreSQL (optional) | |
- **content_type**: Content type of the file (optional) | |
Note: Mock mode has been permanently removed and the system always operates in real mode | |
""" | |
# Generate request ID for tracking | |
correlation_id = str(uuid.uuid4())[:8] | |
logger.info(f"[{correlation_id}] PDF upload request received: ns={namespace}, index={index_name}, user={user_id}") | |
log_upload_debug(correlation_id, f"Upload request: vector_db_id={vector_database_id}") | |
# Variables that might need cleanup in case of error | |
temp_file_path = None | |
document = None | |
vector_status = None | |
try: | |
# Check file type - accept both PDF and plaintext for testing | |
is_pdf = file.filename.lower().endswith('.pdf') | |
is_text = file.filename.lower().endswith(('.txt', '.md', '.html')) | |
log_upload_debug(correlation_id, f"File type check: is_pdf={is_pdf}, is_text={is_text}, filename={file.filename}") | |
if not (is_pdf or is_text): | |
log_upload_debug(correlation_id, f"Rejecting non-PDF file: {file.filename}") | |
raise HTTPException(status_code=400, detail="Only PDF files are accepted") | |
# If vector_database_id provided, get info from PostgreSQL | |
api_key = None | |
vector_db = None | |
if vector_database_id: | |
log_upload_debug(correlation_id, f"Looking up vector database ID {vector_database_id}") | |
vector_db = db.query(VectorDatabase).filter( | |
VectorDatabase.id == vector_database_id, | |
VectorDatabase.status == "active" | |
).first() | |
if not vector_db: | |
log_upload_debug(correlation_id, f"Vector database {vector_database_id} not found or inactive") | |
raise HTTPException(status_code=404, detail="Vector database not found or inactive") | |
log_upload_debug(correlation_id, f"Found vector database: id={vector_db.id}, name={vector_db.name}, index={vector_db.pinecone_index}") | |
# Use vector database information | |
# Try to get API key from relationship | |
log_upload_debug(correlation_id, f"Trying to get API key for vector database {vector_database_id}") | |
# Log available attributes | |
vector_db_attrs = dir(vector_db) | |
log_upload_debug(correlation_id, f"Vector DB attributes: {vector_db_attrs}") | |
if hasattr(vector_db, 'api_key_ref') and vector_db.api_key_ref: | |
log_upload_debug(correlation_id, f"Using API key from relationship for vector database ID {vector_database_id}") | |
log_upload_debug(correlation_id, f"api_key_ref type: {type(vector_db.api_key_ref)}") | |
log_upload_debug(correlation_id, f"api_key_ref attributes: {dir(vector_db.api_key_ref)}") | |
if hasattr(vector_db.api_key_ref, 'key_value'): | |
api_key = vector_db.api_key_ref.key_value | |
# Log first few chars of API key for debugging | |
key_prefix = api_key[:4] + "..." if api_key and len(api_key) > 4 else "invalid/empty" | |
log_upload_debug(correlation_id, f"API key retrieved: {key_prefix}, length: {len(api_key) if api_key else 0}") | |
logger.info(f"[{correlation_id}] Using API key from relationship for vector database ID {vector_database_id}") | |
else: | |
log_upload_debug(correlation_id, f"api_key_ref does not have key_value attribute") | |
elif hasattr(vector_db, 'api_key') and vector_db.api_key: | |
# Fallback to direct api_key if needed (deprecated) | |
api_key = vector_db.api_key | |
key_prefix = api_key[:4] + "..." if api_key and len(api_key) > 4 else "invalid/empty" | |
log_upload_debug(correlation_id, f"Using deprecated direct api_key: {key_prefix}") | |
logger.warning(f"[{correlation_id}] Using deprecated direct api_key for vector database ID {vector_database_id}") | |
else: | |
log_upload_debug(correlation_id, "No API key found in vector database") | |
# Use index from vector database | |
index_name = vector_db.pinecone_index | |
log_upload_debug(correlation_id, f"Using index name '{index_name}' from vector database") | |
logger.info(f"[{correlation_id}] Using index name '{index_name}' from vector database") | |
# Generate file_id and save temporary file | |
file_id = str(uuid.uuid4()) | |
temp_file_path = os.path.join(TEMP_UPLOAD_DIR, f"{file_id}{'.pdf' if is_pdf else '.txt'}") | |
log_upload_debug(correlation_id, f"Generated file_id: {file_id}, temp path: {temp_file_path}") | |
# Send notification of upload start via WebSocket if user_id provided | |
if user_id: | |
try: | |
await send_pdf_upload_started(user_id, file.filename, file_id) | |
log_upload_debug(correlation_id, f"Sent upload started notification to user {user_id}") | |
except Exception as ws_error: | |
log_upload_debug(correlation_id, f"Error sending WebSocket notification: {ws_error}", ws_error) | |
# Save file | |
log_upload_debug(correlation_id, f"Reading file content") | |
file_content = await file.read() | |
log_upload_debug(correlation_id, f"File size: {len(file_content)} bytes") | |
with open(temp_file_path, "wb") as buffer: | |
buffer.write(file_content) | |
log_upload_debug(correlation_id, f"File saved to {temp_file_path}") | |
# Create metadata | |
metadata = { | |
"filename": file.filename, | |
"content_type": file.content_type | |
} | |
# Use provided content_type or fallback to file.content_type | |
actual_content_type = content_type or file.content_type | |
log_upload_debug(correlation_id, f"Using content_type: {actual_content_type}") | |
if not actual_content_type: | |
# Fallback content type based on file extension | |
if is_pdf: | |
actual_content_type = "application/pdf" | |
elif is_text: | |
actual_content_type = "text/plain" | |
else: | |
actual_content_type = "application/octet-stream" | |
log_upload_debug(correlation_id, f"No content_type provided, using fallback: {actual_content_type}") | |
metadata["content_type"] = actual_content_type | |
# Use provided title or filename as document name | |
document_name = title or file.filename | |
# Verify document name is unique within this vector database | |
if vector_database_id: | |
# Check if a document with this name already exists in this vector database | |
existing_doc = db.query(Document).filter( | |
Document.name == document_name, | |
Document.vector_database_id == vector_database_id | |
).first() | |
if existing_doc: | |
# Make the name unique by appending timestamp | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
base_name, extension = os.path.splitext(document_name) | |
document_name = f"{base_name}_{timestamp}{extension}" | |
log_upload_debug(correlation_id, f"Document name already exists, using unique name: {document_name}") | |
metadata["title"] = document_name | |
if description: | |
metadata["description"] = description | |
# Send progress update via WebSocket | |
if user_id: | |
try: | |
await send_progress_update( | |
user_id, | |
file_id, | |
"file_preparation", | |
0.2, | |
"File saved, preparing for processing" | |
) | |
log_upload_debug(correlation_id, f"Sent file preparation progress to user {user_id}") | |
except Exception as ws_error: | |
log_upload_debug(correlation_id, f"Error sending progress update: {ws_error}", ws_error) | |
# Create document record - do this regardless of mock mode | |
document = None | |
vector_status = None | |
if vector_database_id and vector_db: | |
log_upload_debug(correlation_id, f"Creating PostgreSQL records for document with vector_database_id={vector_database_id}") | |
# Create document record without file content | |
try: | |
document = Document( | |
name=document_name, # Use the (potentially) modified document name | |
file_type="pdf" if is_pdf else "text", | |
content_type=actual_content_type, # Use the actual_content_type here | |
size=len(file_content), | |
is_embedded=False, | |
vector_database_id=vector_database_id | |
) | |
db.add(document) | |
db.commit() | |
db.refresh(document) | |
log_upload_debug(correlation_id, f"Created document record: id={document.id}") | |
except Exception as doc_error: | |
log_upload_debug(correlation_id, f"Error creating document record: {doc_error}", doc_error) | |
raise | |
# Create document content record to store binary data separately | |
try: | |
document_content = DocumentContent( | |
document_id=document.id, | |
file_content=file_content | |
) | |
db.add(document_content) | |
db.commit() | |
log_upload_debug(correlation_id, f"Created document content record for document ID {document.id}") | |
except Exception as content_error: | |
log_upload_debug(correlation_id, f"Error creating document content: {content_error}", content_error) | |
raise | |
# Create vector status record - store file_id as the vector_id for deletion later | |
try: | |
vector_status = VectorStatus( | |
document_id=document.id, | |
vector_database_id=vector_database_id, | |
status="pending", | |
vector_id=file_id # Store the document UUID as vector_id for later deletion | |
) | |
db.add(vector_status) | |
db.commit() | |
log_upload_debug(correlation_id, f"Created vector status record for document ID {document.id} with vector_id={file_id}") | |
except Exception as status_error: | |
log_upload_debug(correlation_id, f"Error creating vector status: {status_error}", status_error) | |
raise | |
logger.info(f"[{correlation_id}] Created document ID {document.id} and vector status in PostgreSQL") | |
# Initialize PDF processor with correct parameters | |
log_upload_debug(correlation_id, f"Initializing PDFProcessor: index={index_name}, vector_db_id={vector_database_id}") | |
processor = PDFProcessor( | |
index_name=index_name, | |
namespace=namespace, | |
api_key=api_key, | |
vector_db_id=vector_database_id, | |
correlation_id=correlation_id | |
) | |
# Send embedding start notification via WebSocket | |
if user_id: | |
try: | |
await send_progress_update( | |
user_id, | |
file_id, | |
"embedding_start", | |
0.4, | |
"Starting to process PDF and create embeddings" | |
) | |
log_upload_debug(correlation_id, f"Sent embedding start notification to user {user_id}") | |
except Exception as ws_error: | |
log_upload_debug(correlation_id, f"Error sending WebSocket notification: {ws_error}", ws_error) | |
# Process PDF and create embeddings with progress callback | |
log_upload_debug(correlation_id, f"Processing PDF with file_path={temp_file_path}, document_id={file_id}") | |
result = await processor.process_pdf( | |
file_path=temp_file_path, | |
document_id=file_id, # Use UUID as document_id for Pinecone | |
metadata=metadata, | |
progress_callback=send_progress_update if user_id else None | |
) | |
log_upload_debug(correlation_id, f"PDF processing result: {result}") | |
# Handle PDF processing result | |
return await handle_pdf_processing_result(result, correlation_id, user_id, file_id, file.filename, document, vector_status, | |
vector_database_id, temp_file_path, db, is_pdf) | |
except Exception as e: | |
log_upload_debug(correlation_id, f"Error in upload_pdf: {str(e)}", e) | |
logger.exception(f"[{correlation_id}] Error in upload_pdf: {str(e)}") | |
# Cleanup on error | |
if os.path.exists(temp_file_path): | |
try: | |
os.remove(temp_file_path) | |
log_upload_debug(correlation_id, f"Cleaned up temp file after error: {temp_file_path}") | |
except Exception as cleanup_error: | |
log_upload_debug(correlation_id, f"Error cleaning up temporary file: {cleanup_error}", cleanup_error) | |
# Update error status in PostgreSQL | |
if vector_database_id and vector_status: | |
try: | |
vector_status.status = "failed" | |
vector_status.error_message = str(e) | |
db.commit() | |
log_upload_debug(correlation_id, f"Updated database with error status") | |
except Exception as db_error: | |
log_upload_debug(correlation_id, f"Error updating database with error status: {db_error}", db_error) | |
# Send failure notification via WebSocket | |
if user_id and file_id: | |
try: | |
await send_pdf_upload_failed( | |
user_id, | |
file_id, | |
file.filename, | |
str(e) | |
) | |
log_upload_debug(correlation_id, f"Sent failure notification for exception") | |
except Exception as ws_error: | |
log_upload_debug(correlation_id, f"Error sending WebSocket notification for failure: {ws_error}", ws_error) | |
log_upload_debug(correlation_id, f"Upload request failed with exception: {str(e)}") | |
return PDFResponse( | |
success=False, | |
error=str(e) | |
) | |
# Endpoint xóa tài liệu | |
async def delete_namespace( | |
namespace: str = "Default", | |
index_name: str = "testbot768", | |
vector_database_id: Optional[int] = None, | |
user_id: Optional[str] = None, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Xóa toàn bộ embeddings trong một namespace từ Pinecone (tương ứng xoá namespace) | |
- **namespace**: Namespace trong Pinecone (mặc định: "Default") | |
- **index_name**: Tên index Pinecone (mặc định: "testbot768") | |
- **vector_database_id**: ID của vector database trong PostgreSQL (nếu có) | |
- **user_id**: ID của người dùng để cập nhật trạng thái qua WebSocket | |
""" | |
logger.info(f"Delete namespace request: namespace={namespace}, index={index_name}, vector_db_id={vector_database_id}") | |
try: | |
# Nếu có vector_database_id, lấy thông tin từ PostgreSQL | |
api_key = None | |
vector_db = None | |
if vector_database_id: | |
vector_db = db.query(VectorDatabase).filter( | |
VectorDatabase.id == vector_database_id, | |
VectorDatabase.status == "active" | |
).first() | |
if not vector_db: | |
return PDFResponse( | |
success=False, | |
error=f"Vector database with ID {vector_database_id} not found or inactive" | |
) | |
# Use index from vector database | |
index_name = vector_db.pinecone_index | |
# Get API key | |
if hasattr(vector_db, 'api_key_ref') and vector_db.api_key_ref: | |
api_key = vector_db.api_key_ref.key_value | |
elif hasattr(vector_db, 'api_key') and vector_db.api_key: | |
api_key = vector_db.api_key | |
# Use namespace based on vector database ID | |
namespace = f"vdb-{vector_database_id}" if vector_database_id else namespace | |
logger.info(f"Using namespace '{namespace}' based on vector database ID") | |
# Gửi thông báo bắt đầu xóa qua WebSocket | |
if user_id: | |
await send_pdf_delete_started(user_id, namespace) | |
processor = PDFProcessor( | |
index_name=index_name, | |
namespace=namespace, | |
api_key=api_key, | |
vector_db_id=vector_database_id | |
) | |
result = await processor.delete_namespace() | |
# If successful and vector_database_id, update PostgreSQL to reflect the deletion | |
if result.get('success') and vector_database_id: | |
try: | |
# Update vector statuses for this database | |
affected_count = db.query(VectorStatus).filter( | |
VectorStatus.vector_database_id == vector_database_id, | |
VectorStatus.status != "deleted" | |
).update({"status": "deleted", "updated_at": datetime.now()}) | |
# Update document embedding status | |
db.query(Document).filter( | |
Document.vector_database_id == vector_database_id, | |
Document.is_embedded == True | |
).update({"is_embedded": False}) | |
db.commit() | |
logger.info(f"Updated {affected_count} vector statuses to 'deleted'") | |
# Include this info in the result | |
result["updated_records"] = affected_count | |
except Exception as db_error: | |
logger.error(f"Error updating PostgreSQL records after namespace deletion: {db_error}") | |
result["postgresql_update_error"] = str(db_error) | |
# Gửi thông báo kết quả qua WebSocket | |
if user_id: | |
if result.get('success'): | |
await send_pdf_delete_completed(user_id, namespace) | |
else: | |
await send_pdf_delete_failed(user_id, namespace, result.get('error', 'Unknown error')) | |
return result | |
except Exception as e: | |
logger.exception(f"Error in delete_namespace: {str(e)}") | |
# Gửi thông báo lỗi qua WebSocket | |
if user_id: | |
await send_pdf_delete_failed(user_id, namespace, str(e)) | |
return PDFResponse( | |
success=False, | |
error=str(e) | |
) | |
# Endpoint lấy danh sách tài liệu | |
async def get_documents( | |
namespace: str = "Default", | |
index_name: str = "testbot768", | |
vector_database_id: Optional[int] = None, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Lấy thông tin về tất cả tài liệu đã được embed | |
- **namespace**: Namespace trong Pinecone (mặc định: "Default") | |
- **index_name**: Tên index Pinecone (mặc định: "testbot768") | |
- **vector_database_id**: ID của vector database trong PostgreSQL (nếu có) | |
""" | |
logger.info(f"Get documents request: namespace={namespace}, index={index_name}, vector_db_id={vector_database_id}") | |
try: | |
# Nếu có vector_database_id, lấy thông tin từ PostgreSQL | |
api_key = None | |
vector_db = None | |
if vector_database_id: | |
vector_db = db.query(VectorDatabase).filter( | |
VectorDatabase.id == vector_database_id, | |
VectorDatabase.status == "active" | |
).first() | |
if not vector_db: | |
return DocumentsListResponse( | |
success=False, | |
error=f"Vector database with ID {vector_database_id} not found or inactive" | |
) | |
# Use index from vector database | |
index_name = vector_db.pinecone_index | |
# Get API key | |
if hasattr(vector_db, 'api_key_ref') and vector_db.api_key_ref: | |
api_key = vector_db.api_key_ref.key_value | |
elif hasattr(vector_db, 'api_key') and vector_db.api_key: | |
api_key = vector_db.api_key | |
# Use namespace based on vector database ID | |
namespace = f"vdb-{vector_database_id}" if vector_database_id else namespace | |
logger.info(f"Using namespace '{namespace}' based on vector database ID") | |
# Khởi tạo PDF processor | |
processor = PDFProcessor( | |
index_name=index_name, | |
namespace=namespace, | |
api_key=api_key, | |
vector_db_id=vector_database_id | |
) | |
# Lấy danh sách documents từ Pinecone | |
pinecone_result = await processor.list_documents() | |
# If vector_database_id is provided, also fetch from PostgreSQL | |
if vector_database_id: | |
try: | |
# Get all successfully embedded documents for this vector database | |
documents = db.query(Document).join( | |
VectorStatus, Document.id == VectorStatus.document_id | |
).filter( | |
Document.vector_database_id == vector_database_id, | |
Document.is_embedded == True, | |
VectorStatus.status == "completed" | |
).all() | |
# Add document info to the result | |
if documents: | |
pinecone_result["postgresql_documents"] = [ | |
{ | |
"id": doc.id, | |
"name": doc.name, | |
"file_type": doc.file_type, | |
"content_type": doc.content_type, | |
"created_at": doc.created_at.isoformat() if doc.created_at else None | |
} | |
for doc in documents | |
] | |
pinecone_result["postgresql_document_count"] = len(documents) | |
except Exception as db_error: | |
logger.error(f"Error fetching PostgreSQL documents: {db_error}") | |
pinecone_result["postgresql_error"] = str(db_error) | |
return pinecone_result | |
except Exception as e: | |
logger.exception(f"Error in get_documents: {str(e)}") | |
return DocumentsListResponse( | |
success=False, | |
error=str(e) | |
) | |
# Health check endpoint for PDF API | |
async def health_check(): | |
return { | |
"status": "healthy", | |
"version": "1.0.0", | |
"message": "PDF API is running" | |
} | |
# Document deletion endpoint | |
async def delete_document( | |
document_id: str, | |
namespace: str = "Default", | |
index_name: str = "testbot768", | |
vector_database_id: Optional[int] = None, | |
user_id: Optional[str] = None, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete vectors for a specific document from the vector database | |
This endpoint can be called in two ways: | |
1. With the PostgreSQL document ID - will look up the actual vector_id first | |
2. With the actual vector_id directly - when called from the PostgreSQL document deletion endpoint | |
- **document_id**: ID of the document to delete (can be PostgreSQL document ID or Pinecone vector_id) | |
- **namespace**: Namespace in the vector database (default: "Default") | |
- **index_name**: Name of the vector index (default: "testbot768") | |
- **vector_database_id**: ID of vector database in PostgreSQL (optional) | |
- **user_id**: User ID for WebSocket status updates (optional) | |
""" | |
logger.info(f"Delete document request: document_id={document_id}, namespace={namespace}, index={index_name}, vector_db_id={vector_database_id}") | |
try: | |
# If vector_database_id is provided, get info from PostgreSQL | |
api_key = None | |
vector_db = None | |
pinecone_document_id = document_id # Default to the provided document_id | |
document_to_delete = None | |
vector_status_to_update = None | |
document_found = False # Flag to track if document was found | |
vector_id_found = False # Flag to track if a valid vector ID was found | |
if vector_database_id: | |
vector_db = db.query(VectorDatabase).filter( | |
VectorDatabase.id == vector_database_id, | |
VectorDatabase.status == "active" | |
).first() | |
if not vector_db: | |
return PDFResponse( | |
success=False, | |
error=f"Vector database with ID {vector_database_id} not found or inactive" | |
) | |
# Use index from vector database | |
index_name = vector_db.pinecone_index | |
# Get API key | |
if hasattr(vector_db, 'api_key_ref') and vector_db.api_key_ref: | |
api_key = vector_db.api_key_ref.key_value | |
elif hasattr(vector_db, 'api_key') and vector_db.api_key: | |
api_key = vector_db.api_key | |
# Use namespace based on vector database ID | |
namespace = f"vdb-{vector_database_id}" if vector_database_id else namespace | |
logger.info(f"Using namespace '{namespace}' based on vector database ID") | |
# Check if document_id is a numeric database ID or document name | |
if document_id.isdigit(): | |
# Try to find the document in PostgreSQL by its ID | |
db_document_id = int(document_id) | |
document_to_delete = db.query(Document).filter(Document.id == db_document_id).first() | |
if document_to_delete: | |
document_found = True | |
logger.info(f"Found document in database: id={document_to_delete.id}, name={document_to_delete.name}") | |
# Look for vector status to find the Pinecone vector_id | |
vector_status_to_update = db.query(VectorStatus).filter( | |
VectorStatus.document_id == document_to_delete.id, | |
VectorStatus.vector_database_id == vector_database_id | |
).first() | |
if vector_status_to_update and vector_status_to_update.vector_id: | |
pinecone_document_id = vector_status_to_update.vector_id | |
vector_id_found = True | |
logger.info(f"Using vector_id '{pinecone_document_id}' from vector status") | |
else: | |
# Fallback options if vector_id is not directly found | |
pinecone_document_id = document_to_delete.name | |
logger.info(f"Vector ID not found in status, using document name '{pinecone_document_id}' as fallback") | |
else: | |
logger.warning(f"Document with ID {db_document_id} not found in database. Using ID as is.") | |
else: | |
# Try to find document by name/title | |
document_to_delete = db.query(Document).filter( | |
Document.name == document_id, | |
Document.vector_database_id == vector_database_id | |
).first() | |
if document_to_delete: | |
document_found = True | |
logger.info(f"Found document by name: id={document_to_delete.id}, name={document_to_delete.name}") | |
# Get vector status for this document | |
vector_status_to_update = db.query(VectorStatus).filter( | |
VectorStatus.document_id == document_to_delete.id, | |
VectorStatus.vector_database_id == vector_database_id | |
).first() | |
if vector_status_to_update and vector_status_to_update.vector_id: | |
pinecone_document_id = vector_status_to_update.vector_id | |
vector_id_found = True | |
logger.info(f"Using vector_id '{pinecone_document_id}' from vector status") | |
# Send notification of deletion start via WebSocket if user_id provided | |
if user_id: | |
try: | |
await send_pdf_delete_started(user_id, pinecone_document_id) | |
except Exception as ws_error: | |
logger.error(f"Error sending WebSocket notification: {ws_error}") | |
# Initialize PDF processor | |
processor = PDFProcessor( | |
index_name=index_name, | |
namespace=namespace, | |
api_key=api_key, | |
vector_db_id=vector_database_id | |
) | |
# Delete document vectors using the pinecone_document_id and additional metadata | |
additional_metadata = {} | |
if document_to_delete: | |
# Add document name as title for searching | |
additional_metadata["document_name"] = document_to_delete.name | |
result = await processor.delete_document(pinecone_document_id, additional_metadata) | |
# Check if vectors were actually deleted or found | |
vectors_deleted = result.get('vectors_deleted', 0) | |
vectors_found = result.get('vectors_found', False) | |
# If no document was found in PostgreSQL and no vectors were found/deleted in Pinecone | |
if not document_found and not vectors_found: | |
result['success'] = False # Override success to false | |
result['error'] = f"Document ID {document_id} not found in PostgreSQL or Pinecone" | |
# Send notification of deletion failure via WebSocket if user_id provided | |
if user_id: | |
try: | |
await send_pdf_delete_failed(user_id, document_id, result['error']) | |
except Exception as ws_error: | |
logger.error(f"Error sending WebSocket notification: {ws_error}") | |
return result | |
# If successful and vector_database_id is provided, update PostgreSQL records | |
if result.get('success') and vector_database_id: | |
try: | |
# Update vector status if we found it earlier | |
if vector_status_to_update: | |
vector_status_to_update.status = "deleted" | |
db.commit() | |
result["postgresql_updated"] = True | |
logger.info(f"Updated vector status for document ID {document_to_delete.id if document_to_delete else document_id} to 'deleted'") | |
else: | |
# If we didn't find it earlier, try again with more search options | |
document = None | |
if document_id.isdigit(): | |
# If the original document_id was numeric, use it directly | |
document = db.query(Document).filter(Document.id == int(document_id)).first() | |
if not document: | |
# Find document by vector ID if it exists | |
document = db.query(Document).join( | |
VectorStatus, Document.id == VectorStatus.document_id | |
).filter( | |
Document.vector_database_id == vector_database_id, | |
VectorStatus.vector_id == pinecone_document_id | |
).first() | |
if not document: | |
# Try finding by name | |
document = db.query(Document).filter( | |
Document.vector_database_id == vector_database_id, | |
Document.name == pinecone_document_id | |
).first() | |
if document: | |
# Update vector status | |
vector_status = db.query(VectorStatus).filter( | |
VectorStatus.document_id == document.id, | |
VectorStatus.vector_database_id == vector_database_id | |
).first() | |
if vector_status: | |
vector_status.status = "deleted" | |
db.commit() | |
result["postgresql_updated"] = True | |
logger.info(f"Updated vector status for document ID {document.id} to 'deleted'") | |
else: | |
logger.warning(f"Could not find document record for deletion confirmation. Document ID: {document_id}, Vector ID: {pinecone_document_id}") | |
except Exception as db_error: | |
logger.error(f"Error updating PostgreSQL records: {db_error}") | |
result["postgresql_error"] = str(db_error) | |
# Add information about what was found and deleted | |
result["document_found_in_db"] = document_found | |
result["vector_id_found"] = vector_id_found | |
result["vectors_deleted"] = vectors_deleted | |
# Send notification of deletion completion via WebSocket if user_id provided | |
if user_id: | |
try: | |
if result.get('success'): | |
await send_pdf_delete_completed(user_id, pinecone_document_id) | |
else: | |
await send_pdf_delete_failed(user_id, pinecone_document_id, result.get('error', 'Unknown error')) | |
except Exception as ws_error: | |
logger.error(f"Error sending WebSocket notification: {ws_error}") | |
return result | |
except Exception as e: | |
logger.exception(f"Error in delete_document: {str(e)}") | |
# Send notification of deletion failure via WebSocket if user_id provided | |
if user_id: | |
try: | |
await send_pdf_delete_failed(user_id, document_id, str(e)) | |
except Exception as ws_error: | |
logger.error(f"Error sending WebSocket notification: {ws_error}") | |
return PDFResponse( | |
success=False, | |
error=str(e) | |
) | |