taslim19
drag
4051191
import os
from typing import Type, List, TypeVar, Optional
from sqlalchemy.ext.asyncio import create_async_engine
from sqlmodel import SQLModel, Field, Session, select, delete
from sqlmodel.ext.asyncio.session import AsyncSession
from tools import LanguageSingleton
T = TypeVar("T")
class ChapterFile(SQLModel, table=True):
url: str = Field(primary_key=True)
file_id: Optional[str]
file_unique_id: Optional[str]
cbz_id: Optional[str]
cbz_unique_id: Optional[str]
telegraph_url: Optional[str]
class MangaOutput(SQLModel, table=True):
user_id: str = Field(primary_key=True, regex=r'\d+')
output: int = Field
class Subscription(SQLModel, table=True):
url: str = Field(primary_key=True)
user_id: str = Field(primary_key=True, regex=r'\d+')
class LastChapter(SQLModel, table=True):
url: str = Field(primary_key=True)
chapter_url: str = Field
class MangaName(SQLModel, table=True):
url: str = Field(primary_key=True)
name: str = Field
class DB(metaclass=LanguageSingleton):
def __init__(self, dbname: str = 'sqlite+aiosqlite:///test.db'):
if dbname.startswith('postgres://'):
dbname = dbname.replace('postgres://', 'postgresql+asyncpg://', 1)
elif dbname.startswith('postgresql://'):
dbname = dbname.replace('postgresql://', 'postgresql+asyncpg://', 1)
elif dbname.startswith('sqlite'):
dbname = dbname.replace('sqlite', 'sqlite+aiosqlite', 1)
self.engine = create_async_engine(dbname)
async def connect(self):
async with self.engine.begin() as conn:
await conn.run_sync(SQLModel.metadata.create_all, checkfirst=True)
async def add(self, other: SQLModel):
async with AsyncSession(self.engine) as session: # type: AsyncSession
async with session.begin():
session.add(other)
async def get(self, table: Type[T], id) -> T:
async with AsyncSession(self.engine) as session: # type: AsyncSession
return await session.get(table, id)
async def get_all(self, table: Type[T]) -> List[T]:
async with AsyncSession(self.engine) as session: # type: AsyncSession
statement = select(table)
return await session.exec(statement=statement)
async def erase(self, other: SQLModel):
async with AsyncSession(self.engine) as session: # type: AsyncSession
async with session.begin():
await session.delete(other)
async def get_chapter_file_by_id(self, id: str):
async with AsyncSession(self.engine) as session: # type: AsyncSession
statement = select(ChapterFile).where((ChapterFile.file_unique_id == id) |
(ChapterFile.cbz_unique_id == id) |
(ChapterFile.telegraph_url == id))
return (await session.exec(statement=statement)).first()
async def get_subs(self, user_id: str, filters=None) -> List[MangaName]:
async with AsyncSession(self.engine) as session:
statement = (
select(MangaName)
.join(Subscription, Subscription.url == MangaName.url)
.where(Subscription.user_id == user_id)
)
for filter_ in filters or []:
statement = statement.where(MangaName.name.ilike(f'%{filter_}%') | MangaName.url.ilike(f'%{filter_}%'))
return (await session.exec(statement=statement)).all()
async def erase_subs(self, user_id: str):
async with AsyncSession(self.engine) as session:
async with session.begin():
statement = delete(Subscription).where(Subscription.user_id == user_id)
await session.exec(statement=statement)