import os import zipfile from io import BytesIO import pandas as pd import streamlit as st # === Fonctions === def find_header_row(df, keyword="Dist_Name"): for i in range(min(20, len(df))): row = df.iloc[i].astype(str).str.strip().str.lower() if any(keyword.lower() in str(cell) for cell in row): return i raise ValueError(f"No row with '{keyword}' found.") def read_sheet_fallback(file_bytes, sheet): file_bytes.seek(0) return pd.read_excel(file_bytes, sheet_name=sheet, header=None, engine="calamine") def load_clean_df(file_bytes, sheet): df_raw = read_sheet_fallback(file_bytes, sheet) header_row = find_header_row(df_raw) df_raw.columns = df_raw.iloc[header_row] df = df_raw.drop(index=list(range(header_row + 1))) df.columns = [str(c).strip().replace("\xa0", " ") for c in df.columns] df = df.astype(str).apply(lambda col: col.str.strip()) return df def detect_dist_col(columns): for col in columns: if "dist" in col.lower() and "name" in col.lower(): return col raise ValueError("Dist_Name column not found.") # === Interface Streamlit === st.title("📊 Dump Compare Tool") st.markdown( ":blue[**Upload the old and new dumps, then input the object class (comma-separated) to compare**]" ) old_file = st.file_uploader("Upload Old Dump (.xlsb)", type=["xlsb"], key="old") new_file = st.file_uploader("Upload New Dump (.xlsb)", type=["xlsb"], key="new") sheet_list_input = st.text_input( "Enter object class (comma-separated)", placeholder="e.g. BCF, BTS, CELL" ) if st.button("Run Comparison", type="primary", use_container_width=True): if not all([old_file, new_file, sheet_list_input.strip()]): st.warning("Please upload both files and provide at least one sheet name.") else: sheet_names = [s.strip() for s in sheet_list_input.split(",") if s.strip()] old_bytes = BytesIO(old_file.read()) new_bytes = BytesIO(new_file.read()) logs = [] total = 0 all_results = {} for sheet in sheet_names: try: df_old = load_clean_df(old_bytes, sheet) old_bytes.seek(0) df_new = load_clean_df(new_bytes, sheet) new_bytes.seek(0) dist_col_old = detect_dist_col(df_old.columns) dist_col_new = detect_dist_col(df_new.columns) df_old = df_old[df_old[dist_col_old].notna()].set_index(dist_col_old) df_new = df_new[df_new[dist_col_new].notna()].set_index(dist_col_new) common = df_old.index.intersection(df_new.index) df_old_common = df_old.loc[common] df_new_common = df_new.loc[common] mask = (df_old_common != df_new_common) & ~( df_old_common.isna() & df_new_common.isna() ) changes = [] for dist in mask.index: for param in mask.columns[mask.loc[dist]]: if param.strip().lower() == "file_name": continue changes.append( { "Dist_Name": dist, "Parameter": param, os.path.basename(old_file.name): df_old_common.loc[ dist, param ], os.path.basename(new_file.name): df_new_common.loc[ dist, param ], } ) df_changes = pd.DataFrame(changes) if not df_changes.empty: all_results[sheet] = df_changes logs.append(f"{len(df_changes)} changes in '{sheet}'") total += len(df_changes) else: logs.append(f"No changes in '{sheet}'") except Exception as e: logs.append(f"❌ Error in '{sheet}': {e}") st.success(f"✅ Comparison completed. Total changes: {total}") for log in logs: st.write(log) if all_results: output_buffer = BytesIO() with zipfile.ZipFile(output_buffer, mode="w") as zf: for sheet, df in all_results.items(): file_buffer = BytesIO() df.to_excel(file_buffer, index=False) zf.writestr(f"{sheet}_differences.xlsx", file_buffer.getvalue()) st.download_button( "Download Results (.zip)", data=output_buffer.getvalue(), file_name="differences.zip", mime="application/zip", type="primary", on_click="ignore", )