# db_utils.py import sqlite3 import os DATABASE_NAME = os.getenv("DATABASE_NAME", "data/bot_data.db") def get_connection(): return sqlite3.connect(DATABASE_NAME) # Initialize DB with table for file cache and user table async def initialize_database(): conn = get_connection() cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS file_cache ( short_id TEXT PRIMARY KEY, file_id TEXT NOT NULL, filename TEXT, type TEXT, size INTEGER ) """) cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( user_id INTEGER PRIMARY KEY, username TEXT, first_name TEXT ) """) conn.commit() conn.close() # Get file from cache async def get_cached_file(short_id: str) -> dict | None: conn = get_connection() cursor = conn.cursor() cursor.execute("SELECT file_id, filename, type, size FROM file_cache WHERE short_id = ?", (short_id,)) row = cursor.fetchone() conn.close() if row: return { "file_id": row[0], "filename": row[1], "type": row[2], "size": row[3], } return None # Add to cache async def add_to_cache(short_id: str, file_id: str, filename: str, file_type: str, size: int): conn = get_connection() cursor = conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO file_cache (short_id, file_id, filename, type, size) VALUES (?, ?, ?, ?, ?) """, (short_id, file_id, filename, file_type, size)) conn.commit() conn.close() # Add or update user in DB async def add_or_update_user_db(user_id: int, username: str, first_name: str): conn = get_connection() cursor = conn.cursor() cursor.execute(""" INSERT OR REPLACE INTO users (user_id, username, first_name) VALUES (?, ?, ?) """, (user_id, username, first_name)) conn.commit() conn.close()