# ============================================================= # Hugging Face Space – Lecture → Podcast Generator (User-selectable Languages) # ============================================================= # • **Text generation** – SmolAgents `HfApiModel` (Qwen/Qwen2.5-Coder-32B-Instruct) # • **Speech synthesis** – `InferenceClient.text_to_speech`, chunk-safe # (MMS-TTS for en/bn/ur/ne, mms-TTS-zho for zh). Long texts are split # into ≤280-char chunks to stay within HF endpoint limits. # ----------------------------------------------------------------- import os import re import tempfile import textwrap from pathlib import Path from typing import List, Dict, Optional import gradio as gr from huggingface_hub import InferenceClient, HubHTTPError from PyPDF2 import PdfReader # For PDF processing from smolagents import HfApiModel # For LLM interaction from pydub import AudioSegment # Added for robust audio concatenation from pydub.exceptions import CouldntDecodeError # Specific pydub error # ------------------------------------------------------------------ # LLM setup – remote Qwen model via SmolAgents # ------------------------------------------------------------------ llm = HfApiModel( model_id="Qwen/Qwen2.5-Coder-32B-Instruct", max_tokens=2048, # Max tokens for the generated output dialogue temperature=0.5, ) # ------------------------------------------------------------------ # Hugging Face Inference API client (uses HF_TOKEN secret if provided) # ------------------------------------------------------------------ client = InferenceClient(token=os.getenv("HF_TOKEN", None)) # ------------------------------------------------------------------ # Language metadata and corresponding open TTS model IDs # ------------------------------------------------------------------ LANG_INFO: Dict[str, Dict[str, str]] = { "en": {"name": "English", "tts_model": "facebook/mms-tts-eng"}, "bn": {"name": "Bangla", "tts_model": "facebook/mms-tts-ben"}, "zh": {"name": "Chinese", "tts_model": "facebook/mms-tts-zho"}, "ur": {"name": "Urdu", "tts_model": "facebook/mms-tts-urd"}, "ne": {"name": "Nepali", "tts_model": "facebook/mms-tts-npi"}, } # For reverse lookup: language name to language code LANG_CODE_BY_NAME = {info["name"]: code for code, info in LANG_INFO.items()} # ------------------------------------------------------------------ # Prompt template (target ~300 words for LLM output) # ------------------------------------------------------------------ PROMPT_TEMPLATE = textwrap.dedent( """ You are producing a lively two-host educational podcast in {lang_name}. Summarize the following lecture content into a dialogue of **approximately 300 words**. Make it engaging: hosts ask questions, clarify ideas with analogies, and wrap up with a concise recap. Preserve technical accuracy. ### Lecture Content {content} """ ) # PDF helpers ------------------------------------------------------- def extract_pdf_text(pdf_path: str) -> str: try: reader = PdfReader(pdf_path) return "\n".join(page.extract_text() or "" for page in reader.pages) except Exception as e: # Raise a Gradio error to display it in the UI raise gr.Error(f"Failed to process PDF: {e}") # Increased slightly; Qwen models have large context windows. This is input *words*. # Actual limit is in tokens. Qwen2.5-Coder-32B-Instruct context is 65536 tokens. # 8000 words is still conservative. The prompt itself also consumes tokens. TOKEN_LIMIT = 8000 def truncate_text(text: str, limit: int = TOKEN_LIMIT) -> str: words = text.split() if len(words) > limit: gr.Warning(f"Input text was truncated from {len(words)} to {limit} words to fit LLM context window.") return " ".join(words[:limit]) return text # ------------------------------------------------------------------ # TTS helper – chunk long text safely (HF endpoint limit ~30s / 200-300 chars) # ------------------------------------------------------------------ CHUNK_CHAR_LIMIT = 280 # Safe margin for MMS-TTS character limit per request def _split_to_chunks(text: str, limit: int = CHUNK_CHAR_LIMIT) -> List[str]: # Split on sentence boundaries (.!?) while respecting the character limit per chunk. sentences_raw = re.split(r"(?<=[.!?])\s+", text.strip()) sentences = [s.strip() for s in sentences_raw if s.strip()] # Clean and filter empty sentences if not sentences: return [] chunks, current_chunk = [], "" for sent in sentences: # If current_chunk is empty, the first sentence always starts a new chunk. # If current_chunk is not empty, check if adding the new sentence (plus a space) exceeds the limit. if current_chunk and (len(current_chunk) + len(sent) + 1 > limit): chunks.append(current_chunk) # Finalize the current chunk current_chunk = sent # Start a new chunk with the current sentence else: # Append sentence to current_chunk (with a space if current_chunk is not empty) current_chunk += (" " + sent) if current_chunk else sent if current_chunk: # Add any remaining part as the last chunk chunks.append(current_chunk) return [chunk for chunk in chunks if chunk.strip()] # Ensure no empty chunks are returned def synthesize_speech(text: str, model_id: str, lang_tmpdir: Path) -> Path: """Splits text into chunks, synthesizes speech for each, and concatenates them using pydub.""" chunks = _split_to_chunks(text) if not chunks: raise ValueError("Text resulted in no speakable chunks after splitting.") audio_segments: List[AudioSegment] = [] for idx, chunk in enumerate(chunks): gr.Info(f"Synthesizing audio for chunk {idx + 1}/{len(chunks)}...") try: audio_bytes = client.text_to_speech(chunk, model=model_id) except HubHTTPError as e: error_message = f"TTS request failed for chunk {idx+1}/{len(chunks)} ('{chunk[:30]}...'): {e}" if "Input validation error: `inputs` must be non-empty" in str(e) and not chunk.strip(): gr.Warning(f"Skipping an apparently empty chunk for TTS that wasn't filtered: Chunk {idx+1}") continue raise RuntimeError(error_message) from e part_path = lang_tmpdir / f"part_{idx}.flac" # Assuming TTS returns FLAC part_path.write_bytes(audio_bytes) try: # Load the audio part using pydub. # MMS TTS via HF Inference API usually returns WAV by default, but filename implies FLAC. # If API returns WAV, use format="wav". If FLAC, format="flac". # The original code implies FLAC, so we'll stick to that. segment = AudioSegment.from_file(part_path, format="flac") audio_segments.append(segment) except CouldntDecodeError as e: # This can happen if the audio data is not valid FLAC or is empty/corrupted. raise RuntimeError( f"Failed to decode audio chunk {idx+1} from {part_path}. " f"Audio data might be corrupted, empty, or not in FLAC format. TTS Error: {e}" ) from e if not audio_segments: raise RuntimeError("No audio segments were successfully synthesized or decoded.") # Concatenate all audio segments combined_audio = sum(audio_segments, AudioSegment.empty()) # Efficient sum for pydub final_path = lang_tmpdir / "podcast.flac" combined_audio.export(final_path, format="flac") return final_path # ------------------------------------------------------------------ # Main pipeline function for Gradio # ------------------------------------------------------------------ def generate_podcast(pdf_file_obj: Optional[gr.File], selected_lang_names: List[str]): if not pdf_file_obj: raise gr.Error("Please upload a PDF file.") if not selected_lang_names: raise gr.Error("Please select at least one language for the podcast.") # Map selected language names back to their codes selected_codes = [LANG_CODE_BY_NAME[name] for name in selected_lang_names] # Initialize results map. Keys are lang codes, values will be audio file paths or None. # This helps in populating results for selected languages only. results_map: Dict[str, Optional[str]] = {code: None for code in LANG_INFO.keys()} try: with tempfile.TemporaryDirectory() as td: tmpdir_base = Path(td) # Base temporary directory gr.Info("Extracting text from PDF...") lecture_raw = extract_pdf_text(pdf_file_obj.name) # .name is path to temp uploaded file lecture_text = truncate_text(lecture_raw) if not lecture_text.strip(): raise gr.Error("Could not extract any text from the PDF, or the PDF content is empty.") for code in selected_codes: # Iterate only through user-selected languages info = LANG_INFO[code] lang_name = info["name"] tts_model = info["tts_model"] gr.Info(f"Processing for {lang_name}...") # Create a language-specific subdirectory within the base temporary directory lang_tmpdir = tmpdir_base / code lang_tmpdir.mkdir(parents=True, exist_ok=True) # 1️⃣ Generate dialogue using LLM gr.Info(f"Generating dialogue for {lang_name}...") prompt = PROMPT_TEMPLATE.format(lang_name=lang_name, content=lecture_text) try: dialogue: str = llm(prompt) if not dialogue or not dialogue.strip(): gr.Warning(f"LLM returned empty dialogue for {lang_name}. Skipping TTS for this language.") results_map[code] = None continue # Move to the next selected language except Exception as e: gr.Error(f"Error generating dialogue for {lang_name}: {e}") results_map[code] = None continue # 2️⃣ Synthesize speech from the dialogue (chunked and concatenated) gr.Info(f"Synthesizing speech for {lang_name}...") try: tts_path = synthesize_speech(dialogue, tts_model, lang_tmpdir) results_map[code] = str(tts_path) # Store the file path for this language except ValueError as e: # From _split_to_chunks or synthesize_speech if no chunks gr.Warning(f"Could not synthesize speech for {lang_name} (ValueError): {e}") results_map[code] = None except RuntimeError as e: # From synthesize_speech (TTS/pydub errors) gr.Error(f"Error synthesizing speech for {lang_name} (RuntimeError): {e}") results_map[code] = None except Exception as e: # Catch any other unexpected errors during synthesis gr.Error(f"Unexpected error during speech synthesis for {lang_name}: {e}") results_map[code] = None # Convert the results_map to an ordered list based on LANG_INFO keys. # This ensures the returned list matches the order of Gradio output components. final_results = [results_map[lang_code] for lang_code in LANG_INFO.keys()] gr.Info("Podcast generation complete!") return final_results except gr.Error as e: # Re-raise Gradio-specific errors to be displayed in UI raise e except Exception as e: # Catch other unexpected errors during the process # Log the full error for debugging purposes (e.g., to server logs) import traceback print("An unexpected error occurred in generate_podcast:") traceback.print_exc() # Show a generic error message in the UI raise gr.Error(f"An unexpected server error occurred. Details: {str(e)[:100]}...") # ------------------------------------------------------------------ # Gradio Interface Setup # ------------------------------------------------------------------ # Ensure choices and outputs maintain consistent order related to LANG_INFO language_names_ordered = [LANG_INFO[code]["name"] for code in LANG_INFO.keys()] inputs = [ gr.File(label="Upload Lecture PDF", file_types=[".pdf"]), gr.CheckboxGroup( choices=language_names_ordered, value=["English"], # Default language selection label="Select podcast language(s) to generate", ), ] # Create an gr.Audio output component for each language, in the defined order outputs = [ gr.Audio(label=f"{LANG_INFO[code]['name']} Podcast", type="filepath") for code in LANG_INFO.keys() ] iface = gr.Interface( fn=generate_podcast, inputs=inputs, outputs=outputs, title="Lecture → Podcast Generator (Multi-Language)", description=( "Upload a lecture PDF, choose language(s), and receive a two-host " "audio podcast for each selected language. Dialogue is generated by Qwen-32B, " "and speech is synthesized using open MMS-TTS models via the HF Inference API. " "Long texts are automatically chunked, and audio parts are robustly combined." ), allow_flagging="never", # Set to "auto" or "manual" if you want to enable flagging # Provide examples if you have sample PDFs accessible to the Gradio app # examples=[ # ["path/to/sample_lecture.pdf", ["English", "Chinese"]], # ] ) if __name__ == "__main__": # For local testing, ensure ffmpeg is installed and in PATH if pydub relies on it # for FLAC conversion or other operations not handled by its built-in capabilities. # The Hugging Face Inference API for MMS-TTS should ideally return FLAC directly # if the model specified (e.g., facebook/mms-tts-eng) outputs that format. iface.launch()