Spaces:
Sleeping
Sleeping
import logging | |
import json | |
import traceback | |
from datetime import datetime, timedelta, timezone | |
import time | |
from functools import lru_cache | |
from pathlib import Path as pathlib_Path # Import Path from pathlib with a different name | |
from fastapi import APIRouter, HTTPException, Depends, Query, Body, Response, File, UploadFile, Form, BackgroundTasks | |
from fastapi.params import Path # Import Path explicitly from fastapi.params instead | |
from sqlalchemy.orm import Session | |
from sqlalchemy.exc import SQLAlchemyError | |
from typing import List, Optional, Dict, Any | |
import logging | |
import traceback | |
from datetime import datetime | |
from sqlalchemy import text, inspect, func | |
from sqlalchemy.exc import SQLAlchemyError | |
from sqlalchemy import desc, func | |
from cachetools import TTLCache | |
import uuid | |
from app.database.postgresql import get_db | |
from app.database.models import FAQItem, EmergencyItem, EventItem, AboutPixity, SolanaSummit, DaNangBucketList, ApiKey, VectorDatabase, Document, VectorStatus, TelegramBot, ChatEngine, BotEngine, EngineVectorDb, DocumentContent | |
from pydantic import BaseModel, Field, ConfigDict | |
# Configure logging | |
logger = logging.getLogger(__name__) | |
# Create router | |
router = APIRouter( | |
prefix="/postgres", | |
tags=["PostgreSQL"], | |
) | |
# Initialize caches for frequently used data | |
# Cache for 5 minutes (300 seconds) | |
faqs_cache = TTLCache(maxsize=1, ttl=300) | |
emergencies_cache = TTLCache(maxsize=1, ttl=300) | |
events_cache = TTLCache(maxsize=10, ttl=300) # Cache for different page sizes | |
about_pixity_cache = TTLCache(maxsize=1, ttl=300) | |
solana_summit_cache = TTLCache(maxsize=1, ttl=300) | |
danang_bucket_list_cache = TTLCache(maxsize=1, ttl=300) | |
# --- Pydantic models for request/response --- | |
# Information models | |
class InfoContentBase(BaseModel): | |
content: str | |
class InfoContentCreate(InfoContentBase): | |
pass | |
class InfoContentUpdate(InfoContentBase): | |
pass | |
class InfoContentResponse(InfoContentBase): | |
id: int | |
created_at: datetime | |
updated_at: datetime | |
model_config = ConfigDict(from_attributes=True) | |
# FAQ models | |
class FAQBase(BaseModel): | |
question: str | |
answer: str | |
is_active: bool = True | |
class FAQCreate(FAQBase): | |
pass | |
class FAQUpdate(BaseModel): | |
question: Optional[str] = None | |
answer: Optional[str] = None | |
is_active: Optional[bool] = None | |
class FAQResponse(FAQBase): | |
id: int | |
created_at: datetime | |
updated_at: datetime | |
model_config = ConfigDict(from_attributes=True) | |
# Emergency contact models | |
class EmergencyBase(BaseModel): | |
name: str | |
phone_number: str | |
description: Optional[str] = None | |
address: Optional[str] = None | |
location: Optional[str] = None | |
priority: int = 0 | |
is_active: bool = True | |
section: Optional[str] = None | |
section_id: Optional[int] = None | |
class EmergencyCreate(EmergencyBase): | |
pass | |
class EmergencyUpdate(BaseModel): | |
name: Optional[str] = None | |
phone_number: Optional[str] = None | |
description: Optional[str] = None | |
address: Optional[str] = None | |
location: Optional[str] = None | |
priority: Optional[int] = None | |
is_active: Optional[bool] = None | |
section: Optional[str] = None | |
section_id: Optional[int] = None | |
class EmergencyResponse(EmergencyBase): | |
id: int | |
created_at: datetime | |
updated_at: datetime | |
model_config = ConfigDict(from_attributes=True) | |
# Event models | |
class EventBase(BaseModel): | |
name: str | |
description: str | |
address: str | |
location: Optional[str] = None | |
date_start: datetime | |
date_end: Optional[datetime] = None | |
price: Optional[List[dict]] = None | |
url: Optional[str] = None | |
is_active: bool = True | |
featured: bool = False | |
class EventCreate(EventBase): | |
pass | |
class EventUpdate(BaseModel): | |
name: Optional[str] = None | |
description: Optional[str] = None | |
address: Optional[str] = None | |
location: Optional[str] = None | |
date_start: Optional[datetime] = None | |
date_end: Optional[datetime] = None | |
price: Optional[List[dict]] = None | |
url: Optional[str] = None | |
is_active: Optional[bool] = None | |
featured: Optional[bool] = None | |
class EventResponse(EventBase): | |
id: int | |
created_at: datetime | |
updated_at: datetime | |
model_config = ConfigDict(from_attributes=True) | |
# --- Batch operations for better performance --- | |
class BatchEventCreate(BaseModel): | |
events: List[EventCreate] | |
class BatchUpdateResult(BaseModel): | |
success_count: int | |
failed_ids: List[int] = [] | |
message: str | |
# --- FAQ endpoints --- | |
async def get_faqs( | |
skip: int = 0, | |
limit: int = 100, | |
active_only: bool = False, | |
use_cache: bool = True, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get all FAQ items. | |
- **skip**: Number of items to skip | |
- **limit**: Maximum number of items to return | |
- **active_only**: If true, only return active items | |
- **use_cache**: If true, use cached results when available | |
""" | |
try: | |
# Generate cache key based on query parameters | |
cache_key = f"faqs_{skip}_{limit}_{active_only}" | |
# Try to get from cache if caching is enabled | |
if use_cache: | |
cached_result = faqs_cache.get(cache_key) | |
if cached_result: | |
logger.info(f"Cache hit for {cache_key}") | |
return cached_result | |
# Build query directly without excessive logging or inspection | |
query = db.query(FAQItem) | |
# Add filter if needed | |
if active_only: | |
query = query.filter(FAQItem.is_active == True) | |
# Get total count for pagination | |
count_query = query.with_entities(func.count(FAQItem.id)) | |
total_count = count_query.scalar() | |
# Execute query with pagination | |
faqs = query.offset(skip).limit(limit).all() | |
# Convert to Pydantic models | |
result = [FAQResponse.model_validate(faq, from_attributes=True) for faq in faqs] | |
# Store in cache if caching is enabled | |
if use_cache: | |
faqs_cache[cache_key] = result | |
return result | |
except SQLAlchemyError as e: | |
logger.error(f"Database error in get_faqs: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
logger.error(f"Unexpected error in get_faqs: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") | |
async def create_faq( | |
faq: FAQCreate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Create a new FAQ item. | |
- **question**: Question text | |
- **answer**: Answer text | |
- **is_active**: Whether the FAQ is active (default: True) | |
""" | |
try: | |
# Create new FAQ item | |
db_faq = FAQItem(**faq.model_dump()) | |
db.add(db_faq) | |
db.commit() | |
db.refresh(db_faq) | |
# Invalidate FAQ cache after creating a new item | |
faqs_cache.clear() | |
# Convert to Pydantic model | |
return FAQResponse.model_validate(db_faq, from_attributes=True) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in create_faq: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def get_faq( | |
faq_id: int = Path(..., gt=0), | |
use_cache: bool = True, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get a single FAQ item by ID. | |
- **faq_id**: ID of the FAQ item | |
- **use_cache**: If true, use cached results when available | |
""" | |
try: | |
# Generate cache key | |
cache_key = f"faq_{faq_id}" | |
# Try to get from cache if caching is enabled | |
if use_cache: | |
cached_result = faqs_cache.get(cache_key) | |
if cached_result: | |
logger.info(f"Cache hit for {cache_key}") | |
return cached_result | |
# Use direct SQL query for better performance on single item lookup | |
stmt = text("SELECT * FROM faq_item WHERE id = :id") | |
result = db.execute(stmt, {"id": faq_id}).fetchone() | |
if not result: | |
raise HTTPException(status_code=404, detail="FAQ item not found") | |
# Create a FAQItem model instance manually | |
faq = FAQItem() | |
for key, value in result._mapping.items(): | |
if hasattr(faq, key): | |
setattr(faq, key, value) | |
# Convert to Pydantic model | |
response = FAQResponse.model_validate(faq, from_attributes=True) | |
# Store in cache if caching is enabled | |
if use_cache: | |
faqs_cache[cache_key] = response | |
return response | |
except SQLAlchemyError as e: | |
logger.error(f"Database error in get_faq: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def update_faq( | |
faq_id: int = Path(..., gt=0), | |
faq_update: FAQUpdate = Body(...), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update an existing FAQ item. | |
- **faq_id**: ID of the FAQ item to update | |
- **question**: Updated question text (optional) | |
- **answer**: Updated answer text (optional) | |
- **is_active**: Updated active status (optional) | |
""" | |
try: | |
# Check if FAQ exists | |
faq = db.query(FAQItem).filter(FAQItem.id == faq_id).first() | |
if faq is None: | |
raise HTTPException(status_code=404, detail=f"FAQ with ID {faq_id} not found") | |
# Update fields with optimized dict handling | |
update_data = faq_update.model_dump(exclude_unset=True) | |
for key, value in update_data.items(): | |
setattr(faq, key, value) | |
# Commit changes | |
db.commit() | |
db.refresh(faq) | |
# Invalidate specific cache entries | |
faqs_cache.delete(f"faq_{faq_id}") | |
faqs_cache.clear() # Clear all list caches | |
# Convert to Pydantic model | |
return FAQResponse.model_validate(faq, from_attributes=True) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in update_faq: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def delete_faq( | |
faq_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete an FAQ item. | |
- **faq_id**: ID of the FAQ item to delete | |
""" | |
try: | |
# Use optimized query with proper error handling | |
result = db.execute( | |
text("DELETE FROM faq_item WHERE id = :id RETURNING id"), | |
{"id": faq_id} | |
).fetchone() | |
if not result: | |
raise HTTPException(status_code=404, detail="FAQ item not found") | |
db.commit() | |
# Invalidate cache entries | |
faqs_cache.delete(f"faq_{faq_id}") | |
faqs_cache.clear() # Clear all list caches | |
return {"status": "success", "message": f"FAQ item {faq_id} deleted"} | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in delete_faq: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
# --- Emergency contact endpoints --- | |
async def get_emergency_contacts( | |
skip: int = 0, | |
limit: int = 100, | |
active_only: bool = False, | |
section: Optional[str] = None, | |
use_cache: bool = True, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get all emergency contacts. | |
- **skip**: Number of items to skip | |
- **limit**: Maximum number of items to return | |
- **active_only**: If true, only return active items | |
- **section**: Filter by section (16.1, 16.2.1, 16.2.2, 16.3) | |
- **use_cache**: If true, use cached results when available | |
""" | |
try: | |
# Generate cache key based on query parameters | |
cache_key = f"emergency_{skip}_{limit}_{active_only}_{section}" | |
# Try to get from cache if caching is enabled | |
if use_cache: | |
cached_result = emergencies_cache.get(cache_key) | |
if cached_result: | |
logger.info(f"Cache hit for {cache_key}") | |
return cached_result | |
# Build query directly without excessive inspection and logging | |
query = db.query(EmergencyItem) | |
# Add filters if needed | |
if active_only: | |
query = query.filter(EmergencyItem.is_active == True) | |
if section: | |
query = query.filter(EmergencyItem.section == section) | |
# Get total count for pagination info | |
count_query = query.with_entities(func.count(EmergencyItem.id)) | |
total_count = count_query.scalar() | |
# Order by priority for proper sorting | |
emergency_contacts = query.order_by(EmergencyItem.priority.desc()).offset(skip).limit(limit).all() | |
# Convert to Pydantic models efficiently | |
result = [EmergencyResponse.model_validate(contact, from_attributes=True) for contact in emergency_contacts] | |
# Store in cache if caching is enabled | |
if use_cache: | |
emergencies_cache[cache_key] = result | |
return result | |
except SQLAlchemyError as e: | |
logger.error(f"Database error in get_emergency_contacts: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
logger.error(f"Unexpected error in get_emergency_contacts: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") | |
async def get_emergency_sections( | |
use_cache: bool = True, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get all available emergency sections. | |
Returns a list of section information including ID and name. | |
""" | |
try: | |
# Generate cache key | |
cache_key = "emergency_sections" | |
# Try to get from cache if caching is enabled | |
if use_cache: | |
cached_result = emergencies_cache.get(cache_key) | |
if cached_result: | |
logger.info(f"Cache hit for {cache_key}") | |
return cached_result | |
# Query distinct sections with their IDs | |
stmt = text(""" | |
SELECT DISTINCT section_id, section | |
FROM emergency_item | |
WHERE section IS NOT NULL | |
ORDER BY section_id | |
""") | |
result = db.execute(stmt) | |
# Extract section info | |
sections = [{"id": row[0], "name": row[1]} for row in result] | |
# Store in cache if caching is enabled | |
if use_cache: | |
emergencies_cache[cache_key] = sections | |
return sections | |
except SQLAlchemyError as e: | |
logger.error(f"Database error in get_emergency_sections: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
logger.error(f"Unexpected error in get_emergency_sections: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") | |
async def get_emergency_contacts_by_section_id( | |
section_id: int = Path(..., description="Section ID (1, 2, 3, or 4)"), | |
active_only: bool = True, | |
use_cache: bool = True, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get emergency contacts for a specific section ID. | |
- **section_id**: Section ID (1: Tourist support, 2: Emergency numbers, 3: Emergency situations, 4: Tourist scams) | |
- **active_only**: If true, only return active items | |
- **use_cache**: If true, use cached results when available | |
""" | |
try: | |
# Generate cache key based on query parameters | |
cache_key = f"emergency_section_id_{section_id}_{active_only}" | |
# Try to get from cache if caching is enabled | |
if use_cache: | |
cached_result = emergencies_cache.get(cache_key) | |
if cached_result: | |
logger.info(f"Cache hit for {cache_key}") | |
return cached_result | |
# Build query | |
query = db.query(EmergencyItem).filter(EmergencyItem.section_id == section_id) | |
# Add active filter if needed | |
if active_only: | |
query = query.filter(EmergencyItem.is_active == True) | |
# Order by priority for proper sorting | |
emergency_contacts = query.order_by(EmergencyItem.priority.desc()).all() | |
# Convert to Pydantic models efficiently | |
result = [EmergencyResponse.model_validate(contact, from_attributes=True) for contact in emergency_contacts] | |
# Store in cache if caching is enabled | |
if use_cache: | |
emergencies_cache[cache_key] = result | |
return result | |
except SQLAlchemyError as e: | |
logger.error(f"Database error in get_emergency_contacts_by_section_id: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
logger.error(f"Unexpected error in get_emergency_contacts_by_section_id: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") | |
async def create_emergency_contact( | |
emergency: EmergencyCreate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Create a new emergency contact. | |
- **name**: Contact name | |
- **phone_number**: Phone number | |
- **description**: Description (optional) | |
- **address**: Address (optional) | |
- **location**: Location coordinates (optional) | |
- **priority**: Priority order (default: 0) | |
- **is_active**: Whether the contact is active (default: True) | |
""" | |
try: | |
db_emergency = EmergencyItem(**emergency.model_dump()) | |
db.add(db_emergency) | |
db.commit() | |
db.refresh(db_emergency) | |
# Invalidate emergency cache after creating a new item | |
emergencies_cache.clear() | |
# Convert SQLAlchemy model to Pydantic model before returning | |
result = EmergencyResponse.model_validate(db_emergency, from_attributes=True) | |
return result | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in create_emergency_contact: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def get_emergency_contact( | |
emergency_id: int = Path(..., gt=0), | |
use_cache: bool = True, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get a single emergency contact by ID. | |
- **emergency_id**: ID of the emergency contact | |
- **use_cache**: If true, use cached results when available | |
""" | |
try: | |
# Generate cache key | |
cache_key = f"emergency_{emergency_id}" | |
# Try to get from cache if caching is enabled | |
if use_cache: | |
cached_result = emergencies_cache.get(cache_key) | |
if cached_result: | |
logger.info(f"Cache hit for {cache_key}") | |
return cached_result | |
# Use direct SQL query for better performance on single item lookup | |
stmt = text("SELECT * FROM emergency_item WHERE id = :id") | |
result = db.execute(stmt, {"id": emergency_id}).fetchone() | |
if not result: | |
raise HTTPException(status_code=404, detail="Emergency contact not found") | |
# Create an EmergencyItem model instance manually | |
emergency = EmergencyItem() | |
for key, value in result._mapping.items(): | |
if hasattr(emergency, key): | |
setattr(emergency, key, value) | |
# Convert to Pydantic model | |
response = EmergencyResponse.model_validate(emergency, from_attributes=True) | |
# Store in cache if caching is enabled | |
if use_cache: | |
emergencies_cache[cache_key] = response | |
return response | |
except SQLAlchemyError as e: | |
logger.error(f"Database error in get_emergency_contact: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def update_emergency_contact( | |
emergency_id: int = Path(..., gt=0), | |
emergency_update: EmergencyUpdate = Body(...), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update a specific emergency contact. | |
- **emergency_id**: ID of the emergency contact to update | |
- **name**: New name (optional) | |
- **phone_number**: New phone number (optional) | |
- **description**: New description (optional) | |
- **address**: New address (optional) | |
- **location**: New location coordinates (optional) | |
- **priority**: New priority order (optional) | |
- **is_active**: New active status (optional) | |
""" | |
try: | |
emergency = db.query(EmergencyItem).filter(EmergencyItem.id == emergency_id).first() | |
if not emergency: | |
raise HTTPException(status_code=404, detail="Emergency contact not found") | |
# Update fields if provided | |
update_data = emergency_update.model_dump(exclude_unset=True) | |
for key, value in update_data.items(): | |
setattr(emergency, key, value) | |
db.commit() | |
db.refresh(emergency) | |
# Invalidate specific cache entries | |
emergencies_cache.delete(f"emergency_{emergency_id}") | |
emergencies_cache.clear() # Clear all list caches | |
# Convert to Pydantic model | |
return EmergencyResponse.model_validate(emergency, from_attributes=True) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in update_emergency_contact: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def delete_emergency_contact( | |
emergency_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete a specific emergency contact. | |
- **emergency_id**: ID of the emergency contact to delete | |
""" | |
try: | |
# Use optimized direct SQL with RETURNING for better performance | |
result = db.execute( | |
text("DELETE FROM emergency_item WHERE id = :id RETURNING id"), | |
{"id": emergency_id} | |
).fetchone() | |
if not result: | |
raise HTTPException(status_code=404, detail="Emergency contact not found") | |
db.commit() | |
# Invalidate cache entries | |
emergencies_cache.delete(f"emergency_{emergency_id}") | |
emergencies_cache.clear() # Clear all list caches | |
return {"status": "success", "message": f"Emergency contact {emergency_id} deleted"} | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in delete_emergency_contact: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def batch_update_emergency_status( | |
emergency_ids: List[int] = Body(..., embed=True), | |
is_active: bool = Body(..., embed=True), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update the active status of multiple emergency contacts at once. | |
This is much more efficient than updating emergency contacts one at a time. | |
""" | |
try: | |
if not emergency_ids: | |
raise HTTPException(status_code=400, detail="No emergency contact IDs provided") | |
# Prepare the update statement | |
stmt = text(""" | |
UPDATE emergency_item | |
SET is_active = :is_active, updated_at = NOW() | |
WHERE id = ANY(:emergency_ids) | |
RETURNING id | |
""") | |
# Execute the update in a single query | |
result = db.execute(stmt, {"is_active": is_active, "emergency_ids": emergency_ids}) | |
updated_ids = [row[0] for row in result] | |
# Commit the transaction | |
db.commit() | |
# Determine which IDs weren't found | |
failed_ids = [id for id in emergency_ids if id not in updated_ids] | |
# Invalidate emergency cache | |
emergencies_cache.clear() | |
return BatchUpdateResult( | |
success_count=len(updated_ids), | |
failed_ids=failed_ids, | |
message=f"Updated {len(updated_ids)} emergency contacts" if updated_ids else "No emergency contacts were updated" | |
) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in batch_update_emergency_status: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def batch_delete_emergency_contacts( | |
emergency_ids: List[int] = Body(..., embed=True), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete multiple emergency contacts at once. | |
This is much more efficient than deleting emergency contacts one at a time with separate API calls. | |
""" | |
try: | |
if not emergency_ids: | |
raise HTTPException(status_code=400, detail="No emergency contact IDs provided") | |
# Prepare and execute the delete statement with RETURNING to get deleted IDs | |
stmt = text(""" | |
DELETE FROM emergency_item | |
WHERE id = ANY(:emergency_ids) | |
RETURNING id | |
""") | |
result = db.execute(stmt, {"emergency_ids": emergency_ids}) | |
deleted_ids = [row[0] for row in result] | |
# Commit the transaction | |
db.commit() | |
# Determine which IDs weren't found | |
failed_ids = [id for id in emergency_ids if id not in deleted_ids] | |
# Invalidate emergency cache | |
emergencies_cache.clear() | |
return BatchUpdateResult( | |
success_count=len(deleted_ids), | |
failed_ids=failed_ids, | |
message=f"Deleted {len(deleted_ids)} emergency contacts" if deleted_ids else "No emergency contacts were deleted" | |
) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in batch_delete_emergency_contacts: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
# --- Event endpoints --- | |
async def get_events( | |
skip: int = 0, | |
limit: int = 100, | |
active_only: bool = False, | |
featured_only: bool = False, | |
use_cache: bool = True, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get all events. | |
- **skip**: Number of items to skip | |
- **limit**: Maximum number of items to return | |
- **active_only**: If true, only return active items | |
- **featured_only**: If true, only return featured items | |
- **use_cache**: If true, use cached results when available | |
""" | |
try: | |
# Generate cache key based on query parameters | |
cache_key = f"events_{skip}_{limit}_{active_only}_{featured_only}" | |
# Try to get from cache if caching is enabled | |
if use_cache: | |
cached_result = events_cache.get(cache_key) | |
if cached_result: | |
return cached_result | |
# Build query directly without excessive inspection and logging | |
query = db.query(EventItem) | |
# Add filters if needed | |
if active_only: | |
query = query.filter(EventItem.is_active == True) | |
if featured_only: | |
query = query.filter(EventItem.featured == True) | |
# To improve performance, first fetch just IDs with COUNT | |
count_query = query.with_entities(func.count(EventItem.id)) | |
total_count = count_query.scalar() | |
# Now get the actual data with pagination | |
events = query.order_by(EventItem.date_start.desc()).offset(skip).limit(limit).all() | |
# Convert to Pydantic models efficiently | |
result = [EventResponse.model_validate(event, from_attributes=True) for event in events] | |
# Store in cache if caching is enabled (30 seconds TTL for events list) | |
if use_cache: | |
events_cache[cache_key] = result | |
return result | |
except SQLAlchemyError as e: | |
logger.error(f"Database error in get_events: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
logger.error(f"Unexpected error in get_events: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}") | |
async def create_event( | |
event: EventCreate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Create a new event. | |
- **name**: Event name | |
- **description**: Event description | |
- **address**: Event address | |
- **location**: Location coordinates (optional) | |
- **date_start**: Start date and time | |
- **date_end**: End date and time (optional) | |
- **price**: Price information (optional JSON object) | |
- **is_active**: Whether the event is active (default: True) | |
- **featured**: Whether the event is featured (default: False) | |
""" | |
try: | |
db_event = EventItem(**event.model_dump()) | |
db.add(db_event) | |
db.commit() | |
db.refresh(db_event) | |
# Invalidate relevant caches on create | |
events_cache.clear() | |
# Convert SQLAlchemy model to Pydantic model before returning | |
result = EventResponse.model_validate(db_event, from_attributes=True) | |
return result | |
except SQLAlchemyError as e: | |
logger.error(f"Database error in create_event: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def get_event( | |
event_id: int = Path(..., gt=0), | |
use_cache: bool = True, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get a single event by ID. | |
- **event_id**: ID of the event | |
- **use_cache**: If true, use cached results when available | |
""" | |
try: | |
# Generate cache key | |
cache_key = f"event_{event_id}" | |
# Try to get from cache if caching is enabled | |
if use_cache: | |
cached_result = events_cache.get(cache_key) | |
if cached_result: | |
return cached_result | |
# Use direct SQL query for better performance on single item lookup | |
# This avoids SQLAlchemy overhead and takes advantage of primary key lookup | |
stmt = text("SELECT * FROM event_item WHERE id = :id") | |
result = db.execute(stmt, {"id": event_id}).fetchone() | |
if not result: | |
raise HTTPException(status_code=404, detail="Event not found") | |
# Create an EventItem model instance manually from the result | |
event = EventItem() | |
for key, value in result._mapping.items(): | |
if hasattr(event, key): | |
setattr(event, key, value) | |
# Convert SQLAlchemy model to Pydantic model | |
response = EventResponse.model_validate(event, from_attributes=True) | |
# Store in cache if caching is enabled (60 seconds TTL for single event) | |
if use_cache: | |
events_cache[cache_key] = response | |
return response | |
except SQLAlchemyError as e: | |
logger.error(f"Database error in get_event: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
def update_event( | |
event_id: int, | |
event: EventUpdate, | |
db: Session = Depends(get_db) | |
): | |
"""Update an existing event.""" | |
try: | |
db_event = db.query(EventItem).filter(EventItem.id == event_id).first() | |
if not db_event: | |
raise HTTPException(status_code=404, detail="Event not found") | |
# Update event fields | |
for key, value in event.model_dump(exclude_unset=True).items(): | |
setattr(db_event, key, value) | |
db.commit() | |
db.refresh(db_event) | |
# Invalidate specific cache entries | |
events_cache.delete(f"event_{event_id}") | |
events_cache.clear() # Clear all list caches | |
# Convert SQLAlchemy model to Pydantic model before returning | |
result = EventResponse.model_validate(db_event, from_attributes=True) | |
return result | |
except SQLAlchemyError as e: | |
logger.error(f"Database error in update_event: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def delete_event( | |
event_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
"""Delete a specific event.""" | |
try: | |
event = db.query(EventItem).filter(EventItem.id == event_id).first() | |
if event is None: | |
raise HTTPException(status_code=404, detail=f"Event with ID {event_id} not found") | |
db.delete(event) | |
db.commit() | |
# Invalidate cache entries | |
events_cache.delete(f"event_{event_id}") | |
events_cache.clear() # Clear all list caches | |
return {"status": "success", "message": f"Event {event_id} deleted"} | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
# --- Batch operations for better performance --- | |
async def batch_create_events( | |
batch: BatchEventCreate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Create multiple events in a single database transaction. | |
This is much more efficient than creating events one at a time with separate API calls. | |
""" | |
try: | |
db_events = [] | |
for event_data in batch.events: | |
db_event = EventItem(**event_data.model_dump()) | |
db.add(db_event) | |
db_events.append(db_event) | |
# Commit all events in a single transaction | |
db.commit() | |
# Refresh all events to get their IDs and other generated fields | |
for db_event in db_events: | |
db.refresh(db_event) | |
# Convert SQLAlchemy models to Pydantic models | |
result = [EventResponse.model_validate(event, from_attributes=True) for event in db_events] | |
return result | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in batch_create_events: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def batch_update_event_status( | |
event_ids: List[int] = Body(..., embed=True), | |
is_active: bool = Body(..., embed=True), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update the active status of multiple events at once. | |
This is much more efficient than updating events one at a time. | |
""" | |
try: | |
if not event_ids: | |
raise HTTPException(status_code=400, detail="No event IDs provided") | |
# Prepare the update statement | |
stmt = text(""" | |
UPDATE event_item | |
SET is_active = :is_active, updated_at = NOW() | |
WHERE id = ANY(:event_ids) | |
RETURNING id | |
""") | |
# Execute the update in a single query | |
result = db.execute(stmt, {"is_active": is_active, "event_ids": event_ids}) | |
updated_ids = [row[0] for row in result] | |
# Commit the transaction | |
db.commit() | |
# Determine which IDs weren't found | |
failed_ids = [id for id in event_ids if id not in updated_ids] | |
return BatchUpdateResult( | |
success_count=len(updated_ids), | |
failed_ids=failed_ids, | |
message=f"Updated {len(updated_ids)} events" if updated_ids else "No events were updated" | |
) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in batch_update_event_status: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def batch_delete_events( | |
event_ids: List[int] = Body(..., embed=True), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete multiple events at once. | |
This is much more efficient than deleting events one at a time with separate API calls. | |
""" | |
try: | |
if not event_ids: | |
raise HTTPException(status_code=400, detail="No event IDs provided") | |
# Prepare and execute the delete statement with RETURNING to get deleted IDs | |
stmt = text(""" | |
DELETE FROM event_item | |
WHERE id = ANY(:event_ids) | |
RETURNING id | |
""") | |
result = db.execute(stmt, {"event_ids": event_ids}) | |
deleted_ids = [row[0] for row in result] | |
# Commit the transaction | |
db.commit() | |
# Determine which IDs weren't found | |
failed_ids = [id for id in event_ids if id not in deleted_ids] | |
return BatchUpdateResult( | |
success_count=len(deleted_ids), | |
failed_ids=failed_ids, | |
message=f"Deleted {len(deleted_ids)} events" if deleted_ids else "No events were deleted" | |
) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in batch_delete_events: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
# Health check endpoint | |
async def health_check(db: Session = Depends(get_db)): | |
""" | |
Check health of PostgreSQL connection. | |
""" | |
try: | |
# Perform a simple database query to check health | |
# Use text() to wrap the SQL query for SQLAlchemy 2.0 compatibility | |
db.execute(text("SELECT 1")).first() | |
return {"status": "healthy", "message": "PostgreSQL connection is working", "timestamp": datetime.now().isoformat()} | |
except Exception as e: | |
logger.error(f"PostgreSQL health check failed: {e}") | |
raise HTTPException(status_code=503, detail=f"PostgreSQL connection failed: {str(e)}") | |
# Add BatchFAQCreate class to model definitions | |
class BatchFAQCreate(BaseModel): | |
faqs: List[FAQCreate] | |
# Add after delete_faq endpoint | |
async def batch_create_faqs( | |
batch: BatchFAQCreate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Create multiple FAQ items in a single database transaction. | |
This is much more efficient than creating FAQ items one at a time with separate API calls. | |
""" | |
try: | |
db_faqs = [] | |
for faq_data in batch.faqs: | |
db_faq = FAQItem(**faq_data.model_dump()) | |
db.add(db_faq) | |
db_faqs.append(db_faq) | |
# Commit all FAQ items in a single transaction | |
db.commit() | |
# Refresh all FAQ items to get their IDs and other generated fields | |
for db_faq in db_faqs: | |
db.refresh(db_faq) | |
# Invalidate FAQ cache | |
faqs_cache.clear() | |
# Convert SQLAlchemy models to Pydantic models | |
result = [FAQResponse.model_validate(faq, from_attributes=True) for faq in db_faqs] | |
return result | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in batch_create_faqs: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def batch_update_faq_status( | |
faq_ids: List[int] = Body(..., embed=True), | |
is_active: bool = Body(..., embed=True), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update the active status of multiple FAQ items at once. | |
This is much more efficient than updating FAQ items one at a time. | |
""" | |
try: | |
if not faq_ids: | |
raise HTTPException(status_code=400, detail="No FAQ IDs provided") | |
# Prepare the update statement | |
stmt = text(""" | |
UPDATE faq_item | |
SET is_active = :is_active, updated_at = NOW() | |
WHERE id = ANY(:faq_ids) | |
RETURNING id | |
""") | |
# Execute the update in a single query | |
result = db.execute(stmt, {"is_active": is_active, "faq_ids": faq_ids}) | |
updated_ids = [row[0] for row in result] | |
# Commit the transaction | |
db.commit() | |
# Determine which IDs weren't found | |
failed_ids = [id for id in faq_ids if id not in updated_ids] | |
# Invalidate FAQ cache | |
faqs_cache.clear() | |
return BatchUpdateResult( | |
success_count=len(updated_ids), | |
failed_ids=failed_ids, | |
message=f"Updated {len(updated_ids)} FAQ items" if updated_ids else "No FAQ items were updated" | |
) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in batch_update_faq_status: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def batch_delete_faqs( | |
faq_ids: List[int] = Body(..., embed=True), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete multiple FAQ items at once. | |
This is much more efficient than deleting FAQ items one at a time with separate API calls. | |
""" | |
try: | |
if not faq_ids: | |
raise HTTPException(status_code=400, detail="No FAQ IDs provided") | |
# Prepare and execute the delete statement with RETURNING to get deleted IDs | |
stmt = text(""" | |
DELETE FROM faq_item | |
WHERE id = ANY(:faq_ids) | |
RETURNING id | |
""") | |
result = db.execute(stmt, {"faq_ids": faq_ids}) | |
deleted_ids = [row[0] for row in result] | |
# Commit the transaction | |
db.commit() | |
# Determine which IDs weren't found | |
failed_ids = [id for id in faq_ids if id not in deleted_ids] | |
# Invalidate FAQ cache | |
faqs_cache.clear() | |
return BatchUpdateResult( | |
success_count=len(deleted_ids), | |
failed_ids=failed_ids, | |
message=f"Deleted {len(deleted_ids)} FAQ items" if deleted_ids else "No FAQ items were deleted" | |
) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in batch_delete_faqs: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
# Add BatchEmergencyCreate class to the Pydantic models section | |
class BatchEmergencyCreate(BaseModel): | |
emergency_contacts: List[EmergencyCreate] | |
async def batch_create_emergency_contacts( | |
batch: BatchEmergencyCreate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Create multiple emergency contacts in a single database transaction. | |
This is much more efficient than creating emergency contacts one at a time with separate API calls. | |
""" | |
try: | |
db_emergency_contacts = [] | |
for emergency_data in batch.emergency_contacts: | |
db_emergency = EmergencyItem(**emergency_data.model_dump()) | |
db.add(db_emergency) | |
db_emergency_contacts.append(db_emergency) | |
# Commit all emergency contacts in a single transaction | |
db.commit() | |
# Refresh all items to get their IDs and other generated fields | |
for db_emergency in db_emergency_contacts: | |
db.refresh(db_emergency) | |
# Invalidate emergency cache | |
emergencies_cache.clear() | |
# Convert SQLAlchemy models to Pydantic models | |
result = [EmergencyResponse.model_validate(emergency, from_attributes=True) for emergency in db_emergency_contacts] | |
return result | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in batch_create_emergency_contacts: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def batch_update_emergency_status( | |
emergency_ids: List[int] = Body(..., embed=True), | |
is_active: bool = Body(..., embed=True), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update the active status of multiple emergency contacts at once. | |
This is much more efficient than updating emergency contacts one at a time. | |
""" | |
try: | |
if not emergency_ids: | |
raise HTTPException(status_code=400, detail="No emergency contact IDs provided") | |
# Prepare the update statement | |
stmt = text(""" | |
UPDATE emergency_item | |
SET is_active = :is_active, updated_at = NOW() | |
WHERE id = ANY(:emergency_ids) | |
RETURNING id | |
""") | |
# Execute the update in a single query | |
result = db.execute(stmt, {"is_active": is_active, "emergency_ids": emergency_ids}) | |
updated_ids = [row[0] for row in result] | |
# Commit the transaction | |
db.commit() | |
# Determine which IDs weren't found | |
failed_ids = [id for id in emergency_ids if id not in updated_ids] | |
# Invalidate emergency cache | |
emergencies_cache.clear() | |
return BatchUpdateResult( | |
success_count=len(updated_ids), | |
failed_ids=failed_ids, | |
message=f"Updated {len(updated_ids)} emergency contacts" if updated_ids else "No emergency contacts were updated" | |
) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in batch_update_emergency_status: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def batch_delete_emergency_contacts( | |
emergency_ids: List[int] = Body(..., embed=True), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete multiple emergency contacts at once. | |
This is much more efficient than deleting emergency contacts one at a time with separate API calls. | |
""" | |
try: | |
if not emergency_ids: | |
raise HTTPException(status_code=400, detail="No emergency contact IDs provided") | |
# Prepare and execute the delete statement with RETURNING to get deleted IDs | |
stmt = text(""" | |
DELETE FROM emergency_item | |
WHERE id = ANY(:emergency_ids) | |
RETURNING id | |
""") | |
result = db.execute(stmt, {"emergency_ids": emergency_ids}) | |
deleted_ids = [row[0] for row in result] | |
# Commit the transaction | |
db.commit() | |
# Determine which IDs weren't found | |
failed_ids = [id for id in emergency_ids if id not in deleted_ids] | |
# Invalidate emergency cache | |
emergencies_cache.clear() | |
return BatchUpdateResult( | |
success_count=len(deleted_ids), | |
failed_ids=failed_ids, | |
message=f"Deleted {len(deleted_ids)} emergency contacts" if deleted_ids else "No emergency contacts were deleted" | |
) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in batch_delete_emergency_contacts: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
# --- About Pixity endpoints --- | |
async def get_about_pixity( | |
use_cache: bool = True, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get the About Pixity information. | |
- **use_cache**: If true, use cached results when available | |
""" | |
try: | |
# Try to get from cache if caching is enabled | |
if use_cache: | |
cached_result = about_pixity_cache.get("about_pixity") | |
if cached_result: | |
logger.info("Cache hit for about_pixity") | |
return cached_result | |
# Get the first record (or create if none exists) | |
about = db.query(AboutPixity).first() | |
if not about: | |
# Create default content if none exists | |
about = AboutPixity( | |
content="""PiXity is your smart, AI-powered local companion designed to help foreigners navigate life in any city of Vietnam with ease, starting with Da Nang. From finding late-night eats to handling visas, housing, and healthcare, PiXity bridges the gap in language, culture, and local know-how — so you can explore the city like a true insider. | |
PiXity is proudly built by PiX.teq, the tech team behind PiX — a multidisciplinary collective based in Da Nang. | |
X: x.com/pixity_bot | |
Instagram: instagram.com/pixity.aibot/ | |
Tiktok: tiktok.com/@pixity.aibot""" | |
) | |
db.add(about) | |
db.commit() | |
db.refresh(about) | |
# Convert to Pydantic model | |
response = InfoContentResponse( | |
id=about.id, | |
content=about.content, | |
created_at=about.created_at, | |
updated_at=about.updated_at | |
) | |
# Store in cache if caching is enabled | |
if use_cache: | |
about_pixity_cache["about_pixity"] = response | |
return response | |
except SQLAlchemyError as e: | |
logger.error(f"Database error in get_about_pixity: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def update_about_pixity( | |
data: InfoContentUpdate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update the About Pixity information. | |
- **content**: New content text | |
""" | |
try: | |
# Get the first record (or create if none exists) | |
about = db.query(AboutPixity).first() | |
if not about: | |
# Create new record if none exists | |
about = AboutPixity(content=data.content) | |
db.add(about) | |
else: | |
# Update existing record | |
about.content = data.content | |
db.commit() | |
db.refresh(about) | |
# Invalidate cache | |
about_pixity_cache.clear() | |
# Convert to Pydantic model | |
response = InfoContentResponse( | |
id=about.id, | |
content=about.content, | |
created_at=about.created_at, | |
updated_at=about.updated_at | |
) | |
return response | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in update_about_pixity: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
# --- Da Nang Bucket List Pydantic models --- | |
class DaNangBucketListBase(BaseModel): | |
content: str | |
class DaNangBucketListResponse(DaNangBucketListBase): | |
id: int | |
created_at: datetime | |
updated_at: datetime | |
model_config = ConfigDict(from_attributes=True) | |
class DaNangBucketListCreate(DaNangBucketListBase): | |
pass | |
class DaNangBucketListUpdate(BaseModel): | |
content: str | |
# --- Da Nang Bucket List Endpoints --- | |
async def get_danang_bucket_list( | |
db: Session = Depends(get_db), | |
use_cache: bool = True | |
): | |
""" | |
Retrieve the Da Nang Bucket List information. | |
If none exists, creates a default entry. | |
""" | |
cache_key = "danang_bucket_list" | |
# Try to get from cache if caching is enabled | |
if use_cache and cache_key in danang_bucket_list_cache: | |
cached_result = danang_bucket_list_cache[cache_key] | |
logger.info(f"Cache hit for {cache_key}") | |
return cached_result | |
try: | |
# Try to get the first bucket list entry | |
db_bucket_list = db.query(DaNangBucketList).first() | |
# If no entry exists, create a default one | |
if not db_bucket_list: | |
default_content = json.dumps({ | |
"title": "Da Nang Bucket List", | |
"description": "Must-visit places and experiences in Da Nang", | |
"items": [ | |
{"name": "Ba Na Hills", "description": "Visit the famous Golden Bridge"}, | |
{"name": "Marble Mountains", "description": "Explore caves and temples"}, | |
{"name": "My Khe Beach", "description": "Relax at one of the most beautiful beaches in Vietnam"}, | |
{"name": "Dragon Bridge", "description": "Watch the fire-breathing show on weekends"}, | |
{"name": "Son Tra Peninsula", "description": "See the Lady Buddha statue and lookout point"} | |
] | |
}) | |
new_bucket_list = DaNangBucketList(content=default_content) | |
db.add(new_bucket_list) | |
db.commit() | |
db.refresh(new_bucket_list) | |
db_bucket_list = new_bucket_list | |
# Convert to Pydantic model | |
response = DaNangBucketListResponse.model_validate(db_bucket_list, from_attributes=True) | |
# Store in cache if caching is enabled | |
if use_cache: | |
danang_bucket_list_cache[cache_key] = response | |
return response | |
except SQLAlchemyError as e: | |
error_msg = f"Database error in get_danang_bucket_list: {str(e)}" | |
logger.error(error_msg) | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=error_msg) | |
async def update_danang_bucket_list( | |
bucket_list_data: DaNangBucketListUpdate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update the Da Nang Bucket List information. | |
If none exists, creates a new entry. | |
""" | |
try: | |
# Try to get the first bucket list entry | |
db_bucket_list = db.query(DaNangBucketList).first() | |
# If no entry exists, create a new one | |
if not db_bucket_list: | |
db_bucket_list = DaNangBucketList(content=bucket_list_data.content) | |
db.add(db_bucket_list) | |
else: | |
# Update existing entry | |
db_bucket_list.content = bucket_list_data.content | |
db_bucket_list.updated_at = datetime.utcnow() | |
db.commit() | |
db.refresh(db_bucket_list) | |
# Clear cache | |
if "danang_bucket_list" in danang_bucket_list_cache: | |
del danang_bucket_list_cache["danang_bucket_list"] | |
# Convert to Pydantic model | |
return DaNangBucketListResponse.model_validate(db_bucket_list, from_attributes=True) | |
except SQLAlchemyError as e: | |
error_msg = f"Database error in update_danang_bucket_list: {str(e)}" | |
logger.error(error_msg) | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=error_msg) | |
# --- Solana Summit Pydantic models --- | |
class SolanaSummitBase(BaseModel): | |
content: str | |
class SolanaSummitResponse(SolanaSummitBase): | |
id: int | |
created_at: datetime | |
updated_at: datetime | |
model_config = ConfigDict(from_attributes=True) | |
class SolanaSummitCreate(SolanaSummitBase): | |
pass | |
class SolanaSummitUpdate(BaseModel): | |
content: str | |
# --- Solana Summit Endpoints --- | |
async def get_solana_summit( | |
db: Session = Depends(get_db), | |
use_cache: bool = True | |
): | |
""" | |
Retrieve the Solana Summit information. | |
If none exists, creates a default entry. | |
""" | |
cache_key = "solana_summit" | |
# Try to get from cache if caching is enabled | |
if use_cache and cache_key in solana_summit_cache: | |
cached_result = solana_summit_cache[cache_key] | |
logger.info(f"Cache hit for {cache_key}") | |
return cached_result | |
try: | |
# Try to get the first solana summit entry | |
db_solana_summit = db.query(SolanaSummit).first() | |
# If no entry exists, create a default one | |
if not db_solana_summit: | |
default_content = json.dumps({ | |
"title": "Solana Summit Vietnam", | |
"description": "Information about Solana Summit Vietnam event in Da Nang", | |
"date": "2023-11-04T09:00:00+07:00", | |
"location": "Hyatt Regency, Da Nang", | |
"details": "The Solana Summit is a gathering of developers, entrepreneurs, and enthusiasts in the Solana ecosystem.", | |
"agenda": [ | |
{"time": "09:00", "activity": "Registration & Networking"}, | |
{"time": "10:00", "activity": "Opening Keynote"}, | |
{"time": "12:00", "activity": "Lunch Break"}, | |
{"time": "13:30", "activity": "Developer Workshops"}, | |
{"time": "17:00", "activity": "Closing Remarks & Networking"} | |
], | |
"registration_url": "https://example.com/solana-summit-registration" | |
}) | |
new_solana_summit = SolanaSummit(content=default_content) | |
db.add(new_solana_summit) | |
db.commit() | |
db.refresh(new_solana_summit) | |
db_solana_summit = new_solana_summit | |
# Convert to Pydantic model | |
response = SolanaSummitResponse.model_validate(db_solana_summit, from_attributes=True) | |
# Store in cache if caching is enabled | |
if use_cache: | |
solana_summit_cache[cache_key] = response | |
return response | |
except SQLAlchemyError as e: | |
error_msg = f"Database error in get_solana_summit: {str(e)}" | |
logger.error(error_msg) | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=error_msg) | |
async def update_solana_summit( | |
summit_data: SolanaSummitUpdate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update the Solana Summit information. | |
If none exists, creates a new entry. | |
""" | |
try: | |
# Try to get the first solana summit entry | |
db_solana_summit = db.query(SolanaSummit).first() | |
# If no entry exists, create a new one | |
if not db_solana_summit: | |
db_solana_summit = SolanaSummit(content=summit_data.content) | |
db.add(db_solana_summit) | |
else: | |
# Update existing entry | |
db_solana_summit.content = summit_data.content | |
db_solana_summit.updated_at = datetime.utcnow() | |
db.commit() | |
db.refresh(db_solana_summit) | |
# Clear cache | |
if "solana_summit" in solana_summit_cache: | |
del solana_summit_cache["solana_summit"] | |
# Convert to Pydantic model | |
return SolanaSummitResponse.model_validate(db_solana_summit, from_attributes=True) | |
except SQLAlchemyError as e: | |
error_msg = f"Database error in update_solana_summit: {str(e)}" | |
logger.error(error_msg) | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=error_msg) | |
# --- API Key models and endpoints --- | |
class ApiKeyBase(BaseModel): | |
key_type: str | |
key_value: str | |
description: Optional[str] = None | |
is_active: bool = True | |
class ApiKeyCreate(ApiKeyBase): | |
pass | |
class ApiKeyUpdate(BaseModel): | |
key_type: Optional[str] = None | |
key_value: Optional[str] = None | |
description: Optional[str] = None | |
is_active: Optional[bool] = None | |
class ApiKeyResponse(ApiKeyBase): | |
id: int | |
created_at: datetime | |
last_used: Optional[datetime] = None | |
model_config = ConfigDict(from_attributes=True) | |
async def get_api_keys( | |
skip: int = 0, | |
limit: int = 100, | |
active_only: bool = False, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get all API keys. | |
- **skip**: Number of items to skip | |
- **limit**: Maximum number of items to return | |
- **active_only**: If true, only return active keys | |
""" | |
try: | |
query = db.query(ApiKey) | |
if active_only: | |
query = query.filter(ApiKey.is_active == True) | |
api_keys = query.offset(skip).limit(limit).all() | |
return [ApiKeyResponse.model_validate(key, from_attributes=True) for key in api_keys] | |
except SQLAlchemyError as e: | |
logger.error(f"Database error retrieving API keys: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
logger.error(f"Error retrieving API keys: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving API keys: {str(e)}") | |
async def create_api_key( | |
api_key: ApiKeyCreate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Create a new API key. | |
""" | |
try: | |
# Create API key object | |
db_api_key = ApiKey( | |
key_type=api_key.key_type, | |
key_value=api_key.key_value, | |
description=api_key.description, | |
is_active=api_key.is_active | |
) | |
db.add(db_api_key) | |
db.commit() | |
db.refresh(db_api_key) | |
return ApiKeyResponse.model_validate(db_api_key, from_attributes=True) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error creating API key: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error creating API key: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error creating API key: {str(e)}") | |
async def get_api_key( | |
api_key_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get API key by ID. | |
""" | |
try: | |
api_key = db.query(ApiKey).filter(ApiKey.id == api_key_id).first() | |
if not api_key: | |
raise HTTPException(status_code=404, detail=f"API key with ID {api_key_id} not found") | |
return ApiKeyResponse.model_validate(api_key, from_attributes=True) | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error retrieving API key: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving API key: {str(e)}") | |
async def update_api_key( | |
api_key_id: int = Path(..., gt=0), | |
api_key_update: ApiKeyUpdate = Body(...), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update API key details. | |
""" | |
try: | |
db_api_key = db.query(ApiKey).filter(ApiKey.id == api_key_id).first() | |
if not db_api_key: | |
raise HTTPException(status_code=404, detail=f"API key with ID {api_key_id} not found") | |
# Update fields if provided | |
if api_key_update.key_type is not None: | |
db_api_key.key_type = api_key_update.key_type | |
if api_key_update.key_value is not None: | |
db_api_key.key_value = api_key_update.key_value | |
if api_key_update.description is not None: | |
db_api_key.description = api_key_update.description | |
if api_key_update.is_active is not None: | |
db_api_key.is_active = api_key_update.is_active | |
db.commit() | |
db.refresh(db_api_key) | |
return ApiKeyResponse.model_validate(db_api_key, from_attributes=True) | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error updating API key: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error updating API key: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error updating API key: {str(e)}") | |
async def delete_api_key( | |
api_key_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete API key. | |
""" | |
try: | |
db_api_key = db.query(ApiKey).filter(ApiKey.id == api_key_id).first() | |
if not db_api_key: | |
raise HTTPException(status_code=404, detail=f"API key with ID {api_key_id} not found") | |
db.delete(db_api_key) | |
db.commit() | |
return {"message": f"API key with ID {api_key_id} deleted successfully"} | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error deleting API key: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error deleting API key: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error deleting API key: {str(e)}") | |
async def validate_api_key( | |
key: str, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Validate an API key and update its last_used timestamp. | |
""" | |
try: | |
db_api_key = db.query(ApiKey).filter(ApiKey.key_value == key, ApiKey.is_active == True).first() | |
if not db_api_key: | |
return {"valid": False, "message": "Invalid or inactive API key"} | |
# Update last used timestamp | |
db_api_key.last_used = datetime.now() | |
db.commit() | |
return { | |
"valid": True, | |
"key_type": db_api_key.key_type, | |
"id": db_api_key.id, | |
"message": "API key is valid" | |
} | |
except Exception as e: | |
logger.error(f"Error validating API key: {e}") | |
logger.error(traceback.format_exc()) | |
return {"valid": False, "message": f"Error validating API key: {str(e)}"} | |
# --- Vector Database models and endpoints --- | |
class VectorDatabaseBase(BaseModel): | |
name: str | |
description: Optional[str] = None | |
pinecone_index: str | |
api_key_id: Optional[int] = None # Make api_key_id optional to handle NULL values | |
status: str = "active" | |
class VectorDatabaseCreate(VectorDatabaseBase): | |
api_key_id: int # Keep this required for new databases | |
pass | |
class VectorDatabaseUpdate(BaseModel): | |
name: Optional[str] = None | |
description: Optional[str] = None | |
pinecone_index: Optional[str] = None | |
api_key_id: Optional[int] = None | |
status: Optional[str] = None | |
class VectorDatabaseResponse(BaseModel): | |
name: str | |
description: Optional[str] = None | |
pinecone_index: str | |
api_key_id: Optional[int] = None # Make api_key_id optional to handle NULL values | |
status: str | |
id: int | |
created_at: datetime | |
updated_at: datetime | |
message: Optional[str] = None # Add message field for notifications | |
model_config = ConfigDict(from_attributes=True) | |
class VectorDatabaseDetailResponse(BaseModel): | |
id: int | |
name: str | |
description: Optional[str] = None | |
pinecone_index: str | |
status: str | |
created_at: datetime | |
updated_at: datetime | |
document_count: int | |
embedded_count: int | |
pending_count: int | |
message: Optional[str] = None # Add message field for notifications | |
model_config = ConfigDict(from_attributes=True) | |
async def get_vector_databases( | |
skip: int = 0, | |
limit: int = 100, | |
status: Optional[str] = None, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get all vector databases. | |
- **skip**: Number of items to skip | |
- **limit**: Maximum number of items to return | |
- **status**: Filter by status (e.g., 'active', 'inactive') | |
""" | |
try: | |
query = db.query(VectorDatabase) | |
if status: | |
query = query.filter(VectorDatabase.status == status) | |
vector_dbs = query.offset(skip).limit(limit).all() | |
return [VectorDatabaseResponse.model_validate(db_item, from_attributes=True) for db_item in vector_dbs] | |
except SQLAlchemyError as e: | |
logger.error(f"Database error retrieving vector databases: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
logger.error(f"Error retrieving vector databases: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving vector databases: {str(e)}") | |
async def create_vector_database( | |
vector_db: VectorDatabaseCreate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Create a new vector database. If the specified Pinecone index doesn't exist, it will be created automatically. | |
""" | |
try: | |
# Check if a database with the same name already exists | |
existing_db = db.query(VectorDatabase).filter(VectorDatabase.name == vector_db.name).first() | |
if existing_db: | |
raise HTTPException(status_code=400, detail=f"Vector database with name '{vector_db.name}' already exists") | |
# Check if the API key exists | |
api_key = db.query(ApiKey).filter(ApiKey.id == vector_db.api_key_id).first() | |
if not api_key: | |
raise HTTPException(status_code=400, detail=f"API key with ID {vector_db.api_key_id} not found") | |
# Initialize Pinecone client with the API key | |
from pinecone import Pinecone, ServerlessSpec | |
pc_client = Pinecone(api_key=api_key.key_value) | |
# Check if the index exists | |
index_list = pc_client.list_indexes() | |
index_names = index_list.names() if hasattr(index_list, 'names') else [] | |
index_exists = vector_db.pinecone_index in index_names | |
index_created = False | |
if not index_exists: | |
# Index doesn't exist - try to create it | |
try: | |
logger.info(f"Pinecone index '{vector_db.pinecone_index}' does not exist. Attempting to create it automatically.") | |
# Create the index with standard parameters | |
pc_client.create_index( | |
name=vector_db.pinecone_index, | |
dimension=1536, # Standard OpenAI embedding dimension | |
metric="cosine", # Most common similarity metric | |
spec=ServerlessSpec( | |
cloud="aws", | |
region="us-east-1" # Use a standard region that works with the free tier | |
) | |
) | |
logger.info(f"Successfully created Pinecone index '{vector_db.pinecone_index}'") | |
index_created = True | |
# Allow some time for the index to initialize | |
import time | |
time.sleep(5) | |
except Exception as create_error: | |
logger.error(f"Failed to create Pinecone index '{vector_db.pinecone_index}': {create_error}") | |
raise HTTPException( | |
status_code=400, | |
detail=f"Failed to create Pinecone index '{vector_db.pinecone_index}': {str(create_error)}" | |
) | |
# Verify we can connect to the index (whether existing or newly created) | |
try: | |
index = pc_client.Index(vector_db.pinecone_index) | |
# Try to get stats to verify connection | |
stats = index.describe_index_stats() | |
# Create success message based on whether we created the index or used an existing one | |
if index_created: | |
success_message = f"Successfully created and connected to new Pinecone index '{vector_db.pinecone_index}'" | |
else: | |
success_message = f"Successfully connected to existing Pinecone index '{vector_db.pinecone_index}'" | |
logger.info(f"{success_message}: {stats}") | |
except Exception as e: | |
error_message = f"Error connecting to Pinecone index '{vector_db.pinecone_index}': {str(e)}" | |
logger.error(error_message) | |
raise HTTPException(status_code=400, detail=error_message) | |
# Create new vector database | |
db_vector_db = VectorDatabase(**vector_db.model_dump()) | |
db.add(db_vector_db) | |
db.commit() | |
db.refresh(db_vector_db) | |
# Return response with additional info about index creation | |
response_data = VectorDatabaseResponse.model_validate(db_vector_db, from_attributes=True).model_dump() | |
# Add a message to the response indicating whether the index was created or existed | |
if index_created: | |
response_data["message"] = f"Created new Pinecone index '{vector_db.pinecone_index}' automatically" | |
else: | |
response_data["message"] = f"Using existing Pinecone index '{vector_db.pinecone_index}'" | |
return VectorDatabaseResponse.model_validate(response_data) | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error creating vector database: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error creating vector database: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error creating vector database: {str(e)}") | |
async def get_vector_database( | |
vector_db_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get vector database by ID. | |
""" | |
try: | |
vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_db_id).first() | |
if not vector_db: | |
raise HTTPException(status_code=404, detail=f"Vector database with ID {vector_db_id} not found") | |
return VectorDatabaseResponse.model_validate(vector_db, from_attributes=True) | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error retrieving vector database: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving vector database: {str(e)}") | |
async def update_vector_database( | |
vector_db_id: int = Path(..., gt=0), | |
vector_db_update: VectorDatabaseUpdate = Body(...), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update vector database details. | |
""" | |
try: | |
db_vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_db_id).first() | |
if not db_vector_db: | |
raise HTTPException(status_code=404, detail=f"Vector database with ID {vector_db_id} not found") | |
# Check name uniqueness if updating name | |
if vector_db_update.name and vector_db_update.name != db_vector_db.name: | |
existing_db = db.query(VectorDatabase).filter(VectorDatabase.name == vector_db_update.name).first() | |
if existing_db: | |
raise HTTPException(status_code=400, detail=f"Vector database with name '{vector_db_update.name}' already exists") | |
# Check if API key exists if updating API key ID | |
if vector_db_update.api_key_id: | |
api_key = db.query(ApiKey).filter(ApiKey.id == vector_db_update.api_key_id).first() | |
if not api_key: | |
raise HTTPException(status_code=400, detail=f"API key with ID {vector_db_update.api_key_id} not found") | |
# Update fields if provided | |
update_data = vector_db_update.model_dump(exclude_unset=True) | |
for key, value in update_data.items(): | |
if value is not None: | |
setattr(db_vector_db, key, value) | |
db.commit() | |
db.refresh(db_vector_db) | |
return VectorDatabaseResponse.model_validate(db_vector_db, from_attributes=True) | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error updating vector database: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error updating vector database: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error updating vector database: {str(e)}") | |
async def delete_vector_database( | |
vector_db_id: int = Path(..., gt=0), | |
force: bool = Query(False, description="Force deletion even if documents exist"), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete vector database. | |
- **force**: If true, will delete all associated documents first | |
""" | |
try: | |
db_vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_db_id).first() | |
if not db_vector_db: | |
raise HTTPException(status_code=404, detail=f"Vector database with ID {vector_db_id} not found") | |
# Check if there are documents associated with this database | |
doc_count = db.query(func.count(Document.id)).filter(Document.vector_database_id == vector_db_id).scalar() | |
if doc_count > 0 and not force: | |
raise HTTPException( | |
status_code=400, | |
detail=f"Cannot delete vector database with {doc_count} documents. Use force=true to delete anyway." | |
) | |
# If force=true, delete all associated documents first | |
if force and doc_count > 0: | |
# Delete all documents associated with this database | |
db.query(Document).filter(Document.vector_database_id == vector_db_id).delete() | |
# Delete all vector statuses associated with this database | |
db.query(VectorStatus).filter(VectorStatus.vector_database_id == vector_db_id).delete() | |
# Delete all engine-vector-db associations | |
db.query(EngineVectorDb).filter(EngineVectorDb.vector_database_id == vector_db_id).delete() | |
# Delete the vector database | |
db.delete(db_vector_db) | |
db.commit() | |
return {"message": f"Vector database with ID {vector_db_id} deleted successfully"} | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error deleting vector database: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error deleting vector database: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error deleting vector database: {str(e)}") | |
async def get_vector_database_info( | |
vector_db_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get detailed information about a vector database including document counts. | |
Also verifies connectivity to the Pinecone index. | |
""" | |
try: | |
# Get the vector database | |
vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_db_id).first() | |
if not vector_db: | |
raise HTTPException(status_code=404, detail="Vector database not found") | |
# Count total documents | |
total_docs = db.query(func.count(Document.id)).filter( | |
Document.vector_database_id == vector_db_id | |
).scalar() | |
# Count embedded documents | |
embedded_docs = db.query(func.count(Document.id)).filter( | |
Document.vector_database_id == vector_db_id, | |
Document.is_embedded == True | |
).scalar() | |
# Count pending documents (not embedded) | |
pending_docs = db.query(func.count(Document.id)).filter( | |
Document.vector_database_id == vector_db_id, | |
Document.is_embedded == False | |
).scalar() | |
# Verify Pinecone index connectivity if API key is available | |
message = None | |
if vector_db.api_key_id: | |
try: | |
# Get the API key | |
api_key = db.query(ApiKey).filter(ApiKey.id == vector_db.api_key_id).first() | |
if api_key: | |
# Initialize Pinecone client with the API key | |
from pinecone import Pinecone | |
pc_client = Pinecone(api_key=api_key.key_value) | |
# Check if the index exists | |
index_list = pc_client.list_indexes() | |
index_names = index_list.names() if hasattr(index_list, 'names') else [] | |
if vector_db.pinecone_index in index_names: | |
# Try to connect to the index | |
index = pc_client.Index(vector_db.pinecone_index) | |
stats = index.describe_index_stats() | |
message = f"Pinecone index '{vector_db.pinecone_index}' is operational with {stats.get('total_vector_count', 0)} vectors" | |
logger.info(f"Successfully connected to Pinecone index '{vector_db.pinecone_index}': {stats}") | |
else: | |
message = f"Pinecone index '{vector_db.pinecone_index}' does not exist. Available indexes: {', '.join(index_names)}" | |
logger.warning(message) | |
else: | |
message = f"API key with ID {vector_db.api_key_id} not found" | |
logger.warning(message) | |
except Exception as e: | |
message = f"Error connecting to Pinecone: {str(e)}" | |
logger.error(message) | |
else: | |
message = "No API key associated with this vector database" | |
logger.warning(message) | |
# Create response with added counts | |
result = VectorDatabaseDetailResponse( | |
id=vector_db.id, | |
name=vector_db.name, | |
description=vector_db.description, | |
pinecone_index=vector_db.pinecone_index, | |
status=vector_db.status, | |
created_at=vector_db.created_at, | |
updated_at=vector_db.updated_at, | |
document_count=total_docs or 0, | |
embedded_count=embedded_docs or 0, | |
pending_count=pending_docs or 0, | |
message=message | |
) | |
return result | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error getting vector database info: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error getting vector database info: {str(e)}") | |
# --- Document models and endpoints --- | |
class DocumentBase(BaseModel): | |
name: str | |
vector_database_id: int | |
class DocumentCreate(BaseModel): | |
name: str | |
vector_database_id: int | |
class DocumentUpdate(BaseModel): | |
name: Optional[str] = None | |
class DocumentResponse(BaseModel): | |
id: int | |
name: str | |
file_type: str | |
content_type: Optional[str] = None | |
size: int | |
created_at: datetime | |
updated_at: datetime | |
vector_database_id: int | |
vector_database_name: Optional[str] = None | |
is_embedded: bool | |
model_config = ConfigDict(from_attributes=True) | |
async def get_documents( | |
skip: int = 0, | |
limit: int = 100, | |
vector_database_id: Optional[int] = None, | |
is_embedded: Optional[bool] = None, | |
file_type: Optional[str] = None, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get all documents with optional filtering. | |
- **skip**: Number of items to skip | |
- **limit**: Maximum number of items to return | |
- **vector_database_id**: Filter by vector database ID | |
- **is_embedded**: Filter by embedding status | |
- **file_type**: Filter by file type | |
""" | |
try: | |
query = db.query(Document) | |
# Apply filters if provided | |
if vector_database_id is not None: | |
query = query.filter(Document.vector_database_id == vector_database_id) | |
if is_embedded is not None: | |
query = query.filter(Document.is_embedded == is_embedded) | |
if file_type is not None: | |
query = query.filter(Document.file_type == file_type) | |
# Execute query with pagination | |
documents = query.offset(skip).limit(limit).all() | |
# Add vector database name | |
result = [] | |
for doc in documents: | |
# Create a dictionary from the document for easier manipulation | |
doc_dict = { | |
"id": doc.id, | |
"name": doc.name, | |
"file_type": doc.file_type, | |
"content_type": doc.content_type, | |
"size": doc.size, | |
"created_at": doc.created_at, | |
"updated_at": doc.updated_at, | |
"vector_database_id": doc.vector_database_id or 0, # Handle NULL values | |
"is_embedded": doc.is_embedded | |
} | |
# Get vector database name if not already populated | |
vector_db_name = None | |
if doc.vector_database_id is not None: | |
vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == doc.vector_database_id).first() | |
vector_db_name = vector_db.name if vector_db else f"db_{doc.vector_database_id}" | |
else: | |
vector_db_name = "No Database" | |
doc_dict["vector_database_name"] = vector_db_name | |
# Create Pydantic model from dictionary | |
doc_response = DocumentResponse(**doc_dict) | |
result.append(doc_response) | |
return result | |
except SQLAlchemyError as e: | |
logger.error(f"Database error retrieving documents: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
logger.error(f"Error retrieving documents: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving documents: {str(e)}") | |
async def get_document( | |
document_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get document by ID. | |
""" | |
try: | |
document = db.query(Document).filter(Document.id == document_id).first() | |
if not document: | |
raise HTTPException(status_code=404, detail=f"Document with ID {document_id} not found") | |
# Get vector database name | |
vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == document.vector_database_id).first() | |
vector_db_name = vector_db.name if vector_db else f"db_{document.vector_database_id}" | |
# Create response with vector database name | |
result = DocumentResponse.model_validate(document, from_attributes=True) | |
result.vector_database_name = vector_db_name | |
return result | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error retrieving document: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving document: {str(e)}") | |
async def get_document_content( | |
document_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get document content (file) by document ID. | |
Returns the binary content with the appropriate Content-Type header. | |
""" | |
try: | |
# Get document to check if it exists and get metadata | |
document = db.query(Document).filter(Document.id == document_id).first() | |
if not document: | |
raise HTTPException(status_code=404, detail=f"Document with ID {document_id} not found") | |
# Get document content from document_content table | |
document_content = db.query(DocumentContent).filter(DocumentContent.document_id == document_id).first() | |
if not document_content or not document_content.file_content: | |
raise HTTPException(status_code=404, detail=f"Content for document with ID {document_id} not found") | |
# Determine content type | |
content_type = document.content_type if hasattr(document, 'content_type') and document.content_type else "application/octet-stream" | |
# Return binary content with correct content type | |
return Response( | |
content=document_content.file_content, | |
media_type=content_type, | |
headers={"Content-Disposition": f"attachment; filename=\"{document.name}\""} | |
) | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error retrieving document content: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving document content: {str(e)}") | |
# --- Telegram Bot models and endpoints --- | |
class TelegramBotBase(BaseModel): | |
name: str | |
username: str | |
token: str | |
status: str = "inactive" | |
class TelegramBotCreate(TelegramBotBase): | |
pass | |
class TelegramBotUpdate(BaseModel): | |
name: Optional[str] = None | |
username: Optional[str] = None | |
token: Optional[str] = None | |
status: Optional[str] = None | |
class TelegramBotResponse(TelegramBotBase): | |
id: int | |
created_at: datetime | |
updated_at: datetime | |
model_config = ConfigDict(from_attributes=True) | |
async def get_telegram_bot( | |
bot_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get Telegram bot by ID. | |
""" | |
try: | |
bot = db.query(TelegramBot).filter(TelegramBot.id == bot_id).first() | |
if not bot: | |
raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_id} not found") | |
return TelegramBotResponse.model_validate(bot, from_attributes=True) | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error retrieving Telegram bot: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving Telegram bot: {str(e)}") | |
async def update_telegram_bot( | |
bot_id: int = Path(..., gt=0), | |
bot_update: TelegramBotUpdate = Body(...), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update Telegram bot details. | |
""" | |
try: | |
db_bot = db.query(TelegramBot).filter(TelegramBot.id == bot_id).first() | |
if not db_bot: | |
raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_id} not found") | |
# Check if new username conflicts with existing bots | |
if bot_update.username and bot_update.username != db_bot.username: | |
existing_bot = db.query(TelegramBot).filter(TelegramBot.username == bot_update.username).first() | |
if existing_bot: | |
raise HTTPException( | |
status_code=400, | |
detail=f"Telegram bot with username '{bot_update.username}' already exists" | |
) | |
# Update fields if provided | |
update_data = bot_update.model_dump(exclude_unset=True) | |
for key, value in update_data.items(): | |
if value is not None: | |
setattr(db_bot, key, value) | |
db.commit() | |
db.refresh(db_bot) | |
return TelegramBotResponse.model_validate(db_bot, from_attributes=True) | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error updating Telegram bot: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error updating Telegram bot: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error updating Telegram bot: {str(e)}") | |
async def delete_telegram_bot( | |
bot_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete Telegram bot. | |
""" | |
try: | |
db_bot = db.query(TelegramBot).filter(TelegramBot.id == bot_id).first() | |
if not db_bot: | |
raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_id} not found") | |
# Check if bot is associated with any engines | |
bot_engines = db.query(BotEngine).filter(BotEngine.bot_id == bot_id).all() | |
if bot_engines: | |
raise HTTPException( | |
status_code=400, | |
detail="Cannot delete bot as it is associated with chat engines. Remove associations first." | |
) | |
# Delete bot | |
db.delete(db_bot) | |
db.commit() | |
return {"message": f"Telegram bot with ID {bot_id} deleted successfully"} | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error deleting Telegram bot: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error deleting Telegram bot: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error deleting Telegram bot: {str(e)}") | |
async def get_bot_engines_info( | |
bot_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get all chat engines associated with a Telegram bot. | |
""" | |
try: | |
# Verify bot exists | |
bot = db.query(TelegramBot).filter(TelegramBot.id == bot_id).first() | |
if not bot: | |
raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_id} not found") | |
# Get associated engines through BotEngine | |
bot_engines = db.query(BotEngine).filter(BotEngine.bot_id == bot_id).all() | |
result = [] | |
for association in bot_engines: | |
engine = db.query(ChatEngine).filter(ChatEngine.id == association.engine_id).first() | |
if engine: | |
result.append({ | |
"association_id": association.id, | |
"engine_id": engine.id, | |
"engine_name": engine.name, | |
"answer_model": engine.answer_model, | |
"status": engine.status, | |
"created_at": association.created_at | |
}) | |
return result | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error retrieving bot engines: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving bot engines: {str(e)}") | |
# --- Chat Engine models and endpoints --- | |
class ChatEngineBase(BaseModel): | |
name: str | |
answer_model: str | |
system_prompt: Optional[str] = None | |
empty_response: Optional[str] = None | |
similarity_top_k: int = 3 | |
vector_distance_threshold: float = 0.75 | |
grounding_threshold: float = 0.2 | |
use_public_information: bool = False | |
status: str = "active" | |
class ChatEngineCreate(ChatEngineBase): | |
pass | |
class ChatEngineUpdate(BaseModel): | |
name: Optional[str] = None | |
answer_model: Optional[str] = None | |
system_prompt: Optional[str] = None | |
empty_response: Optional[str] = None | |
similarity_top_k: Optional[int] = None | |
vector_distance_threshold: Optional[float] = None | |
grounding_threshold: Optional[float] = None | |
use_public_information: Optional[bool] = None | |
status: Optional[str] = None | |
class ChatEngineResponse(ChatEngineBase): | |
id: int | |
created_at: datetime | |
last_modified: datetime | |
model_config = ConfigDict(from_attributes=True) | |
async def get_chat_engines( | |
skip: int = 0, | |
limit: int = 100, | |
status: Optional[str] = None, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get all chat engines. | |
- **skip**: Number of items to skip | |
- **limit**: Maximum number of items to return | |
- **status**: Filter by status (e.g., 'active', 'inactive') | |
""" | |
try: | |
query = db.query(ChatEngine) | |
if status: | |
query = query.filter(ChatEngine.status == status) | |
engines = query.offset(skip).limit(limit).all() | |
return [ChatEngineResponse.model_validate(engine, from_attributes=True) for engine in engines] | |
except SQLAlchemyError as e: | |
logger.error(f"Database error retrieving chat engines: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
logger.error(f"Error retrieving chat engines: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving chat engines: {str(e)}") | |
async def create_chat_engine( | |
engine: ChatEngineCreate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Create a new chat engine. | |
""" | |
try: | |
# Create chat engine | |
db_engine = ChatEngine(**engine.model_dump()) | |
db.add(db_engine) | |
db.commit() | |
db.refresh(db_engine) | |
return ChatEngineResponse.model_validate(db_engine, from_attributes=True) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error creating chat engine: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error creating chat engine: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error creating chat engine: {str(e)}") | |
async def get_chat_engine( | |
engine_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get chat engine by ID. | |
""" | |
try: | |
engine = db.query(ChatEngine).filter(ChatEngine.id == engine_id).first() | |
if not engine: | |
raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_id} not found") | |
return ChatEngineResponse.model_validate(engine, from_attributes=True) | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error retrieving chat engine: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving chat engine: {str(e)}") | |
async def update_chat_engine( | |
engine_id: int = Path(..., gt=0), | |
engine_update: ChatEngineUpdate = Body(...), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update chat engine details. | |
""" | |
try: | |
db_engine = db.query(ChatEngine).filter(ChatEngine.id == engine_id).first() | |
if not db_engine: | |
raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_id} not found") | |
# Update fields if provided | |
update_data = engine_update.model_dump(exclude_unset=True) | |
for key, value in update_data.items(): | |
if value is not None: | |
setattr(db_engine, key, value) | |
# Update last_modified timestamp | |
db_engine.last_modified = datetime.utcnow() | |
db.commit() | |
db.refresh(db_engine) | |
return ChatEngineResponse.model_validate(db_engine, from_attributes=True) | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error updating chat engine: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error updating chat engine: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error updating chat engine: {str(e)}") | |
async def delete_chat_engine( | |
engine_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete chat engine. | |
""" | |
try: | |
db_engine = db.query(ChatEngine).filter(ChatEngine.id == engine_id).first() | |
if not db_engine: | |
raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_id} not found") | |
# Check if engine has associated bots or vector databases | |
bot_engine_count = db.query(func.count(BotEngine.id)).filter(BotEngine.engine_id == engine_id).scalar() | |
vector_db_count = db.query(func.count(EngineVectorDb.id)).filter(EngineVectorDb.engine_id == engine_id).scalar() | |
if bot_engine_count > 0 or vector_db_count > 0: | |
raise HTTPException( | |
status_code=400, | |
detail="Cannot delete chat engine as it has associated bots or vector databases. Remove associations first." | |
) | |
# Delete engine | |
db.delete(db_engine) | |
db.commit() | |
return {"message": f"Chat engine with ID {engine_id} deleted successfully"} | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error deleting chat engine: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error deleting chat engine: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error deleting chat engine: {str(e)}") | |
async def get_engine_vector_databases( | |
engine_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get all vector databases associated with a chat engine. | |
""" | |
try: | |
# Verify engine exists | |
engine = db.query(ChatEngine).filter(ChatEngine.id == engine_id).first() | |
if not engine: | |
raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_id} not found") | |
# Get associated vector databases through EngineVectorDb | |
engine_vector_dbs = db.query(EngineVectorDb).filter(EngineVectorDb.engine_id == engine_id).all() | |
result = [] | |
for association in engine_vector_dbs: | |
vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == association.vector_database_id).first() | |
if vector_db: | |
result.append({ | |
"association_id": association.id, | |
"vector_database_id": vector_db.id, | |
"name": vector_db.name, | |
"pinecone_index": vector_db.pinecone_index, | |
"priority": association.priority, | |
"status": vector_db.status | |
}) | |
return result | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error retrieving engine vector databases: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving engine vector databases: {str(e)}") | |
# --- Bot Engine Association models and endpoints --- | |
class BotEngineCreate(BaseModel): | |
bot_id: int | |
engine_id: int | |
class BotEngineResponse(BotEngineCreate): | |
id: int | |
created_at: datetime | |
model_config = ConfigDict(from_attributes=True) | |
async def get_bot_engines( | |
skip: int = 0, | |
limit: int = 100, | |
bot_id: Optional[int] = None, | |
engine_id: Optional[int] = None, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get all bot-engine associations. | |
- **skip**: Number of items to skip | |
- **limit**: Maximum number of items to return | |
- **bot_id**: Filter by bot ID | |
- **engine_id**: Filter by engine ID | |
""" | |
try: | |
query = db.query(BotEngine) | |
if bot_id is not None: | |
query = query.filter(BotEngine.bot_id == bot_id) | |
if engine_id is not None: | |
query = query.filter(BotEngine.engine_id == engine_id) | |
bot_engines = query.offset(skip).limit(limit).all() | |
return [BotEngineResponse.model_validate(association, from_attributes=True) for association in bot_engines] | |
except SQLAlchemyError as e: | |
logger.error(f"Database error retrieving bot-engine associations: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
logger.error(f"Error retrieving bot-engine associations: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving bot-engine associations: {str(e)}") | |
async def create_bot_engine( | |
bot_engine: BotEngineCreate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Create a new bot-engine association. | |
""" | |
try: | |
# Check if bot exists | |
bot = db.query(TelegramBot).filter(TelegramBot.id == bot_engine.bot_id).first() | |
if not bot: | |
raise HTTPException(status_code=404, detail=f"Telegram bot with ID {bot_engine.bot_id} not found") | |
# Check if engine exists | |
engine = db.query(ChatEngine).filter(ChatEngine.id == bot_engine.engine_id).first() | |
if not engine: | |
raise HTTPException(status_code=404, detail=f"Chat engine with ID {bot_engine.engine_id} not found") | |
# Check if association already exists | |
existing_association = db.query(BotEngine).filter( | |
BotEngine.bot_id == bot_engine.bot_id, | |
BotEngine.engine_id == bot_engine.engine_id | |
).first() | |
if existing_association: | |
raise HTTPException( | |
status_code=400, | |
detail=f"Association between bot ID {bot_engine.bot_id} and engine ID {bot_engine.engine_id} already exists" | |
) | |
# Create association | |
db_bot_engine = BotEngine(**bot_engine.model_dump()) | |
db.add(db_bot_engine) | |
db.commit() | |
db.refresh(db_bot_engine) | |
return BotEngineResponse.model_validate(db_bot_engine, from_attributes=True) | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error creating bot-engine association: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error creating bot-engine association: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error creating bot-engine association: {str(e)}") | |
async def get_bot_engine( | |
association_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get bot-engine association by ID. | |
""" | |
try: | |
association = db.query(BotEngine).filter(BotEngine.id == association_id).first() | |
if not association: | |
raise HTTPException(status_code=404, detail=f"Bot-engine association with ID {association_id} not found") | |
return BotEngineResponse.model_validate(association, from_attributes=True) | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error retrieving bot-engine association: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving bot-engine association: {str(e)}") | |
async def delete_bot_engine( | |
association_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete bot-engine association. | |
""" | |
try: | |
association = db.query(BotEngine).filter(BotEngine.id == association_id).first() | |
if not association: | |
raise HTTPException(status_code=404, detail=f"Bot-engine association with ID {association_id} not found") | |
db.delete(association) | |
db.commit() | |
return {"message": f"Bot-engine association with ID {association_id} deleted successfully"} | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error deleting bot-engine association: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error deleting bot-engine association: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error deleting bot-engine association: {str(e)}") | |
# --- Engine-Vector DB Association models and endpoints --- | |
class EngineVectorDbCreate(BaseModel): | |
engine_id: int | |
vector_database_id: int | |
priority: int = 0 | |
class EngineVectorDbResponse(EngineVectorDbCreate): | |
id: int | |
model_config = ConfigDict(from_attributes=True) | |
async def get_engine_vector_dbs( | |
skip: int = 0, | |
limit: int = 100, | |
engine_id: Optional[int] = None, | |
vector_database_id: Optional[int] = None, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get all engine-vector-db associations. | |
- **skip**: Number of items to skip | |
- **limit**: Maximum number of items to return | |
- **engine_id**: Filter by engine ID | |
- **vector_database_id**: Filter by vector database ID | |
""" | |
try: | |
query = db.query(EngineVectorDb) | |
if engine_id is not None: | |
query = query.filter(EngineVectorDb.engine_id == engine_id) | |
if vector_database_id is not None: | |
query = query.filter(EngineVectorDb.vector_database_id == vector_database_id) | |
associations = query.offset(skip).limit(limit).all() | |
return [EngineVectorDbResponse.model_validate(association, from_attributes=True) for association in associations] | |
except SQLAlchemyError as e: | |
logger.error(f"Database error retrieving engine-vector-db associations: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
logger.error(f"Error retrieving engine-vector-db associations: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving engine-vector-db associations: {str(e)}") | |
async def create_engine_vector_db( | |
engine_vector_db: EngineVectorDbCreate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Create a new engine-vector-db association. | |
""" | |
try: | |
# Check if engine exists | |
engine = db.query(ChatEngine).filter(ChatEngine.id == engine_vector_db.engine_id).first() | |
if not engine: | |
raise HTTPException(status_code=404, detail=f"Chat engine with ID {engine_vector_db.engine_id} not found") | |
# Check if vector database exists | |
vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == engine_vector_db.vector_database_id).first() | |
if not vector_db: | |
raise HTTPException(status_code=404, detail=f"Vector database with ID {engine_vector_db.vector_database_id} not found") | |
# Check if association already exists | |
existing_association = db.query(EngineVectorDb).filter( | |
EngineVectorDb.engine_id == engine_vector_db.engine_id, | |
EngineVectorDb.vector_database_id == engine_vector_db.vector_database_id | |
).first() | |
if existing_association: | |
raise HTTPException( | |
status_code=400, | |
detail=f"Association between engine ID {engine_vector_db.engine_id} and vector database ID {engine_vector_db.vector_database_id} already exists" | |
) | |
# Create association | |
db_engine_vector_db = EngineVectorDb(**engine_vector_db.model_dump()) | |
db.add(db_engine_vector_db) | |
db.commit() | |
db.refresh(db_engine_vector_db) | |
return EngineVectorDbResponse.model_validate(db_engine_vector_db, from_attributes=True) | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error creating engine-vector-db association: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error creating engine-vector-db association: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error creating engine-vector-db association: {str(e)}") | |
async def get_engine_vector_db( | |
association_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get engine-vector-db association by ID. | |
""" | |
try: | |
association = db.query(EngineVectorDb).filter(EngineVectorDb.id == association_id).first() | |
if not association: | |
raise HTTPException(status_code=404, detail=f"Engine-vector-db association with ID {association_id} not found") | |
return EngineVectorDbResponse.model_validate(association, from_attributes=True) | |
except HTTPException: | |
raise | |
except Exception as e: | |
logger.error(f"Error retrieving engine-vector-db association: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving engine-vector-db association: {str(e)}") | |
async def update_engine_vector_db( | |
association_id: int = Path(..., gt=0), | |
update_data: dict = Body(...), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update engine-vector-db association details (only priority can be updated). | |
""" | |
try: | |
association = db.query(EngineVectorDb).filter(EngineVectorDb.id == association_id).first() | |
if not association: | |
raise HTTPException(status_code=404, detail=f"Engine-vector-db association with ID {association_id} not found") | |
# Only priority can be updated | |
if "priority" in update_data: | |
association.priority = update_data["priority"] | |
db.commit() | |
db.refresh(association) | |
return EngineVectorDbResponse.model_validate(association, from_attributes=True) | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error updating engine-vector-db association: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error updating engine-vector-db association: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error updating engine-vector-db association: {str(e)}") | |
async def delete_engine_vector_db( | |
association_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete engine-vector-db association. | |
""" | |
try: | |
association = db.query(EngineVectorDb).filter(EngineVectorDb.id == association_id).first() | |
if not association: | |
raise HTTPException(status_code=404, detail=f"Engine-vector-db association with ID {association_id} not found") | |
db.delete(association) | |
db.commit() | |
return {"message": f"Engine-vector-db association with ID {association_id} deleted successfully"} | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error deleting engine-vector-db association: {e}") | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error deleting engine-vector-db association: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error deleting engine-vector-db association: {str(e)}") | |
# --- VectorStatus models and endpoints --- | |
class VectorStatusBase(BaseModel): | |
document_id: int | |
vector_database_id: int | |
vector_id: Optional[str] = None | |
status: str = "pending" | |
error_message: Optional[str] = None | |
class VectorStatusCreate(VectorStatusBase): | |
pass | |
class VectorStatusResponse(VectorStatusBase): | |
id: int | |
embedded_at: Optional[datetime] = None | |
model_config = ConfigDict(from_attributes=True) | |
async def get_vector_statuses( | |
skip: int = 0, | |
limit: int = 100, | |
status: Optional[str] = None, | |
document_id: Optional[int] = None, | |
vector_database_id: Optional[int] = None, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get all vector statuses with optional filtering. | |
- **skip**: Number of items to skip | |
- **limit**: Maximum number of items to return | |
- **status**: Filter by status (e.g., 'pending', 'completed', 'error') | |
- **document_id**: Filter by document ID | |
- **vector_database_id**: Filter by vector database ID | |
""" | |
try: | |
query = db.query(VectorStatus) | |
# Apply filters if provided | |
if status is not None: | |
query = query.filter(VectorStatus.status == status) | |
if document_id is not None: | |
query = query.filter(VectorStatus.document_id == document_id) | |
if vector_database_id is not None: | |
query = query.filter(VectorStatus.vector_database_id == vector_database_id) | |
# Execute query with pagination | |
vector_statuses = query.offset(skip).limit(limit).all() | |
# Convert to Pydantic models | |
return [VectorStatusResponse.model_validate(status, from_attributes=True) for status in vector_statuses] | |
except SQLAlchemyError as e: | |
logger.error(f"Database error in get_vector_statuses: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
logger.error(f"Error retrieving vector statuses: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error retrieving vector statuses: {str(e)}") | |
async def create_emergency_contact( | |
emergency: EmergencyCreate, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Create a new emergency contact. | |
- **name**: Contact name | |
- **phone_number**: Phone number | |
- **description**: Description (optional) | |
- **address**: Address (optional) | |
- **location**: Location coordinates (optional) | |
- **priority**: Priority order (default: 0) | |
- **is_active**: Whether the contact is active (default: True) | |
""" | |
try: | |
db_emergency = EmergencyItem(**emergency.model_dump()) | |
db.add(db_emergency) | |
db.commit() | |
db.refresh(db_emergency) | |
# Invalidate emergency cache after creating a new item | |
emergencies_cache.clear() | |
# Convert SQLAlchemy model to Pydantic model before returning | |
result = EmergencyResponse.model_validate(db_emergency, from_attributes=True) | |
return result | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in create_emergency_contact: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def get_emergency_contact( | |
emergency_id: int = Path(..., gt=0), | |
use_cache: bool = True, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Get a single emergency contact by ID. | |
- **emergency_id**: ID of the emergency contact | |
- **use_cache**: If true, use cached results when available | |
""" | |
try: | |
# Generate cache key | |
cache_key = f"emergency_{emergency_id}" | |
# Try to get from cache if caching is enabled | |
if use_cache: | |
cached_result = emergencies_cache.get(cache_key) | |
if cached_result: | |
logger.info(f"Cache hit for {cache_key}") | |
return cached_result | |
# Use direct SQL query for better performance on single item lookup | |
stmt = text("SELECT * FROM emergency_item WHERE id = :id") | |
result = db.execute(stmt, {"id": emergency_id}).fetchone() | |
if not result: | |
raise HTTPException(status_code=404, detail="Emergency contact not found") | |
# Create an EmergencyItem model instance manually | |
emergency = EmergencyItem() | |
for key, value in result._mapping.items(): | |
if hasattr(emergency, key): | |
setattr(emergency, key, value) | |
# Convert to Pydantic model | |
response = EmergencyResponse.model_validate(emergency, from_attributes=True) | |
# Store in cache if caching is enabled | |
if use_cache: | |
emergencies_cache[cache_key] = response | |
return response | |
except SQLAlchemyError as e: | |
logger.error(f"Database error in get_emergency_contact: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def update_emergency_contact( | |
emergency_id: int = Path(..., gt=0), | |
emergency_update: EmergencyUpdate = Body(...), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update a specific emergency contact. | |
- **emergency_id**: ID of the emergency contact to update | |
- **name**: New name (optional) | |
- **phone_number**: New phone number (optional) | |
- **description**: New description (optional) | |
- **address**: New address (optional) | |
- **location**: New location coordinates (optional) | |
- **priority**: New priority order (optional) | |
- **is_active**: New active status (optional) | |
""" | |
try: | |
emergency = db.query(EmergencyItem).filter(EmergencyItem.id == emergency_id).first() | |
if not emergency: | |
raise HTTPException(status_code=404, detail="Emergency contact not found") | |
# Update fields if provided | |
update_data = emergency_update.model_dump(exclude_unset=True) | |
for key, value in update_data.items(): | |
setattr(emergency, key, value) | |
db.commit() | |
db.refresh(emergency) | |
# Invalidate specific cache entries | |
emergencies_cache.delete(f"emergency_{emergency_id}") | |
emergencies_cache.clear() # Clear all list caches | |
# Convert to Pydantic model | |
return EmergencyResponse.model_validate(emergency, from_attributes=True) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in update_emergency_contact: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def delete_emergency_contact( | |
emergency_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete a specific emergency contact. | |
- **emergency_id**: ID of the emergency contact to delete | |
""" | |
try: | |
# Use optimized direct SQL with RETURNING for better performance | |
result = db.execute( | |
text("DELETE FROM emergency_item WHERE id = :id RETURNING id"), | |
{"id": emergency_id} | |
).fetchone() | |
if not result: | |
raise HTTPException(status_code=404, detail="Emergency contact not found") | |
db.commit() | |
# Invalidate cache entries | |
emergencies_cache.delete(f"emergency_{emergency_id}") | |
emergencies_cache.clear() # Clear all list caches | |
return {"status": "success", "message": f"Emergency contact {emergency_id} deleted"} | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in delete_emergency_contact: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def batch_update_emergency_status( | |
emergency_ids: List[int] = Body(..., embed=True), | |
is_active: bool = Body(..., embed=True), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update the active status of multiple emergency contacts at once. | |
This is much more efficient than updating emergency contacts one at a time. | |
""" | |
try: | |
if not emergency_ids: | |
raise HTTPException(status_code=400, detail="No emergency contact IDs provided") | |
# Prepare the update statement | |
stmt = text(""" | |
UPDATE emergency_item | |
SET is_active = :is_active, updated_at = NOW() | |
WHERE id = ANY(:emergency_ids) | |
RETURNING id | |
""") | |
# Execute the update in a single query | |
result = db.execute(stmt, {"is_active": is_active, "emergency_ids": emergency_ids}) | |
updated_ids = [row[0] for row in result] | |
# Commit the transaction | |
db.commit() | |
# Determine which IDs weren't found | |
failed_ids = [id for id in emergency_ids if id not in updated_ids] | |
# Invalidate emergency cache | |
emergencies_cache.clear() | |
return BatchUpdateResult( | |
success_count=len(updated_ids), | |
failed_ids=failed_ids, | |
message=f"Updated {len(updated_ids)} emergency contacts" if updated_ids else "No emergency contacts were updated" | |
) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in batch_update_emergency_status: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def batch_delete_emergency_contacts( | |
emergency_ids: List[int] = Body(..., embed=True), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete multiple emergency contacts at once. | |
This is much more efficient than deleting emergency contacts one at a time with separate API calls. | |
""" | |
try: | |
if not emergency_ids: | |
raise HTTPException(status_code=400, detail="No emergency contact IDs provided") | |
# Prepare and execute the delete statement with RETURNING to get deleted IDs | |
stmt = text(""" | |
DELETE FROM emergency_item | |
WHERE id = ANY(:emergency_ids) | |
RETURNING id | |
""") | |
result = db.execute(stmt, {"emergency_ids": emergency_ids}) | |
deleted_ids = [row[0] for row in result] | |
# Commit the transaction | |
db.commit() | |
# Determine which IDs weren't found | |
failed_ids = [id for id in emergency_ids if id not in deleted_ids] | |
# Invalidate emergency cache | |
emergencies_cache.clear() | |
return BatchUpdateResult( | |
success_count=len(deleted_ids), | |
failed_ids=failed_ids, | |
message=f"Deleted {len(deleted_ids)} emergency contacts" if deleted_ids else "No emergency contacts were deleted" | |
) | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error in batch_delete_emergency_contacts: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
async def upload_document( | |
name: str = Form(...), | |
vector_database_id: int = Form(...), | |
file: UploadFile = File(...), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Upload a new document and associate it with a vector database. | |
- **name**: Document name | |
- **vector_database_id**: ID of the vector database to associate with | |
- **file**: The file to upload | |
""" | |
try: | |
# Check if vector database exists | |
vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == vector_database_id).first() | |
if not vector_db: | |
raise HTTPException(status_code=404, detail=f"Vector database with ID {vector_database_id} not found") | |
# Read file content | |
file_content = await file.read() | |
file_size = len(file_content) | |
# Determine file type from extension | |
filename = file.filename | |
file_extension = pathlib_Path(filename).suffix.lower()[1:] if filename else "" | |
# Create document record | |
document = Document( | |
name=name, | |
vector_database_id=vector_database_id, | |
file_type=file_extension, | |
content_type=file.content_type, | |
size=file_size, | |
is_embedded=False | |
) | |
db.add(document) | |
db.flush() # Get ID without committing | |
# Create document content record | |
document_content = DocumentContent( | |
document_id=document.id, | |
file_content=file_content | |
) | |
db.add(document_content) | |
db.commit() | |
db.refresh(document) | |
# Create vector status record for tracking embedding | |
vector_status = VectorStatus( | |
document_id=document.id, | |
vector_database_id=vector_database_id, | |
status="pending" | |
) | |
db.add(vector_status) | |
db.commit() | |
# Get vector database name for response | |
vector_db_name = vector_db.name if vector_db else f"db_{vector_database_id}" | |
# Create response | |
result = DocumentResponse( | |
id=document.id, | |
name=document.name, | |
file_type=document.file_type, | |
content_type=document.content_type, | |
size=document.size, | |
created_at=document.created_at, | |
updated_at=document.updated_at, | |
vector_database_id=document.vector_database_id, | |
vector_database_name=vector_db_name, | |
is_embedded=document.is_embedded | |
) | |
return result | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error uploading document: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error uploading document: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error uploading document: {str(e)}") | |
async def update_document( | |
document_id: int, | |
name: Optional[str] = Form(None), | |
file: Optional[UploadFile] = File(None), | |
background_tasks: BackgroundTasks = None, | |
db: Session = Depends(get_db) | |
): | |
""" | |
Update an existing document. Can update name, file content, or both. | |
- **document_id**: ID of the document to update | |
- **name**: New document name (optional) | |
- **file**: New file content (optional) | |
""" | |
try: | |
# Validate document_id | |
if document_id <= 0: | |
raise HTTPException(status_code=400, detail="document_id must be greater than 0") | |
# Check if document exists | |
document = db.query(Document).filter(Document.id == document_id).first() | |
if not document: | |
raise HTTPException(status_code=404, detail=f"Document with ID {document_id} not found") | |
# Get vector database information for later use | |
vector_db = None | |
if document.vector_database_id: | |
vector_db = db.query(VectorDatabase).filter(VectorDatabase.id == document.vector_database_id).first() | |
# Update name if provided | |
if name: | |
document.name = name | |
# Update file if provided | |
if file: | |
# Read new file content | |
file_content = await file.read() | |
file_size = len(file_content) | |
# Determine file type from extension | |
filename = file.filename | |
file_extension = pathlib_Path(filename).suffix.lower()[1:] if filename else "" | |
# Update document record | |
document.file_type = file_extension | |
document.content_type = file.content_type | |
document.size = file_size | |
document.is_embedded = False # Reset embedding status | |
document.updated_at = datetime.now() | |
# Update document content | |
document_content = db.query(DocumentContent).filter(DocumentContent.document_id == document_id).first() | |
if document_content: | |
document_content.file_content = file_content | |
else: | |
# Create new document content if it doesn't exist | |
document_content = DocumentContent( | |
document_id=document_id, | |
file_content=file_content | |
) | |
db.add(document_content) | |
# Get vector status for Pinecone cleanup | |
vector_status = db.query(VectorStatus).filter(VectorStatus.document_id == document_id).first() | |
# Store old vector_id for cleanup | |
old_vector_id = None | |
if vector_status and vector_status.vector_id: | |
old_vector_id = vector_status.vector_id | |
# Update vector status to pending | |
if vector_status: | |
vector_status.status = "pending" | |
vector_status.vector_id = None | |
vector_status.embedded_at = None | |
vector_status.error_message = None | |
else: | |
# Create new vector status if it doesn't exist | |
vector_status = VectorStatus( | |
document_id=document_id, | |
vector_database_id=document.vector_database_id, | |
status="pending" | |
) | |
db.add(vector_status) | |
# Schedule deletion of old vectors in Pinecone if we have all needed info | |
if old_vector_id and vector_db and document.vector_database_id and background_tasks: | |
try: | |
# Initialize PDFProcessor for vector deletion | |
from app.pdf.processor import PDFProcessor | |
processor = PDFProcessor( | |
index_name=vector_db.pinecone_index, | |
namespace=f"vdb-{document.vector_database_id}", | |
vector_db_id=document.vector_database_id | |
) | |
# Add deletion task to background tasks | |
background_tasks.add_task( | |
processor.delete_document_vectors, | |
old_vector_id | |
) | |
logger.info(f"Scheduled deletion of old vectors for document {document_id}") | |
except Exception as e: | |
logger.error(f"Error scheduling vector deletion: {str(e)}") | |
# Continue with the update even if vector deletion scheduling fails | |
# Schedule document for re-embedding if possible | |
if background_tasks and document.vector_database_id: | |
try: | |
# Import here to avoid circular imports | |
from app.pdf.tasks import process_document_for_embedding | |
# Schedule embedding | |
background_tasks.add_task( | |
process_document_for_embedding, | |
document_id=document_id, | |
vector_db_id=document.vector_database_id | |
) | |
logger.info(f"Scheduled re-embedding for document {document_id}") | |
except Exception as e: | |
logger.error(f"Error scheduling document embedding: {str(e)}") | |
# Continue with the update even if embedding scheduling fails | |
db.commit() | |
db.refresh(document) | |
# Get vector database name for response | |
vector_db_name = "No Database" | |
if vector_db: | |
vector_db_name = vector_db.name | |
elif document.vector_database_id: | |
vector_db_name = f"db_{document.vector_database_id}" | |
# Create response | |
result = DocumentResponse( | |
id=document.id, | |
name=document.name, | |
file_type=document.file_type, | |
content_type=document.content_type, | |
size=document.size, | |
created_at=document.created_at, | |
updated_at=document.updated_at, | |
vector_database_id=document.vector_database_id or 0, | |
vector_database_name=vector_db_name, | |
is_embedded=document.is_embedded | |
) | |
return result | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error updating document: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error updating document: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error updating document: {str(e)}") | |
async def delete_document( | |
document_id: int = Path(..., gt=0), | |
db: Session = Depends(get_db) | |
): | |
""" | |
Delete a document and its associated content. | |
- **document_id**: ID of the document to delete | |
""" | |
try: | |
# Check if document exists | |
document = db.query(Document).filter(Document.id == document_id).first() | |
if not document: | |
raise HTTPException(status_code=404, detail=f"Document with ID {document_id} not found") | |
# Delete vector status | |
db.query(VectorStatus).filter(VectorStatus.document_id == document_id).delete() | |
# Delete document content | |
db.query(DocumentContent).filter(DocumentContent.document_id == document_id).delete() | |
# Delete document | |
db.delete(document) | |
db.commit() | |
return {"status": "success", "message": f"Document with ID {document_id} deleted successfully"} | |
except HTTPException: | |
raise | |
except SQLAlchemyError as e: | |
db.rollback() | |
logger.error(f"Database error deleting document: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Database error: {str(e)}") | |
except Exception as e: | |
db.rollback() | |
logger.error(f"Error deleting document: {e}") | |
logger.error(traceback.format_exc()) | |
raise HTTPException(status_code=500, detail=f"Error deleting document: {str(e)}") |