Spaces:
Running
Running
import gradio as gr | |
from openpyxl import load_workbook | |
from openpyxl.worksheet.worksheet import Worksheet | |
from openpyxl.utils import get_column_letter | |
from huggingface_hub import hf_hub_download | |
import tempfile | |
import os, re | |
from collections import defaultdict | |
from datetime import datetime | |
HF_DATASET_REPO = "leadingbridge/ammu" | |
TEMPLATE_FILENAME = "AMMU-order-form-template.xlsx" | |
LOCAL_TEMPLATE_FALLBACK = os.path.join(os.path.dirname(__file__), TEMPLATE_FILENAME) | |
def _normalize_power(val): | |
if val is None: | |
return None | |
s = str(val).strip() | |
if s == "": | |
return None | |
if s.lower() in {"plano", "piano", "0", "0.0", "0.00", "000"}: | |
return "0.00" | |
m = re.search(r"(-?\d+(?:\.\d+)?)", s.replace(",", "")) | |
if not m: | |
return None | |
try: | |
num = float(m.group(1)) | |
except ValueError: | |
return None | |
return f"{num:.2f}" | |
def _power_to_triplet_digits(power_str: str) -> str: | |
if power_str is None: | |
return None | |
s = power_str.strip().lstrip("+").replace("-", "") | |
if "." in s: | |
whole, frac = s.split(".", 1) | |
frac = (frac + "00")[:2] | |
else: | |
whole, frac = s, "00" | |
digits = f"{whole}{frac}" | |
return digits.zfill(3) | |
def _find_header_row(ws: Worksheet, required_headers): | |
req = {h.lower() for h in required_headers} | |
for r in range(1, 11): | |
header_map = {} | |
present = set() | |
for c in range(1, ws.max_column + 1): | |
v = ws.cell(row=r, column=c).value | |
if isinstance(v, str) and v.strip(): | |
key = v.strip().lower() | |
header_map[key] = c | |
if key in req: | |
present.add(key) | |
if req.issubset(present): | |
return r, header_map | |
raise ValueError(f"Could not locate a header row containing: {required_headers}") | |
def _download_template(): | |
if os.path.exists(LOCAL_TEMPLATE_FALLBACK): | |
return LOCAL_TEMPLATE_FALLBACK | |
return hf_hub_download( | |
repo_id=HF_DATASET_REPO, filename=TEMPLATE_FILENAME, repo_type="dataset" | |
) | |
def _auto_fit_columns(ws: Worksheet, max_col: int, max_row: int): | |
# Width = longest string in column + small padding | |
for c in range(1, max_col + 1): | |
max_len = 0 | |
col_letter = get_column_letter(c) | |
for r in range(1, max_row + 1): | |
val = ws.cell(row=r, column=c).value | |
if val is not None: | |
max_len = max(max_len, len(str(val))) | |
ws.column_dimensions[col_letter].width = max(10, max_len + 2) | |
def process(input_file): | |
try: | |
if input_file is None: | |
return None, "Please upload an Excel file first." | |
# --- INPUT: detect headers by name --- | |
wb_in = load_workbook(input_file.name, data_only=True) | |
ws_in = wb_in.active | |
# Copy ALL input (for "raw data" sheet later) | |
in_max_row = ws_in.max_row | |
in_max_col = ws_in.max_column | |
header_row_idx, header_map = _find_header_row( | |
ws_in, {"SKU", "Product Option Value", "Quantity"} | |
) | |
col_sku = header_map["sku"] | |
col_pov = header_map["product option value"] | |
col_qty = header_map["quantity"] | |
header_values = [ws_in.cell(row=header_row_idx, column=c).value for c in range(1, in_max_col + 1)] | |
entries = [] | |
rows_scanned = 0 | |
for r in range(header_row_idx + 1, ws_in.max_row + 1): | |
row_values = [ws_in.cell(row=r, column=c).value for c in range(1, in_max_col + 1)] | |
sku = ws_in.cell(row=r, column=col_sku).value | |
pov = ws_in.cell(row=r, column=col_pov).value | |
qty = ws_in.cell(row=r, column=col_qty).value | |
if sku is None and pov is None and qty is None: | |
continue | |
rows_scanned += 1 | |
power = _normalize_power(pov) | |
try: | |
q = int(qty) if qty is not None and str(qty).strip() != "" else 0 | |
except Exception: | |
try: | |
q = int(float(qty)) | |
except Exception: | |
q = 0 | |
entries.append({ | |
"sku": (str(sku).strip() if sku is not None else None), | |
"power": power, | |
"qty": q, | |
"row_values": row_values | |
}) | |
# --- OUTPUT template --- | |
template_path = _download_template() | |
wb_out = load_workbook(template_path) | |
ws_out = wb_out.active | |
mysku_header_row = None | |
mysku_col_idx = None | |
power_label_row = None | |
power_col_map = {} | |
triplet_row = None | |
triplet_col_map = {} | |
for r in range(1, 11): | |
row_vals = [ws_out.cell(row=r, column=c).value for c in range(1, ws_out.max_column + 1)] | |
# "MY SKU" header | |
for c, v in enumerate(row_vals, start=1): | |
if isinstance(v, str) and v.strip().lower() == "my sku": | |
mysku_header_row = r | |
mysku_col_idx = c | |
# textual power labels | |
labels = {} | |
for c, v in enumerate(row_vals, start=1): | |
if isinstance(v, str): | |
nv = _normalize_power(v) | |
if nv is not None and re.match(r"^-?\d+\.\d{2}$", v.strip()): | |
labels[nv] = c | |
if len(labels) >= 5 and power_label_row is None: | |
power_label_row = r | |
power_col_map = labels | |
# numeric triplets | |
trip = {} | |
for c, v in enumerate(row_vals, start=1): | |
if isinstance(v, str) and re.fullmatch(r"\d{2,3}", v.strip()): | |
trip[v.strip()] = c | |
if len(trip) >= 5 and triplet_row is None: | |
triplet_row = r | |
triplet_col_map = trip | |
if mysku_header_row is None or mysku_col_idx is None: | |
raise ValueError("Could not find the 'MY SKU' header in the template (rows 1–10).") | |
if not (power_label_row or triplet_row): | |
raise ValueError("Could not find power-column headers in the template (rows 1–10).") | |
# Build SKU -> row map | |
sku_to_row = {} | |
for r in range(mysku_header_row + 1, ws_out.max_row + 1): | |
val = ws_out.cell(row=r, column=mysku_col_idx).value | |
if val is None: | |
continue | |
sku_to_row[str(val).strip()] = r | |
# Classify entries and aggregate matches | |
agg = defaultdict(int) | |
unmatched_rows = [] | |
for rec in entries: | |
sku, power, qty = rec["sku"], rec["power"], rec["qty"] | |
if not sku or qty <= 0 or power is None: | |
unmatched_rows.append(rec["row_values"]) | |
continue | |
row_idx = sku_to_row.get(sku) | |
if row_idx is None: | |
unmatched_rows.append(rec["row_values"]) | |
continue | |
col_idx = power_col_map.get(power) if power_col_map else None | |
if col_idx is None and triplet_col_map: | |
key = _power_to_triplet_digits(power) | |
col_idx = triplet_col_map.get(key) | |
if col_idx is None: | |
unmatched_rows.append(rec["row_values"]) | |
continue | |
agg[(sku, power)] += qty | |
# Write aggregated matches | |
written_count = 0 | |
for (sku, power), qty in agg.items(): | |
row_idx = sku_to_row.get(sku) | |
if row_idx is None: | |
continue | |
col_idx = power_col_map.get(power) if power_col_map else None | |
if col_idx is None and triplet_col_map: | |
key = _power_to_triplet_digits(power) | |
col_idx = triplet_col_map.get(key) | |
if col_idx is None: | |
continue | |
current = ws_out.cell(row=row_idx, column=col_idx).value | |
try: | |
current_val = int(current) if current is not None and str(current).strip() != "" else 0 | |
except Exception: | |
try: | |
current_val = int(float(current)) | |
except Exception: | |
current_val = 0 | |
ws_out.cell(row=row_idx, column=col_idx).value = current_val + int(qty) | |
written_count += 1 | |
# --- "additional order" tab with unmatched rows --- | |
add_name = "additional order" | |
if add_name in wb_out.sheetnames: | |
wb_out.remove(wb_out[add_name]) | |
ws_add = wb_out.create_sheet(title=add_name) | |
# header + rows | |
for c, val in enumerate(header_values, start=1): | |
ws_add.cell(row=1, column=c).value = val | |
for i, row_vals in enumerate(unmatched_rows, start=2): | |
for c, val in enumerate(row_vals, start=1): | |
ws_add.cell(row=i, column=c).value = val | |
# Auto-fit additional order | |
_auto_fit_columns(ws_add, max_col=in_max_col, max_row=len(unmatched_rows) + 1) | |
# --- "raw data" tab with ALL input copied verbatim --- | |
raw_name = "raw data" | |
if raw_name in wb_out.sheetnames: | |
wb_out.remove(wb_out[raw_name]) | |
ws_raw = wb_out.create_sheet(title=raw_name) | |
for r in range(1, in_max_row + 1): | |
for c in range(1, in_max_col + 1): | |
ws_raw.cell(row=r, column=c).value = ws_in.cell(row=r, column=c).value | |
# Auto-fit raw data | |
_auto_fit_columns(ws_raw, max_col=in_max_col, max_row=in_max_row) | |
# --- Save output with date-based filename --- | |
yymmdd = datetime.now().strftime("%y%m%d") | |
tmpdir = tempfile.mkdtemp() | |
out_filename = f"AMMU-Order-Form-Leading-Bridge-{yymmdd}.xlsx" | |
out_path = os.path.join(tmpdir, out_filename) | |
wb_out.save(out_path) | |
log_lines = [ | |
f"Rows scanned in input: {rows_scanned}", | |
f"Unique matched (SKU, power) pairs aggregated: {len(agg)}", | |
f"Entries written into template: {written_count}", | |
f"Unmatched rows copied to 'additional order': {len(unmatched_rows)}", | |
f"Raw data sheet rows x cols: {in_max_row} x {in_max_col}", | |
f"Output file: {out_filename}", | |
] | |
log = "\n".join(log_lines) | |
return out_path, log | |
except Exception as e: | |
return None, f"Error: {e}" | |
with gr.Blocks(title="AMMU Order Form Filler") as demo: | |
gr.Markdown( | |
"### AMMU Order Form Filler\n" | |
"• Uses **MY SKU** column to map rows\n" | |
"• Matches power columns (text like `-1.25` or fallback triplets like `125`)\n" | |
"• Aggregates quantities for matched lines\n" | |
"• Copies **unmatched lines** to **`additional order`** (auto-fit columns)\n" | |
"• Copies **entire input** to **`raw data`** (auto-fit columns)\n" | |
"• Exports as **AMMU-Order-Form-Leading-Bridge-YYMMDD.xlsx**" | |
) | |
with gr.Row(): | |
in_file = gr.File(label="Upload input Excel (.xlsx)", file_types=[".xlsx"]) | |
with gr.Row(): | |
run_btn = gr.Button("Process") | |
with gr.Row(): | |
out_file = gr.File(label="Download filled template (.xlsx)") | |
log_box = gr.Textbox(label="Log", lines=12) | |
run_btn.click(fn=process, inputs=in_file, outputs=[out_file, log_box]) | |
if __name__ == "__main__": | |
demo.launch() | |