|
import os |
|
import shutil |
|
import tempfile |
|
|
|
import pandas as pd |
|
import streamlit as st |
|
import xlwings as xw |
|
from pyxlsb import open_workbook |
|
|
|
|
|
|
|
|
|
def find_header_row(df, keyword="Dist_Name"): |
|
for i in range(min(20, len(df))): |
|
row = df.iloc[i].astype(str).str.strip().str.lower() |
|
if any(keyword.lower() in str(cell) for cell in row): |
|
return i |
|
raise ValueError(f"No row with '{keyword}' found.") |
|
|
|
|
|
def read_xlsb_with_pyxlsb(file, sheet): |
|
rows = [] |
|
with open_workbook(file) as wb: |
|
with wb.get_sheet(sheet) as s: |
|
for row in s.rows(): |
|
rows.append([item.v for item in row]) |
|
return pd.DataFrame(rows) |
|
|
|
|
|
def read_sheet_fallback(file, sheet): |
|
try: |
|
return read_xlsb_with_pyxlsb(file, sheet) |
|
except Exception: |
|
try: |
|
app = xw.App(visible=False) |
|
book = app.books.open(file) |
|
sht = book.sheets[sheet] |
|
df = sht.used_range.options(pd.DataFrame, header=False, index=False).value |
|
book.close() |
|
app.quit() |
|
return df |
|
except Exception as e2: |
|
raise RuntimeError(f"xlwings failed: {e2}") |
|
|
|
|
|
def load_clean_df(file, sheet): |
|
df_raw = read_sheet_fallback(file, sheet) |
|
header_row = find_header_row(df_raw) |
|
df_raw.columns = df_raw.iloc[header_row] |
|
df = df_raw.drop(index=list(range(header_row + 1))) |
|
df.columns = [str(c).strip().replace("\xa0", " ") for c in df.columns] |
|
df = df.astype(str).apply(lambda col: col.str.strip()) |
|
return df |
|
|
|
|
|
def detect_dist_col(columns): |
|
for col in columns: |
|
if "dist" in col.lower() and "name" in col.lower(): |
|
return col |
|
raise ValueError("Dist_Name column not found.") |
|
|
|
|
|
def compare_dumps( |
|
old_file, |
|
new_file, |
|
mo_list, |
|
output_dir, |
|
|
|
): |
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
old_label = os.path.basename(old_file) |
|
new_label = os.path.basename(new_file) |
|
|
|
total_changes = 0 |
|
logs = [] |
|
|
|
for i, sheet_name in enumerate(mo_list): |
|
try: |
|
df_old = load_clean_df(old_file, sheet_name) |
|
df_new = load_clean_df(new_file, sheet_name) |
|
|
|
dist_col_old = detect_dist_col(df_old.columns) |
|
dist_col_new = detect_dist_col(df_new.columns) |
|
|
|
df_old = df_old[df_old[dist_col_old].notna()].set_index(dist_col_old) |
|
df_new = df_new[df_new[dist_col_new].notna()].set_index(dist_col_new) |
|
|
|
common = df_old.index.intersection(df_new.index) |
|
df_old_common = df_old.loc[common] |
|
df_new_common = df_new.loc[common] |
|
|
|
mask = (df_old_common != df_new_common) & ~( |
|
df_old_common.isna() & df_new_common.isna() |
|
) |
|
|
|
changes = [] |
|
for dist in mask.index: |
|
for param in mask.columns[mask.loc[dist]]: |
|
if param.strip().lower() == "file_name": |
|
continue |
|
|
|
changes.append( |
|
{ |
|
"Dist_Name": dist, |
|
"Parameter": param, |
|
old_label: df_old_common.loc[dist, param], |
|
new_label: df_new_common.loc[dist, param], |
|
} |
|
) |
|
|
|
df_changes = pd.DataFrame(changes) |
|
if not df_changes.empty: |
|
output_path = os.path.join(output_dir, f"{sheet_name}_differences.xlsx") |
|
df_changes.to_excel(output_path, index=False) |
|
logs.append(f"{len(df_changes)} changes in {sheet_name}") |
|
total_changes += len(df_changes) |
|
else: |
|
logs.append(f"No changes in {sheet_name}") |
|
|
|
except Exception as e: |
|
logs.append(f"Error in {sheet_name}: {e}") |
|
|
|
|
|
|
|
|
|
return total_changes, logs |
|
|
|
|
|
|
|
|
|
st.title("π Dump Compare Tool") |
|
|
|
old_file = st.file_uploader("Upload Old Dump (.xlsb)", type=["xlsb"], key="old") |
|
new_file = st.file_uploader("Upload New Dump (.xlsb)", type=["xlsb"], key="new") |
|
|
|
|
|
common_sheets: list[str] = [] |
|
selected_sheets: list[str] = [] |
|
|
|
if old_file and new_file: |
|
import tempfile as _tmp |
|
|
|
from pyxlsb import open_workbook as _open_wb |
|
|
|
def _get_sheet_names(uploaded_file) -> list[str]: |
|
"""Return sheet names from an `st.uploaded_file` object.""" |
|
with _tmp.NamedTemporaryFile(delete=False, suffix=".xlsb") as tmp: |
|
tmp.write(uploaded_file.getvalue()) |
|
tmp_path = tmp.name |
|
try: |
|
with _open_wb(tmp_path) as wb: |
|
|
|
return list(wb.sheets) |
|
finally: |
|
os.remove(tmp_path) |
|
|
|
common_sheets = sorted( |
|
set(_get_sheet_names(old_file)).intersection(_get_sheet_names(new_file)) |
|
) |
|
|
|
if common_sheets: |
|
selected_sheets = st.multiselect( |
|
"MO Sheet Names (choose one or more)", |
|
common_sheets, |
|
default=common_sheets[:1], |
|
) |
|
else: |
|
st.warning("No common sheet names found between the two files.") |
|
output_dir = "comparison_output" |
|
|
|
if st.button("Run Comparison", type="primary", use_container_width=True): |
|
if not all([old_file, new_file]) or not selected_sheets: |
|
st.warning("Please upload both files and select at least one common sheet.") |
|
else: |
|
mo_list = selected_sheets |
|
|
|
old_file.seek(0) |
|
new_file.seek(0) |
|
with st.spinner("Comparing dumps..."): |
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
output_path = os.path.join(tmpdir, output_dir) |
|
old_path = os.path.join(tmpdir, "old.xlsb") |
|
new_path = os.path.join(tmpdir, "new.xlsb") |
|
|
|
with open(old_path, "wb") as f: |
|
f.write(old_file.read()) |
|
with open(new_path, "wb") as f: |
|
f.write(new_file.read()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
total, logs = compare_dumps(old_path, new_path, mo_list, output_path) |
|
|
|
st.success(f"β
Comparison completed. Total changes: {total}") |
|
|
|
|
|
shutil.make_archive(output_path, "zip", output_path) |
|
with open(f"{output_path}.zip", "rb") as f: |
|
st.download_button( |
|
"Download Results (.zip)", |
|
f, |
|
file_name="differences.zip", |
|
mime="application/zip", |
|
type="primary", |
|
on_click="ignore", |
|
) |
|
|