|
from __future__ import annotations |
|
from typing import TYPE_CHECKING |
|
from uuid import uuid4 |
|
|
|
from asyncpg import Connection |
|
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine |
|
|
|
from bot.core.config import settings |
|
|
|
if TYPE_CHECKING: |
|
from sqlalchemy.engine.url import URL |
|
|
|
|
|
class CConnection(Connection): |
|
def _get_unique_id(self, prefix: str) -> str: |
|
return f"__asyncpg_{prefix}_{uuid4()}__" |
|
|
|
|
|
def get_engine(url: URL | str = settings.database_url) -> AsyncEngine: |
|
return create_async_engine( |
|
url=url, |
|
echo=settings.DEBUG, |
|
pool_size=0, |
|
connect_args={ |
|
"connection_class": CConnection, |
|
}, |
|
) |
|
|
|
|
|
def get_sessionmaker(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]: |
|
return async_sessionmaker(bind=engine, autoflush=False, expire_on_commit=False) |
|
|
|
|
|
db_url = settings.database_url |
|
engine = get_engine(url=db_url) |
|
sessionmaker = get_sessionmaker(engine) |
|
|