PodCastIt / app.py
HaiderAUT's picture
Update app.py
2c86eae verified
raw
history blame
12.7 kB
# =============================================================
# Hugging Face Space – Lecture β†’ Podcast Generator (User-selectable Languages)
# =============================================================
# β€’ **Text generation** – SmolAgents `HfApiModel` (Qwen/Qwen2.5-Coder-32B-Instruct)
# β€’ **Speech synthesis** – `InferenceClient.text_to_speech`, chunk-safe
# (MMS-TTS for en/bn/ur/ne, mms-TTS-zho for zh). Long texts are split
# into ≀280-char chunks to stay within HF endpoint limits.
# -----------------------------------------------------------------
import os
import re
import tempfile
import textwrap
from pathlib import Path
from typing import List, Dict, Optional, Any # Added Any
import gradio as gr
from huggingface_hub import InferenceClient # Added HubHTTPError explicitly
from PyPDF2 import PdfReader # For PDF processing
from smolagents import HfApiModel # For LLM interaction
from pydub import AudioSegment # Added for robust audio concatenation
from pydub.exceptions import CouldntDecodeError # Specific pydub error
# ------------------------------------------------------------------
# LLM setup – remote Qwen model via SmolAgents
# ------------------------------------------------------------------
llm = HfApiModel(
model_id="Qwen/Qwen2.5-Coder-32B-Instruct",
max_tokens=2048, # Max tokens for the generated output dialogue
temperature=0.5,
)
# ------------------------------------------------------------------
# Hugging Face Inference API client (uses HF_TOKEN secret if provided)
# ------------------------------------------------------------------
client = InferenceClient(token=os.getenv("HF_TOKEN", None))
# ------------------------------------------------------------------
# Language metadata and corresponding open TTS model IDs
# ------------------------------------------------------------------
LANG_INFO: Dict[str, Dict[str, str]] = {
"en": {"name": "English", "tts_model": "facebook/mms-tts-eng"},
"bn": {"name": "Bangla", "tts_model": "facebook/mms-tts-ben"},
"zh": {"name": "Chinese", "tts_model": "facebook/mms-tts-zho"},
"ur": {"name": "Urdu", "tts_model": "facebook/mms-tts-urd"},
"ne": {"name": "Nepali", "tts_model": "facebook/mms-tts-npi"},
}
# For reverse lookup: language name to language code
LANG_CODE_BY_NAME = {info["name"]: code for code, info in LANG_INFO.items()}
# ------------------------------------------------------------------
# Prompt template (target ~300 words for LLM output)
# ------------------------------------------------------------------
PROMPT_TEMPLATE = textwrap.dedent(
"""
You are producing a lively two-host educational podcast in {lang_name}.
Summarize the following lecture content into a dialogue of **approximately 300 words**.
Make it engaging: hosts ask questions, clarify ideas with analogies, and
wrap up with a concise recap. Preserve technical accuracy. Use Markdown for host names (e.g., **Host 1:**).
### Lecture Content
{content}
"""
)
# PDF helpers -------------------------------------------------------
def extract_pdf_text(pdf_path: str) -> str:
try:
reader = PdfReader(pdf_path)
return "\n".join(page.extract_text() or "" for page in reader.pages)
except Exception as e:
raise gr.Error(f"Failed to process PDF: {e}")
TOKEN_LIMIT = 8000
def truncate_text(text: str, limit: int = TOKEN_LIMIT) -> str:
words = text.split()
if len(words) > limit:
gr.Warning(f"Input text was truncated from {len(words)} to {limit} words to fit LLM context window.")
return " ".join(words[:limit])
return text
# ------------------------------------------------------------------
# TTS helper – chunk long text safely (HF endpoint limit ~30s / 200-300 chars)
# ------------------------------------------------------------------
CHUNK_CHAR_LIMIT = 280
def _split_to_chunks(text: str, limit: int = CHUNK_CHAR_LIMIT) -> List[str]:
sentences_raw = re.split(r"(?<=[.!?])\s+", text.strip())
sentences = [s.strip() for s in sentences_raw if s.strip()]
if not sentences: return []
chunks, current_chunk = [], ""
for sent in sentences:
if current_chunk and (len(current_chunk) + len(sent) + 1 > limit):
chunks.append(current_chunk)
current_chunk = sent
else:
current_chunk += (" " + sent) if current_chunk else sent
if current_chunk: chunks.append(current_chunk)
return [chunk for chunk in chunks if chunk.strip()]
def synthesize_speech(text: str, model_id: str, lang_tmpdir: Path) -> Path:
chunks = _split_to_chunks(text)
if not chunks: raise ValueError("Text resulted in no speakable chunks after splitting.")
audio_segments: List[AudioSegment] = []
for idx, chunk in enumerate(chunks):
gr.Info(f"Synthesizing audio for chunk {idx + 1}/{len(chunks)}...")
try:
audio_bytes = client.text_to_speech(chunk, model=model_id)
except HubHTTPError as e:
error_message = f"TTS request failed for chunk {idx+1}/{len(chunks)} ('{chunk[:30]}...'): {e}"
if "Input validation error: `inputs` must be non-empty" in str(e) and not chunk.strip():
gr.Warning(f"Skipping an apparently empty chunk for TTS: Chunk {idx+1}")
continue
raise RuntimeError(error_message) from e
part_path = lang_tmpdir / f"part_{idx}.flac"
part_path.write_bytes(audio_bytes)
try:
segment = AudioSegment.from_file(part_path, format="flac")
audio_segments.append(segment)
except CouldntDecodeError as e:
raise RuntimeError(f"Failed to decode audio chunk {idx+1} from {part_path}. TTS Error: {e}") from e
if not audio_segments: raise RuntimeError("No audio segments were successfully synthesized or decoded.")
combined_audio = sum(audio_segments, AudioSegment.empty())
final_path = lang_tmpdir / "podcast_audio.flac" # Renamed for clarity
combined_audio.export(final_path, format="flac")
return final_path
# ------------------------------------------------------------------
# Main pipeline function for Gradio
# ------------------------------------------------------------------
def generate_podcast(pdf_file_obj: Optional[gr.File], selected_lang_names: List[str]) -> List[Optional[Any]]:
if not pdf_file_obj:
raise gr.Error("Please upload a PDF file.")
if not selected_lang_names:
raise gr.Error("Please select at least one language for the podcast.")
selected_codes = [LANG_CODE_BY_NAME[name] for name in selected_lang_names]
# Initialize results data structure for all languages
# Each language will have a dict for audio, script_text (for display), and script_file (for download)
results_data: Dict[str, Dict[str, Optional[str]]] = {
code: {"audio": None, "script_text": None, "script_file": None}
for code in LANG_INFO.keys()
}
try:
with tempfile.TemporaryDirectory() as td:
tmpdir_base = Path(td)
gr.Info("Extracting text from PDF...")
lecture_raw = extract_pdf_text(pdf_file_obj.name)
lecture_text = truncate_text(lecture_raw)
if not lecture_text.strip():
raise gr.Error("Could not extract any text from the PDF, or the PDF content is empty.")
for code in selected_codes: # Iterate only through user-selected languages
info = LANG_INFO[code]
lang_name = info["name"]
tts_model = info["tts_model"]
gr.Info(f"Processing for {lang_name}...")
lang_tmpdir = tmpdir_base / code
lang_tmpdir.mkdir(parents=True, exist_ok=True)
dialogue: Optional[str] = None # Initialize dialogue for the current language scope
# 1️⃣ Generate dialogue using LLM
gr.Info(f"Generating dialogue for {lang_name}...")
prompt = PROMPT_TEMPLATE.format(lang_name=lang_name, content=lecture_text)
try:
dialogue_raw: str = llm(prompt)
if not dialogue_raw or not dialogue_raw.strip():
gr.Warning(f"LLM returned empty dialogue for {lang_name}. Skipping this language.")
continue # Skip to the next selected language; results_data[code] remains all None
dialogue = dialogue_raw # Keep the generated dialogue
# Store script text and save script to a file
results_data[code]["script_text"] = dialogue
script_file_path = lang_tmpdir / f"podcast_script_{code}.txt"
script_file_path.write_text(dialogue, encoding="utf-8")
results_data[code]["script_file"] = str(script_file_path)
except Exception as e:
gr.Error(f"Error generating dialogue for {lang_name}: {e}")
# If dialogue generation fails, all parts for this lang remain None or partially filled
# The continue ensures we don't try TTS if dialogue failed
continue
# 2️⃣ Synthesize speech (only if dialogue was successfully generated)
if dialogue: # Ensure dialogue is not None here
gr.Info(f"Synthesizing speech for {lang_name}...")
try:
tts_path = synthesize_speech(dialogue, tts_model, lang_tmpdir)
results_data[code]["audio"] = str(tts_path)
except ValueError as e:
gr.Warning(f"Could not synthesize speech for {lang_name} (ValueError): {e}")
# Audio remains None for this language
except RuntimeError as e:
gr.Error(f"Error synthesizing speech for {lang_name} (RuntimeError): {e}")
# Audio remains None
except Exception as e:
gr.Error(f"Unexpected error during speech synthesis for {lang_name}: {e}")
# Audio remains None
# Convert the results_data (dict of dicts) to an ordered flat list for Gradio outputs
final_ordered_results: List[Optional[Any]] = []
for code_key in LANG_INFO.keys(): # Iterate in the defined order of LANG_INFO
lang_output_data = results_data[code_key]
final_ordered_results.append(lang_output_data["audio"])
final_ordered_results.append(lang_output_data["script_text"])
final_ordered_results.append(lang_output_data["script_file"])
gr.Info("Podcast generation complete!")
return final_ordered_results
except gr.Error as e:
raise e
except Exception as e:
import traceback
print("An unexpected error occurred in generate_podcast:")
traceback.print_exc()
raise gr.Error(f"An unexpected server error occurred. Details: {str(e)[:100]}...")
# ------------------------------------------------------------------
# Gradio Interface Setup
# ------------------------------------------------------------------
language_names_ordered = [LANG_INFO[code]["name"] for code in LANG_INFO.keys()]
inputs = [
gr.File(label="Upload Lecture PDF", file_types=[".pdf"]),
gr.CheckboxGroup(
choices=language_names_ordered,
value=["English"],
label="Select podcast language(s) to generate",
),
]
# Create output components: Audio, Script Display (Markdown), Script Download (File) for each language
outputs = []
for code in LANG_INFO.keys(): # Iterate in the consistent order of LANG_INFO
info = LANG_INFO[code]
lang_name = info["name"]
outputs.append(gr.Audio(label=f"{lang_name} Podcast", type="filepath"))
outputs.append(gr.Markdown(label=f"{lang_name} Script")) # Display script as Markdown
outputs.append(gr.File(label=f"Download {lang_name} Script (.txt)", type="filepath")) # Download script
iface = gr.Interface(
fn=generate_podcast,
inputs=inputs,
outputs=outputs,
title="Lecture β†’ Podcast & Script Generator (Multi-Language)",
description=(
"Upload a lecture PDF, choose language(s), and receive an audio podcast "
"and its script for each selected language. Dialogue by Qwen-32B, "
"speech by MMS-TTS. Scripts are viewable and downloadable."
),
allow_flagging="never",
)
if __name__ == "__main__":
iface.launch()