Spaces:
Runtime error
Runtime error
"""NetCom β WooCommerce transformer (Try 2 schema β 100-parallel + de-dupe, pandas fix) | |
====================================================================================== | |
*Accept CSV **or** Excel schedule files and output the WooCommerce CSV.* | |
New since the last paste | |
------------------------ | |
* Fix for older pandas: move `include_groups=False` from `.groupby()` to `.apply()`. | |
* Everything else (cache names, concurrency cap, in-flight de-duplication) is unchanged. | |
""" | |
from __future__ import annotations | |
import asyncio | |
import hashlib | |
import json | |
import os | |
import tempfile | |
from io import BytesIO | |
from pathlib import Path | |
import gradio as gr | |
import gradio_client.utils | |
import openai | |
import pandas as pd | |
# -------- Gradio bool-schema hot-patch -------------------------------------- | |
_original = gradio_client.utils._json_schema_to_python_type | |
def _fixed_json_schema_to_python_type(schema, defs=None): # type: ignore | |
if isinstance(schema, bool): | |
return "any" | |
return _original(schema, defs) | |
gradio_client.utils._json_schema_to_python_type = _fixed_json_schema_to_python_type # type: ignore | |
# -------- Tiny disk cache ---------------------------------------------------- | |
CACHE_DIR = Path("ai_response_cache") | |
CACHE_DIR.mkdir(exist_ok=True) | |
def _cache_path(p: str) -> Path: | |
return CACHE_DIR / f"{hashlib.md5(p.encode()).hexdigest()}.json" | |
def _get_cached(p: str) -> str | None: | |
try: | |
return json.loads(_cache_path(p).read_text("utf-8"))["response"] | |
except Exception: | |
return None | |
def _set_cache(p: str, r: str) -> None: | |
try: | |
_cache_path(p).write_text(json.dumps({"prompt": p, "response": r}), "utf-8") | |
except Exception: | |
pass | |
# -------- Async GPT helpers -------------------------------------------------- | |
_SEM = asyncio.Semaphore(100) # β€100 concurrent OpenAI calls | |
_inflight: dict[str, asyncio.Future] = {} # prompt β Future | |
async def _gpt_async(client: openai.AsyncOpenAI, prompt: str) -> str: | |
"""Single LLM call with disk cache, concurrency cap, and de-duplication.""" | |
cached = _get_cached(prompt) | |
if cached is not None: | |
return cached | |
# De-duplicate identical prompts already in flight | |
running = _inflight.get(prompt) | |
if running is not None: | |
return await running | |
loop = asyncio.get_running_loop() | |
async def _call_api() -> str: | |
async with _SEM: # concurrency limiter | |
try: | |
msg = await client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0, | |
) | |
text = msg.choices[0].message.content | |
except Exception as exc: | |
text = f"Error: {exc}" | |
_set_cache(prompt, text) | |
return text | |
task = loop.create_task(_call_api()) | |
_inflight[prompt] = task | |
try: | |
return await task | |
finally: | |
_inflight.pop(prompt, None) | |
async def _batch_async(lst: list[str], instruction: str, client: openai.AsyncOpenAI) -> list[str]: | |
"""Vectorised helper β returns an output list matching *lst* length.""" | |
out: list[str] = ["" for _ in lst] | |
idx, prompts = [], [] | |
for i, txt in enumerate(lst): | |
if isinstance(txt, str) and txt.strip(): | |
idx.append(i) | |
prompts.append(f"{instruction}\n\nText: {txt}") | |
if not prompts: | |
return out | |
responses = await asyncio.gather(*[_gpt_async(client, p) for p in prompts]) | |
for j, val in enumerate(responses): | |
out[idx[j]] = val | |
return out | |
# -------- Core converter ----------------------------------------------------- | |
DEFAULT_PREREQ = ( | |
"No specific prerequisites are required for this course. Basic computer literacy and " | |
"familiarity with fundamental concepts in the subject area are recommended for the best " | |
"learning experience." | |
) | |
def _read(path: str) -> pd.DataFrame: | |
if path.lower().endswith((".xlsx", ".xls")): | |
return pd.read_excel(path) | |
return pd.read_csv(path, encoding="latin1") | |
async def _enrich_dataframe( | |
df: pd.DataFrame, dcol: str, ocol: str, pcol: str, acol: str | |
) -> tuple[list[str], list[str], list[str], list[str], list[str]]: | |
"""Run all LLM batches concurrently and return the five enrichment columns.""" | |
async with openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) as client: | |
sdesc, ldesc, fobj, fout = await asyncio.gather( | |
_batch_async( | |
df.get(dcol, "").fillna("").tolist(), | |
"Create a concise 250-character summary of this course description:", | |
client, | |
), | |
_batch_async( | |
df.get(dcol, "").fillna("").tolist(), | |
"Condense this description to a maximum of 750 characters in paragraph format, with clean formatting:", | |
client, | |
), | |
_batch_async( | |
df.get(ocol, "").fillna("").tolist(), | |
"Format these objectives into a bullet list with clean formatting. Start each bullet with 'β’ ':", | |
client, | |
), | |
_batch_async( | |
df.get(acol, "").fillna("").tolist(), | |
"Format this agenda into a bullet list with clean formatting. Start each bullet with 'β’ ':", | |
client, | |
), | |
) | |
# Prerequisites (some rows empty β default text) | |
prereq_raw = df.get(pcol, "").fillna("").tolist() | |
fpre: list[str] = [] | |
for req in prereq_raw: | |
if not str(req).strip(): | |
fpre.append(DEFAULT_PREREQ) | |
else: | |
formatted = await _batch_async( | |
[req], | |
"Format these prerequisites into a bullet list with clean formatting. Start each bullet with 'β’ ':", | |
client, | |
) | |
fpre.append(formatted[0]) | |
return sdesc, ldesc, fobj, fout, fpre | |
def convert(path: str) -> BytesIO: | |
logos = { | |
"Amazon Web Services": "/wp-content/uploads/2025/04/aws.png", | |
"Cisco": "/wp-content/uploads/2025/04/cisco-e1738593292198-1.webp", | |
"Microsoft": "/wp-content/uploads/2025/04/Microsoft-e1737494120985-1.png", | |
"Google Cloud": "/wp-content/uploads/2025/04/Google_Cloud.png", | |
"EC Council": "/wp-content/uploads/2025/04/Ec_Council.png", | |
"ITIL": "/wp-content/uploads/2025/04/ITIL.webp", | |
"PMI": "/wp-content/uploads/2025/04/PMI.png", | |
"Comptia": "/wp-content/uploads/2025/04/Comptia.png", | |
"Autodesk": "/wp-content/uploads/2025/04/autodesk.png", | |
"ISC2": "/wp-content/uploads/2025/04/ISC2.png", | |
"AICerts": "/wp-content/uploads/2025/04/aicerts-logo-1.png", | |
} | |
df = _read(path) | |
df.columns = df.columns.str.strip() | |
first_col = lambda *candidates: next((c for c in candidates if c in df.columns), None) | |
dcol = first_col("Description", "Decription") | |
ocol = first_col("Objectives", "objectives") | |
pcol = first_col("RequiredPrerequisite", "Required Pre-requisite") | |
acol = first_col("Outline") | |
dur = first_col("Duration") or "Duration" | |
sid = first_col("Course SID", "Course SID") | |
if dur not in df.columns: | |
df[dur] = "" # ensure Duration column exists | |
# ---------- LLM enrichment (async) ------------------------------------- | |
sdesc, ldesc, fobj, fout, fpre = asyncio.run( | |
_enrich_dataframe(df, dcol, ocol, pcol, acol) | |
) | |
df["Short_Description"] = sdesc | |
df["Condensed_Description"] = ldesc | |
df["Formatted_Objectives"] = fobj | |
df["Formatted_Agenda"] = fout | |
df["Formatted_Prerequisites"] = fpre | |
# ---------- Schedule aggregation -------------------------------------- | |
df["Course Start Date"] = pd.to_datetime(df["Course Start Date"], errors="coerce") | |
df["Date_fmt"] = df["Course Start Date"].dt.strftime("%-m/%-d/%Y") | |
dsorted = df.sort_values(["Course ID", "Course Start Date"]) | |
d_agg = ( | |
dsorted.groupby("Course ID")["Date_fmt"] | |
.apply(lambda s: ",".join(s.dropna().unique())) | |
.reset_index(name="Dates") | |
) | |
t_agg = ( | |
dsorted.groupby("Course ID", group_keys=False) | |
.apply( | |
lambda g: ",".join( | |
f"{st}-{et} {tz}" | |
for st, et, tz in zip( | |
g["Course Start Time"], g["Course End Time"], g["Time Zone"] | |
) | |
), | |
include_groups=False, # <- moved here | |
) | |
.reset_index(name="Times") | |
) | |
parents = dsorted.drop_duplicates("Course ID").merge(d_agg).merge(t_agg) | |
# ---------- Parent / child product rows -------------------------------- | |
parent = pd.DataFrame( | |
{ | |
"Type": "variable", | |
"SKU": parents["Course ID"], | |
"Name": parents["Course Name"], | |
"Published": 1, | |
"Visibility in catalog": "visible", | |
"Short description": parents["Short_Description"], | |
"Description": parents["Condensed_Description"], | |
"Tax status": "taxable", | |
"In stock?": 1, | |
"Stock": 1, | |
"Sold individually?": 1, | |
"Regular price": parents["SRP Pricing"].replace("[\\$,]", "", regex=True), | |
"Categories": "courses", | |
"Images": parents["Vendor"].map(logos).fillna(""), | |
"Parent": "", | |
"Brands": parents["Vendor"], | |
"Attribute 1 name": "Date", | |
"Attribute 1 value(s)": parents["Dates"], | |
"Attribute 1 visible": "visible", | |
"Attribute 1 global": 1, | |
"Attribute 2 name": "Location", | |
"Attribute 2 value(s)": "Virtual", | |
"Attribute 2 visible": "visible", | |
"Attribute 2 global": 1, | |
"Attribute 3 name": "Time", | |
"Attribute 3 value(s)": parents["Times"], | |
"Attribute 3 visible": "visible", | |
"Attribute 3 global": 1, | |
"Meta: outline": parents["Formatted_Agenda"], | |
"Meta: days": parents[dur], | |
"Meta: location": "Virtual", | |
"Meta: overview": parents["Target Audience"], | |
"Meta: objectives": parents["Formatted_Objectives"], | |
"Meta: prerequisites": parents["Formatted_Prerequisites"], | |
"Meta: agenda": parents["Formatted_Agenda"], | |
} | |
) | |
child = pd.DataFrame( | |
{ | |
"Type": "variation, virtual", | |
"SKU": dsorted[sid].astype(str).str.strip(), | |
"Name": dsorted["Course Name"], | |
"Published": 1, | |
"Visibility in catalog": "visible", | |
"Short description": dsorted["Short_Description"], | |
"Description": dsorted["Condensed_Description"], | |
"Tax status": "taxable", | |
"In stock?": 1, | |
"Stock": 1, | |
"Sold individually?": 1, | |
"Regular price": dsorted["SRP Pricing"].replace("[\\$,]", "", regex=True), | |
"Categories": "courses", | |
"Images": dsorted["Vendor"].map(logos).fillna(""), | |
"Parent": dsorted["Course ID"], | |
"Brands": dsorted["Vendor"], | |
"Attribute 1 name": "Date", | |
"Attribute 1 value(s)": dsorted["Date_fmt"], | |
"Attribute 1 visible": "visible", | |
"Attribute 1 global": 1, | |
"Attribute 2 name": "Location", | |
"Attribute 2 value(s)": "Virtual", | |
"Attribute 2 visible": "visible", | |
"Attribute 2 global": 1, | |
"Attribute 3 name": "Time", | |
"Attribute 3 value(s)": dsorted.apply( | |
lambda r: f"{r['Course Start Time']}-{r['Course End Time']} {r['Time Zone']}", | |
axis=1, | |
), | |
"Attribute 3 visible": "visible", | |
"Attribute 3 global": 1, | |
"Meta: outline": dsorted["Formatted_Agenda"], | |
"Meta: days": dsorted[dur], | |
"Meta: location": "Virtual", | |
"Meta: overview": dsorted["Target Audience"], | |
"Meta: objectives": dsorted["Formatted_Objectives"], | |
"Meta: prerequisites": dsorted["Formatted_Prerequisites"], | |
"Meta: agenda": dsorted["Formatted_Agenda"], | |
} | |
) | |
all_rows = pd.concat([parent, child], ignore_index=True) | |
order = [ | |
"Type", | |
"SKU", | |
"Name", | |
"Published", | |
"Visibility in catalog", | |
"Short description", | |
"Description", | |
"Tax status", | |
"In stock?", | |
"Stock", | |
"Sold individually?", | |
"Regular price", | |
"Categories", | |
"Images", | |
"Parent", | |
"Brands", | |
"Attribute 1 name", | |
"Attribute 1 value(s)", | |
"Attribute 1 visible", | |
"Attribute 1 global", | |
"Attribute 2 name", | |
"Attribute 2 value(s)", | |
"Attribute 2 visible", | |
"Attribute 2 global", | |
"Attribute 3 name", | |
"Attribute 3 value(s)", | |
"Attribute 3 visible", | |
"Attribute 3 global", | |
"Meta: outline", | |
"Meta: days", | |
"Meta: location", | |
"Meta: overview", | |
"Meta: objectives", | |
"Meta: prerequisites", | |
"Meta: agenda", | |
] | |
out = BytesIO() | |
all_rows[order].to_csv(out, index=False, encoding="utf-8-sig") | |
out.seek(0) | |
return out | |
# -------- Gradio wrappers ---------------------------------------------------- | |
def process_file(upload: gr.File) -> str: | |
csv_bytes = convert(upload.name) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmp: | |
tmp.write(csv_bytes.getvalue()) | |
path = tmp.name | |
return path | |
ui = gr.Interface( | |
fn=process_file, | |
inputs=gr.File( | |
label="Upload NetCom CSV / Excel", file_types=[".csv", ".xlsx", ".xls"] | |
), | |
outputs=gr.File(label="Download WooCommerce CSV"), | |
title="NetCom β WooCommerce CSV Processor (Try 2)", | |
description="Upload NetCom schedule (.csv/.xlsx) to get the Try 2-formatted WooCommerce CSV.", | |
analytics_enabled=False, | |
) | |
if __name__ == "__main__": | |
if not os.getenv("OPENAI_API_KEY"): | |
print("β οΈ OPENAI_API_KEY not set β AI features will error") | |
ui.launch() | |