HaiderAUT commited on
Commit
f036ad8
·
verified ·
1 Parent(s): 2c86eae

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +89 -179
app.py CHANGED
@@ -1,42 +1,33 @@
1
- # =============================================================
2
- # Hugging Face Space – Lecture → Podcast Generator (User-selectable Languages)
3
- # =============================================================
4
- # • **Text generation** – SmolAgents `HfApiModel` (Qwen/Qwen2.5-Coder-32B-Instruct)
5
- # • **Speech synthesis** – `InferenceClient.text_to_speech`, chunk-safe
6
- # (MMS-TTS for en/bn/ur/ne, mms-TTS-zho for zh). Long texts are split
7
- # into ≤280-char chunks to stay within HF endpoint limits.
8
- # -----------------------------------------------------------------
9
-
10
  import os
11
  import re
12
  import tempfile
13
  import textwrap
14
  from pathlib import Path
15
- from typing import List, Dict, Optional, Any # Added Any
16
 
17
  import gradio as gr
18
- from huggingface_hub import InferenceClient # Added HubHTTPError explicitly
19
- from PyPDF2 import PdfReader # For PDF processing
20
- from smolagents import HfApiModel # For LLM interaction
21
- from pydub import AudioSegment # Added for robust audio concatenation
22
- from pydub.exceptions import CouldntDecodeError # Specific pydub error
23
 
24
  # ------------------------------------------------------------------
25
  # LLM setup – remote Qwen model via SmolAgents
26
  # ------------------------------------------------------------------
27
  llm = HfApiModel(
28
  model_id="Qwen/Qwen2.5-Coder-32B-Instruct",
29
- max_tokens=2048, # Max tokens for the generated output dialogue
30
  temperature=0.5,
31
  )
32
 
33
  # ------------------------------------------------------------------
34
- # Hugging Face Inference API client (uses HF_TOKEN secret if provided)
35
  # ------------------------------------------------------------------
36
  client = InferenceClient(token=os.getenv("HF_TOKEN", None))
37
 
38
  # ------------------------------------------------------------------
39
- # Language metadata and corresponding open TTS model IDs
40
  # ------------------------------------------------------------------
