codys12 commited on
Commit
f4e036a
·
verified ·
1 Parent(s): f302c17

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +45 -30
app.py CHANGED
@@ -1,13 +1,13 @@
1
- """NetCom → WooCommerce transformer (Try2 schema — cleaned async)
2
- =============================================================
3
  *Accept CSV **or** Excel schedule files and output the WooCommerce CSV.*
4
 
5
- Changes vs Try 1
6
- ----------------
7
- * Use **one** event‑loop via `asyncio.run()` — no manual `new_event_loop()` / `loop.close()` gymnastics.
8
- * **One** shared `openai.AsyncOpenAI` client, properly closed with an `async with` block.
9
- * Fixed pandas future‑warning by adding `include_groups=False`.
10
- * Same Gradio interface, caching, and JSON‑schema hot‑patch as before.
11
  """
12
 
13
  from __future__ import annotations
@@ -25,7 +25,7 @@ import gradio_client.utils
25
  import openai
26
  import pandas as pd
27
 
28
- # -------- Gradio boolschema hotpatch --------------------------------------
29
  _original = gradio_client.utils._json_schema_to_python_type
30
 
31
  def _fixed_json_schema_to_python_type(schema, defs=None): # type: ignore
@@ -55,24 +55,42 @@ def _set_cache(p: str, r: str) -> None:
55
  pass
56
 
57
  # -------- Async GPT helpers --------------------------------------------------
 
 
 
58
  async def _gpt_async(client: openai.AsyncOpenAI, prompt: str) -> str:
59
- """Single LLM call with ondisk response cache."""
60
  cached = _get_cached(prompt)
61
  if cached is not None:
62
  return cached
63
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64
  try:
65
- msg = await client.chat.completions.create(
66
- model="gpt-4o-mini",
67
- messages=[{"role": "user", "content": prompt}],
68
- temperature=0,
69
- )
70
- text = msg.choices[0].message.content
71
- except Exception as exc: # network or auth failure ‑ return explicit error string
72
- text = f"Error: {exc}"
73
-
74
- _set_cache(prompt, text)
75
- return text
76
 
77
  async def _batch_async(lst: list[str], instruction: str, client: openai.AsyncOpenAI) -> list[str]:
78
  """Vectorised helper — returns an output list matching *lst* length."""
@@ -82,11 +100,10 @@ async def _batch_async(lst: list[str], instruction: str, client: openai.AsyncOpe
82
  if isinstance(txt, str) and txt.strip():
83
  idx.append(i)
84
  prompts.append(f"{instruction}\n\nText: {txt}")
85
- # Fast‑path: nothing to do
86
- if not prompts:
87
  return out
88
 
89
- # Fire off all prompts concurrently
90
  responses = await asyncio.gather(*[_gpt_async(client, p) for p in prompts])
91
  for j, val in enumerate(responses):
92
  out[idx[j]] = val
@@ -107,7 +124,6 @@ def _read(path: str) -> pd.DataFrame:
107
  async def _enrich_dataframe(df: pd.DataFrame, dcol: str, ocol: str, pcol: str, acol: str) -> tuple[list[str], list[str], list[str], list[str], list[str]]:
108
  """Run all LLM batches concurrently and return the five enrichment columns."""
109
  async with openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) as client:
