|
""" |
|
Parquet module. |
|
|
|
TODO: handle migrations |
|
TODO: make it work with chunked exports. |
|
TODO: make it work with chunked imports. |
|
|
|
Mostly auto-generated by Cursor + GPT-5. |
|
""" |
|
|
|
import os |
|
|
|
import pandas as pd |
|
from sqlalchemy import inspect, text |
|
from sqlalchemy.engine import Engine |
|
from sqlmodel import Session |
|
|
|
|
|
def export_to_parquet(engine: Engine, backup_dir: str) -> None: |
|
""" |
|
Export each table in the database to a separate Parquet file. |
|
Loads entire tables into memory and sorts deterministically. |
|
|
|
TODO: make it work with chunked exports. |
|
TODO: handle migrations |
|
""" |
|
os.makedirs(backup_dir, exist_ok=True) |
|
inspector = inspect(engine) |
|
table_names = inspector.get_table_names() |
|
|
|
for table_name in table_names: |
|
file_path = os.path.join(backup_dir, f"{table_name}.parquet") |
|
|
|
|
|
query = text(f"SELECT * FROM {table_name}") |
|
with engine.connect() as conn: |
|
df = pd.read_sql_query(query, conn) |
|
|
|
|
|
sort_cols = list(df.columns) |
|
df_sorted = df.sort_values(by=sort_cols).reset_index(drop=True) |
|
|
|
|
|
df_sorted.to_parquet(file_path, index=False) |
|
print(f"Exported {table_name} to {file_path}") |
|
|
|
|
|
def import_from_parquet(engine: Engine, backup_dir: str) -> None: |
|
""" |
|
Import each Parquet file into the database. |
|
Checks schema strictly (column names + types). |
|
Loads entire files into memory. |
|
|
|
TODO: make it work with chunked imports. |
|
TODO: handle migrations |
|
""" |
|
inspector = inspect(engine) |
|
table_names = inspector.get_table_names() |
|
|
|
for table_name in table_names: |
|
file_path = os.path.join(backup_dir, f"{table_name}.parquet") |
|
if not os.path.exists(file_path): |
|
print(f"No backup found for table {table_name}, skipping.") |
|
continue |
|
|
|
|
|
with Session(engine) as session: |
|
session.exec(text(f"DELETE FROM {table_name}")) |
|
|
|
|
|
df = pd.read_parquet(file_path) |
|
with engine.begin() as conn: |
|
conn.execute(text(f"DELETE FROM {table_name}")) |
|
if not df.empty: |
|
columns = df.columns.tolist() |
|
total_rows = len(df) |
|
chunk_size = 10000 |
|
for start in range(0, total_rows, chunk_size): |
|
end = min(start + chunk_size, total_rows) |
|
chunk = df.iloc[start:end] |
|
values = chunk.to_dict(orient="records") |
|
insert_stmt = text( |
|
f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES " |
|
+ ", ".join( |
|
[ |
|
"(" |
|
+ ", ".join([f":{col}_{i}" for col in columns]) |
|
+ ")" |
|
for i in range(len(values)) |
|
] |
|
) |
|
) |
|
params = {} |
|
for i, row in enumerate(values): |
|
for col in columns: |
|
params[f"{col}_{i}"] = row[col] |
|
conn.execute(insert_stmt, params) |
|
|
|
print(f"Imported {table_name} from {file_path}") |
|
|