File size: 7,443 Bytes
ac0f906 e83f5e9 ac0f906 0e5b8f8 ac0f906 0e5b8f8 ac0f906 0e5b8f8 ac0f906 0e5b8f8 ac0f906 e83f5e9 ac0f906 c8b8c9b ac0f906 e83f5e9 ac0f906 e83f5e9 ac0f906 e83f5e9 ac0f906 e83f5e9 ac0f906 e83f5e9 ac0f906 e83f5e9 ac0f906 e83f5e9 ac0f906 e83f5e9 ac0f906 e83f5e9 ac0f906 e83f5e9 ac0f906 e83f5e9 ac0f906 e83f5e9 ac0f906 c8b8c9b ac0f906 c8b8c9b e83f5e9 ac0f906 c8b8c9b ac0f906 c8b8c9b ac0f906 e83f5e9 ac0f906 e83f5e9 ac0f906 e83f5e9 ac0f906 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 |
import os
from sqlalchemy import create_engine, text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError, OperationalError
from dotenv import load_dotenv
import logging
# Configure logging
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Define default PostgreSQL connection string
DEFAULT_DB_URL = os.getenv("AIVEN_DB_URL")
# Set the default DB URL with the correct domain (.l.)
# Get DB connection mode from environment
DB_CONNECTION_MODE = os.getenv("DB_CONNECTION_MODE", "aiven")
# Set connection string based on mode
if DB_CONNECTION_MODE == "aiven":
DATABASE_URL = os.getenv("AIVEN_DB_URL", DEFAULT_DB_URL)
else:
# Default or other connection modes can be added here
DATABASE_URL = os.getenv("AIVEN_DB_URL", DEFAULT_DB_URL)
if not DATABASE_URL:
logger.error("No database URL configured. Using default URL.")
DATABASE_URL = DEFAULT_DB_URL # Use the correct default URL
# Create SQLAlchemy engine with optimized settings
try:
engine = create_engine(
DATABASE_URL,
pool_size=10, # Limit max connections
max_overflow=5, # Allow temporary overflow of connections
pool_timeout=30, # Timeout waiting for connection from pool
pool_recycle=300, # Recycle connections every 5 minutes
pool_pre_ping=True, # Verify connection is still valid before using it
connect_args={
"connect_timeout": 5, # Connection timeout in seconds
"keepalives": 1, # Enable TCP keepalives
"keepalives_idle": 30, # Time before sending keepalives
"keepalives_interval": 10, # Time between keepalives
"keepalives_count": 5, # Number of keepalive probes
"application_name": "pixagent_api" # Identify app in PostgreSQL logs
},
# Performance optimizations
isolation_level="READ COMMITTED", # Lower isolation level for better performance
echo=False, # Disable SQL echo to reduce overhead
echo_pool=False, # Disable pool logging
future=True, # Use SQLAlchemy 2.0 features
# Execution options for common queries
execution_options={
"compiled_cache": {}, # Use an empty dict for compiled query caching
"logging_token": "SQL", # Tag for query logging
}
)
logger.info("PostgreSQL engine initialized with optimized settings")
except Exception as e:
logger.error(f"Failed to initialize PostgreSQL engine: {e}")
# Don't raise exception to avoid crash on startup
# Create optimized session factory
SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine,
expire_on_commit=False # Prevent automatic reloading after commit
)
# Base class for declarative models - use sqlalchemy.orm for SQLAlchemy 2.0 compatibility
from sqlalchemy.orm import declarative_base
Base = declarative_base()
# Check PostgreSQL connection
def check_db_connection():
"""Check PostgreSQL connection status"""
try:
# Simple query to verify connection
with engine.connect() as connection:
connection.execute(text("SELECT 1")).fetchone()
logger.info("PostgreSQL connection successful")
return True
except OperationalError as e:
logger.error(f"PostgreSQL connection failed: {e}")
return False
except Exception as e:
logger.error(f"Unknown error checking PostgreSQL connection: {e}")
return False
# Dependency to get DB session with improved error handling
def get_db():
"""Get PostgreSQL database session"""
db = SessionLocal()
try:
# Test connection
db.execute(text("SELECT 1")).fetchone()
yield db
except Exception as e:
logger.error(f"DB connection error: {e}")
raise
finally:
db.close() # Ensure connection is closed and returned to pool
# Create tables in database if they don't exist
def create_tables():
"""Create tables in database"""
try:
Base.metadata.create_all(bind=engine)
logger.info("Database tables created or already exist")
return True
except SQLAlchemyError as e:
logger.error(f"Failed to create database tables (SQLAlchemy error): {e}")
return False
except Exception as e:
logger.error(f"Failed to create database tables (unexpected error): {e}")
return False
# Function to create indexes for better performance
def create_indexes():
"""Create indexes for better query performance"""
try:
with engine.connect() as conn:
try:
# Index for featured events - use try-except to handle if index already exists
conn.execute(text("""
CREATE INDEX idx_event_featured
ON event_item(featured)
"""))
except SQLAlchemyError:
logger.info("Index idx_event_featured already exists")
try:
# Index for active events
conn.execute(text("""
CREATE INDEX idx_event_active
ON event_item(is_active)
"""))
except SQLAlchemyError:
logger.info("Index idx_event_active already exists")
try:
# Index for date filtering
conn.execute(text("""
CREATE INDEX idx_event_date_start
ON event_item(date_start)
"""))
except SQLAlchemyError:
logger.info("Index idx_event_date_start already exists")
try:
# Composite index for combined filtering
conn.execute(text("""
CREATE INDEX idx_event_featured_active
ON event_item(featured, is_active)
"""))
except SQLAlchemyError:
logger.info("Index idx_event_featured_active already exists")
# Indexes for FAQ and Emergency tables
try:
# FAQ active flag index
conn.execute(text("""
CREATE INDEX idx_faq_active
ON faq_item(is_active)
"""))
except SQLAlchemyError:
logger.info("Index idx_faq_active already exists")
try:
# Emergency contact active flag and priority indexes
conn.execute(text("""
CREATE INDEX idx_emergency_active
ON emergency_item(is_active)
"""))
except SQLAlchemyError:
logger.info("Index idx_emergency_active already exists")
try:
conn.execute(text("""
CREATE INDEX idx_emergency_priority
ON emergency_item(priority)
"""))
except SQLAlchemyError:
logger.info("Index idx_emergency_priority already exists")
conn.commit()
logger.info("Database indexes created or verified")
return True
except SQLAlchemyError as e:
logger.error(f"Failed to create indexes: {e}")
return False |