|
import os |
|
import re |
|
import tempfile |
|
import textwrap |
|
from pathlib import Path |
|
from typing import List, Dict, Optional |
|
|
|
import gradio as gr |
|
from huggingface_hub import InferenceClient |
|
from PyPDF2 import PdfReader |
|
from smolagents import HfApiModel |
|
from pydub import AudioSegment |
|
from pydub.exceptions import CouldntDecodeError |
|
|
|
|
|
|
|
|
|
llm = HfApiModel( |
|
model_id="Qwen/Qwen2.5-Coder-32B-Instruct", |
|
max_tokens=2048, |
|
temperature=0.5, |
|
) |
|
|
|
|
|
|
|
|
|
client = InferenceClient(token=os.getenv("HF_TOKEN", None)) |
|
|
|
|
|
|
|
|
|
LANG_INFO: Dict[str, Dict[str, str]] = { |
|
"en": {"name": "English", "tts_model": "facebook/mms-tts-eng"}, |
|
"bn": {"name": "Bangla", "tts_model": "facebook/mms-tts-ben"}, |
|
"zh": {"name": "Chinese", "tts_model": "facebook/mms-tts-zho"}, |
|
"ur": {"name": "Urdu", "tts_model": "facebook/mms-tts-urd"}, |
|
"ne": {"name": "Nepali", "tts_model": "facebook/mms-tts-npi"}, |
|
} |
|
LANG_CODE_BY_NAME = {info["name"]: code for code, info in LANG_INFO.items()} |
|
|
|
PROMPT_TEMPLATE = textwrap.dedent( |
|
""" |
|
You are producing a lively two-host educational podcast in {lang_name}. |
|
Summarize the following lecture content into a dialogue of ~300 words. |
|
Make it engaging: hosts ask questions, clarify ideas with analogies, and |
|
wrap up with a concise recap. Preserve technical accuracy. |
|
|
|
### Lecture Content |
|
{content} |
|
""" |
|
) |
|
|
|
TOKEN_LIMIT = 8000 |
|
CHUNK_CHAR_LIMIT = 280 |
|
|
|
|
|
|
|
|
|
def extract_pdf_text(pdf_path: str) -> str: |
|
try: |
|
reader = PdfReader(pdf_path) |
|
return "\n".join(page.extract_text() or "" for page in reader.pages) |
|
except Exception as e: |
|
raise gr.Error(f"Failed to process PDF: {e}") |
|
|
|
|
|
|
|
|
|
def truncate_text(text: str, limit: int = TOKEN_LIMIT) -> str: |
|
words = text.split() |
|
if len(words) > limit: |
|
return " ".join(words[:limit]) |
|
return text |
|
|
|
|
|
def _split_to_chunks(text: str, limit: int = CHUNK_CHAR_LIMIT) -> List[str]: |
|
sentences = [s.strip() for s in re.split(r"(?<=[.!?])\s+", text) if s.strip()] |
|
chunks, current = [], "" |
|
for sent in sentences: |
|
if current and len(current) + len(sent) + 1 > limit: |
|
chunks.append(current) |
|
current = sent |
|
else: |
|
current = f"{current} {sent}".strip() |
|
if current: |
|
chunks.append(current) |
|
return chunks |
|
|
|
|
|
def synthesize_speech(text: str, model_id: str, tempdir: Path) -> Path: |
|
chunks = _split_to_chunks(text) |
|
if not chunks: |
|
raise ValueError("No text chunks to synthesize.") |
|
|
|
segments = [] |
|
for i, chunk in enumerate(chunks): |
|
try: |
|
audio_bytes = client.text_to_speech(chunk, model=model_id) |
|
except HubHTTPError as e: |
|
raise RuntimeError(f"TTS error on chunk {i}: {e}") |
|
part = tempdir / f"seg_{i}.flac" |
|
part.write_bytes(audio_bytes) |
|
try: |
|
seg = AudioSegment.from_file(part, format="flac") |
|
except CouldntDecodeError as e: |
|
raise RuntimeError(f"Decode error on chunk {i}: {e}") |
|
segments.append(seg) |
|
|
|
combined = sum(segments, AudioSegment.empty()) |
|
outpath = tempdir / "podcast.flac" |
|
combined.export(outpath, format="flac") |
|
return outpath |
|
|
|
|
|
|
|
|
|
|
|
def generate_podcast(pdf_file: Optional[gr.File], languages: List[str]): |
|
if not pdf_file: |
|
raise gr.Error("Please upload a PDF file.") |
|
if not languages: |
|
raise gr.Error("Select at least one language.") |
|
|
|
|
|
text = extract_pdf_text(pdf_file.name) |
|
if not text.strip(): |
|
raise gr.Error("No text found in PDF.") |
|
lecture = truncate_text(text) |
|
|
|
transcripts, audios = [], [] |
|
with tempfile.TemporaryDirectory() as td: |
|
base = Path(td) |
|
for name in languages: |
|
code = LANG_CODE_BY_NAME[name] |
|
|
|
prompt = PROMPT_TEMPLATE.format(lang_name=name, content=lecture) |
|
dialogue = llm(prompt).strip() |
|
transcripts.append(dialogue) |
|
|
|
tempdir = base / code |
|
tempdir.mkdir(parents=True, exist_ok=True) |
|
audio_path = synthesize_speech(dialogue, LANG_INFO[code]["tts_model"], tempdir) |
|
audios.append(str(audio_path)) |
|
|
|
|
|
results: List = [] |
|
for t, a in zip(transcripts, audios): |
|
results.extend([t, a]) |
|
return results |
|
|
|
|
|
|
|
|
|
languages = [info["name"] for info in LANG_INFO.values()] |
|
|
|
inputs = [ |
|
gr.File(label="Lecture PDF", file_types=[".pdf"]), |
|
gr.CheckboxGroup(languages, value=["English"], label="Languages"), |
|
] |
|
|
|
|
|
outputs = [] |
|
for name in languages: |
|
outputs.append(gr.Textbox(label=f"{name} Transcript", interactive=False)) |
|
outputs.append(gr.Audio(label=f"{name} Podcast", type="filepath")) |
|
|
|
iface = gr.Interface( |
|
fn=generate_podcast, |
|
inputs=inputs, |
|
outputs=outputs, |
|
title="Lecture → Podcast Generator", |
|
description="Upload a lecture PDF, select languages, get dialogue transcript and audio podcast." |
|
) |
|
|
|
if __name__ == "__main__": |
|
iface.launch() |
|
|