Spaces:
Sleeping
Sleeping
import os | |
from sqlalchemy import create_engine, text | |
from sqlalchemy.ext.declarative import declarative_base | |
from sqlalchemy.orm import sessionmaker | |
from sqlalchemy.exc import SQLAlchemyError, OperationalError | |
from dotenv import load_dotenv | |
import logging | |
# Configure logging | |
logger = logging.getLogger(__name__) | |
# Load environment variables | |
load_dotenv() | |
# Define default PostgreSQL connection string | |
DEFAULT_DB_URL = os.getenv("AIVEN_DB_URL") | |
# Set the default DB URL with the correct domain (.l.) | |
# Get DB connection mode from environment | |
DB_CONNECTION_MODE = os.getenv("DB_CONNECTION_MODE", "aiven") | |
# Set connection string based on mode | |
if DB_CONNECTION_MODE == "aiven": | |
DATABASE_URL = os.getenv("AIVEN_DB_URL", DEFAULT_DB_URL) | |
else: | |
# Default or other connection modes can be added here | |
DATABASE_URL = os.getenv("AIVEN_DB_URL", DEFAULT_DB_URL) | |
if not DATABASE_URL: | |
logger.error("No database URL configured. Using default URL.") | |
DATABASE_URL = DEFAULT_DB_URL # Use the correct default URL | |
# Create SQLAlchemy engine with optimized settings | |
try: | |
engine = create_engine( | |
DATABASE_URL, | |
pool_size=10, # Limit max connections | |
max_overflow=5, # Allow temporary overflow of connections | |
pool_timeout=30, # Timeout waiting for connection from pool | |
pool_recycle=300, # Recycle connections every 5 minutes | |
pool_pre_ping=True, # Verify connection is still valid before using it | |
connect_args={ | |
"connect_timeout": 5, # Connection timeout in seconds | |
"keepalives": 1, # Enable TCP keepalives | |
"keepalives_idle": 30, # Time before sending keepalives | |
"keepalives_interval": 10, # Time between keepalives | |
"keepalives_count": 5, # Number of keepalive probes | |
"application_name": "pixagent_api" # Identify app in PostgreSQL logs | |
}, | |
# Performance optimizations | |
isolation_level="READ COMMITTED", # Lower isolation level for better performance | |
echo=False, # Disable SQL echo to reduce overhead | |
echo_pool=False, # Disable pool logging | |
future=True, # Use SQLAlchemy 2.0 features | |
# Execution options for common queries | |
execution_options={ | |
"compiled_cache": {}, # Use an empty dict for compiled query caching | |
"logging_token": "SQL", # Tag for query logging | |
} | |
) | |
logger.info("PostgreSQL engine initialized with optimized settings") | |
except Exception as e: | |
logger.error(f"Failed to initialize PostgreSQL engine: {e}") | |
# Don't raise exception to avoid crash on startup | |
# Create optimized session factory | |
SessionLocal = sessionmaker( | |
autocommit=False, | |
autoflush=False, | |
bind=engine, | |
expire_on_commit=False # Prevent automatic reloading after commit | |
) | |
# Base class for declarative models - use sqlalchemy.orm for SQLAlchemy 2.0 compatibility | |
from sqlalchemy.orm import declarative_base | |
Base = declarative_base() | |
# Check PostgreSQL connection | |
def check_db_connection(): | |
"""Check PostgreSQL connection status""" | |
try: | |
# Simple query to verify connection | |
with engine.connect() as connection: | |
connection.execute(text("SELECT 1")).fetchone() | |
logger.info("PostgreSQL connection successful") | |
return True | |
except OperationalError as e: | |
logger.error(f"PostgreSQL connection failed: {e}") | |
return False | |
except Exception as e: | |
logger.error(f"Unknown error checking PostgreSQL connection: {e}") | |
return False | |
# Dependency to get DB session with improved error handling | |
def get_db(): | |
"""Get PostgreSQL database session""" | |
db = SessionLocal() | |
try: | |
# Test connection | |
db.execute(text("SELECT 1")).fetchone() | |
yield db | |
except Exception as e: | |
logger.error(f"DB connection error: {e}") | |
raise | |
finally: | |
db.close() # Ensure connection is closed and returned to pool | |
# Create tables in database if they don't exist | |
def create_tables(): | |
"""Create tables in database""" | |
try: | |
Base.metadata.create_all(bind=engine) | |
logger.info("Database tables created or already exist") | |
return True | |
except SQLAlchemyError as e: | |
logger.error(f"Failed to create database tables (SQLAlchemy error): {e}") | |
return False | |
except Exception as e: | |
logger.error(f"Failed to create database tables (unexpected error): {e}") | |
return False | |
# Function to create indexes for better performance | |
def create_indexes(): | |
"""Create indexes for better query performance""" | |
try: | |
with engine.connect() as conn: | |
try: | |
# Index for featured events - use try-except to handle if index already exists | |
conn.execute(text(""" | |
CREATE INDEX idx_event_featured | |
ON event_item(featured) | |
""")) | |
except SQLAlchemyError: | |
logger.info("Index idx_event_featured already exists") | |
try: | |
# Index for active events | |
conn.execute(text(""" | |
CREATE INDEX idx_event_active | |
ON event_item(is_active) | |
""")) | |
except SQLAlchemyError: | |
logger.info("Index idx_event_active already exists") | |
try: | |
# Index for date filtering | |
conn.execute(text(""" | |
CREATE INDEX idx_event_date_start | |
ON event_item(date_start) | |
""")) | |
except SQLAlchemyError: | |
logger.info("Index idx_event_date_start already exists") | |
try: | |
# Composite index for combined filtering | |
conn.execute(text(""" | |
CREATE INDEX idx_event_featured_active | |
ON event_item(featured, is_active) | |
""")) | |
except SQLAlchemyError: | |
logger.info("Index idx_event_featured_active already exists") | |
# Indexes for FAQ and Emergency tables | |
try: | |
# FAQ active flag index | |
conn.execute(text(""" | |
CREATE INDEX idx_faq_active | |
ON faq_item(is_active) | |
""")) | |
except SQLAlchemyError: | |
logger.info("Index idx_faq_active already exists") | |
try: | |
# Emergency contact active flag and priority indexes | |
conn.execute(text(""" | |
CREATE INDEX idx_emergency_active | |
ON emergency_item(is_active) | |
""")) | |
except SQLAlchemyError: | |
logger.info("Index idx_emergency_active already exists") | |
try: | |
conn.execute(text(""" | |
CREATE INDEX idx_emergency_priority | |
ON emergency_item(priority) | |
""")) | |
except SQLAlchemyError: | |
logger.info("Index idx_emergency_priority already exists") | |
conn.commit() | |
logger.info("Database indexes created or verified") | |
return True | |
except SQLAlchemyError as e: | |
logger.error(f"Failed to create indexes: {e}") | |
return False |