110
- # 1) Descriptions and objectives/agenda batches
111
  sdesc, ldesc, fobj, fout = await asyncio.gather(
112
  _batch_async(df.get(dcol, "").fillna("").tolist(),
113
  "Create a concise 250-character summary of this course description:", client),
@@ -118,7 +134,7 @@ async def _enrich_dataframe(df: pd.DataFrame, dcol: str, ocol: str, pcol: str, a
118
  _batch_async(df.get(acol, "").fillna("").tolist(),
119
  "Format this agenda into a bullet list with clean formatting. Start each bullet with '• ':", client),
120
  )
121
- # 2) Prerequisites batch (some rows may be empty → DEFAULT_PREREQ)
122
  prereq_raw = df.get(pcol, "").fillna("").tolist()
123
  fpre: list[str] = []
124
  for req in prereq_raw:
@@ -150,7 +166,6 @@ def convert(path: str) -> BytesIO:
150
  df = _read(path)
151
  df.columns = df.columns.str.strip()
152
 
153
- # Helper to locate first existing column name from a list of candidates
154
  first_col = lambda *candidates: next((c for c in candidates if c in df.columns), None)
155
 
156
  dcol = first_col("Description", "Decription")
@@ -161,7 +176,7 @@ def convert(path: str) -> BytesIO:
161
  sid = first_col("Course SID", "Course SID")
162
 
163
  if dur not in df.columns:
164
- df[dur] = "" # create empty Duration col if missing
165
 
166
  # ---------- LLM enrichment (async) -------------------------------------
167
  sdesc, ldesc, fobj, fout, fpre = asyncio.run(_enrich_dataframe(df, dcol, ocol, pcol, acol))
@@ -297,8 +312,8 @@ ui = gr.Interface(
297
  fn=process_file,
298
  inputs=gr.File(label="Upload NetCom CSV / Excel", file_types=[".csv", ".xlsx", ".xls"]),
299
  outputs=gr.File(label="Download WooCommerce CSV"),
300
- title="NetCom → WooCommerce CSV Processor (Try 2)",
301
- description="Upload NetCom schedule (.csv/.xlsx) to get the Try 2formatted WooCommerce CSV.",
302
  analytics_enabled=False,
303
  )
304
 
 
1
+ """NetCom → WooCommerce transformer (Try 2 schema — cleaned async, 100-parallel + de-dupe)
2
+ =========================================================================================
3
  *Accept CSV **or** Excel schedule files and output the WooCommerce CSV.*
4
 
5
+ Changes vs previous Try 2
6
+ -------------------------
7
+ * Cap OpenAI concurrency to **100** with an `asyncio.Semaphore`.
8
+ * Prevent duplicate prompts from hitting the API in parallel with an
9
+ in-flight `dict[str, asyncio.Future]`.
10
+ * Everything else (cache filenames, interface, outputs) stays the same.
11
  """
12
 
13
  from __future__ import annotations
 
25
  import openai
26
  import pandas as pd
27
 
28
+ # -------- Gradio bool-schema hot-patch --------------------------------------
29
  _original = gradio_client.utils._json_schema_to_python_type
30
 
31
  def _fixed_json_schema_to_python_type(schema, defs=None): # type: ignore
 
55
  pass
56
 
57
  # -------- Async GPT helpers --------------------------------------------------
58
+ _SEM = asyncio.Semaphore(100) # ≤100 concurrent calls
59
+ _inflight: dict[str, asyncio.Future] = {} # prompt → Future
60
+
61
  async def _gpt_async(client: openai.AsyncOpenAI, prompt: str) -> str:
62
+ """Single LLM call with on-disk cache, concurrency cap, and de-duplication."""
63
  cached = _get_cached(prompt)
64
  if cached is not None:
65
  return cached
66
 
67
+ # — de-dup identical prompts already in flight —
68
+ running = _inflight.get(prompt)
69
+ if running is not None:
70
+ return await running # reuse the same Future
71
+
72
+ loop = asyncio.get_running_loop()
73
+
74
+ async def _call_api() -> str:
75
+ async with _SEM: # concurrency limiter
76
+ try:
77
+ msg = await client.chat.completions.create(
78
+ model="gpt-4o-mini",
79
+ messages=[{"role": "user", "content": prompt}],
80
+ temperature=0,
81
+ )
82
+ text = msg.choices[0].message.content
83
+ except Exception as exc: # network/auth errors
84
+ text = f"Error: {exc}"
85
+ _set_cache(prompt, text)
86
+ return text
87
+
88
+ task = loop.create_task(_call_api())
89
+ _inflight[prompt] = task # register
90
  try:
91
+ return await task
92
+ finally:
93
+ _inflight.pop(prompt, None) # clean up even on error
 
 
 
 
 
 
 
 
94
 
95
  async def _batch_async(lst: list[str], instruction: str, client: openai.AsyncOpenAI) -> list[str]:
96
  """Vectorised helper — returns an output list matching *lst* length."""
 
100
  if isinstance(txt, str) and txt.strip():
101
  idx.append(i)
102
  prompts.append(f"{instruction}\n\nText: {txt}")
103
+ if not prompts: # fast-path: all rows empty
 
104
  return out
105
 
106
+ # Gather duplicate prompts handled inside _gpt_async
107
  responses = await asyncio.gather(*[_gpt_async(client, p) for p in prompts])
108
  for j, val in enumerate(responses):
109
  out[idx[j]] = val
 
124
  async def _enrich_dataframe(df: pd.DataFrame, dcol: str, ocol: str, pcol: str, acol: str) -> tuple[list[str], list[str], list[str], list[str], list[str]]:
125
  """Run all LLM batches concurrently and return the five enrichment columns."""
126
  async with openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) as client:
 
127
  sdesc, ldesc, fobj, fout = await asyncio.gather(
128
  _batch_async(df.get(dcol, "").fillna("").tolist(),
129
  "Create a concise 250-character summary of this course description:", client),
 
134
  _batch_async(df.get(acol, "").fillna("").tolist(),
135
  "Format this agenda into a bullet list with clean formatting. Start each bullet with '• ':", client),
136
  )
137
+ # Prerequisites (some rows empty → default)
138
  prereq_raw = df.get(pcol, "").fillna("").tolist()
139
  fpre: list[str] = []
140
  for req in prereq_raw:
 
166
  df = _read(path)
167
  df.columns = df.columns.str.strip()
168
 
 
169
  first_col = lambda *candidates: next((c for c in candidates if c in df.columns), None)
170
 
171
  dcol = first_col("Description", "Decription")
 
176
  sid = first_col("Course SID", "Course SID")
177
 
178
  if dur not in df.columns:
179
+ df[dur] = "" # ensure Duration col exists
180
 
181
  # ---------- LLM enrichment (async) -------------------------------------
182
  sdesc, ldesc, fobj, fout, fpre = asyncio.run(_enrich_dataframe(df, dcol, ocol, pcol, acol))
 
312
  fn=process_file,
313
  inputs=gr.File(label="Upload NetCom CSV / Excel", file_types=[".csv", ".xlsx", ".xls"]),
314
  outputs=gr.File(label="Download WooCommerce CSV"),
315
+ title="NetCom → WooCommerce CSV Processor (Try 2)",
316
+ description="Upload NetCom schedule (.csv/.xlsx) to get the Try 2-formatted WooCommerce CSV.",
317
  analytics_enabled=False,
318
  )
319