""" Parquet module. TODO: handle migrations TODO: make it work with chunked exports. TODO: make it work with chunked imports. Mostly auto-generated by Cursor + GPT-5. """ import os import pandas as pd from sqlalchemy import inspect, text from sqlalchemy.engine import Engine from sqlmodel import Session def export_to_parquet(engine: Engine, backup_dir: str) -> None: """ Export each table in the database to a separate Parquet file. Loads entire tables into memory and sorts deterministically. TODO: make it work with chunked exports. TODO: handle migrations """ os.makedirs(backup_dir, exist_ok=True) inspector = inspect(engine) table_names = inspector.get_table_names() for table_name in table_names: file_path = os.path.join(backup_dir, f"{table_name}.parquet") # Load entire table into memory query = text(f"SELECT * FROM {table_name}") with engine.connect() as conn: df = pd.read_sql_query(query, conn) # Sort deterministically by all columns sort_cols = list(df.columns) df_sorted = df.sort_values(by=sort_cols).reset_index(drop=True) # Write to Parquet df_sorted.to_parquet(file_path, index=False) print(f"Exported {table_name} to {file_path}") def import_from_parquet(engine: Engine, backup_dir: str) -> None: """ Import each Parquet file into the database. Checks schema strictly (column names + types). Loads entire files into memory. TODO: make it work with chunked imports. TODO: handle migrations """ inspector = inspect(engine) table_names = inspector.get_table_names() for table_name in table_names: file_path = os.path.join(backup_dir, f"{table_name}.parquet") if not os.path.exists(file_path): print(f"No backup found for table {table_name}, skipping.") continue # Clear table before import with Session(engine) as session: session.exec(text(f"DELETE FROM {table_name}")) # Load entire file and insert at once df = pd.read_parquet(file_path) with engine.begin() as conn: conn.execute(text(f"DELETE FROM {table_name}")) if not df.empty: columns = df.columns.tolist() total_rows = len(df) chunk_size = 10000 for start in range(0, total_rows, chunk_size): end = min(start + chunk_size, total_rows) chunk = df.iloc[start:end] values = chunk.to_dict(orient="records") insert_stmt = text( f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES " + ", ".join( [ "(" + ", ".join([f":{col}_{i}" for col in columns]) + ")" for i in range(len(values)) ] ) ) params = {} for i, row in enumerate(values): for col in columns: params[f"{col}_{i}"] = row[col] conn.execute(insert_stmt, params) print(f"Imported {table_name} from {file_path}")