Wauplin's picture
Wauplin HF Staff
Parquet export
21a86d3 verified
"""
Parquet module.
TODO: handle migrations
TODO: make it work with chunked exports.
TODO: make it work with chunked imports.
Mostly auto-generated by Cursor + GPT-5.
"""
import os
import pandas as pd
from sqlalchemy import inspect, text
from sqlalchemy.engine import Engine
from sqlmodel import Session
def export_to_parquet(engine: Engine, backup_dir: str) -> None:
"""
Export each table in the database to a separate Parquet file.
Loads entire tables into memory and sorts deterministically.
TODO: make it work with chunked exports.
TODO: handle migrations
"""
os.makedirs(backup_dir, exist_ok=True)
inspector = inspect(engine)
table_names = inspector.get_table_names()
for table_name in table_names:
file_path = os.path.join(backup_dir, f"{table_name}.parquet")
# Load entire table into memory
query = text(f"SELECT * FROM {table_name}")
with engine.connect() as conn:
df = pd.read_sql_query(query, conn)
# Sort deterministically by all columns
sort_cols = list(df.columns)
df_sorted = df.sort_values(by=sort_cols).reset_index(drop=True)
# Write to Parquet
df_sorted.to_parquet(file_path, index=False)
print(f"Exported {table_name} to {file_path}")
def import_from_parquet(engine: Engine, backup_dir: str) -> None:
"""
Import each Parquet file into the database.
Checks schema strictly (column names + types).
Loads entire files into memory.
TODO: make it work with chunked imports.
TODO: handle migrations
"""
inspector = inspect(engine)
table_names = inspector.get_table_names()
for table_name in table_names:
file_path = os.path.join(backup_dir, f"{table_name}.parquet")
if not os.path.exists(file_path):
print(f"No backup found for table {table_name}, skipping.")
continue
# Clear table before import
with Session(engine) as session:
session.exec(text(f"DELETE FROM {table_name}"))
# Load entire file and insert at once
df = pd.read_parquet(file_path)
with engine.begin() as conn:
conn.execute(text(f"DELETE FROM {table_name}"))
if not df.empty:
columns = df.columns.tolist()
total_rows = len(df)
chunk_size = 10000
for start in range(0, total_rows, chunk_size):
end = min(start + chunk_size, total_rows)
chunk = df.iloc[start:end]
values = chunk.to_dict(orient="records")
insert_stmt = text(
f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES "
+ ", ".join(
[
"("
+ ", ".join([f":{col}_{i}" for col in columns])
+ ")"
for i in range(len(values))
]
)
)
params = {}
for i, row in enumerate(values):
for col in columns:
params[f"{col}_{i}"] = row[col]
conn.execute(insert_stmt, params)
print(f"Imported {table_name} from {file_path}")