diff --git "a/app/api/postgresql_routes.py" "b/app/api/postgresql_routes.py" new file mode 100644--- /dev/null +++ "b/app/api/postgresql_routes.py" @@ -0,0 +1,4029 @@ +import logging +import json +import traceback +from datetime import datetime, timedelta, timezone +import time +from functools import lru_cache +from pathlib import Path as pathlib_Path # Import Path from pathlib with a different name + +from fastapi import APIRouter, HTTPException, Depends, Query, Body, Response, File, UploadFile, Form, BackgroundTasks +from fastapi.params import Path # Import Path explicitly from fastapi.params instead +from sqlalchemy.orm import Session +from sqlalchemy.exc import SQLAlchemyError +from typing import List, Optional, Dict, Any +import logging +import traceback +from datetime import datetime +from sqlalchemy import text, inspect, func +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy import desc, func +from cachetools import TTLCache +import uuid +import asyncio +import httpx # Import httpx for HTTP requests + +from app.database.postgresql import get_db +from app.database.models import FAQItem, EmergencyItem, EventItem, AboutPixity, SolanaSummit, DaNangBucketList, ApiKey, VectorDatabase, Document, VectorStatus, TelegramBot, ChatEngine, BotEngine, EngineVectorDb, DocumentContent +from pydantic import BaseModel, Field, ConfigDict + +# Configure logging +logger = logging.getLogger(__name__) + +# Create router +router = APIRouter( + prefix="/postgres", + tags=["PostgreSQL"], +) + +# Initialize caches for frequently used data +# Cache for 5 minutes (300 seconds) +faqs_cache = TTLCache(maxsize=1, ttl=300) +emergencies_cache = TTLCache(maxsize=1, ttl=300) +events_cache = TTLCache(maxsize=10, ttl=300) # Cache for different page sizes +about_pixity_cache = TTLCache(maxsize=1, ttl=300) +solana_summit_cache = TTLCache(maxsize=1, ttl=300) +danang_bucket_list_cache = TTLCache(maxsize=1, ttl=300) + +# --- Pydantic models for request/response --- + +# Information models +class InfoContentBase(BaseModel): + content: str + +class InfoContentCreate(InfoContentBase): + pass + +class InfoContentUpdate(InfoContentBase): + pass + +class InfoContentResponse(InfoContentBase): + id: int + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + +# FAQ models +class FAQBase(BaseModel): + question: str + answer: str + is_active: bool = True + +class FAQCreate(FAQBase): + pass + +class FAQUpdate(BaseModel): + question: Optional[str] = None + answer: Optional[str] = None + is_active: Optional[bool] = None + +class FAQResponse(FAQBase): + id: int + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + +# Emergency contact models +class EmergencyBase(BaseModel): + name: str + phone_number: str + description: Optional[str] = None + address: Optional[str] = None + location: Optional[str] = None + priority: int = 0 + is_active: bool = True + section: Optional[str] = None + section_id: Optional[int] = None + +class EmergencyCreate(EmergencyBase): + pass + +class EmergencyUpdate(BaseModel): + name: Optional[str] = None + phone_number: Optional[str] = None + description: Optional[str] = None + address: Optional[str] = None + location: Optional[str] = None + priority: Optional[int] = None + is_active: Optional[bool] = None + section: Optional[str] = None + section_id: Optional[int] = None + +class EmergencyResponse(EmergencyBase): + id: int + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + +# Event models +class EventBase(BaseModel): + name: str + description: str + address: str + location: Optional[str] = None + date_start: datetime + date_end: Optional[datetime] = None + price: Optional[List[dict]] = None + url: Optional[str] = None + is_active: bool = True + featured: bool = False + +class EventCreate(EventBase): + pass + +class EventUpdate(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + address: Optional[str] = None + location: Optional[str] = None + date_start: Optional[datetime] = None + date_end: Optional[datetime] = None + price: Optional[List[dict]] = None + url: Optional[str] = None + is_active: Optional[bool] = None + featured: Optional[bool] = None + +class EventResponse(EventBase): + id: int + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + +# --- Batch operations for better performance --- + +class BatchEventCreate(BaseModel): + events: List[EventCreate] + +class BatchUpdateResult(BaseModel): + success_count: int + failed_ids: List[int] = [] + message: str + +# --- FAQ endpoints --- + +@router.get("/faq", response_model=List[FAQResponse]) +async def get_faqs( + skip: int = 0, + limit: int = 100, + active_only: bool = False, + use_cache: bool = True, + db: Session = Depends(get_db) +): + """ + Get all FAQ items. + + - **skip**: Number of items to skip + - **limit**: Maximum number of items to return + - **active_only**: If true, only return active items + - **use_cache**: If true, use cached results when available + """ + try: + # Generate cache key based on query parameters + cache_key = f"faqs_{skip}_{limit}_{active_only}" + + # Try to get from cache if caching is enabled + if use_cache: + cached_result = faqs_cache.get(cache_key) + if cached_result: + logger.info(f"Cache hit for {cache_key}") + return cached_result + + # Build query directly without excessive logging or inspection + query = db.query(FAQItem) + + # Add filter if needed + if active_only: + query = query.filter(FAQItem.is_active == True) + + # Get total count for pagination + count_query = query.with_entities(func.count(FAQItem.id)) + total_count = count_query.scalar() + + # Execute query with pagination + faqs = query.offset(skip).limit(limit).all() + + # Convert to Pydantic models + result = [FAQResponse.model_validate(faq, from_attributes=True) for faq in faqs] + + # Store in cache if caching is enabled + if use_cache: + faqs_cache[cache_key] = result + + return result + except SQLAlchemyError as e: + logger.error(f"Database error in get_faqs: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + logger.error(f"Unexpected error in get_faqs: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") + +@router.post("/faq", response_model=FAQResponse) +async def create_faq( + faq: FAQCreate, + db: Session = Depends(get_db) +): + """ + Create a new FAQ item. + + - **question**: Question text + - **answer**: Answer text + - **is_active**: Whether the FAQ is active (default: True) + """ + try: + # Create new FAQ item + db_faq = FAQItem(**faq.model_dump()) + db.add(db_faq) + db.commit() + db.refresh(db_faq) + + # Invalidate FAQ cache after creating a new item + faqs_cache.clear() + + # Convert to Pydantic model + return FAQResponse.model_validate(db_faq, from_attributes=True) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in create_faq: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.get("/faq/{faq_id}", response_model=FAQResponse) +async def get_faq( + faq_id: int = Path(..., gt=0), + use_cache: bool = True, + db: Session = Depends(get_db) +): + """ + Get a single FAQ item by ID. + + - **faq_id**: ID of the FAQ item + - **use_cache**: If true, use cached results when available + """ + try: + # Generate cache key + cache_key = f"faq_{faq_id}" + + # Try to get from cache if caching is enabled + if use_cache: + cached_result = faqs_cache.get(cache_key) + if cached_result: + logger.info(f"Cache hit for {cache_key}") + return cached_result + + # Use direct SQL query for better performance on single item lookup + stmt = text("SELECT * FROM faq_item WHERE id = :id") + result = db.execute(stmt, {"id": faq_id}).fetchone() + + if not result: + raise HTTPException(status_code=404, detail="FAQ item not found") + + # Create a FAQItem model instance manually + faq = FAQItem() + for key, value in result._mapping.items(): + if hasattr(faq, key): + setattr(faq, key, value) + + # Convert to Pydantic model + response = FAQResponse.model_validate(faq, from_attributes=True) + + # Store in cache if caching is enabled + if use_cache: + faqs_cache[cache_key] = response + + return response + except SQLAlchemyError as e: + logger.error(f"Database error in get_faq: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.put("/faq/{faq_id}", response_model=FAQResponse) +async def update_faq( + faq_id: int = Path(..., gt=0), + faq_update: FAQUpdate = Body(...), + db: Session = Depends(get_db) +): + """ + Update an existing FAQ item. + + - **faq_id**: ID of the FAQ item to update + - **question**: Updated question text (optional) + - **answer**: Updated answer text (optional) + - **is_active**: Updated active status (optional) + """ + try: + # Check if FAQ exists + faq = db.query(FAQItem).filter(FAQItem.id == faq_id).first() + if faq is None: + raise HTTPException(status_code=404, detail=f"FAQ with ID {faq_id} not found") + + # Update fields with optimized dict handling + update_data = faq_update.model_dump(exclude_unset=True) + for key, value in update_data.items(): + setattr(faq, key, value) + + # Commit changes + db.commit() + db.refresh(faq) + + # Invalidate specific cache entries + faqs_cache.delete(f"faq_{faq_id}") + faqs_cache.clear() # Clear all list caches + + # Convert to Pydantic model + return FAQResponse.model_validate(faq, from_attributes=True) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in update_faq: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.delete("/faq/{faq_id}", response_model=dict) +async def delete_faq( + faq_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Delete an FAQ item. + + - **faq_id**: ID of the FAQ item to delete + """ + try: + # Use optimized query with proper error handling + result = db.execute( + text("DELETE FROM faq_item WHERE id = :id RETURNING id"), + {"id": faq_id} + ).fetchone() + + if not result: + raise HTTPException(status_code=404, detail="FAQ item not found") + + db.commit() + + # Invalidate cache entries + faqs_cache.delete(f"faq_{faq_id}") + faqs_cache.clear() # Clear all list caches + + return {"status": "success", "message": f"FAQ item {faq_id} deleted"} + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in delete_faq: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +# --- Emergency contact endpoints --- + +@router.get("/emergency", response_model=List[EmergencyResponse]) +async def get_emergency_contacts( + skip: int = 0, + limit: int = 100, + active_only: bool = False, + section: Optional[str] = None, + use_cache: bool = True, + db: Session = Depends(get_db) +): + """ + Get all emergency contacts. + + - **skip**: Number of items to skip + - **limit**: Maximum number of items to return + - **active_only**: If true, only return active items + - **section**: Filter by section (16.1, 16.2.1, 16.2.2, 16.3) + - **use_cache**: If true, use cached results when available + """ + try: + # Generate cache key based on query parameters + cache_key = f"emergency_{skip}_{limit}_{active_only}_{section}" + + # Try to get from cache if caching is enabled + if use_cache: + cached_result = emergencies_cache.get(cache_key) + if cached_result: + logger.info(f"Cache hit for {cache_key}") + return cached_result + + # Build query directly without excessive inspection and logging + query = db.query(EmergencyItem) + + # Add filters if needed + if active_only: + query = query.filter(EmergencyItem.is_active == True) + + if section: + query = query.filter(EmergencyItem.section == section) + + # Get total count for pagination info + count_query = query.with_entities(func.count(EmergencyItem.id)) + total_count = count_query.scalar() + + # Order by priority for proper sorting + emergency_contacts = query.order_by(EmergencyItem.priority.desc()).offset(skip).limit(limit).all() + + # Convert to Pydantic models efficiently + result = [EmergencyResponse.model_validate(contact, from_attributes=True) for contact in emergency_contacts] + + # Store in cache if caching is enabled + if use_cache: + emergencies_cache[cache_key] = result + + return result + except SQLAlchemyError as e: + logger.error(f"Database error in get_emergency_contacts: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + logger.error(f"Unexpected error in get_emergency_contacts: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") + +@router.get("/emergency/sections", response_model=List[Dict[str, Any]]) +async def get_emergency_sections( + use_cache: bool = True, + db: Session = Depends(get_db) +): + """ + Get all available emergency sections. + + Returns a list of section information including ID and name. + """ + try: + # Generate cache key + cache_key = "emergency_sections" + + # Try to get from cache if caching is enabled + if use_cache: + cached_result = emergencies_cache.get(cache_key) + if cached_result: + logger.info(f"Cache hit for {cache_key}") + return cached_result + + # Query distinct sections with their IDs + stmt = text(""" + SELECT DISTINCT section_id, section + FROM emergency_item + WHERE section IS NOT NULL + ORDER BY section_id + """) + result = db.execute(stmt) + + # Extract section info + sections = [{"id": row[0], "name": row[1]} for row in result] + + # Store in cache if caching is enabled + if use_cache: + emergencies_cache[cache_key] = sections + + return sections + except SQLAlchemyError as e: + logger.error(f"Database error in get_emergency_sections: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + logger.error(f"Unexpected error in get_emergency_sections: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") + +@router.get("/emergency/section/{section_id}", response_model=List[EmergencyResponse]) +async def get_emergency_contacts_by_section_id( + section_id: int = Path(..., description="Section ID (1, 2, 3, or 4)"), + active_only: bool = True, + use_cache: bool = True, + db: Session = Depends(get_db) +): + """ + Get emergency contacts for a specific section ID. + + - **section_id**: Section ID (1: Tourist support, 2: Emergency numbers, 3: Emergency situations, 4: Tourist scams) + - **active_only**: If true, only return active items + - **use_cache**: If true, use cached results when available + """ + try: + # Generate cache key based on query parameters + cache_key = f"emergency_section_id_{section_id}_{active_only}" + + # Try to get from cache if caching is enabled + if use_cache: + cached_result = emergencies_cache.get(cache_key) + if cached_result: + logger.info(f"Cache hit for {cache_key}") + return cached_result + + # Build query + query = db.query(EmergencyItem).filter(EmergencyItem.section_id == section_id) + + # Add active filter if needed + if active_only: + query = query.filter(EmergencyItem.is_active == True) + + # Order by priority for proper sorting + emergency_contacts = query.order_by(EmergencyItem.priority.desc()).all() + + # Convert to Pydantic models efficiently + result = [EmergencyResponse.model_validate(contact, from_attributes=True) for contact in emergency_contacts] + + # Store in cache if caching is enabled + if use_cache: + emergencies_cache[cache_key] = result + + return result + except SQLAlchemyError as e: + logger.error(f"Database error in get_emergency_contacts_by_section_id: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + logger.error(f"Unexpected error in get_emergency_contacts_by_section_id: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") + +@router.post("/emergency", response_model=EmergencyResponse) +async def create_emergency_contact( + emergency: EmergencyCreate, + db: Session = Depends(get_db) +): + """ + Create a new emergency contact. + + - **name**: Contact name + - **phone_number**: Phone number + - **description**: Description (optional) + - **address**: Address (optional) + - **location**: Location coordinates (optional) + - **priority**: Priority order (default: 0) + - **is_active**: Whether the contact is active (default: True) + """ + try: + db_emergency = EmergencyItem(**emergency.model_dump()) + db.add(db_emergency) + db.commit() + db.refresh(db_emergency) + + # Invalidate emergency cache after creating a new item + emergencies_cache.clear() + + # Convert SQLAlchemy model to Pydantic model before returning + result = EmergencyResponse.model_validate(db_emergency, from_attributes=True) + return result + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in create_emergency_contact: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.get("/emergency/{emergency_id}", response_model=EmergencyResponse) +async def get_emergency_contact( + emergency_id: int = Path(..., gt=0), + use_cache: bool = True, + db: Session = Depends(get_db) +): + """ + Get a single emergency contact by ID. + + - **emergency_id**: ID of the emergency contact + - **use_cache**: If true, use cached results when available + """ + try: + # Generate cache key + cache_key = f"emergency_{emergency_id}" + + # Try to get from cache if caching is enabled + if use_cache: + cached_result = emergencies_cache.get(cache_key) + if cached_result: + logger.info(f"Cache hit for {cache_key}") + return cached_result + + # Use direct SQL query for better performance on single item lookup + stmt = text("SELECT * FROM emergency_item WHERE id = :id") + result = db.execute(stmt, {"id": emergency_id}).fetchone() + + if not result: + raise HTTPException(status_code=404, detail="Emergency contact not found") + + # Create an EmergencyItem model instance manually + emergency = EmergencyItem() + for key, value in result._mapping.items(): + if hasattr(emergency, key): + setattr(emergency, key, value) + + # Convert to Pydantic model + response = EmergencyResponse.model_validate(emergency, from_attributes=True) + + # Store in cache if caching is enabled + if use_cache: + emergencies_cache[cache_key] = response + + return response + except SQLAlchemyError as e: + logger.error(f"Database error in get_emergency_contact: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.put("/emergency/{emergency_id}", response_model=EmergencyResponse) +async def update_emergency_contact( + emergency_id: int = Path(..., gt=0), + emergency_update: EmergencyUpdate = Body(...), + db: Session = Depends(get_db) +): + """ + Update a specific emergency contact. + + - **emergency_id**: ID of the emergency contact to update + - **name**: New name (optional) + - **phone_number**: New phone number (optional) + - **description**: New description (optional) + - **address**: New address (optional) + - **location**: New location coordinates (optional) + - **priority**: New priority order (optional) + - **is_active**: New active status (optional) + """ + try: + emergency = db.query(EmergencyItem).filter(EmergencyItem.id == emergency_id).first() + if not emergency: + raise HTTPException(status_code=404, detail="Emergency contact not found") + + # Update fields if provided + update_data = emergency_update.model_dump(exclude_unset=True) + for key, value in update_data.items(): + setattr(emergency, key, value) + + db.commit() + db.refresh(emergency) + + # Invalidate specific cache entries + emergencies_cache.delete(f"emergency_{emergency_id}") + emergencies_cache.clear() # Clear all list caches + + # Convert to Pydantic model + return EmergencyResponse.model_validate(emergency, from_attributes=True) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in update_emergency_contact: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.delete("/emergency/{emergency_id}", response_model=dict) +async def delete_emergency_contact( + emergency_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Delete a specific emergency contact. + + - **emergency_id**: ID of the emergency contact to delete + """ + try: + # Use optimized direct SQL with RETURNING for better performance + result = db.execute( + text("DELETE FROM emergency_item WHERE id = :id RETURNING id"), + {"id": emergency_id} + ).fetchone() + + if not result: + raise HTTPException(status_code=404, detail="Emergency contact not found") + + db.commit() + + # Invalidate cache entries + emergencies_cache.delete(f"emergency_{emergency_id}") + emergencies_cache.clear() # Clear all list caches + + return {"status": "success", "message": f"Emergency contact {emergency_id} deleted"} + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in delete_emergency_contact: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.put("/emergency/batch-update-status", response_model=BatchUpdateResult) +async def batch_update_emergency_status( + emergency_ids: List[int] = Body(..., embed=True), + is_active: bool = Body(..., embed=True), + db: Session = Depends(get_db) +): + """ + Update the active status of multiple emergency contacts at once. + + This is much more efficient than updating emergency contacts one at a time. + """ + try: + if not emergency_ids: + raise HTTPException(status_code=400, detail="No emergency contact IDs provided") + + # Prepare the update statement + stmt = text(""" + UPDATE emergency_item + SET is_active = :is_active, updated_at = NOW() + WHERE id = ANY(:emergency_ids) + RETURNING id + """) + + # Execute the update in a single query + result = db.execute(stmt, {"is_active": is_active, "emergency_ids": emergency_ids}) + updated_ids = [row[0] for row in result] + + # Commit the transaction + db.commit() + + # Determine which IDs weren't found + failed_ids = [id for id in emergency_ids if id not in updated_ids] + + # Invalidate emergency cache + emergencies_cache.clear() + + return BatchUpdateResult( + success_count=len(updated_ids), + failed_ids=failed_ids, + message=f"Updated {len(updated_ids)} emergency contacts" if updated_ids else "No emergency contacts were updated" + ) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in batch_update_emergency_status: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.delete("/emergency/batch", response_model=BatchUpdateResult) +async def batch_delete_emergency_contacts( + emergency_ids: List[int] = Body(..., embed=True), + db: Session = Depends(get_db) +): + """ + Delete multiple emergency contacts at once. + + This is much more efficient than deleting emergency contacts one at a time with separate API calls. + """ + try: + if not emergency_ids: + raise HTTPException(status_code=400, detail="No emergency contact IDs provided") + + # Prepare and execute the delete statement with RETURNING to get deleted IDs + stmt = text(""" + DELETE FROM emergency_item + WHERE id = ANY(:emergency_ids) + RETURNING id + """) + + result = db.execute(stmt, {"emergency_ids": emergency_ids}) + deleted_ids = [row[0] for row in result] + + # Commit the transaction + db.commit() + + # Determine which IDs weren't found + failed_ids = [id for id in emergency_ids if id not in deleted_ids] + + # Invalidate emergency cache + emergencies_cache.clear() + + return BatchUpdateResult( + success_count=len(deleted_ids), + failed_ids=failed_ids, + message=f"Deleted {len(deleted_ids)} emergency contacts" if deleted_ids else "No emergency contacts were deleted" + ) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in batch_delete_emergency_contacts: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +# --- Event endpoints --- + +@router.get("/events", response_model=List[EventResponse]) +async def get_events( + skip: int = 0, + limit: int = 100, + active_only: bool = False, + featured_only: bool = False, + use_cache: bool = True, + db: Session = Depends(get_db) +): + """ + Get all events. + + - **skip**: Number of items to skip + - **limit**: Maximum number of items to return + - **active_only**: If true, only return active items + - **featured_only**: If true, only return featured items + - **use_cache**: If true, use cached results when available + """ + try: + # Generate cache key based on query parameters + cache_key = f"events_{skip}_{limit}_{active_only}_{featured_only}" + + # Try to get from cache if caching is enabled + if use_cache: + cached_result = events_cache.get(cache_key) + if cached_result: + return cached_result + + # Build query directly without excessive inspection and logging + query = db.query(EventItem) + + # Add filters if needed + if active_only: + query = query.filter(EventItem.is_active == True) + if featured_only: + query = query.filter(EventItem.featured == True) + + # To improve performance, first fetch just IDs with COUNT + count_query = query.with_entities(func.count(EventItem.id)) + total_count = count_query.scalar() + + # Now get the actual data with pagination + events = query.order_by(EventItem.date_start.desc()).offset(skip).limit(limit).all() + + # Convert to Pydantic models efficiently + result = [EventResponse.model_validate(event, from_attributes=True) for event in events] + + # Store in cache if caching is enabled (30 seconds TTL for events list) + if use_cache: + events_cache[cache_key] = result + + return result + except SQLAlchemyError as e: + logger.error(f"Database error in get_events: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + logger.error(f"Unexpected error in get_events: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") + +@router.post("/events", response_model=EventResponse) +async def create_event( + event: EventCreate, + db: Session = Depends(get_db) +): + """ + Create a new event. + + - **name**: Event name + - **description**: Event description + - **address**: Event address + - **location**: Location coordinates (optional) + - **date_start**: Start date and time + - **date_end**: End date and time (optional) + - **price**: Price information (optional JSON object) + - **is_active**: Whether the event is active (default: True) + - **featured**: Whether the event is featured (default: False) + """ + try: + db_event = EventItem(**event.model_dump()) + db.add(db_event) + db.commit() + db.refresh(db_event) + + # Invalidate relevant caches on create + events_cache.clear() + + # Convert SQLAlchemy model to Pydantic model before returning + result = EventResponse.model_validate(db_event, from_attributes=True) + return result + except SQLAlchemyError as e: + logger.error(f"Database error in create_event: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.get("/events/{event_id}", response_model=EventResponse) +async def get_event( + event_id: int = Path(..., gt=0), + use_cache: bool = True, + db: Session = Depends(get_db) +): + """ + Get a single event by ID. + + - **event_id**: ID of the event + - **use_cache**: If true, use cached results when available + """ + try: + # Generate cache key + cache_key = f"event_{event_id}" + + # Try to get from cache if caching is enabled + if use_cache: + cached_result = events_cache.get(cache_key) + if cached_result: + return cached_result + + # Use direct SQL query for better performance on single item lookup + # This avoids SQLAlchemy overhead and takes advantage of primary key lookup + stmt = text("SELECT * FROM event_item WHERE id = :id") + result = db.execute(stmt, {"id": event_id}).fetchone() + + if not result: + raise HTTPException(status_code=404, detail="Event not found") + + # Create an EventItem model instance manually from the result + event = EventItem() + for key, value in result._mapping.items(): + if hasattr(event, key): + setattr(event, key, value) + + # Convert SQLAlchemy model to Pydantic model + response = EventResponse.model_validate(event, from_attributes=True) + + # Store in cache if caching is enabled (60 seconds TTL for single event) + if use_cache: + events_cache[cache_key] = response + + return response + except SQLAlchemyError as e: + logger.error(f"Database error in get_event: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.put("/events/{event_id}", response_model=EventResponse) +def update_event( + event_id: int, + event: EventUpdate, + db: Session = Depends(get_db) +): + """Update an existing event.""" + try: + db_event = db.query(EventItem).filter(EventItem.id == event_id).first() + if not db_event: + raise HTTPException(status_code=404, detail="Event not found") + + # Update event fields + for key, value in event.model_dump(exclude_unset=True).items(): + setattr(db_event, key, value) + + db.commit() + db.refresh(db_event) + + # Invalidate specific cache entries + events_cache.delete(f"event_{event_id}") + events_cache.clear() # Clear all list caches + + # Convert SQLAlchemy model to Pydantic model before returning + result = EventResponse.model_validate(db_event, from_attributes=True) + return result + except SQLAlchemyError as e: + logger.error(f"Database error in update_event: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.delete("/events/{event_id}", response_model=dict) +async def delete_event( + event_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """Delete a specific event.""" + try: + event = db.query(EventItem).filter(EventItem.id == event_id).first() + if event is None: + raise HTTPException(status_code=404, detail=f"Event with ID {event_id} not found") + + db.delete(event) + db.commit() + + # Invalidate cache entries + events_cache.delete(f"event_{event_id}") + events_cache.clear() # Clear all list caches + + return {"status": "success", "message": f"Event {event_id} deleted"} + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +# --- Batch operations for better performance --- + +@router.post("/events/batch", response_model=List[EventResponse]) +async def batch_create_events( + batch: BatchEventCreate, + db: Session = Depends(get_db) +): + """ + Create multiple events in a single database transaction. + + This is much more efficient than creating events one at a time with separate API calls. + """ + try: + db_events = [] + for event_data in batch.events: + db_event = EventItem(**event_data.model_dump()) + db.add(db_event) + db_events.append(db_event) + + # Commit all events in a single transaction + db.commit() + + # Refresh all events to get their IDs and other generated fields + for db_event in db_events: + db.refresh(db_event) + + # Convert SQLAlchemy models to Pydantic models + result = [EventResponse.model_validate(event, from_attributes=True) for event in db_events] + return result + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in batch_create_events: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.put("/events/batch-update-status", response_model=BatchUpdateResult) +async def batch_update_event_status( + event_ids: List[int] = Body(..., embed=True), + is_active: bool = Body(..., embed=True), + db: Session = Depends(get_db) +): + """ + Update the active status of multiple events at once. + + This is much more efficient than updating events one at a time. + """ + try: + if not event_ids: + raise HTTPException(status_code=400, detail="No event IDs provided") + + # Prepare the update statement + stmt = text(""" + UPDATE event_item + SET is_active = :is_active, updated_at = NOW() + WHERE id = ANY(:event_ids) + RETURNING id + """) + + # Execute the update in a single query + result = db.execute(stmt, {"is_active": is_active, "event_ids": event_ids}) + updated_ids = [row[0] for row in result] + + # Commit the transaction + db.commit() + + # Determine which IDs weren't found + failed_ids = [id for id in event_ids if id not in updated_ids] + + return BatchUpdateResult( + success_count=len(updated_ids), + failed_ids=failed_ids, + message=f"Updated {len(updated_ids)} events" if updated_ids else "No events were updated" + ) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in batch_update_event_status: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.delete("/events/batch", response_model=BatchUpdateResult) +async def batch_delete_events( + event_ids: List[int] = Body(..., embed=True), + db: Session = Depends(get_db) +): + """ + Delete multiple events at once. + + This is much more efficient than deleting events one at a time with separate API calls. + """ + try: + if not event_ids: + raise HTTPException(status_code=400, detail="No event IDs provided") + + # Prepare and execute the delete statement with RETURNING to get deleted IDs + stmt = text(""" + DELETE FROM event_item + WHERE id = ANY(:event_ids) + RETURNING id + """) + + result = db.execute(stmt, {"event_ids": event_ids}) + deleted_ids = [row[0] for row in result] + + # Commit the transaction + db.commit() + + # Determine which IDs weren't found + failed_ids = [id for id in event_ids if id not in deleted_ids] + + return BatchUpdateResult( + success_count=len(deleted_ids), + failed_ids=failed_ids, + message=f"Deleted {len(deleted_ids)} events" if deleted_ids else "No events were deleted" + ) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in batch_delete_events: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +# Health check endpoint +@router.get("/health") +async def health_check(db: Session = Depends(get_db)): + """ + Check health of PostgreSQL connection. + """ + try: + # Perform a simple database query to check health + # Use text() to wrap the SQL query for SQLAlchemy 2.0 compatibility + db.execute(text("SELECT 1")).first() + return {"status": "healthy", "message": "PostgreSQL connection is working", "timestamp": datetime.now().isoformat()} + except Exception as e: + logger.error(f"PostgreSQL health check failed: {e}") + raise HTTPException(status_code=503, detail=f"PostgreSQL connection failed: {str(e)}") + +# Add BatchFAQCreate class to model definitions +class BatchFAQCreate(BaseModel): + faqs: List[FAQCreate] + +# Add after delete_faq endpoint +@router.post("/faqs/batch", response_model=List[FAQResponse]) +async def batch_create_faqs( + batch: BatchFAQCreate, + db: Session = Depends(get_db) +): + """ + Create multiple FAQ items in a single database transaction. + + This is much more efficient than creating FAQ items one at a time with separate API calls. + """ + try: + db_faqs = [] + for faq_data in batch.faqs: + db_faq = FAQItem(**faq_data.model_dump()) + db.add(db_faq) + db_faqs.append(db_faq) + + # Commit all FAQ items in a single transaction + db.commit() + + # Refresh all FAQ items to get their IDs and other generated fields + for db_faq in db_faqs: + db.refresh(db_faq) + + # Invalidate FAQ cache + faqs_cache.clear() + + # Convert SQLAlchemy models to Pydantic models + result = [FAQResponse.model_validate(faq, from_attributes=True) for faq in db_faqs] + return result + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in batch_create_faqs: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.put("/faqs/batch-update-status", response_model=BatchUpdateResult) +async def batch_update_faq_status( + faq_ids: List[int] = Body(..., embed=True), + is_active: bool = Body(..., embed=True), + db: Session = Depends(get_db) +): + """ + Update the active status of multiple FAQ items at once. + + This is much more efficient than updating FAQ items one at a time. + """ + try: + if not faq_ids: + raise HTTPException(status_code=400, detail="No FAQ IDs provided") + + # Prepare the update statement + stmt = text(""" + UPDATE faq_item + SET is_active = :is_active, updated_at = NOW() + WHERE id = ANY(:faq_ids) + RETURNING id + """) + + # Execute the update in a single query + result = db.execute(stmt, {"is_active": is_active, "faq_ids": faq_ids}) + updated_ids = [row[0] for row in result] + + # Commit the transaction + db.commit() + + # Determine which IDs weren't found + failed_ids = [id for id in faq_ids if id not in updated_ids] + + # Invalidate FAQ cache + faqs_cache.clear() + + return BatchUpdateResult( + success_count=len(updated_ids), + failed_ids=failed_ids, + message=f"Updated {len(updated_ids)} FAQ items" if updated_ids else "No FAQ items were updated" + ) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in batch_update_faq_status: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.delete("/faqs/batch", response_model=BatchUpdateResult) +async def batch_delete_faqs( + faq_ids: List[int] = Body(..., embed=True), + db: Session = Depends(get_db) +): + """ + Delete multiple FAQ items at once. + + This is much more efficient than deleting FAQ items one at a time with separate API calls. + """ + try: + if not faq_ids: + raise HTTPException(status_code=400, detail="No FAQ IDs provided") + + # Prepare and execute the delete statement with RETURNING to get deleted IDs + stmt = text(""" + DELETE FROM faq_item + WHERE id = ANY(:faq_ids) + RETURNING id + """) + + result = db.execute(stmt, {"faq_ids": faq_ids}) + deleted_ids = [row[0] for row in result] + + # Commit the transaction + db.commit() + + # Determine which IDs weren't found + failed_ids = [id for id in faq_ids if id not in deleted_ids] + + # Invalidate FAQ cache + faqs_cache.clear() + + return BatchUpdateResult( + success_count=len(deleted_ids), + failed_ids=failed_ids, + message=f"Deleted {len(deleted_ids)} FAQ items" if deleted_ids else "No FAQ items were deleted" + ) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in batch_delete_faqs: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +# Add BatchEmergencyCreate class to the Pydantic models section +class BatchEmergencyCreate(BaseModel): + emergency_contacts: List[EmergencyCreate] + +@router.post("/emergency/batch", response_model=List[EmergencyResponse]) +async def batch_create_emergency_contacts( + batch: BatchEmergencyCreate, + db: Session = Depends(get_db) +): + """ + Create multiple emergency contacts in a single database transaction. + + This is much more efficient than creating emergency contacts one at a time with separate API calls. + """ + try: + db_emergency_contacts = [] + for emergency_data in batch.emergency_contacts: + db_emergency = EmergencyItem(**emergency_data.model_dump()) + db.add(db_emergency) + db_emergency_contacts.append(db_emergency) + + # Commit all emergency contacts in a single transaction + db.commit() + + # Refresh all items to get their IDs and other generated fields + for db_emergency in db_emergency_contacts: + db.refresh(db_emergency) + + # Invalidate emergency cache + emergencies_cache.clear() + + # Convert SQLAlchemy models to Pydantic models + result = [EmergencyResponse.model_validate(emergency, from_attributes=True) for emergency in db_emergency_contacts] + return result + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in batch_create_emergency_contacts: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.put("/emergency/batch-update-status", response_model=BatchUpdateResult) +async def batch_update_emergency_status( + emergency_ids: List[int] = Body(..., embed=True), + is_active: bool = Body(..., embed=True), + db: Session = Depends(get_db) +): + """ + Update the active status of multiple emergency contacts at once. + + This is much more efficient than updating emergency contacts one at a time. + """ + try: + if not emergency_ids: + raise HTTPException(status_code=400, detail="No emergency contact IDs provided") + + # Prepare the update statement + stmt = text(""" + UPDATE emergency_item + SET is_active = :is_active, updated_at = NOW() + WHERE id = ANY(:emergency_ids) + RETURNING id + """) + + # Execute the update in a single query + result = db.execute(stmt, {"is_active": is_active, "emergency_ids": emergency_ids}) + updated_ids = [row[0] for row in result] + + # Commit the transaction + db.commit() + + # Determine which IDs weren't found + failed_ids = [id for id in emergency_ids if id not in updated_ids] + + # Invalidate emergency cache + emergencies_cache.clear() + + return BatchUpdateResult( + success_count=len(updated_ids), + failed_ids=failed_ids, + message=f"Updated {len(updated_ids)} emergency contacts" if updated_ids else "No emergency contacts were updated" + ) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in batch_update_emergency_status: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.delete("/emergency/batch", response_model=BatchUpdateResult) +async def batch_delete_emergency_contacts( + emergency_ids: List[int] = Body(..., embed=True), + db: Session = Depends(get_db) +): + """ + Delete multiple emergency contacts at once. + + This is much more efficient than deleting emergency contacts one at a time with separate API calls. + """ + try: + if not emergency_ids: + raise HTTPException(status_code=400, detail="No emergency contact IDs provided") + + # Prepare and execute the delete statement with RETURNING to get deleted IDs + stmt = text(""" + DELETE FROM emergency_item + WHERE id = ANY(:emergency_ids) + RETURNING id + """) + + result = db.execute(stmt, {"emergency_ids": emergency_ids}) + deleted_ids = [row[0] for row in result] + + # Commit the transaction + db.commit() + + # Determine which IDs weren't found + failed_ids = [id for id in emergency_ids if id not in deleted_ids] + + # Invalidate emergency cache + emergencies_cache.clear() + + return BatchUpdateResult( + success_count=len(deleted_ids), + failed_ids=failed_ids, + message=f"Deleted {len(deleted_ids)} emergency contacts" if deleted_ids else "No emergency contacts were deleted" + ) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in batch_delete_emergency_contacts: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +# --- About Pixity endpoints --- + +@router.get("/about-pixity", response_model=InfoContentResponse) +async def get_about_pixity( + use_cache: bool = True, + db: Session = Depends(get_db) +): + """ + Get the About Pixity information. + + - **use_cache**: If true, use cached results when available + """ + try: + # Try to get from cache if caching is enabled + if use_cache: + cached_result = about_pixity_cache.get("about_pixity") + if cached_result: + logger.info("Cache hit for about_pixity") + return cached_result + + # Get the first record (or create if none exists) + about = db.query(AboutPixity).first() + + if not about: + # Create default content if none exists + about = AboutPixity( + content="""PiXity is your smart, AI-powered local companion designed to help foreigners navigate life in any city of Vietnam with ease, starting with Da Nang. From finding late-night eats to handling visas, housing, and healthcare, PiXity bridges the gap in language, culture, and local know-how — so you can explore the city like a true insider. + +PiXity is proudly built by PiX.teq, the tech team behind PiX — a multidisciplinary collective based in Da Nang. + +X: x.com/pixity_bot +Instagram: instagram.com/pixity.aibot/ +Tiktok: tiktok.com/@pixity.aibot""" + ) + db.add(about) + db.commit() + db.refresh(about) + + # Convert to Pydantic model + response = InfoContentResponse( + id=about.id, + content=about.content, + created_at=about.created_at, + updated_at=about.updated_at + ) + + # Store in cache if caching is enabled + if use_cache: + about_pixity_cache["about_pixity"] = response + + return response + except SQLAlchemyError as e: + logger.error(f"Database error in get_about_pixity: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.put("/about-pixity", response_model=InfoContentResponse) +async def update_about_pixity( + data: InfoContentUpdate, + db: Session = Depends(get_db) +): + """ + Update the About Pixity information. + + - **content**: New content text + """ + try: + # Get the first record (or create if none exists) + about = db.query(AboutPixity).first() + + if not about: + # Create new record if none exists + about = AboutPixity(content=data.content) + db.add(about) + else: + # Update existing record + about.content = data.content + + db.commit() + db.refresh(about) + + # Invalidate cache + about_pixity_cache.clear() + + # Convert to Pydantic model + response = InfoContentResponse( + id=about.id, + content=about.content, + created_at=about.created_at, + updated_at=about.updated_at + ) + + return response + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in update_about_pixity: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +# --- Da Nang Bucket List Pydantic models --- +class DaNangBucketListBase(BaseModel): + content: str + +class DaNangBucketListResponse(DaNangBucketListBase): + id: int + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + +class DaNangBucketListCreate(DaNangBucketListBase): + pass + +class DaNangBucketListUpdate(BaseModel): + content: str + +# --- Da Nang Bucket List Endpoints --- +@router.get("/danang-bucket-list", response_model=DaNangBucketListResponse) +async def get_danang_bucket_list( + db: Session = Depends(get_db), + use_cache: bool = True +): + """ + Retrieve the Da Nang Bucket List information. + If none exists, creates a default entry. + """ + cache_key = "danang_bucket_list" + + # Try to get from cache if caching is enabled + if use_cache and cache_key in danang_bucket_list_cache: + cached_result = danang_bucket_list_cache[cache_key] + logger.info(f"Cache hit for {cache_key}") + return cached_result + + try: + # Try to get the first bucket list entry + db_bucket_list = db.query(DaNangBucketList).first() + + # If no entry exists, create a default one + if not db_bucket_list: + default_content = json.dumps({ + "title": "Da Nang Bucket List", + "description": "Must-visit places and experiences in Da Nang", + "items": [ + {"name": "Ba Na Hills", "description": "Visit the famous Golden Bridge"}, + {"name": "Marble Mountains", "description": "Explore caves and temples"}, + {"name": "My Khe Beach", "description": "Relax at one of the most beautiful beaches in Vietnam"}, + {"name": "Dragon Bridge", "description": "Watch the fire-breathing show on weekends"}, + {"name": "Son Tra Peninsula", "description": "See the Lady Buddha statue and lookout point"} + ] + }) + + new_bucket_list = DaNangBucketList(content=default_content) + db.add(new_bucket_list) + db.commit() + db.refresh(new_bucket_list) + db_bucket_list = new_bucket_list + + # Convert to Pydantic model + response = DaNangBucketListResponse.model_validate(db_bucket_list, from_attributes=True) + + # Store in cache if caching is enabled + if use_cache: + danang_bucket_list_cache[cache_key] = response + + return response + + except SQLAlchemyError as e: + error_msg = f"Database error in get_danang_bucket_list: {str(e)}" + logger.error(error_msg) + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=error_msg) + +@router.put("/danang-bucket-list", response_model=DaNangBucketListResponse) +async def update_danang_bucket_list( + bucket_list_data: DaNangBucketListUpdate, + db: Session = Depends(get_db) +): + """ + Update the Da Nang Bucket List information. + If none exists, creates a new entry. + """ + try: + # Try to get the first bucket list entry + db_bucket_list = db.query(DaNangBucketList).first() + + # If no entry exists, create a new one + if not db_bucket_list: + db_bucket_list = DaNangBucketList(content=bucket_list_data.content) + db.add(db_bucket_list) + else: + # Update existing entry + db_bucket_list.content = bucket_list_data.content + db_bucket_list.updated_at = datetime.utcnow() + + db.commit() + db.refresh(db_bucket_list) + + # Clear cache + if "danang_bucket_list" in danang_bucket_list_cache: + del danang_bucket_list_cache["danang_bucket_list"] + + # Convert to Pydantic model + return DaNangBucketListResponse.model_validate(db_bucket_list, from_attributes=True) + + except SQLAlchemyError as e: + error_msg = f"Database error in update_danang_bucket_list: {str(e)}" + logger.error(error_msg) + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=error_msg) + +# --- Solana Summit Pydantic models --- +class SolanaSummitBase(BaseModel): + content: str + +class SolanaSummitResponse(SolanaSummitBase): + id: int + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + +class SolanaSummitCreate(SolanaSummitBase): + pass + +class SolanaSummitUpdate(BaseModel): + content: str + +# --- Solana Summit Endpoints --- +@router.get("/solana-summit", response_model=SolanaSummitResponse) +async def get_solana_summit( + db: Session = Depends(get_db), + use_cache: bool = True +): + """ + Retrieve the Solana Summit information. + If none exists, creates a default entry. + """ + cache_key = "solana_summit" + + # Try to get from cache if caching is enabled + if use_cache and cache_key in solana_summit_cache: + cached_result = solana_summit_cache[cache_key] + logger.info(f"Cache hit for {cache_key}") + return cached_result + + try: + # Try to get the first solana summit entry + db_solana_summit = db.query(SolanaSummit).first() + + # If no entry exists, create a default one + if not db_solana_summit: + default_content = json.dumps({ + "title": "Solana Summit Vietnam", + "description": "Information about Solana Summit Vietnam event in Da Nang", + "date": "2023-11-04T09:00:00+07:00", + "location": "Hyatt Regency, Da Nang", + "details": "The Solana Summit is a gathering of developers, entrepreneurs, and enthusiasts in the Solana ecosystem.", + "agenda": [ + {"time": "09:00", "activity": "Registration & Networking"}, + {"time": "10:00", "activity": "Opening Keynote"}, + {"time": "12:00", "activity": "Lunch Break"}, + {"time": "13:30", "activity": "Developer Workshops"}, + {"time": "17:00", "activity": "Closing Remarks & Networking"} + ], + "registration_url": "https://example.com/solana-summit-registration" + }) + + new_solana_summit = SolanaSummit(content=default_content) + db.add(new_solana_summit) + db.commit() + db.refresh(new_solana_summit) + db_solana_summit = new_solana_summit + + # Convert to Pydantic model + response = SolanaSummitResponse.model_validate(db_solana_summit, from_attributes=True) + + # Store in cache if caching is enabled + if use_cache: + solana_summit_cache[cache_key] = response + + return response + + except SQLAlchemyError as e: + error_msg = f"Database error in get_solana_summit: {str(e)}" + logger.error(error_msg) + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=error_msg) + +@router.put("/solana-summit", response_model=SolanaSummitResponse) +async def update_solana_summit( + summit_data: SolanaSummitUpdate, + db: Session = Depends(get_db) +): + """ + Update the Solana Summit information. + If none exists, creates a new entry. + """ + try: + # Try to get the first solana summit entry + db_solana_summit = db.query(SolanaSummit).first() + + # If no entry exists, create a new one + if not db_solana_summit: + db_solana_summit = SolanaSummit(content=summit_data.content) + db.add(db_solana_summit) + else: + # Update existing entry + db_solana_summit.content = summit_data.content + db_solana_summit.updated_at = datetime.utcnow() + + db.commit() + db.refresh(db_solana_summit) + + # Clear cache + if "solana_summit" in solana_summit_cache: + del solana_summit_cache["solana_summit"] + + # Convert to Pydantic model + return SolanaSummitResponse.model_validate(db_solana_summit, from_attributes=True) + + except SQLAlchemyError as e: + error_msg = f"Database error in update_solana_summit: {str(e)}" + logger.error(error_msg) + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=error_msg) + +# --- API Key models and endpoints --- +class ApiKeyBase(BaseModel): + key_type: str + key_value: str + description: Optional[str] = None + is_active: bool = True + +class ApiKeyCreate(ApiKeyBase): + pass + +class ApiKeyUpdate(BaseModel): + key_type: Optional[str] = None + key_value: Optional[str] = None + description: Optional[str] = None + is_active: Optional[bool] = None + +class ApiKeyResponse(ApiKeyBase): + id: int + created_at: datetime + last_used: Optional[datetime] = None + + model_config = ConfigDict(from_attributes=True) + +@router.get("/api-keys", response_model=List[ApiKeyResponse]) +async def get_api_keys( + skip: int = 0, + limit: int = 100, + active_only: bool = False, + db: Session = Depends(get_db) +): + """ + Get all API keys. + + - **skip**: Number of items to skip + - **limit**: Maximum number of items to return + - **active_only**: If true, only return active keys + """ + try: + query = db.query(ApiKey) + + if active_only: + query = query.filter(ApiKey.is_active == True) + + api_keys = query.offset(skip).limit(limit).all() + return [ApiKeyResponse.model_validate(key, from_attributes=True) for key in api_keys] + except SQLAlchemyError as e: + logger.error(f"Database error retrieving API keys: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + logger.error(f"Error retrieving API keys: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving API keys: {str(e)}") + +@router.post("/api-keys", response_model=ApiKeyResponse) +async def create_api_key( + api_key: ApiKeyCreate, + db: Session = Depends(get_db) +): + """ + Create a new API key. + """ + try: + # Create API key object + db_api_key = ApiKey( + key_type=api_key.key_type, + key_value=api_key.key_value, + description=api_key.description, + is_active=api_key.is_active + ) + + db.add(db_api_key) + db.commit() + db.refresh(db_api_key) + + return ApiKeyResponse.model_validate(db_api_key, from_attributes=True) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error creating API key: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error creating API key: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error creating API key: {str(e)}") + +@router.get("/api-keys/{api_key_id}", response_model=ApiKeyResponse) +async def get_api_key( + api_key_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Get API key by ID. + """ + try: + api_key = db.query(ApiKey).filter(ApiKey.id == api_key_id).first() + if not api_key: + raise HTTPException(status_code=404, detail=f"API key with ID {api_key_id} not found") + + return ApiKeyResponse.model_validate(api_key, from_attributes=True) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving API key: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving API key: {str(e)}") + +@router.put("/api-keys/{api_key_id}", response_model=ApiKeyResponse) +async def update_api_key( + api_key_id: int = Path(..., gt=0), + api_key_update: ApiKeyUpdate = Body(...), + db: Session = Depends(get_db) +): + """ + Update API key details. + """ + try: + db_api_key = db.query(ApiKey).filter(ApiKey.id == api_key_id).first() + if not db_api_key: + raise HTTPException(status_code=404, detail=f"API key with ID {api_key_id} not found") + + # Update fields if provided + if api_key_update.key_type is not None: + db_api_key.key_type = api_key_update.key_type + if api_key_update.key_value is not None: + db_api_key.key_value = api_key_update.key_value + if api_key_update.description is not None: + db_api_key.description = api_key_update.description + if api_key_update.is_active is not None: + db_api_key.is_active = api_key_update.is_active + + db.commit() + db.refresh(db_api_key) + + return ApiKeyResponse.model_validate(db_api_key, from_attributes=True) + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error updating API key: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error updating API key: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error updating API key: {str(e)}") + +@router.delete("/api-keys/{api_key_id}", response_model=dict) +async def delete_api_key( + api_key_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Delete API key. + """ + try: + db_api_key = db.query(ApiKey).filter(ApiKey.id == api_key_id).first() + if not db_api_key: + raise HTTPException(status_code=404, detail=f"API key with ID {api_key_id} not found") + + db.delete(db_api_key) + db.commit() + + return {"message": f"API key with ID {api_key_id} deleted successfully"} + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error deleting API key: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error deleting API key: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error deleting API key: {str(e)}") + +@router.get("/api-keys/validate/{key}", response_model=dict) +async def validate_api_key( + key: str, + db: Session = Depends(get_db) +): + """ + Validate an API key and update its last_used timestamp. + """ + try: + db_api_key = db.query(ApiKey).filter(ApiKey.key_value == key, ApiKey.is_active == True).first() + if not db_api_key: + return {"valid": False, "message": "Invalid or inactive API key"} + + # Update last used timestamp + db_api_key.last_used = datetime.now() + db.commit() + + return { + "valid": True, + "key_type": db_api_key.key_type, + "id": db_api_key.id, + "message": "API key is valid" + } + except Exception as e: + logger.error(f"Error validating API key: {e}") + logger.error(traceback.format_exc()) + return {"valid": False, "message": f"Error validating API key: {str(e)}"} + +# --- Vector Database models and endpoints --- +class VectorDatabaseBase(BaseModel): + name: str + description: Optional[str] = None + pinecone_index: str + api_key_id: Optional[int] = None # Make api_key_id optional to handle NULL values + status: str = "active" + +class VectorDatabaseCreate(VectorDatabaseBase): + api_key_id: int # Keep this required for new databases + pass + +class VectorDatabaseUpdate(BaseModel): + name: Optional[str] = None + description: Optional[str] = None + pinecone_index: Optional[str] = None + api_key_id: Optional[int] = None + status: Optional[str] = None + +class VectorDatabaseResponse(BaseModel): + name: str + description: Optional[str] = None + pinecone_index: str + api_key_id: Optional[int] = None # Make api_key_id optional to handle NULL values + status: str + id: int + created_at: datetime + updated_at: datetime + message: Optional[str] = None # Add message field for notifications + + model_config = ConfigDict(from_attributes=True) + +class VectorDatabaseDetailResponse(BaseModel): + id: int + name: str + description: Optional[str] = None + pinecone_index: str + status: str + created_at: datetime + updated_at: datetime + document_count: int + embedded_count: int + pending_count: int + message: Optional[str] = None # Add message field for notifications + + model_config = ConfigDict(from_attributes=True) + +@router.get("/vector-databases", response_model=List[VectorDatabaseResponse]) +async def get_vector_databases( + skip: int = 0, + limit: int = 100, + status: Optional[str] = None, + db: Session = Depends(get_db) +): + """ + Get all vector databases. + + - **skip**: Number of items to skip + - **limit**: Maximum number of items to return + - **status**: Filter by status (e.g., 'active', 'inactive') + """ + try: + query = db.query(VectorDatabase) + + if status: + query = query.filter(VectorDatabase.status == status) + + vector_dbs = query.offset(skip).limit(limit).all() + return [VectorDatabaseResponse.model_validate(db_item, from_attributes=True) for db_item in vector_dbs] + except SQLAlchemyError as e: + logger.error(f"Database error retrieving vector databases: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + logger.error(f"Error retrieving vector databases: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving vector databases: {str(e)}") + +@router.post("/vector-databases", response_model=VectorDatabaseResponse) +async def create_vector_database( + vector_db: VectorDatabaseCreate, + db: Session = Depends(get_db) +): + """ + Create a new vector database. If the specified Pinecone index doesn't exist, it will be created automatically. + """ + try: + # Check if a database with the same name already exists + existing_db = db.query(VectorDatabase).filter(VectorDatabase.name == vector_db.name).first() + if existing_db: + raise HTTPException(status_code=400, detail=f"Vector database with name '{vector_db.name}' already exists") + + # Check if the API key exists + api_key = db.query(ApiKey).filter(ApiKey.id == vector_db.api_key_id).first() + if not api_key: + raise HTTPException(status_code=400, detail=f"API key with ID {vector_db.api_key_id} not found") + + # Initialize Pinecone client with the API key + from pinecone import Pinecone, ServerlessSpec + pc_client = Pinecone(api_key=api_key.key_value) + + # Check if the index exists + index_list = pc_client.list_indexes() + index_names = index_list.names() if hasattr(index_list, 'names') else [] + + index_exists = vector_db.pinecone_index in index_names + index_created = False + + if not index_exists: + # Index doesn't exist - try to create it + try: + logger.info(f"Pinecone index '{vector_db.pinecone_index}' does not exist. Attempting to create it automatically.") + + # Create the index with standard parameters + pc_client.create_index( + name=vector_db.pinecone_index, + dimension=1536, # Standard OpenAI embedding dimension + metric="cosine", # Most common similarity metric + spec=ServerlessSpec( + cloud="aws", + region="us-east-1" # Use a standard region that works with the free tier + ) + ) + + logger.info(f"Successfully created Pinecone index '{vector_db.pinecone_index}'") + index_created = True + + # Allow some time for the index to initialize + import time + time.sleep(5) + + except Exception as create_error: + logger.error(f"Failed to create Pinecone index '{vector_db.pinecone_index}': {create_error}") + raise HTTPException( + status_code=400, + detail=f"Failed to create Pinecone index '{vector_db.pinecone_index}': {str(create_error)}" + ) + + # Verify we can connect to the index (whether existing or newly created) + try: + index = pc_client.Index(vector_db.pinecone_index) + # Try to get stats to verify connection + stats = index.describe_index_stats() + + # Create success message based on whether we created the index or used an existing one + if index_created: + success_message = f"Successfully created and connected to new Pinecone index '{vector_db.pinecone_index}'" + else: + success_message = f"Successfully connected to existing Pinecone index '{vector_db.pinecone_index}'" + + logger.info(f"{success_message}: {stats}") + + except Exception as e: + error_message = f"Error connecting to Pinecone index '{vector_db.pinecone_index}': {str(e)}" + logger.error(error_message) + raise HTTPException(status_code=400, detail=error_message) + + # Create new vector database + db_vector_db = VectorDatabase(**vector_db.model_dump()) + + db.add(db_vector_db) + db.commit() + db.refresh(db_vector_db) + + # Return response with additional info about index creation + response_data = VectorDatabaseResponse.model_validate(db_vector_db, from_attributes=True).model_dump() + + # Add a message to the response indicating whether the index was created or existed + if index_created: + response_data["message"] = f"Created new Pinecone index '{vector_db.pinecone_index}' automatically" + else: + response_data["message"] = f"Using existing Pinecone index '{vector_db.pinecone_index}'" + + return VectorDatabaseResponse.model_validate(response_data) + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error creating vector database: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error creating vector database: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error creating vector database: {str(e)}") + +@router.get("/vector-databases/{vector_db_id}", response_model=VectorDatabaseResponse) +async def get_vector_database( + vector_db_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Get vector database by ID. + """ + try: + vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_db_id).first() + if not vector_db: + raise HTTPException(status_code=404, detail=f"Vector database with ID {vector_db_id} not found") + + return VectorDatabaseResponse.model_validate(vector_db, from_attributes=True) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving vector database: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving vector database: {str(e)}") + +@router.put("/vector-databases/{vector_db_id}", response_model=VectorDatabaseResponse) +async def update_vector_database( + vector_db_id: int = Path(..., gt=0), + vector_db_update: VectorDatabaseUpdate = Body(...), + db: Session = Depends(get_db) +): + """ + Update vector database details. + """ + try: + db_vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_db_id).first() + if not db_vector_db: + raise HTTPException(status_code=404, detail=f"Vector database with ID {vector_db_id} not found") + + # Check name uniqueness if updating name + if vector_db_update.name and vector_db_update.name != db_vector_db.name: + existing_db = db.query(VectorDatabase).filter(VectorDatabase.name == vector_db_update.name).first() + if existing_db: + raise HTTPException(status_code=400, detail=f"Vector database with name '{vector_db_update.name}' already exists") + + # Check if API key exists if updating API key ID + if vector_db_update.api_key_id: + api_key = db.query(ApiKey).filter(ApiKey.id == vector_db_update.api_key_id).first() + if not api_key: + raise HTTPException(status_code=400, detail=f"API key with ID {vector_db_update.api_key_id} not found") + + # Update fields if provided + update_data = vector_db_update.model_dump(exclude_unset=True) + for key, value in update_data.items(): + if value is not None: + setattr(db_vector_db, key, value) + + db.commit() + db.refresh(db_vector_db) + + return VectorDatabaseResponse.model_validate(db_vector_db, from_attributes=True) + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error updating vector database: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error updating vector database: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error updating vector database: {str(e)}") + +@router.delete("/vector-databases/{vector_db_id}", response_model=dict) +async def delete_vector_database( + vector_db_id: int = Path(..., gt=0), + force: bool = Query(False, description="Force deletion even if documents exist"), + db: Session = Depends(get_db) +): + """ + Delete vector database. + + - **force**: If true, will delete all associated documents first + """ + try: + db_vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_db_id).first() + if not db_vector_db: + raise HTTPException(status_code=404, detail=f"Vector database with ID {vector_db_id} not found") + + # Check if there are documents associated with this database + doc_count = db.query(func.count(Document.id)).filter(Document.vector_database_id == vector_db_id).scalar() + if doc_count > 0 and not force: + raise HTTPException( + status_code=400, + detail=f"Cannot delete vector database with {doc_count} documents. Use force=true to delete anyway." + ) + + # If force=true, delete all associated documents first + if force and doc_count > 0: + # Delete all documents associated with this database + db.query(Document).filter(Document.vector_database_id == vector_db_id).delete() + + # Delete all vector statuses associated with this database + db.query(VectorStatus).filter(VectorStatus.vector_database_id == vector_db_id).delete() + + # Delete all engine-vector-db associations + db.query(EngineVectorDb).filter(EngineVectorDb.vector_database_id == vector_db_id).delete() + + # Delete the vector database + db.delete(db_vector_db) + db.commit() + + return {"message": f"Vector database with ID {vector_db_id} deleted successfully"} + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error deleting vector database: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error deleting vector database: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error deleting vector database: {str(e)}") + +@router.get("/vector-databases/{vector_db_id}/info", response_model=VectorDatabaseDetailResponse) +async def get_vector_database_info( + vector_db_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Get detailed information about a vector database including document counts. + Also verifies connectivity to the Pinecone index. + """ + try: + # Get the vector database + vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_db_id).first() + if not vector_db: + raise HTTPException(status_code=404, detail="Vector database not found") + + # Count total documents + total_docs = db.query(func.count(Document.id)).filter( + Document.vector_database_id == vector_db_id + ).scalar() + + # Count embedded documents + embedded_docs = db.query(func.count(Document.id)).filter( + Document.vector_database_id == vector_db_id, + Document.is_embedded == True + ).scalar() + + # Count pending documents (not embedded) + pending_docs = db.query(func.count(Document.id)).filter( + Document.vector_database_id == vector_db_id, + Document.is_embedded == False + ).scalar() + + # Verify Pinecone index connectivity if API key is available + message = None + if vector_db.api_key_id: + try: + # Get the API key + api_key = db.query(ApiKey).filter(ApiKey.id == vector_db.api_key_id).first() + if api_key: + # Initialize Pinecone client with the API key + from pinecone import Pinecone + pc_client = Pinecone(api_key=api_key.key_value) + + # Check if the index exists + index_list = pc_client.list_indexes() + index_names = index_list.names() if hasattr(index_list, 'names') else [] + + if vector_db.pinecone_index in index_names: + # Try to connect to the index + index = pc_client.Index(vector_db.pinecone_index) + stats = index.describe_index_stats() + message = f"Pinecone index '{vector_db.pinecone_index}' is operational with {stats.get('total_vector_count', 0)} vectors" + logger.info(f"Successfully connected to Pinecone index '{vector_db.pinecone_index}': {stats}") + else: + message = f"Pinecone index '{vector_db.pinecone_index}' does not exist. Available indexes: {', '.join(index_names)}" + logger.warning(message) + else: + message = f"API key with ID {vector_db.api_key_id} not found" + logger.warning(message) + except Exception as e: + message = f"Error connecting to Pinecone: {str(e)}" + logger.error(message) + else: + message = "No API key associated with this vector database" + logger.warning(message) + + # Create response with added counts + result = VectorDatabaseDetailResponse( + id=vector_db.id, + name=vector_db.name, + description=vector_db.description, + pinecone_index=vector_db.pinecone_index, + status=vector_db.status, + created_at=vector_db.created_at, + updated_at=vector_db.updated_at, + document_count=total_docs or 0, + embedded_count=embedded_docs or 0, + pending_count=pending_docs or 0, + message=message + ) + + return result + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting vector database info: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error getting vector database info: {str(e)}") + +# --- Document models and endpoints --- +class DocumentBase(BaseModel): + name: str + vector_database_id: int + +class DocumentCreate(BaseModel): + name: str + vector_database_id: int + +class DocumentUpdate(BaseModel): + name: Optional[str] = None + +class DocumentResponse(BaseModel): + id: int + name: str + file_type: str + content_type: Optional[str] = None + size: int + created_at: datetime + updated_at: datetime + vector_database_id: int + vector_database_name: Optional[str] = None + is_embedded: bool + + model_config = ConfigDict(from_attributes=True) + +@router.get("/documents", response_model=List[DocumentResponse]) +async def get_documents( + skip: int = 0, + limit: int = 100, + vector_database_id: Optional[int] = None, + is_embedded: Optional[bool] = None, + file_type: Optional[str] = None, + db: Session = Depends(get_db) +): + """ + Get all documents with optional filtering. + + - **skip**: Number of items to skip + - **limit**: Maximum number of items to return + - **vector_database_id**: Filter by vector database ID + - **is_embedded**: Filter by embedding status + - **file_type**: Filter by file type + """ + try: + query = db.query(Document) + + # Apply filters if provided + if vector_database_id is not None: + query = query.filter(Document.vector_database_id == vector_database_id) + + if is_embedded is not None: + query = query.filter(Document.is_embedded == is_embedded) + + if file_type is not None: + query = query.filter(Document.file_type == file_type) + + # Execute query with pagination + documents = query.offset(skip).limit(limit).all() + + # Add vector database name + result = [] + for doc in documents: + # Create a dictionary from the document for easier manipulation + doc_dict = { + "id": doc.id, + "name": doc.name, + "file_type": doc.file_type, + "content_type": doc.content_type, + "size": doc.size, + "created_at": doc.created_at, + "updated_at": doc.updated_at, + "vector_database_id": doc.vector_database_id or 0, # Handle NULL values + "is_embedded": doc.is_embedded + } + + # Get vector database name if not already populated + vector_db_name = None + if doc.vector_database_id is not None: + vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == doc.vector_database_id).first() + vector_db_name = vector_db.name if vector_db else f"db_{doc.vector_database_id}" + else: + vector_db_name = "No Database" + + doc_dict["vector_database_name"] = vector_db_name + + # Create Pydantic model from dictionary + doc_response = DocumentResponse(**doc_dict) + result.append(doc_response) + + return result + except SQLAlchemyError as e: + logger.error(f"Database error retrieving documents: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + logger.error(f"Error retrieving documents: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving documents: {str(e)}") + +@router.get("/documents/{document_id}", response_model=DocumentResponse) +async def get_document( + document_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Get document by ID. + """ + try: + document = db.query(Document).filter(Document.id == document_id).first() + if not document: + raise HTTPException(status_code=404, detail=f"Document with ID {document_id} not found") + + # Get vector database name + vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == document.vector_database_id).first() + vector_db_name = vector_db.name if vector_db else f"db_{document.vector_database_id}" + + # Create response with vector database name + result = DocumentResponse.model_validate(document, from_attributes=True) + result.vector_database_name = vector_db_name + + return result + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving document: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving document: {str(e)}") + +@router.get("/documents/{document_id}/content", response_class=Response) +async def get_document_content( + document_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Get document content (file) by document ID. + Returns the binary content with the appropriate Content-Type header. + """ + try: + # Get document to check if it exists and get metadata + document = db.query(Document).filter(Document.id == document_id).first() + if not document: + raise HTTPException(status_code=404, detail=f"Document with ID {document_id} not found") + + # Get document content from document_content table + document_content = db.query(DocumentContent).filter(DocumentContent.document_id == document_id).first() + if not document_content or not document_content.file_content: + raise HTTPException(status_code=404, detail=f"Content for document with ID {document_id} not found") + + # Determine content type + content_type = document.content_type if hasattr(document, 'content_type') and document.content_type else "application/octet-stream" + + # Return binary content with correct content type + return Response( + content=document_content.file_content, + media_type=content_type, + headers={"Content-Disposition": f"attachment; filename=\"{document.name}\""} + ) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving document content: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving document content: {str(e)}") + +# --- Telegram Bot models and endpoints --- +class TelegramBotBase(BaseModel): + name: str + username: str + token: str + status: str = "inactive" + +class TelegramBotCreate(TelegramBotBase): + pass + +class TelegramBotUpdate(BaseModel): + name: Optional[str] = None + username: Optional[str] = None + token: Optional[str] = None + status: Optional[str] = None + +class TelegramBotResponse(TelegramBotBase): + id: int + created_at: datetime + updated_at: datetime + + model_config = ConfigDict(from_attributes=True) + +@router.get("/telegram-bots/{bot_id}", response_model=TelegramBotResponse) +async def get_telegram_bot( + bot_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Get Telegram bot by ID. + """ + try: + bot = db.query(TelegramBot).filter(TelegramBot.id == bot_id).first() + if not bot: + raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_id} not found") + + return TelegramBotResponse.model_validate(bot, from_attributes=True) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving Telegram bot: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving Telegram bot: {str(e)}") + +@router.put("/telegram-bots/{bot_id}", response_model=TelegramBotResponse) +async def update_telegram_bot( + bot_id: int = Path(..., gt=0), + bot_update: TelegramBotUpdate = Body(...), + db: Session = Depends(get_db) +): + """ + Update Telegram bot details. + """ + try: + db_bot = db.query(TelegramBot).filter(TelegramBot.id == bot_id).first() + if not db_bot: + raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_id} not found") + + # Check if new username conflicts with existing bots + if bot_update.username and bot_update.username != db_bot.username: + existing_bot = db.query(TelegramBot).filter(TelegramBot.username == bot_update.username).first() + if existing_bot: + raise HTTPException( + status_code=400, + detail=f"Telegram bot with username '{bot_update.username}' already exists" + ) + + # Update fields if provided + update_data = bot_update.model_dump(exclude_unset=True) + for key, value in update_data.items(): + if value is not None: + setattr(db_bot, key, value) + + db.commit() + db.refresh(db_bot) + + return TelegramBotResponse.model_validate(db_bot, from_attributes=True) + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error updating Telegram bot: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error updating Telegram bot: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error updating Telegram bot: {str(e)}") + +@router.delete("/telegram-bots/{bot_id}", response_model=dict) +async def delete_telegram_bot( + bot_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Delete Telegram bot. + """ + try: + db_bot = db.query(TelegramBot).filter(TelegramBot.id == bot_id).first() + if not db_bot: + raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_id} not found") + + # Check if bot is associated with any engines + bot_engines = db.query(BotEngine).filter(BotEngine.bot_id == bot_id).all() + if bot_engines: + raise HTTPException( + status_code=400, + detail="Cannot delete bot as it is associated with chat engines. Remove associations first." + ) + + # Delete bot + db.delete(db_bot) + db.commit() + + return {"message": f"Telegram bot with ID {bot_id} deleted successfully"} + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error deleting Telegram bot: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error deleting Telegram bot: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error deleting Telegram bot: {str(e)}") + +@router.get("/telegram-bots/{bot_id}/engines", response_model=List[dict]) +async def get_bot_engines_info( + bot_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Get all chat engines associated with a Telegram bot. + """ + try: + # Verify bot exists + bot = db.query(TelegramBot).filter(TelegramBot.id == bot_id).first() + if not bot: + raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_id} not found") + + # Get associated engines through BotEngine + bot_engines = db.query(BotEngine).filter(BotEngine.bot_id == bot_id).all() + + result = [] + for association in bot_engines: + engine = db.query(ChatEngine).filter(ChatEngine.id == association.engine_id).first() + if engine: + result.append({ + "association_id": association.id, + "engine_id": engine.id, + "engine_name": engine.name, + "answer_model": engine.answer_model, + "status": engine.status, + "created_at": association.created_at + }) + + return result + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving bot engines: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving bot engines: {str(e)}") + +# --- Chat Engine models and endpoints --- +class ChatEngineBase(BaseModel): + name: str + answer_model: str + system_prompt: Optional[str] = None + empty_response: Optional[str] = None + similarity_top_k: int = 3 + vector_distance_threshold: float = 0.75 + grounding_threshold: float = 0.2 + use_public_information: bool = False + status: str = "active" + +class ChatEngineCreate(ChatEngineBase): + pass + +class ChatEngineUpdate(BaseModel): + name: Optional[str] = None + answer_model: Optional[str] = None + system_prompt: Optional[str] = None + empty_response: Optional[str] = None + similarity_top_k: Optional[int] = None + vector_distance_threshold: Optional[float] = None + grounding_threshold: Optional[float] = None + use_public_information: Optional[bool] = None + status: Optional[str] = None + +class ChatEngineResponse(ChatEngineBase): + id: int + created_at: datetime + last_modified: datetime + + model_config = ConfigDict(from_attributes=True) + +@router.get("/chat-engines", response_model=List[ChatEngineResponse]) +async def get_chat_engines( + skip: int = 0, + limit: int = 100, + status: Optional[str] = None, + db: Session = Depends(get_db) +): + """ + Get all chat engines. + + - **skip**: Number of items to skip + - **limit**: Maximum number of items to return + - **status**: Filter by status (e.g., 'active', 'inactive') + """ + try: + query = db.query(ChatEngine) + + if status: + query = query.filter(ChatEngine.status == status) + + engines = query.offset(skip).limit(limit).all() + return [ChatEngineResponse.model_validate(engine, from_attributes=True) for engine in engines] + except SQLAlchemyError as e: + logger.error(f"Database error retrieving chat engines: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + logger.error(f"Error retrieving chat engines: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving chat engines: {str(e)}") + +@router.post("/chat-engines", response_model=ChatEngineResponse) +async def create_chat_engine( + engine: ChatEngineCreate, + db: Session = Depends(get_db) +): + """ + Create a new chat engine. + """ + try: + # Create chat engine + db_engine = ChatEngine(**engine.model_dump()) + + db.add(db_engine) + db.commit() + db.refresh(db_engine) + + return ChatEngineResponse.model_validate(db_engine, from_attributes=True) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error creating chat engine: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error creating chat engine: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error creating chat engine: {str(e)}") + +@router.get("/chat-engines/{engine_id}", response_model=ChatEngineResponse) +async def get_chat_engine( + engine_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Get chat engine by ID. + """ + try: + engine = db.query(ChatEngine).filter(ChatEngine.id == engine_id).first() + if not engine: + raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_id} not found") + + return ChatEngineResponse.model_validate(engine, from_attributes=True) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving chat engine: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving chat engine: {str(e)}") + +@router.put("/chat-engines/{engine_id}", response_model=ChatEngineResponse) +async def update_chat_engine( + engine_id: int = Path(..., gt=0), + engine_update: ChatEngineUpdate = Body(...), + db: Session = Depends(get_db) +): + """ + Update chat engine details. + """ + try: + db_engine = db.query(ChatEngine).filter(ChatEngine.id == engine_id).first() + if not db_engine: + raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_id} not found") + + # Update fields if provided + update_data = engine_update.model_dump(exclude_unset=True) + for key, value in update_data.items(): + if value is not None: + setattr(db_engine, key, value) + + # Update last_modified timestamp + db_engine.last_modified = datetime.utcnow() + + db.commit() + db.refresh(db_engine) + + return ChatEngineResponse.model_validate(db_engine, from_attributes=True) + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error updating chat engine: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error updating chat engine: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error updating chat engine: {str(e)}") + +@router.delete("/chat-engines/{engine_id}", response_model=dict) +async def delete_chat_engine( + engine_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Delete chat engine. + """ + try: + db_engine = db.query(ChatEngine).filter(ChatEngine.id == engine_id).first() + if not db_engine: + raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_id} not found") + + # Check if engine has associated bots or vector databases + bot_engine_count = db.query(func.count(BotEngine.id)).filter(BotEngine.engine_id == engine_id).scalar() + vector_db_count = db.query(func.count(EngineVectorDb.id)).filter(EngineVectorDb.engine_id == engine_id).scalar() + + if bot_engine_count > 0 or vector_db_count > 0: + raise HTTPException( + status_code=400, + detail="Cannot delete chat engine as it has associated bots or vector databases. Remove associations first." + ) + + # Delete engine + db.delete(db_engine) + db.commit() + + return {"message": f"Chat engine with ID {engine_id} deleted successfully"} + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error deleting chat engine: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error deleting chat engine: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error deleting chat engine: {str(e)}") + +@router.get("/chat-engines/{engine_id}/vector-databases", response_model=List[dict]) +async def get_engine_vector_databases( + engine_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Get all vector databases associated with a chat engine. + """ + try: + # Verify engine exists + engine = db.query(ChatEngine).filter(ChatEngine.id == engine_id).first() + if not engine: + raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_id} not found") + + # Get associated vector databases through EngineVectorDb + engine_vector_dbs = db.query(EngineVectorDb).filter(EngineVectorDb.engine_id == engine_id).all() + + result = [] + for association in engine_vector_dbs: + vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == association.vector_database_id).first() + if vector_db: + result.append({ + "association_id": association.id, + "vector_database_id": vector_db.id, + "name": vector_db.name, + "pinecone_index": vector_db.pinecone_index, + "priority": association.priority, + "status": vector_db.status + }) + + return result + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving engine vector databases: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving engine vector databases: {str(e)}") + +# --- Bot Engine Association models and endpoints --- +class BotEngineCreate(BaseModel): + bot_id: int + engine_id: int + +class BotEngineResponse(BotEngineCreate): + id: int + created_at: datetime + + model_config = ConfigDict(from_attributes=True) + +@router.get("/bot-engines", response_model=List[BotEngineResponse]) +async def get_bot_engines( + skip: int = 0, + limit: int = 100, + bot_id: Optional[int] = None, + engine_id: Optional[int] = None, + db: Session = Depends(get_db) +): + """ + Get all bot-engine associations. + + - **skip**: Number of items to skip + - **limit**: Maximum number of items to return + - **bot_id**: Filter by bot ID + - **engine_id**: Filter by engine ID + """ + try: + query = db.query(BotEngine) + + if bot_id is not None: + query = query.filter(BotEngine.bot_id == bot_id) + + if engine_id is not None: + query = query.filter(BotEngine.engine_id == engine_id) + + bot_engines = query.offset(skip).limit(limit).all() + return [BotEngineResponse.model_validate(association, from_attributes=True) for association in bot_engines] + except SQLAlchemyError as e: + logger.error(f"Database error retrieving bot-engine associations: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + logger.error(f"Error retrieving bot-engine associations: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving bot-engine associations: {str(e)}") + +@router.post("/bot-engines", response_model=BotEngineResponse) +async def create_bot_engine( + bot_engine: BotEngineCreate, + db: Session = Depends(get_db) +): + """ + Create a new bot-engine association. + """ + try: + # Check if bot exists + bot = db.query(TelegramBot).filter(TelegramBot.id == bot_engine.bot_id).first() + if not bot: + raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_engine.bot_id} not found") + + # Check if engine exists + engine = db.query(ChatEngine).filter(ChatEngine.id == bot_engine.engine_id).first() + if not engine: + raise HTTPException(status_code=404, detail=f"Chat engine with ID {bot_engine.engine_id} not found") + + # Check if association already exists + existing_association = db.query(BotEngine).filter( + BotEngine.bot_id == bot_engine.bot_id, + BotEngine.engine_id == bot_engine.engine_id + ).first() + + if existing_association: + raise HTTPException( + status_code=400, + detail=f"Association between bot ID {bot_engine.bot_id} and engine ID {bot_engine.engine_id} already exists" + ) + + # Create association + db_bot_engine = BotEngine(**bot_engine.model_dump()) + + db.add(db_bot_engine) + db.commit() + db.refresh(db_bot_engine) + + return BotEngineResponse.model_validate(db_bot_engine, from_attributes=True) + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error creating bot-engine association: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error creating bot-engine association: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error creating bot-engine association: {str(e)}") + +@router.get("/bot-engines/{association_id}", response_model=BotEngineResponse) +async def get_bot_engine( + association_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Get bot-engine association by ID. + """ + try: + association = db.query(BotEngine).filter(BotEngine.id == association_id).first() + if not association: + raise HTTPException(status_code=404, detail=f"Bot-engine association with ID {association_id} not found") + + return BotEngineResponse.model_validate(association, from_attributes=True) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving bot-engine association: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving bot-engine association: {str(e)}") + +@router.delete("/bot-engines/{association_id}", response_model=dict) +async def delete_bot_engine( + association_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Delete bot-engine association. + """ + try: + association = db.query(BotEngine).filter(BotEngine.id == association_id).first() + if not association: + raise HTTPException(status_code=404, detail=f"Bot-engine association with ID {association_id} not found") + + db.delete(association) + db.commit() + + return {"message": f"Bot-engine association with ID {association_id} deleted successfully"} + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error deleting bot-engine association: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error deleting bot-engine association: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error deleting bot-engine association: {str(e)}") + +# --- Engine-Vector DB Association models and endpoints --- +class EngineVectorDbCreate(BaseModel): + engine_id: int + vector_database_id: int + priority: int = 0 + +class EngineVectorDbResponse(EngineVectorDbCreate): + id: int + + model_config = ConfigDict(from_attributes=True) + +@router.get("/engine-vector-dbs", response_model=List[EngineVectorDbResponse]) +async def get_engine_vector_dbs( + skip: int = 0, + limit: int = 100, + engine_id: Optional[int] = None, + vector_database_id: Optional[int] = None, + db: Session = Depends(get_db) +): + """ + Get all engine-vector-db associations. + + - **skip**: Number of items to skip + - **limit**: Maximum number of items to return + - **engine_id**: Filter by engine ID + - **vector_database_id**: Filter by vector database ID + """ + try: + query = db.query(EngineVectorDb) + + if engine_id is not None: + query = query.filter(EngineVectorDb.engine_id == engine_id) + + if vector_database_id is not None: + query = query.filter(EngineVectorDb.vector_database_id == vector_database_id) + + associations = query.offset(skip).limit(limit).all() + return [EngineVectorDbResponse.model_validate(association, from_attributes=True) for association in associations] + except SQLAlchemyError as e: + logger.error(f"Database error retrieving engine-vector-db associations: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + logger.error(f"Error retrieving engine-vector-db associations: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving engine-vector-db associations: {str(e)}") + +@router.post("/engine-vector-dbs", response_model=EngineVectorDbResponse) +async def create_engine_vector_db( + engine_vector_db: EngineVectorDbCreate, + db: Session = Depends(get_db) +): + """ + Create a new engine-vector-db association. + """ + try: + # Check if engine exists + engine = db.query(ChatEngine).filter(ChatEngine.id == engine_vector_db.engine_id).first() + if not engine: + raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_vector_db.engine_id} not found") + + # Check if vector database exists + vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == engine_vector_db.vector_database_id).first() + if not vector_db: + raise HTTPException(status_code=404, detail=f"Vector database with ID {engine_vector_db.vector_database_id} not found") + + # Check if association already exists + existing_association = db.query(EngineVectorDb).filter( + EngineVectorDb.engine_id == engine_vector_db.engine_id, + EngineVectorDb.vector_database_id == engine_vector_db.vector_database_id + ).first() + + if existing_association: + raise HTTPException( + status_code=400, + detail=f"Association between engine ID {engine_vector_db.engine_id} and vector database ID {engine_vector_db.vector_database_id} already exists" + ) + + # Create association + db_engine_vector_db = EngineVectorDb(**engine_vector_db.model_dump()) + + db.add(db_engine_vector_db) + db.commit() + db.refresh(db_engine_vector_db) + + return EngineVectorDbResponse.model_validate(db_engine_vector_db, from_attributes=True) + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error creating engine-vector-db association: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error creating engine-vector-db association: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error creating engine-vector-db association: {str(e)}") + +@router.get("/engine-vector-dbs/{association_id}", response_model=EngineVectorDbResponse) +async def get_engine_vector_db( + association_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Get engine-vector-db association by ID. + """ + try: + association = db.query(EngineVectorDb).filter(EngineVectorDb.id == association_id).first() + if not association: + raise HTTPException(status_code=404, detail=f"Engine-vector-db association with ID {association_id} not found") + + return EngineVectorDbResponse.model_validate(association, from_attributes=True) + except HTTPException: + raise + except Exception as e: + logger.error(f"Error retrieving engine-vector-db association: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving engine-vector-db association: {str(e)}") + +@router.put("/engine-vector-dbs/{association_id}", response_model=EngineVectorDbResponse) +async def update_engine_vector_db( + association_id: int = Path(..., gt=0), + update_data: dict = Body(...), + db: Session = Depends(get_db) +): + """ + Update engine-vector-db association details (only priority can be updated). + """ + try: + association = db.query(EngineVectorDb).filter(EngineVectorDb.id == association_id).first() + if not association: + raise HTTPException(status_code=404, detail=f"Engine-vector-db association with ID {association_id} not found") + + # Only priority can be updated + if "priority" in update_data: + association.priority = update_data["priority"] + + db.commit() + db.refresh(association) + + return EngineVectorDbResponse.model_validate(association, from_attributes=True) + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error updating engine-vector-db association: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error updating engine-vector-db association: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error updating engine-vector-db association: {str(e)}") + +@router.delete("/engine-vector-dbs/{association_id}", response_model=dict) +async def delete_engine_vector_db( + association_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Delete engine-vector-db association. + """ + try: + association = db.query(EngineVectorDb).filter(EngineVectorDb.id == association_id).first() + if not association: + raise HTTPException(status_code=404, detail=f"Engine-vector-db association with ID {association_id} not found") + + db.delete(association) + db.commit() + + return {"message": f"Engine-vector-db association with ID {association_id} deleted successfully"} + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error deleting engine-vector-db association: {e}") + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error deleting engine-vector-db association: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error deleting engine-vector-db association: {str(e)}") + +# --- VectorStatus models and endpoints --- +class VectorStatusBase(BaseModel): + document_id: int + vector_database_id: int + vector_id: Optional[str] = None + document_name: Optional[str] = None # Added to match database schema + status: str = "pending" + error_message: Optional[str] = None + +class VectorStatusCreate(VectorStatusBase): + pass + +class VectorStatusResponse(VectorStatusBase): + id: int + embedded_at: Optional[datetime] = None + + model_config = ConfigDict(from_attributes=True) + +@router.get("/vector-statuses", response_model=List[VectorStatusResponse]) +async def get_vector_statuses( + skip: int = 0, + limit: int = 100, + status: Optional[str] = None, + document_id: Optional[int] = None, + vector_database_id: Optional[int] = None, + db: Session = Depends(get_db) +): + """ + Get all vector statuses with optional filtering. + + - **skip**: Number of items to skip + - **limit**: Maximum number of items to return + - **status**: Filter by status (e.g., 'pending', 'completed', 'error') + - **document_id**: Filter by document ID + - **vector_database_id**: Filter by vector database ID + """ + try: + query = db.query(VectorStatus) + + # Apply filters if provided + if status is not None: + query = query.filter(VectorStatus.status == status) + + if document_id is not None: + query = query.filter(VectorStatus.document_id == document_id) + + if vector_database_id is not None: + query = query.filter(VectorStatus.vector_database_id == vector_database_id) + + # Execute query with pagination + vector_statuses = query.offset(skip).limit(limit).all() + + # Convert to Pydantic models + return [VectorStatusResponse.model_validate(status, from_attributes=True) for status in vector_statuses] + except SQLAlchemyError as e: + logger.error(f"Database error in get_vector_statuses: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + logger.error(f"Error retrieving vector statuses: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error retrieving vector statuses: {str(e)}") + +@router.post("/emergency", response_model=EmergencyResponse) +async def create_emergency_contact( + emergency: EmergencyCreate, + db: Session = Depends(get_db) +): + """ + Create a new emergency contact. + + - **name**: Contact name + - **phone_number**: Phone number + - **description**: Description (optional) + - **address**: Address (optional) + - **location**: Location coordinates (optional) + - **priority**: Priority order (default: 0) + - **is_active**: Whether the contact is active (default: True) + """ + try: + db_emergency = EmergencyItem(**emergency.model_dump()) + db.add(db_emergency) + db.commit() + db.refresh(db_emergency) + + # Invalidate emergency cache after creating a new item + emergencies_cache.clear() + + # Convert SQLAlchemy model to Pydantic model before returning + result = EmergencyResponse.model_validate(db_emergency, from_attributes=True) + return result + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in create_emergency_contact: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.get("/emergency/{emergency_id}", response_model=EmergencyResponse) +async def get_emergency_contact( + emergency_id: int = Path(..., gt=0), + use_cache: bool = True, + db: Session = Depends(get_db) +): + """ + Get a single emergency contact by ID. + + - **emergency_id**: ID of the emergency contact + - **use_cache**: If true, use cached results when available + """ + try: + # Generate cache key + cache_key = f"emergency_{emergency_id}" + + # Try to get from cache if caching is enabled + if use_cache: + cached_result = emergencies_cache.get(cache_key) + if cached_result: + logger.info(f"Cache hit for {cache_key}") + return cached_result + + # Use direct SQL query for better performance on single item lookup + stmt = text("SELECT * FROM emergency_item WHERE id = :id") + result = db.execute(stmt, {"id": emergency_id}).fetchone() + + if not result: + raise HTTPException(status_code=404, detail="Emergency contact not found") + + # Create an EmergencyItem model instance manually + emergency = EmergencyItem() + for key, value in result._mapping.items(): + if hasattr(emergency, key): + setattr(emergency, key, value) + + # Convert to Pydantic model + response = EmergencyResponse.model_validate(emergency, from_attributes=True) + + # Store in cache if caching is enabled + if use_cache: + emergencies_cache[cache_key] = response + + return response + except SQLAlchemyError as e: + logger.error(f"Database error in get_emergency_contact: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.put("/emergency/{emergency_id}", response_model=EmergencyResponse) +async def update_emergency_contact( + emergency_id: int = Path(..., gt=0), + emergency_update: EmergencyUpdate = Body(...), + db: Session = Depends(get_db) +): + """ + Update a specific emergency contact. + + - **emergency_id**: ID of the emergency contact to update + - **name**: New name (optional) + - **phone_number**: New phone number (optional) + - **description**: New description (optional) + - **address**: New address (optional) + - **location**: New location coordinates (optional) + - **priority**: New priority order (optional) + - **is_active**: New active status (optional) + """ + try: + emergency = db.query(EmergencyItem).filter(EmergencyItem.id == emergency_id).first() + if not emergency: + raise HTTPException(status_code=404, detail="Emergency contact not found") + + # Update fields if provided + update_data = emergency_update.model_dump(exclude_unset=True) + for key, value in update_data.items(): + setattr(emergency, key, value) + + db.commit() + db.refresh(emergency) + + # Invalidate specific cache entries + emergencies_cache.delete(f"emergency_{emergency_id}") + emergencies_cache.clear() # Clear all list caches + + # Convert to Pydantic model + return EmergencyResponse.model_validate(emergency, from_attributes=True) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in update_emergency_contact: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.delete("/emergency/{emergency_id}", response_model=dict) +async def delete_emergency_contact( + emergency_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Delete a specific emergency contact. + + - **emergency_id**: ID of the emergency contact to delete + """ + try: + # Use optimized direct SQL with RETURNING for better performance + result = db.execute( + text("DELETE FROM emergency_item WHERE id = :id RETURNING id"), + {"id": emergency_id} + ).fetchone() + + if not result: + raise HTTPException(status_code=404, detail="Emergency contact not found") + + db.commit() + + # Invalidate cache entries + emergencies_cache.delete(f"emergency_{emergency_id}") + emergencies_cache.clear() # Clear all list caches + + return {"status": "success", "message": f"Emergency contact {emergency_id} deleted"} + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in delete_emergency_contact: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.put("/emergency/batch-update-status", response_model=BatchUpdateResult) +async def batch_update_emergency_status( + emergency_ids: List[int] = Body(..., embed=True), + is_active: bool = Body(..., embed=True), + db: Session = Depends(get_db) +): + """ + Update the active status of multiple emergency contacts at once. + + This is much more efficient than updating emergency contacts one at a time. + """ + try: + if not emergency_ids: + raise HTTPException(status_code=400, detail="No emergency contact IDs provided") + + # Prepare the update statement + stmt = text(""" + UPDATE emergency_item + SET is_active = :is_active, updated_at = NOW() + WHERE id = ANY(:emergency_ids) + RETURNING id + """) + + # Execute the update in a single query + result = db.execute(stmt, {"is_active": is_active, "emergency_ids": emergency_ids}) + updated_ids = [row[0] for row in result] + + # Commit the transaction + db.commit() + + # Determine which IDs weren't found + failed_ids = [id for id in emergency_ids if id not in updated_ids] + + # Invalidate emergency cache + emergencies_cache.clear() + + return BatchUpdateResult( + success_count=len(updated_ids), + failed_ids=failed_ids, + message=f"Updated {len(updated_ids)} emergency contacts" if updated_ids else "No emergency contacts were updated" + ) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in batch_update_emergency_status: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.delete("/emergency/batch", response_model=BatchUpdateResult) +async def batch_delete_emergency_contacts( + emergency_ids: List[int] = Body(..., embed=True), + db: Session = Depends(get_db) +): + """ + Delete multiple emergency contacts at once. + + This is much more efficient than deleting emergency contacts one at a time with separate API calls. + """ + try: + if not emergency_ids: + raise HTTPException(status_code=400, detail="No emergency contact IDs provided") + + # Prepare and execute the delete statement with RETURNING to get deleted IDs + stmt = text(""" + DELETE FROM emergency_item + WHERE id = ANY(:emergency_ids) + RETURNING id + """) + + result = db.execute(stmt, {"emergency_ids": emergency_ids}) + deleted_ids = [row[0] for row in result] + + # Commit the transaction + db.commit() + + # Determine which IDs weren't found + failed_ids = [id for id in emergency_ids if id not in deleted_ids] + + # Invalidate emergency cache + emergencies_cache.clear() + + return BatchUpdateResult( + success_count=len(deleted_ids), + failed_ids=failed_ids, + message=f"Deleted {len(deleted_ids)} emergency contacts" if deleted_ids else "No emergency contacts were deleted" + ) + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error in batch_delete_emergency_contacts: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + +@router.post("/documents", response_model=DocumentResponse) +async def upload_document( + name: str = Form(...), + vector_database_id: int = Form(...), + file: UploadFile = File(...), + db: Session = Depends(get_db) +): + """ + Upload a new document and associate it with a vector database. + + - **name**: Document name + - **vector_database_id**: ID of the vector database to associate with + - **file**: The file to upload + """ + try: + # Check if vector database exists + vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_database_id).first() + if not vector_db: + raise HTTPException(status_code=404, detail=f"Vector database with ID {vector_database_id} not found") + + # Read file content + file_content = await file.read() + file_size = len(file_content) + + # Determine file type from extension + filename = file.filename + file_extension = pathlib_Path(filename).suffix.lower()[1:] if filename else "" + + # Create document record + document = Document( + name=name, + vector_database_id=vector_database_id, + file_type=file_extension, + content_type=file.content_type, + size=file_size, + is_embedded=False + ) + + db.add(document) + db.flush() # Get ID without committing + + # Create document content record + document_content = DocumentContent( + document_id=document.id, + file_content=file_content + ) + + db.add(document_content) + db.commit() + db.refresh(document) + + # Create vector status record for tracking embedding + vector_status = VectorStatus( + document_id=document.id, + vector_database_id=vector_database_id, + status="pending" + ) + + db.add(vector_status) + db.commit() + + # Get vector database name for response + vector_db_name = vector_db.name if vector_db else f"db_{vector_database_id}" + + # Create response + result = DocumentResponse( + id=document.id, + name=document.name, + file_type=document.file_type, + content_type=document.content_type, + size=document.size, + created_at=document.created_at, + updated_at=document.updated_at, + vector_database_id=document.vector_database_id, + vector_database_name=vector_db_name, + is_embedded=document.is_embedded + ) + + return result + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error uploading document: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error uploading document: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error uploading document: {str(e)}") + +@router.put("/documents/{document_id}", response_model=DocumentResponse) +async def update_document( + document_id: int, + name: Optional[str] = Form(None), + file: Optional[UploadFile] = File(None), + background_tasks: BackgroundTasks = None, + db: Session = Depends(get_db) +): + """ + Update an existing document. Can update name, file content, or both. + + - **document_id**: ID of the document to update + - **name**: New document name (optional) + - **file**: New file content (optional) + """ + try: + # Validate document_id + if document_id <= 0: + raise HTTPException(status_code=400, detail="document_id must be greater than 0") + + # Check if document exists + document = db.query(Document).filter(Document.id == document_id).first() + if not document: + raise HTTPException(status_code=404, detail=f"Document with ID {document_id} not found") + + # Get vector database information for later use + vector_db = None + if document.vector_database_id: + vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == document.vector_database_id).first() + + # Update name if provided + if name: + document.name = name + + # Update file if provided + if file: + # Read new file content + file_content = await file.read() + file_size = len(file_content) + + # Determine file type from extension + filename = file.filename + file_extension = pathlib_Path(filename).suffix.lower()[1:] if filename else "" + + # Update document record + document.file_type = file_extension + document.content_type = file.content_type + document.size = file_size + document.is_embedded = False # Reset embedding status + document.updated_at = datetime.now() + + # Update document content + document_content = db.query(DocumentContent).filter(DocumentContent.document_id == document_id).first() + if document_content: + document_content.file_content = file_content + else: + # Create new document content if it doesn't exist + document_content = DocumentContent( + document_id=document_id, + file_content=file_content + ) + db.add(document_content) + + # Get vector status for Pinecone cleanup + vector_status = db.query(VectorStatus).filter( + VectorStatus.document_id == document_id, + VectorStatus.vector_database_id == document.vector_database_id + ).first() + + # Store old vector_id for cleanup + old_vector_id = None + if vector_status and vector_status.vector_id: + old_vector_id = vector_status.vector_id + logger.info(f"Found old vector_id {old_vector_id} for document {document_id}, planning to delete") + + # Delete old vector status and create a new one + if vector_status: + # Instead of updating the status, delete the old one and create a new one + # This avoids validation errors with constrains on the vector_status table + db.delete(vector_status) + db.flush() # Ensure the delete is processed before creating a new one + logger.info(f"Deleted old vector status for document {document_id}") + + # Create new vector status + vector_status = VectorStatus( + document_id=document_id, + vector_database_id=document.vector_database_id, + status="pending", + document_name=document.name + ) + db.add(vector_status) + db.flush() + logger.info(f"Created new vector status for document {document_id} with status 'pending'") + + # Delete old vectors from Pinecone if we have vector_id and vector_db + if old_vector_id and vector_db and document.vector_database_id: + try: + import httpx + + # Call PDF API to delete document using HTTP request (avoids circular imports) + base_url = "http://localhost:8000" + delete_url = f"{base_url}/pdf/document" + + params = { + "document_id": old_vector_id, # Use the vector_id instead of document ID + "namespace": f"vdb-{document.vector_database_id}", + "index_name": vector_db.pinecone_index, + "vector_database_id": document.vector_database_id + } + + logger.info(f"Deleting old vectors for document {document_id} with params: {params}") + + # Run deletion synchronously to ensure completion before proceeding + async with httpx.AsyncClient() as client: + response = await client.delete(delete_url, params=params) + if response.status_code == 200: + result = response.json() + vectors_deleted = result.get('vectors_deleted', 0) + logger.info(f"Successfully deleted {vectors_deleted} old vectors for document {document_id}") + else: + logger.warning(f"Failed to delete old vectors: {response.status_code} - {response.text}") + except Exception as e: + logger.error(f"Error deleting old vectors: {str(e)}") + # Continue with the update even if vector deletion fails + + # Now start a background task to upload and process the new document + if document.vector_database_id: + try: + # Use httpx to call the PDF API to upload the new document + # This ensures we reuse all the existing upload and vector creation logic + import tempfile + import os + + logger.info(f"Starting background task to re-upload document {document_id}") + + # Define an async function for uploading in background + async def upload_and_process_document(): + try: + # Create temporary file with the document content + with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_extension}") as temp_file: + temp_file.write(file_content) + temp_path = temp_file.name + + # Prepare multipart form data for the upload + import aiofiles + from aiofiles import os as aio_os + + async with httpx.AsyncClient(timeout=300) as client: # Increased timeout to 300 seconds + # Open the temp file for async reading + async with aiofiles.open(temp_path, "rb") as f: + file_data = await f.read() + + # Create form data + files = {"file": (filename, file_data, document.content_type)} + form_data = { + "title": document.name, + "vector_database_id": str(document.vector_database_id), + "namespace": f"vdb-{document.vector_database_id}" + } + + # Call the PDF upload API + upload_url = f"{base_url}/pdf/upload" + logger.info(f"Calling PDF upload API for document {document_id}") + + response = await client.post(upload_url, files=files, data=form_data) + + # Process the response + if response.status_code == 200: + result = response.json() + logger.info(f"Successfully uploaded document {document_id}: {result}") + + # Get the new vector_id from the result + new_vector_id = result.get('document_id') + + # If upload was successful, update the vector status in PostgreSQL + if result.get('success') and new_vector_id: + # Get the latest vector status + await asyncio.sleep(1) # Small delay to ensure DB consistency + + # Use a new DB session for this update + from app.database.postgresql import SessionLocal + async_db = SessionLocal() + try: + vs = async_db.query(VectorStatus).filter( + VectorStatus.document_id == document_id, + VectorStatus.vector_database_id == document.vector_database_id + ).first() + + if vs: + vs.vector_id = new_vector_id + vs.status = "completed" + vs.embedded_at = datetime.now() + + # Also update document embedded status + doc = async_db.query(Document).filter(Document.id == document_id).first() + if doc: + doc.is_embedded = True + + async_db.commit() + logger.info(f"Updated vector status with new vector_id {new_vector_id}") + finally: + async_db.close() + else: + logger.error(f"Failed to upload document: {response.status_code} - {response.text}") + + # Clean up temporary file + try: + await aio_os.remove(temp_path) + except Exception as cleanup_error: + logger.error(f"Error cleaning up temporary file: {str(cleanup_error)}") + except Exception as e: + logger.error(f"Error in background upload task: {str(e)}") + logger.error(traceback.format_exc()) + + # Add the task to background tasks + if background_tasks: + background_tasks.add_task(upload_and_process_document) + logger.info("Added document upload to background tasks") + else: + logger.warning("Background tasks not available, skipping document upload") + except Exception as e: + logger.error(f"Error setting up document re-upload: {str(e)}") + # Continue with the update even if re-upload setup fails + + # Commit changes to document record + db.commit() + db.refresh(document) + + # Get vector database name for response + vector_db_name = "No Database" + if vector_db: + vector_db_name = vector_db.name + elif document.vector_database_id: + vector_db_name = f"db_{document.vector_database_id}" + + # Create response + result = DocumentResponse( + id=document.id, + name=document.name, + file_type=document.file_type, + content_type=document.content_type, + size=document.size, + created_at=document.created_at, + updated_at=document.updated_at, + vector_database_id=document.vector_database_id or 0, + vector_database_name=vector_db_name, + is_embedded=document.is_embedded + ) + + return result + except HTTPException: + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error updating document: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error updating document: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error updating document: {str(e)}") + +@router.delete("/documents/{document_id}", response_model=dict) +async def delete_document( + document_id: int = Path(..., gt=0), + db: Session = Depends(get_db) +): + """ + Delete a document and its associated content. + + - **document_id**: ID of the document to delete + """ + try: + logger.info(f"Starting deletion process for document ID {document_id}") + + # Check if document exists + document = db.query(Document).filter(Document.id == document_id).first() + if not document: + logger.warning(f"Document with ID {document_id} not found for deletion") + raise HTTPException(status_code=404, detail=f"Document with ID {document_id} not found") + + vector_database_id = document.vector_database_id + logger.info(f"Found document to delete: name={document.name}, vector_database_id={vector_database_id}") + + # Get the vector_id from VectorStatus before deletion + vector_status = db.query(VectorStatus).filter( + VectorStatus.document_id == document_id, + VectorStatus.vector_database_id == vector_database_id + ).first() + + # Store the vector_id for Pinecone deletion + vector_id = None + pinecone_deletion_success = False + pinecone_error = None + + if vector_status and vector_status.vector_id: + vector_id = vector_status.vector_id + logger.info(f"Found vector_id {vector_id} for document {document_id}") + + # Get vector database info + vector_db = db.query(VectorDatabase).filter( + VectorDatabase.id == vector_database_id + ).first() + + if vector_db: + logger.info(f"Found vector database: name={vector_db.name}, index={vector_db.pinecone_index}") + + # Create namespace for vector database + namespace = f"vdb-{vector_database_id}" + + try: + import httpx + + # Call PDF API to delete from Pinecone using an HTTP request + # This avoids circular import issues + base_url = "http://localhost:8000" # Adjust this to match your actual base URL + delete_url = f"{base_url}/pdf/document" + + params = { + "document_id": vector_id, # Use the vector_id instead of the PostgreSQL document ID + "namespace": namespace, + "index_name": vector_db.pinecone_index, + "vector_database_id": vector_database_id + } + + logger.info(f"Calling PDF API to delete vectors with params: {params}") + + # Add retry logic for better reliability + max_retries = 3 + retry_delay = 2 # seconds + success = False + last_error = None + + for retry in range(max_retries): + try: + async with httpx.AsyncClient(timeout=300) as client: # Increased timeout to 300 seconds + response = await client.delete(delete_url, params=params) + + if response.status_code == 200: + result = response.json() + pinecone_deletion_success = result.get('success', False) + vectors_deleted = result.get('vectors_deleted', 0) + logger.info(f"Vector deletion API call response: success={pinecone_deletion_success}, vectors_deleted={vectors_deleted}") + success = True + break + else: + last_error = f"Failed with status code {response.status_code}: {response.text}" + logger.warning(f"Deletion attempt {retry+1}/{max_retries} failed: {last_error}") + except Exception as e: + last_error = str(e) + logger.warning(f"Deletion attempt {retry+1}/{max_retries} failed with exception: {last_error}") + + # Wait before retrying + if retry < max_retries - 1: # Don't sleep after the last attempt + logger.info(f"Retrying in {retry_delay} seconds...") + await asyncio.sleep(retry_delay) + retry_delay *= 2 # Exponential backoff + + if not success: + pinecone_error = f"All deletion attempts failed. Last error: {last_error}" + logger.warning(pinecone_error) + # Continue with PostgreSQL deletion even if Pinecone deletion fails + except Exception as e: + pinecone_error = f"Error setting up Pinecone deletion: {str(e)}" + logger.error(pinecone_error) + # Continue with PostgreSQL deletion even if Pinecone deletion fails + else: + logger.warning(f"No vector_id found for document {document_id}, skipping Pinecone deletion") + + # Delete vector status + result_vs = db.query(VectorStatus).filter(VectorStatus.document_id == document_id).delete() + logger.info(f"Deleted {result_vs} vector status records for document {document_id}") + + # Delete document content + result_dc = db.query(DocumentContent).filter(DocumentContent.document_id == document_id).delete() + logger.info(f"Deleted {result_dc} document content records for document {document_id}") + + # Delete document + db.delete(document) + db.commit() + logger.info(f"Document with ID {document_id} successfully deleted from PostgreSQL") + + # Prepare response with information about what happened + response = { + "status": "success", + "message": f"Document with ID {document_id} deleted successfully", + "postgresql_deletion": { + "document_deleted": True, + "vector_status_deleted": result_vs > 0, + "document_content_deleted": result_dc > 0 + } + } + + # Add Pinecone deletion information + if vector_id: + response["pinecone_deletion"] = { + "attempted": True, + "vector_id": vector_id, + "success": pinecone_deletion_success, + } + if pinecone_error: + response["pinecone_deletion"]["error"] = pinecone_error + else: + response["pinecone_deletion"] = { + "attempted": False, + "reason": "No vector_id found for document" + } + + return response + except HTTPException: + logger.warning(f"HTTP exception in delete_document for ID {document_id}") + raise + except SQLAlchemyError as e: + db.rollback() + logger.error(f"Database error deleting document {document_id}: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") + except Exception as e: + db.rollback() + logger.error(f"Error deleting document {document_id}: {e}") + logger.error(traceback.format_exc()) + raise HTTPException(status_code=500, detail=f"Error deleting document: {str(e)}") \ No newline at end of file