File size: 3,393 Bytes
21a86d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
"""
Parquet module.

TODO: handle migrations
TODO: make it work with chunked exports.
TODO: make it work with chunked imports.

Mostly auto-generated by Cursor + GPT-5.
"""

import os

import pandas as pd
from sqlalchemy import inspect, text
from sqlalchemy.engine import Engine
from sqlmodel import Session


def export_to_parquet(engine: Engine, backup_dir: str) -> None:
    """
    Export each table in the database to a separate Parquet file.
    Loads entire tables into memory and sorts deterministically.

    TODO: make it work with chunked exports.
    TODO: handle migrations
    """
    os.makedirs(backup_dir, exist_ok=True)
    inspector = inspect(engine)
    table_names = inspector.get_table_names()

    for table_name in table_names:
        file_path = os.path.join(backup_dir, f"{table_name}.parquet")

        # Load entire table into memory
        query = text(f"SELECT * FROM {table_name}")
        with engine.connect() as conn:
            df = pd.read_sql_query(query, conn)

        # Sort deterministically by all columns
        sort_cols = list(df.columns)
        df_sorted = df.sort_values(by=sort_cols).reset_index(drop=True)

        # Write to Parquet
        df_sorted.to_parquet(file_path, index=False)
        print(f"Exported {table_name} to {file_path}")


def import_from_parquet(engine: Engine, backup_dir: str) -> None:
    """
    Import each Parquet file into the database.
    Checks schema strictly (column names + types).
    Loads entire files into memory.

    TODO: make it work with chunked imports.
    TODO: handle migrations
    """
    inspector = inspect(engine)
    table_names = inspector.get_table_names()

    for table_name in table_names:
        file_path = os.path.join(backup_dir, f"{table_name}.parquet")
        if not os.path.exists(file_path):
            print(f"No backup found for table {table_name}, skipping.")
            continue

        # Clear table before import
        with Session(engine) as session:
            session.exec(text(f"DELETE FROM {table_name}"))

        # Load entire file and insert at once
        df = pd.read_parquet(file_path)
        with engine.begin() as conn:
            conn.execute(text(f"DELETE FROM {table_name}"))
            if not df.empty:
                columns = df.columns.tolist()
                total_rows = len(df)
                chunk_size = 10000
                for start in range(0, total_rows, chunk_size):
                    end = min(start + chunk_size, total_rows)
                    chunk = df.iloc[start:end]
                    values = chunk.to_dict(orient="records")
                    insert_stmt = text(
                        f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES "
                        + ", ".join(
                            [
                                "("
                                + ", ".join([f":{col}_{i}" for col in columns])
                                + ")"
                                for i in range(len(values))
                            ]
                        )
                    )
                    params = {}
                    for i, row in enumerate(values):
                        for col in columns:
                            params[f"{col}_{i}"] = row[col]
                    conn.execute(insert_stmt, params)

        print(f"Imported {table_name} from {file_path}")