import os from sqlalchemy import create_engine, text from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from sqlalchemy.exc import SQLAlchemyError, OperationalError from dotenv import load_dotenv import logging # Configure logging logger = logging.getLogger(__name__) # Load environment variables load_dotenv() # Define default PostgreSQL connection string DEFAULT_DB_URL = os.getenv("AIVEN_DB_URL") # Set the default DB URL with the correct domain (.l.) # Get DB connection mode from environment DB_CONNECTION_MODE = os.getenv("DB_CONNECTION_MODE", "aiven") # Set connection string based on mode if DB_CONNECTION_MODE == "aiven": DATABASE_URL = os.getenv("AIVEN_DB_URL", DEFAULT_DB_URL) else: # Default or other connection modes can be added here DATABASE_URL = os.getenv("AIVEN_DB_URL", DEFAULT_DB_URL) if not DATABASE_URL: logger.error("No database URL configured. Using default URL.") DATABASE_URL = DEFAULT_DB_URL # Use the correct default URL # Create SQLAlchemy engine with optimized settings try: engine = create_engine( DATABASE_URL, pool_size=10, # Limit max connections max_overflow=5, # Allow temporary overflow of connections pool_timeout=30, # Timeout waiting for connection from pool pool_recycle=300, # Recycle connections every 5 minutes pool_pre_ping=True, # Verify connection is still valid before using it connect_args={ "connect_timeout": 5, # Connection timeout in seconds "keepalives": 1, # Enable TCP keepalives "keepalives_idle": 30, # Time before sending keepalives "keepalives_interval": 10, # Time between keepalives "keepalives_count": 5, # Number of keepalive probes "application_name": "pixagent_api" # Identify app in PostgreSQL logs }, # Performance optimizations isolation_level="READ COMMITTED", # Lower isolation level for better performance echo=False, # Disable SQL echo to reduce overhead echo_pool=False, # Disable pool logging future=True, # Use SQLAlchemy 2.0 features # Execution options for common queries execution_options={ "compiled_cache": {}, # Use an empty dict for compiled query caching "logging_token": "SQL", # Tag for query logging } ) logger.info("PostgreSQL engine initialized with optimized settings") except Exception as e: logger.error(f"Failed to initialize PostgreSQL engine: {e}") # Don't raise exception to avoid crash on startup # Create optimized session factory SessionLocal = sessionmaker( autocommit=False, autoflush=False, bind=engine, expire_on_commit=False # Prevent automatic reloading after commit ) # Base class for declarative models - use sqlalchemy.orm for SQLAlchemy 2.0 compatibility from sqlalchemy.orm import declarative_base Base = declarative_base() # Check PostgreSQL connection def check_db_connection(): """Check PostgreSQL connection status""" try: # Simple query to verify connection with engine.connect() as connection: connection.execute(text("SELECT 1")).fetchone() logger.info("PostgreSQL connection successful") return True except OperationalError as e: logger.error(f"PostgreSQL connection failed: {e}") return False except Exception as e: logger.error(f"Unknown error checking PostgreSQL connection: {e}") return False # Dependency to get DB session with improved error handling def get_db(): """Get PostgreSQL database session""" db = SessionLocal() try: # Test connection db.execute(text("SELECT 1")).fetchone() yield db except Exception as e: logger.error(f"DB connection error: {e}") raise finally: db.close() # Ensure connection is closed and returned to pool # Create tables in database if they don't exist def create_tables(): """Create tables in database""" try: Base.metadata.create_all(bind=engine) logger.info("Database tables created or already exist") return True except SQLAlchemyError as e: logger.error(f"Failed to create database tables (SQLAlchemy error): {e}") return False except Exception as e: logger.error(f"Failed to create database tables (unexpected error): {e}") return False # Function to create indexes for better performance def create_indexes(): """Create indexes for better query performance""" try: with engine.connect() as conn: try: # Index for featured events - use try-except to handle if index already exists conn.execute(text(""" CREATE INDEX idx_event_featured ON event_item(featured) """)) except SQLAlchemyError: logger.info("Index idx_event_featured already exists") try: # Index for active events conn.execute(text(""" CREATE INDEX idx_event_active ON event_item(is_active) """)) except SQLAlchemyError: logger.info("Index idx_event_active already exists") try: # Index for date filtering conn.execute(text(""" CREATE INDEX idx_event_date_start ON event_item(date_start) """)) except SQLAlchemyError: logger.info("Index idx_event_date_start already exists") try: # Composite index for combined filtering conn.execute(text(""" CREATE INDEX idx_event_featured_active ON event_item(featured, is_active) """)) except SQLAlchemyError: logger.info("Index idx_event_featured_active already exists") # Indexes for FAQ and Emergency tables try: # FAQ active flag index conn.execute(text(""" CREATE INDEX idx_faq_active ON faq_item(is_active) """)) except SQLAlchemyError: logger.info("Index idx_faq_active already exists") try: # Emergency contact active flag and priority indexes conn.execute(text(""" CREATE INDEX idx_emergency_active ON emergency_item(is_active) """)) except SQLAlchemyError: logger.info("Index idx_emergency_active already exists") try: conn.execute(text(""" CREATE INDEX idx_emergency_priority ON emergency_item(priority) """)) except SQLAlchemyError: logger.info("Index idx_emergency_priority already exists") conn.commit() logger.info("Database indexes created or verified") return True except SQLAlchemyError as e: logger.error(f"Failed to create indexes: {e}") return False