codys12's picture
Update app.py
db70732 verified
raw
history blame
14.2 kB
"""NetCom β†’ WooCommerce transformer (Try 2 schema β€” 100-parallel + de-dupe, pandas fix)
======================================================================================
*Accept CSV **or** Excel schedule files and output the WooCommerce CSV.*
New since the last paste
------------------------
* Fix for older pandas: move `include_groups=False` from `.groupby()` to `.apply()`.
* Everything else (cache names, concurrency cap, in-flight de-duplication) is unchanged.
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import os
import tempfile
from io import BytesIO
from pathlib import Path
import gradio as gr
import gradio_client.utils
import openai
import pandas as pd
# -------- Gradio bool-schema hot-patch --------------------------------------
_original = gradio_client.utils._json_schema_to_python_type
def _fixed_json_schema_to_python_type(schema, defs=None): # type: ignore
if isinstance(schema, bool):
return "any"
return _original(schema, defs)
gradio_client.utils._json_schema_to_python_type = _fixed_json_schema_to_python_type # type: ignore
# -------- Tiny disk cache ----------------------------------------------------
CACHE_DIR = Path("ai_response_cache")
CACHE_DIR.mkdir(exist_ok=True)
def _cache_path(p: str) -> Path:
return CACHE_DIR / f"{hashlib.md5(p.encode()).hexdigest()}.json"
def _get_cached(p: str) -> str | None:
try:
return json.loads(_cache_path(p).read_text("utf-8"))["response"]
except Exception:
return None
def _set_cache(p: str, r: str) -> None:
try:
_cache_path(p).write_text(json.dumps({"prompt": p, "response": r}), "utf-8")
except Exception:
pass
# -------- Async GPT helpers --------------------------------------------------
_SEM = asyncio.Semaphore(100) # ≀100 concurrent OpenAI calls
_inflight: dict[str, asyncio.Future] = {} # prompt β†’ Future
async def _gpt_async(client: openai.AsyncOpenAI, prompt: str) -> str:
"""Single LLM call with disk cache, concurrency cap, and de-duplication."""
cached = _get_cached(prompt)
if cached is not None:
return cached
# De-duplicate identical prompts already in flight
running = _inflight.get(prompt)
if running is not None:
return await running
loop = asyncio.get_running_loop()
async def _call_api() -> str:
async with _SEM: # concurrency limiter
try:
msg = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
temperature=0,
)
text = msg.choices[0].message.content
except Exception as exc:
text = f"Error: {exc}"
_set_cache(prompt, text)
return text
task = loop.create_task(_call_api())
_inflight[prompt] = task
try:
return await task
finally:
_inflight.pop(prompt, None)
async def _batch_async(lst: list[str], instruction: str, client: openai.AsyncOpenAI) -> list[str]:
"""Vectorised helper β€” returns an output list matching *lst* length."""
out: list[str] = ["" for _ in lst]
idx, prompts = [], []
for i, txt in enumerate(lst):
if isinstance(txt, str) and txt.strip():
idx.append(i)
prompts.append(f"{instruction}\n\nText: {txt}")
if not prompts:
return out
responses = await asyncio.gather(*[_gpt_async(client, p) for p in prompts])
for j, val in enumerate(responses):
out[idx[j]] = val
return out
# -------- Core converter -----------------------------------------------------
DEFAULT_PREREQ = (
"No specific prerequisites are required for this course. Basic computer literacy and "
"familiarity with fundamental concepts in the subject area are recommended for the best "
"learning experience."
)
def _read(path: str) -> pd.DataFrame:
if path.lower().endswith((".xlsx", ".xls")):
return pd.read_excel(path)
return pd.read_csv(path, encoding="latin1")
async def _enrich_dataframe(
df: pd.DataFrame, dcol: str, ocol: str, pcol: str, acol: str
) -> tuple[list[str], list[str], list[str], list[str], list[str]]:
"""Run all LLM batches concurrently and return the five enrichment columns."""
async with openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) as client:
sdesc, ldesc, fobj, fout = await asyncio.gather(
_batch_async(
df.get(dcol, "").fillna("").tolist(),
"Create a concise 250-character summary of this course description:",
client,
),
_batch_async(
df.get(dcol, "").fillna("").tolist(),
"Condense this description to a maximum of 750 characters in paragraph format, with clean formatting:",
client,
),
_batch_async(
df.get(ocol, "").fillna("").tolist(),
"Format these objectives into a bullet list with clean formatting. Start each bullet with 'β€’ ':",
client,
),
_batch_async(
df.get(acol, "").fillna("").tolist(),
"Format this agenda into a bullet list with clean formatting. Start each bullet with 'β€’ ':",
client,
),
)
# Prerequisites (some rows empty β†’ default text)
prereq_raw = df.get(pcol, "").fillna("").tolist()
fpre: list[str] = []
for req in prereq_raw:
if not str(req).strip():
fpre.append(DEFAULT_PREREQ)
else:
formatted = await _batch_async(
[req],
"Format these prerequisites into a bullet list with clean formatting. Start each bullet with 'β€’ ':",
client,
)
fpre.append(formatted[0])
return sdesc, ldesc, fobj, fout, fpre
def convert(path: str) -> BytesIO:
logos = {
"Amazon Web Services": "/wp-content/uploads/2025/04/aws.png",
"Cisco": "/wp-content/uploads/2025/04/cisco-e1738593292198-1.webp",
"Microsoft": "/wp-content/uploads/2025/04/Microsoft-e1737494120985-1.png",
"Google Cloud": "/wp-content/uploads/2025/04/Google_Cloud.png",
"EC Council": "/wp-content/uploads/2025/04/Ec_Council.png",
"ITIL": "/wp-content/uploads/2025/04/ITIL.webp",
"PMI": "/wp-content/uploads/2025/04/PMI.png",
"Comptia": "/wp-content/uploads/2025/04/Comptia.png",
"Autodesk": "/wp-content/uploads/2025/04/autodesk.png",
"ISC2": "/wp-content/uploads/2025/04/ISC2.png",
"AICerts": "/wp-content/uploads/2025/04/aicerts-logo-1.png",
}
df = _read(path)
df.columns = df.columns.str.strip()
first_col = lambda *candidates: next((c for c in candidates if c in df.columns), None)
dcol = first_col("Description", "Decription")
ocol = first_col("Objectives", "objectives")
pcol = first_col("RequiredPrerequisite", "Required Pre-requisite")
acol = first_col("Outline")
dur = first_col("Duration") or "Duration"
sid = first_col("Course SID", "Course SID")
if dur not in df.columns:
df[dur] = "" # ensure Duration column exists
# ---------- LLM enrichment (async) -------------------------------------
sdesc, ldesc, fobj, fout, fpre = asyncio.run(
_enrich_dataframe(df, dcol, ocol, pcol, acol)
)
df["Short_Description"] = sdesc
df["Condensed_Description"] = ldesc
df["Formatted_Objectives"] = fobj
df["Formatted_Agenda"] = fout
df["Formatted_Prerequisites"] = fpre
# ---------- Schedule aggregation --------------------------------------
df["Course Start Date"] = pd.to_datetime(df["Course Start Date"], errors="coerce")
df["Date_fmt"] = df["Course Start Date"].dt.strftime("%-m/%-d/%Y")
dsorted = df.sort_values(["Course ID", "Course Start Date"])
d_agg = (
dsorted.groupby("Course ID")["Date_fmt"]
.apply(lambda s: ",".join(s.dropna().unique()))
.reset_index(name="Dates")
)
t_agg = (
dsorted.groupby("Course ID", group_keys=False)
.apply(
lambda g: ",".join(
f"{st}-{et} {tz}"
for st, et, tz in zip(
g["Course Start Time"], g["Course End Time"], g["Time Zone"]
)
),
include_groups=False, # <- moved here
)
.reset_index(name="Times")
)
parents = dsorted.drop_duplicates("Course ID").merge(d_agg).merge(t_agg)
# ---------- Parent / child product rows --------------------------------
parent = pd.DataFrame(
{
"Type": "variable",
"SKU": parents["Course ID"],
"Name": parents["Course Name"],
"Published": 1,
"Visibility in catalog": "visible",
"Short description": parents["Short_Description"],
"Description": parents["Condensed_Description"],
"Tax status": "taxable",
"In stock?": 1,
"Stock": 1,
"Sold individually?": 1,
"Regular price": parents["SRP Pricing"].replace("[\\$,]", "", regex=True),
"Categories": "courses",
"Images": parents["Vendor"].map(logos).fillna(""),
"Parent": "",
"Brands": parents["Vendor"],
"Attribute 1 name": "Date",
"Attribute 1 value(s)": parents["Dates"],
"Attribute 1 visible": "visible",
"Attribute 1 global": 1,
"Attribute 2 name": "Location",
"Attribute 2 value(s)": "Virtual",
"Attribute 2 visible": "visible",
"Attribute 2 global": 1,
"Attribute 3 name": "Time",
"Attribute 3 value(s)": parents["Times"],
"Attribute 3 visible": "visible",
"Attribute 3 global": 1,
"Meta: outline": parents["Formatted_Agenda"],
"Meta: days": parents[dur],
"Meta: location": "Virtual",
"Meta: overview": parents["Target Audience"],
"Meta: objectives": parents["Formatted_Objectives"],
"Meta: prerequisites": parents["Formatted_Prerequisites"],
"Meta: agenda": parents["Formatted_Agenda"],
}
)
child = pd.DataFrame(
{
"Type": "variation, virtual",
"SKU": dsorted[sid].astype(str).str.strip(),
"Name": dsorted["Course Name"],
"Published": 1,
"Visibility in catalog": "visible",
"Short description": dsorted["Short_Description"],
"Description": dsorted["Condensed_Description"],
"Tax status": "taxable",
"In stock?": 1,
"Stock": 1,
"Sold individually?": 1,
"Regular price": dsorted["SRP Pricing"].replace("[\\$,]", "", regex=True),
"Categories": "courses",
"Images": dsorted["Vendor"].map(logos).fillna(""),
"Parent": dsorted["Course ID"],
"Brands": dsorted["Vendor"],
"Attribute 1 name": "Date",
"Attribute 1 value(s)": dsorted["Date_fmt"],
"Attribute 1 visible": "visible",
"Attribute 1 global": 1,
"Attribute 2 name": "Location",
"Attribute 2 value(s)": "Virtual",
"Attribute 2 visible": "visible",
"Attribute 2 global": 1,
"Attribute 3 name": "Time",
"Attribute 3 value(s)": dsorted.apply(
lambda r: f"{r['Course Start Time']}-{r['Course End Time']} {r['Time Zone']}",
axis=1,
),
"Attribute 3 visible": "visible",
"Attribute 3 global": 1,
"Meta: outline": dsorted["Formatted_Agenda"],
"Meta: days": dsorted[dur],
"Meta: location": "Virtual",
"Meta: overview": dsorted["Target Audience"],
"Meta: objectives": dsorted["Formatted_Objectives"],
"Meta: prerequisites": dsorted["Formatted_Prerequisites"],
"Meta: agenda": dsorted["Formatted_Agenda"],
}
)
all_rows = pd.concat([parent, child], ignore_index=True)
order = [
"Type",
"SKU",
"Name",
"Published",
"Visibility in catalog",
"Short description",
"Description",
"Tax status",
"In stock?",
"Stock",
"Sold individually?",
"Regular price",
"Categories",
"Images",
"Parent",
"Brands",
"Attribute 1 name",
"Attribute 1 value(s)",
"Attribute 1 visible",
"Attribute 1 global",
"Attribute 2 name",
"Attribute 2 value(s)",
"Attribute 2 visible",
"Attribute 2 global",
"Attribute 3 name",
"Attribute 3 value(s)",
"Attribute 3 visible",
"Attribute 3 global",
"Meta: outline",
"Meta: days",
"Meta: location",
"Meta: overview",
"Meta: objectives",
"Meta: prerequisites",
"Meta: agenda",
]
out = BytesIO()
all_rows[order].to_csv(out, index=False, encoding="utf-8-sig")
out.seek(0)
return out
# -------- Gradio wrappers ----------------------------------------------------
def process_file(upload: gr.File) -> str:
csv_bytes = convert(upload.name)
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as tmp:
tmp.write(csv_bytes.getvalue())
path = tmp.name
return path
ui = gr.Interface(
fn=process_file,
inputs=gr.File(
label="Upload NetCom CSV / Excel", file_types=[".csv", ".xlsx", ".xls"]
),
outputs=gr.File(label="Download WooCommerce CSV"),
title="NetCom β†’ WooCommerce CSV Processor (Try 2)",
description="Upload NetCom schedule (.csv/.xlsx) to get the Try 2-formatted WooCommerce CSV.",
analytics_enabled=False,
)
if __name__ == "__main__":
if not os.getenv("OPENAI_API_KEY"):
print("⚠️ OPENAI_API_KEY not set – AI features will error")
ui.launch()