ammu-order / app.py
leadingbridge's picture
Update app.py
8c03e92 verified
raw
history blame
11.3 kB
import gradio as gr
from openpyxl import load_workbook
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.utils import get_column_letter
from huggingface_hub import hf_hub_download
import tempfile
import os, re
from collections import defaultdict
from datetime import datetime
HF_DATASET_REPO = "leadingbridge/ammu"
TEMPLATE_FILENAME = "AMMU-order-form-template.xlsx"
LOCAL_TEMPLATE_FALLBACK = os.path.join(os.path.dirname(__file__), TEMPLATE_FILENAME)
def _normalize_power(val):
if val is None:
return None
s = str(val).strip()
if s == "":
return None
if s.lower() in {"plano", "piano", "0", "0.0", "0.00", "000"}:
return "0.00"
m = re.search(r"(-?\d+(?:\.\d+)?)", s.replace(",", ""))
if not m:
return None
try:
num = float(m.group(1))
except ValueError:
return None
return f"{num:.2f}"
def _power_to_triplet_digits(power_str: str) -> str:
if power_str is None:
return None
s = power_str.strip().lstrip("+").replace("-", "")
if "." in s:
whole, frac = s.split(".", 1)
frac = (frac + "00")[:2]
else:
whole, frac = s, "00"
digits = f"{whole}{frac}"
return digits.zfill(3)
def _find_header_row(ws: Worksheet, required_headers):
req = {h.lower() for h in required_headers}
for r in range(1, 11):
header_map = {}
present = set()
for c in range(1, ws.max_column + 1):
v = ws.cell(row=r, column=c).value
if isinstance(v, str) and v.strip():
key = v.strip().lower()
header_map[key] = c
if key in req:
present.add(key)
if req.issubset(present):
return r, header_map
raise ValueError(f"Could not locate a header row containing: {required_headers}")
def _download_template():
if os.path.exists(LOCAL_TEMPLATE_FALLBACK):
return LOCAL_TEMPLATE_FALLBACK
return hf_hub_download(
repo_id=HF_DATASET_REPO, filename=TEMPLATE_FILENAME, repo_type="dataset"
)
def _auto_fit_columns(ws: Worksheet, max_col: int, max_row: int):
# Width = longest string in column + small padding
for c in range(1, max_col + 1):
max_len = 0
col_letter = get_column_letter(c)
for r in range(1, max_row + 1):
val = ws.cell(row=r, column=c).value
if val is not None:
max_len = max(max_len, len(str(val)))
ws.column_dimensions[col_letter].width = max(10, max_len + 2)
def process(input_file):
try:
if input_file is None:
return None, "Please upload an Excel file first."
# --- INPUT: detect headers by name ---
wb_in = load_workbook(input_file.name, data_only=True)
ws_in = wb_in.active
# Copy ALL input (for "raw data" sheet later)
in_max_row = ws_in.max_row
in_max_col = ws_in.max_column
header_row_idx, header_map = _find_header_row(
ws_in, {"SKU", "Product Option Value", "Quantity"}
)
col_sku = header_map["sku"]
col_pov = header_map["product option value"]
col_qty = header_map["quantity"]
header_values = [ws_in.cell(row=header_row_idx, column=c).value for c in range(1, in_max_col + 1)]
entries = []
rows_scanned = 0
for r in range(header_row_idx + 1, ws_in.max_row + 1):
row_values = [ws_in.cell(row=r, column=c).value for c in range(1, in_max_col + 1)]
sku = ws_in.cell(row=r, column=col_sku).value
pov = ws_in.cell(row=r, column=col_pov).value
qty = ws_in.cell(row=r, column=col_qty).value
if sku is None and pov is None and qty is None:
continue
rows_scanned += 1
power = _normalize_power(pov)
try:
q = int(qty) if qty is not None and str(qty).strip() != "" else 0
except Exception:
try:
q = int(float(qty))
except Exception:
q = 0
entries.append({
"sku": (str(sku).strip() if sku is not None else None),
"power": power,
"qty": q,
"row_values": row_values
})
# --- OUTPUT template ---
template_path = _download_template()
wb_out = load_workbook(template_path)
ws_out = wb_out.active
mysku_header_row = None
mysku_col_idx = None
power_label_row = None
power_col_map = {}
triplet_row = None
triplet_col_map = {}
for r in range(1, 11):
row_vals = [ws_out.cell(row=r, column=c).value for c in range(1, ws_out.max_column + 1)]
# "MY SKU" header
for c, v in enumerate(row_vals, start=1):
if isinstance(v, str) and v.strip().lower() == "my sku":
mysku_header_row = r
mysku_col_idx = c
# textual power labels
labels = {}
for c, v in enumerate(row_vals, start=1):
if isinstance(v, str):
nv = _normalize_power(v)
if nv is not None and re.match(r"^-?\d+\.\d{2}$", v.strip()):
labels[nv] = c
if len(labels) >= 5 and power_label_row is None:
power_label_row = r
power_col_map = labels
# numeric triplets
trip = {}
for c, v in enumerate(row_vals, start=1):
if isinstance(v, str) and re.fullmatch(r"\d{2,3}", v.strip()):
trip[v.strip()] = c
if len(trip) >= 5 and triplet_row is None:
triplet_row = r
triplet_col_map = trip
if mysku_header_row is None or mysku_col_idx is None:
raise ValueError("Could not find the 'MY SKU' header in the template (rows 1–10).")
if not (power_label_row or triplet_row):
raise ValueError("Could not find power-column headers in the template (rows 1–10).")
# Build SKU -> row map
sku_to_row = {}
for r in range(mysku_header_row + 1, ws_out.max_row + 1):
val = ws_out.cell(row=r, column=mysku_col_idx).value
if val is None:
continue
sku_to_row[str(val).strip()] = r
# Classify entries and aggregate matches
agg = defaultdict(int)
unmatched_rows = []
for rec in entries:
sku, power, qty = rec["sku"], rec["power"], rec["qty"]
if not sku or qty <= 0 or power is None:
unmatched_rows.append(rec["row_values"])
continue
row_idx = sku_to_row.get(sku)
if row_idx is None:
unmatched_rows.append(rec["row_values"])
continue
col_idx = power_col_map.get(power) if power_col_map else None
if col_idx is None and triplet_col_map:
key = _power_to_triplet_digits(power)
col_idx = triplet_col_map.get(key)
if col_idx is None:
unmatched_rows.append(rec["row_values"])
continue
agg[(sku, power)] += qty
# Write aggregated matches
written_count = 0
for (sku, power), qty in agg.items():
row_idx = sku_to_row.get(sku)
if row_idx is None:
continue
col_idx = power_col_map.get(power) if power_col_map else None
if col_idx is None and triplet_col_map:
key = _power_to_triplet_digits(power)
col_idx = triplet_col_map.get(key)
if col_idx is None:
continue
current = ws_out.cell(row=row_idx, column=col_idx).value
try:
current_val = int(current) if current is not None and str(current).strip() != "" else 0
except Exception:
try:
current_val = int(float(current))
except Exception:
current_val = 0
ws_out.cell(row=row_idx, column=col_idx).value = current_val + int(qty)
written_count += 1
# --- "additional order" tab with unmatched rows ---
add_name = "additional order"
if add_name in wb_out.sheetnames:
wb_out.remove(wb_out[add_name])
ws_add = wb_out.create_sheet(title=add_name)
# header + rows
for c, val in enumerate(header_values, start=1):
ws_add.cell(row=1, column=c).value = val
for i, row_vals in enumerate(unmatched_rows, start=2):
for c, val in enumerate(row_vals, start=1):
ws_add.cell(row=i, column=c).value = val
# Auto-fit additional order
_auto_fit_columns(ws_add, max_col=in_max_col, max_row=len(unmatched_rows) + 1)
# --- "raw data" tab with ALL input copied verbatim ---
raw_name = "raw data"
if raw_name in wb_out.sheetnames:
wb_out.remove(wb_out[raw_name])
ws_raw = wb_out.create_sheet(title=raw_name)
for r in range(1, in_max_row + 1):
for c in range(1, in_max_col + 1):
ws_raw.cell(row=r, column=c).value = ws_in.cell(row=r, column=c).value
# Auto-fit raw data
_auto_fit_columns(ws_raw, max_col=in_max_col, max_row=in_max_row)
# --- Save output with date-based filename ---
yymmdd = datetime.now().strftime("%y%m%d")
tmpdir = tempfile.mkdtemp()
out_filename = f"AMMU-Order-Form-Leading-Bridge-{yymmdd}.xlsx"
out_path = os.path.join(tmpdir, out_filename)
wb_out.save(out_path)
log_lines = [
f"Rows scanned in input: {rows_scanned}",
f"Unique matched (SKU, power) pairs aggregated: {len(agg)}",
f"Entries written into template: {written_count}",
f"Unmatched rows copied to 'additional order': {len(unmatched_rows)}",
f"Raw data sheet rows x cols: {in_max_row} x {in_max_col}",
f"Output file: {out_filename}",
]
log = "\n".join(log_lines)
return out_path, log
except Exception as e:
return None, f"Error: {e}"
with gr.Blocks(title="AMMU Order Form Filler") as demo:
gr.Markdown(
"### AMMU Order Form Filler\n"
"• Uses **MY SKU** column to map rows\n"
"• Matches power columns (text like `-1.25` or fallback triplets like `125`)\n"
"• Aggregates quantities for matched lines\n"
"• Copies **unmatched lines** to **`additional order`** (auto-fit columns)\n"
"• Copies **entire input** to **`raw data`** (auto-fit columns)\n"
"• Exports as **AMMU-Order-Form-Leading-Bridge-YYMMDD.xlsx**"
)
with gr.Row():
in_file = gr.File(label="Upload input Excel (.xlsx)", file_types=[".xlsx"])
with gr.Row():
run_btn = gr.Button("Process")
with gr.Row():
out_file = gr.File(label="Download filled template (.xlsx)")
log_box = gr.Textbox(label="Log", lines=12)
run_btn.click(fn=process, inputs=in_file, outputs=[out_file, log_box])
if __name__ == "__main__":
demo.launch()