41
  LANG_INFO: Dict[str, Dict[str, str]] = {
42
  "en": {"name": "English", "tts_model": "facebook/mms-tts-eng"},
@@ -45,26 +36,26 @@ LANG_INFO: Dict[str, Dict[str, str]] = {
45
  "ur": {"name": "Urdu", "tts_model": "facebook/mms-tts-urd"},
46
  "ne": {"name": "Nepali", "tts_model": "facebook/mms-tts-npi"},
47
  }
48
- # For reverse lookup: language name to language code
49
  LANG_CODE_BY_NAME = {info["name"]: code for code, info in LANG_INFO.items()}
50
 
51
- # ------------------------------------------------------------------
52
- # Prompt template (target ~300 words for LLM output)
53
- # ------------------------------------------------------------------
54
  PROMPT_TEMPLATE = textwrap.dedent(
55
  """
56
  You are producing a lively two-host educational podcast in {lang_name}.
57
- Summarize the following lecture content into a dialogue of **approximately 300 words**.
58
  Make it engaging: hosts ask questions, clarify ideas with analogies, and
59
- wrap up with a concise recap. Preserve technical accuracy. Use Markdown for host names (e.g., **Host 1:**).
60
 
61
  ### Lecture Content
62
  {content}
63
  """
64
  )
65
 
66
- # PDF helpers -------------------------------------------------------
 
67
 
 
 
 
68
  def extract_pdf_text(pdf_path: str) -> str:
69
  try:
70
  reader = PdfReader(pdf_path)
@@ -72,195 +63,114 @@ def extract_pdf_text(pdf_path: str) -> str:
72
  except Exception as e:
73
  raise gr.Error(f"Failed to process PDF: {e}")
74
 
75
- TOKEN_LIMIT = 8000
76
-
 
77
  def truncate_text(text: str, limit: int = TOKEN_LIMIT) -> str:
78
  words = text.split()
79
  if len(words) > limit:
80
- gr.Warning(f"Input text was truncated from {len(words)} to {limit} words to fit LLM context window.")
81
  return " ".join(words[:limit])
82
  return text
83
 
84
- # ------------------------------------------------------------------
85
- # TTS helper – chunk long text safely (HF endpoint limit ~30s / 200-300 chars)
86
- # ------------------------------------------------------------------
87
- CHUNK_CHAR_LIMIT = 280
88
 
89
  def _split_to_chunks(text: str, limit: int = CHUNK_CHAR_LIMIT) -> List[str]:
90
- sentences_raw = re.split(r"(?<=[.!?])\s+", text.strip())
91
- sentences = [s.strip() for s in sentences_raw if s.strip()]
92
- if not sentences: return []
93
- chunks, current_chunk = [], ""
94
  for sent in sentences:
95
- if current_chunk and (len(current_chunk) + len(sent) + 1 > limit):
96
- chunks.append(current_chunk)
97
- current_chunk = sent
98
  else:
99
- current_chunk += (" " + sent) if current_chunk else sent
100
- if current_chunk: chunks.append(current_chunk)
101
- return [chunk for chunk in chunks if chunk.strip()]
 
102
 
103
- def synthesize_speech(text: str, model_id: str, lang_tmpdir: Path) -> Path:
 
104
  chunks = _split_to_chunks(text)
105
- if not chunks: raise ValueError("Text resulted in no speakable chunks after splitting.")
106
- audio_segments: List[AudioSegment] = []
107
- for idx, chunk in enumerate(chunks):
108
- gr.Info(f"Synthesizing audio for chunk {idx + 1}/{len(chunks)}...")
 
109
  try:
110
  audio_bytes = client.text_to_speech(chunk, model=model_id)
111
  except HubHTTPError as e:
112
- error_message = f"TTS request failed for chunk {idx+1}/{len(chunks)} ('{chunk[:30]}...'): {e}"
113
- if "Input validation error: `inputs` must be non-empty" in str(e) and not chunk.strip():
114
- gr.Warning(f"Skipping an apparently empty chunk for TTS: Chunk {idx+1}")
115
- continue
116
- raise RuntimeError(error_message) from e
117
- part_path = lang_tmpdir / f"part_{idx}.flac"
118
- part_path.write_bytes(audio_bytes)
119
  try:
120
- segment = AudioSegment.from_file(part_path, format="flac")
121
- audio_segments.append(segment)
122
  except CouldntDecodeError as e:
123
- raise RuntimeError(f"Failed to decode audio chunk {idx+1} from {part_path}. TTS Error: {e}") from e
124
- if not audio_segments: raise RuntimeError("No audio segments were successfully synthesized or decoded.")
125
- combined_audio = sum(audio_segments, AudioSegment.empty())
126
- final_path = lang_tmpdir / "podcast_audio.flac" # Renamed for clarity
127
- combined_audio.export(final_path, format="flac")
128
- return final_path
 
129
 
130
  # ------------------------------------------------------------------
131
- # Main pipeline function for Gradio
132
  # ------------------------------------------------------------------
133
 
134
- def generate_podcast(pdf_file_obj: Optional[gr.File], selected_lang_names: List[str]) -> List[Optional[Any]]:
135
- if not pdf_file_obj:
136
  raise gr.Error("Please upload a PDF file.")
137
- if not selected_lang_names:
138
- raise gr.Error("Please select at least one language for the podcast.")
139
-
140
- selected_codes = [LANG_CODE_BY_NAME[name] for name in selected_lang_names]
141
-
142
- # Initialize results data structure for all languages
143
- # Each language will have a dict for audio, script_text (for display), and script_file (for download)
144
- results_data: Dict[str, Dict[str, Optional[str]]] = {
145
- code: {"audio": None, "script_text": None, "script_file": None}
146
- for code in LANG_INFO.keys()
147
- }
148
-
149
- try:
150
- with tempfile.TemporaryDirectory() as td:
151
- tmpdir_base = Path(td)
152
-
153
- gr.Info("Extracting text from PDF...")
154
- lecture_raw = extract_pdf_text(pdf_file_obj.name)
155
- lecture_text = truncate_text(lecture_raw)
156
-
157
- if not lecture_text.strip():
158
- raise gr.Error("Could not extract any text from the PDF, or the PDF content is empty.")
159
-
160
- for code in selected_codes: # Iterate only through user-selected languages
161
- info = LANG_INFO[code]
162
- lang_name = info["name"]
163
- tts_model = info["tts_model"]
164
-
165
- gr.Info(f"Processing for {lang_name}...")
166
- lang_tmpdir = tmpdir_base / code
167
- lang_tmpdir.mkdir(parents=True, exist_ok=True)
168
-
169
- dialogue: Optional[str] = None # Initialize dialogue for the current language scope
170
-
171
- # 1️⃣ Generate dialogue using LLM
172
- gr.Info(f"Generating dialogue for {lang_name}...")
173
- prompt = PROMPT_TEMPLATE.format(lang_name=lang_name, content=lecture_text)
174
- try:
175
- dialogue_raw: str = llm(prompt)
176
- if not dialogue_raw or not dialogue_raw.strip():
177
- gr.Warning(f"LLM returned empty dialogue for {lang_name}. Skipping this language.")
178
- continue # Skip to the next selected language; results_data[code] remains all None
179
-
180
- dialogue = dialogue_raw # Keep the generated dialogue
181
-
182
- # Store script text and save script to a file
183
- results_data[code]["script_text"] = dialogue
184
- script_file_path = lang_tmpdir / f"podcast_script_{code}.txt"
185
- script_file_path.write_text(dialogue, encoding="utf-8")
186
- results_data[code]["script_file"] = str(script_file_path)
187
-
188
- except Exception as e:
189
- gr.Error(f"Error generating dialogue for {lang_name}: {e}")
190
- # If dialogue generation fails, all parts for this lang remain None or partially filled
191
- # The continue ensures we don't try TTS if dialogue failed
192
- continue
193
-
194
- # 2️⃣ Synthesize speech (only if dialogue was successfully generated)
195
- if dialogue: # Ensure dialogue is not None here
196
- gr.Info(f"Synthesizing speech for {lang_name}...")
197
- try:
198
- tts_path = synthesize_speech(dialogue, tts_model, lang_tmpdir)
199
- results_data[code]["audio"] = str(tts_path)
200
- except ValueError as e:
201
- gr.Warning(f"Could not synthesize speech for {lang_name} (ValueError): {e}")
202
- # Audio remains None for this language
203
- except RuntimeError as e:
204
- gr.Error(f"Error synthesizing speech for {lang_name} (RuntimeError): {e}")
205
- # Audio remains None
206
- except Exception as e:
207
- gr.Error(f"Unexpected error during speech synthesis for {lang_name}: {e}")
208
- # Audio remains None
209
-
210
- # Convert the results_data (dict of dicts) to an ordered flat list for Gradio outputs
211
- final_ordered_results: List[Optional[Any]] = []
212
- for code_key in LANG_INFO.keys(): # Iterate in the defined order of LANG_INFO
213
- lang_output_data = results_data[code_key]
214
- final_ordered_results.append(lang_output_data["audio"])
215
- final_ordered_results.append(lang_output_data["script_text"])
216
- final_ordered_results.append(lang_output_data["script_file"])
217
-
218
- gr.Info("Podcast generation complete!")
219
- return final_ordered_results
220
-
221
- except gr.Error as e:
222
- raise e
223
- except Exception as e:
224
- import traceback
225
- print("An unexpected error occurred in generate_podcast:")
226
- traceback.print_exc()
227
- raise gr.Error(f"An unexpected server error occurred. Details: {str(e)[:100]}...")
228
 
229
  # ------------------------------------------------------------------
230
- # Gradio Interface Setup
231
  # ------------------------------------------------------------------
232
- language_names_ordered = [LANG_INFO[code]["name"] for code in LANG_INFO.keys()]
233
 
234
  inputs = [
235
- gr.File(label="Upload Lecture PDF", file_types=[".pdf"]),
236
- gr.CheckboxGroup(
237
- choices=language_names_ordered,
238
- value=["English"],
239
- label="Select podcast language(s) to generate",
240
- ),
241
  ]
242
 
243
- # Create output components: Audio, Script Display (Markdown), Script Download (File) for each language
244
  outputs = []
245
- for code in LANG_INFO.keys(): # Iterate in the consistent order of LANG_INFO
246
- info = LANG_INFO[code]
247
- lang_name = info["name"]
248
- outputs.append(gr.Audio(label=f"{lang_name} Podcast", type="filepath"))
249
- outputs.append(gr.Markdown(label=f"{lang_name} Script")) # Display script as Markdown
250
- outputs.append(gr.File(label=f"Download {lang_name} Script (.txt)", type="filepath")) # Download script
251
 
252
  iface = gr.Interface(
253
  fn=generate_podcast,
254
  inputs=inputs,
255
  outputs=outputs,
256
- title="Lecture → Podcast & Script Generator (Multi-Language)",
257
- description=(
258
- "Upload a lecture PDF, choose language(s), and receive an audio podcast "
259
- "and its script for each selected language. Dialogue by Qwen-32B, "
260
- "speech by MMS-TTS. Scripts are viewable and downloadable."
261
- ),
262
- allow_flagging="never",
263
  )
264
 
265
  if __name__ == "__main__":
266
- iface.launch()
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import re
3
  import tempfile
4
  import textwrap
5
  from pathlib import Path
6
+ from typing import List, Dict, Optional
7
 
8
  import gradio as gr
9
+ from huggingface_hub import InferenceClient
10
+ from PyPDF2 import PdfReader # For PDF processing
11
+ from smolagents import HfApiModel # For LLM interaction
12
+ from pydub import AudioSegment
13
+ from pydub.exceptions import CouldntDecodeError
14
 
15
  # ------------------------------------------------------------------
16
  # LLM setup – remote Qwen model via SmolAgents
17
  # ------------------------------------------------------------------
18
  llm = HfApiModel(
19
  model_id="Qwen/Qwen2.5-Coder-32B-Instruct",
20
+ max_tokens=2048,
21
  temperature=0.5,
22
  )
23
 
24
  # ------------------------------------------------------------------
25
+ # Hugging Face Inference API client
26
  # ------------------------------------------------------------------
27
  client = InferenceClient(token=os.getenv("HF_TOKEN", None))
28
 
29
  # ------------------------------------------------------------------
30
+ # Language metadata and open TTS models
31
  # ------------------------------------------------------------------
32
  LANG_INFO: Dict[str, Dict[str, str]] = {
33
  "en": {"name": "English", "tts_model": "facebook/mms-tts-eng"},
 
36
  "ur": {"name": "Urdu", "tts_model": "facebook/mms-tts-urd"},
37
  "ne": {"name": "Nepali", "tts_model": "facebook/mms-tts-npi"},
38
  }
 
39
  LANG_CODE_BY_NAME = {info["name"]: code for code, info in LANG_INFO.items()}
40
 
 
 
 
41
  PROMPT_TEMPLATE = textwrap.dedent(
42
  """
43
  You are producing a lively two-host educational podcast in {lang_name}.
44
+ Summarize the following lecture content into a dialogue of ~300 words.
45
  Make it engaging: hosts ask questions, clarify ideas with analogies, and
46
+ wrap up with a concise recap. Preserve technical accuracy.
47
 
48
  ### Lecture Content
49
  {content}
50
  """
51
  )
52
 
53
+ TOKEN_LIMIT = 8000
54
+ CHUNK_CHAR_LIMIT = 280
55
 
56
+ # ------------------------------------------------------------------
57
+ # PDF text extraction
58
+ # ------------------------------------------------------------------
59
  def extract_pdf_text(pdf_path: str) -> str:
60
  try:
61
  reader = PdfReader(pdf_path)
 
63
  except Exception as e:
64
  raise gr.Error(f"Failed to process PDF: {e}")
65
 
66
+ # ------------------------------------------------------------------
67
+ # Helpers
68
+ # ------------------------------------------------------------------
69
  def truncate_text(text: str, limit: int = TOKEN_LIMIT) -> str:
70
  words = text.split()
71
  if len(words) > limit:
 
72
  return " ".join(words[:limit])
73
  return text
74
 
 
 
 
 
75
 
76
  def _split_to_chunks(text: str, limit: int = CHUNK_CHAR_LIMIT) -> List[str]:
77
+ sentences = [s.strip() for s in re.split(r"(?<=[.!?])\s+", text) if s.strip()]
78
+ chunks, current = [], ""
 
 
79
  for sent in sentences:
80
+ if current and len(current) + len(sent) + 1 > limit:
81
+ chunks.append(current)
82
+ current = sent
83
  else:
84
+ current = f"{current} {sent}".strip()
85
+ if current:
86
+ chunks.append(current)
87
+ return chunks
88
 
89
+
90
+ def synthesize_speech(text: str, model_id: str, tempdir: Path) -> Path:
91
  chunks = _split_to_chunks(text)
92
+ if not chunks:
93
+ raise ValueError("No text chunks to synthesize.")
94
+
95
+ segments = []
96
+ for i, chunk in enumerate(chunks):
97
  try:
98
  audio_bytes = client.text_to_speech(chunk, model=model_id)
99
  except HubHTTPError as e:
100
+ raise RuntimeError(f"TTS error on chunk {i}: {e}")
101
+ part = tempdir / f"seg_{i}.flac"
102
+ part.write_bytes(audio_bytes)
 
 
 
 
103
  try:
104
+ seg = AudioSegment.from_file(part, format="flac")
 
105
  except CouldntDecodeError as e:
106
+ raise RuntimeError(f"Decode error on chunk {i}: {e}")
107
+ segments.append(seg)
108
+
109
+ combined = sum(segments, AudioSegment.empty())
110
+ outpath = tempdir / "podcast.flac"
111
+ combined.export(outpath, format="flac")
112
+ return outpath
113
 
114
  # ------------------------------------------------------------------
115
+ # Main pipeline
116
  # ------------------------------------------------------------------
117
 
118
+ def generate_podcast(pdf_file: Optional[gr.File], languages: List[str]):
119
+ if not pdf_file:
120
  raise gr.Error("Please upload a PDF file.")
121
+ if not languages:
122
+ raise gr.Error("Select at least one language.")
123
+
124
+ # Extract and truncate
125
+ text = extract_pdf_text(pdf_file.name)
126
+ if not text.strip():
127
+ raise gr.Error("No text found in PDF.")
128
+ lecture = truncate_text(text)
129
+
130
+ transcripts, audios = [], []
131
+ with tempfile.TemporaryDirectory() as td:
132
+ base = Path(td)
133
+ for name in languages:
134
+ code = LANG_CODE_BY_NAME[name]
135
+ # 1️⃣ Dialogue
136
+ prompt = PROMPT_TEMPLATE.format(lang_name=name, content=lecture)
137
+ dialogue = llm(prompt).strip()
138
+ transcripts.append(dialogue)
139
+ # 2️⃣ Speech
140
+ tempdir = base / code
141
+ tempdir.mkdir(parents=True, exist_ok=True)
142
+ audio_path = synthesize_speech(dialogue, LANG_INFO[code]["tts_model"], tempdir)
143
+ audios.append(str(audio_path))
144
+
145
+ # Return alternating transcript and audio path
146
+ results: List = []
147
+ for t, a in zip(transcripts, audios):
148
+ results.extend([t, a])
149
+ return results
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
150
 
151
  # ------------------------------------------------------------------
152
+ # Gradio UI
153
  # ------------------------------------------------------------------
154
+ languages = [info["name"] for info in LANG_INFO.values()]
155
 
156
  inputs = [
157
+ gr.File(label="Lecture PDF", file_types=[".pdf"]),
158
+ gr.CheckboxGroup(languages, value=["English"], label="Languages"),
 
 
 
 
159
  ]
160
 
161
+ # Two outputs per language: transcript and audio
162
  outputs = []
163
+ for name in languages:
164
+ outputs.append(gr.Textbox(label=f"{name} Transcript", interactive=False))
165
+ outputs.append(gr.Audio(label=f"{name} Podcast", type="filepath"))
 
 
 
166
 
167
  iface = gr.Interface(
168
  fn=generate_podcast,
169
  inputs=inputs,
170
  outputs=outputs,
171
+ title="Lecture → Podcast Generator",
172
+ description="Upload a lecture PDF, select languages, get dialogue transcript and audio podcast."
 
 
 
 
 
173
  )
174
 
175
  if __name__ == "__main__":
176
+ iface.launch()