import os from typing import Type, List, TypeVar, Optional from sqlalchemy.ext.asyncio import create_async_engine from sqlmodel import SQLModel, Field, Session, select, delete from sqlmodel.ext.asyncio.session import AsyncSession from tools import LanguageSingleton T = TypeVar("T") class ChapterFile(SQLModel, table=True): url: str = Field(primary_key=True) file_id: Optional[str] file_unique_id: Optional[str] cbz_id: Optional[str] cbz_unique_id: Optional[str] telegraph_url: Optional[str] class MangaOutput(SQLModel, table=True): user_id: str = Field(primary_key=True, regex=r'\d+') output: int = Field class Subscription(SQLModel, table=True): url: str = Field(primary_key=True) user_id: str = Field(primary_key=True, regex=r'\d+') class LastChapter(SQLModel, table=True): url: str = Field(primary_key=True) chapter_url: str = Field class MangaName(SQLModel, table=True): url: str = Field(primary_key=True) name: str = Field class DB(metaclass=LanguageSingleton): def __init__(self, dbname: str = 'sqlite+aiosqlite:///test.db'): if dbname.startswith('postgres://'): dbname = dbname.replace('postgres://', 'postgresql+asyncpg://', 1) elif dbname.startswith('postgresql://'): dbname = dbname.replace('postgresql://', 'postgresql+asyncpg://', 1) elif dbname.startswith('sqlite'): dbname = dbname.replace('sqlite', 'sqlite+aiosqlite', 1) self.engine = create_async_engine(dbname) async def connect(self): async with self.engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all, checkfirst=True) async def add(self, other: SQLModel): async with AsyncSession(self.engine) as session: # type: AsyncSession async with session.begin(): session.add(other) async def get(self, table: Type[T], id) -> T: async with AsyncSession(self.engine) as session: # type: AsyncSession return await session.get(table, id) async def get_all(self, table: Type[T]) -> List[T]: async with AsyncSession(self.engine) as session: # type: AsyncSession statement = select(table) return await session.exec(statement=statement) async def erase(self, other: SQLModel): async with AsyncSession(self.engine) as session: # type: AsyncSession async with session.begin(): await session.delete(other) async def get_chapter_file_by_id(self, id: str): async with AsyncSession(self.engine) as session: # type: AsyncSession statement = select(ChapterFile).where((ChapterFile.file_unique_id == id) | (ChapterFile.cbz_unique_id == id) | (ChapterFile.telegraph_url == id)) return (await session.exec(statement=statement)).first() async def get_subs(self, user_id: str, filters=None) -> List[MangaName]: async with AsyncSession(self.engine) as session: statement = ( select(MangaName) .join(Subscription, Subscription.url == MangaName.url) .where(Subscription.user_id == user_id) ) for filter_ in filters or []: statement = statement.where(MangaName.name.ilike(f'%{filter_}%') | MangaName.url.ilike(f'%{filter_}%')) return (await session.exec(statement=statement)).all() async def erase_subs(self, user_id: str): async with AsyncSession(self.engine) as session: async with session.begin(): statement = delete(Subscription).where(Subscription.user_id == user_id) await session.exec(statement=statement)