|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
import re |
|
import tempfile |
|
import textwrap |
|
from pathlib import Path |
|
from typing import List, Dict, Optional, Any |
|
|
|
import gradio as gr |
|
from PyPDF2 import PdfReader |
|
from pydub import AudioSegment |
|
from pydub.exceptions import CouldntDecodeError |
|
|
|
|
|
try: |
|
import google.generativeai as genai |
|
from google.cloud import texttospeech |
|
except ImportError: |
|
raise ImportError( |
|
"Please install required Google libraries: " |
|
"pip install google-generativeai google-cloud-texttospeech" |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
LANG_INFO: Dict[str, Dict[str, str]] = { |
|
"en": {"name": "English", "tts_lang_code": "en-US"}, |
|
"bn": {"name": "Bangla", "tts_lang_code": "bn-IN"}, |
|
"zh": {"name": "Chinese (Mandarin)", "tts_lang_code": "cmn-CN"}, |
|
"ur": {"name": "Urdu", "tts_lang_code": "ur-PK"}, |
|
"ne": {"name": "Nepali", "tts_lang_code": "ne-NP"}, |
|
} |
|
LANG_CODE_BY_NAME = {info["name"]: code for code, info in LANG_INFO.items()} |
|
|
|
|
|
|
|
|
|
PROMPT_TEMPLATE = textwrap.dedent( |
|
""" |
|
You are producing a lively two-host educational podcast in {lang_name}. |
|
Summarize the following lecture content into a dialogue of **approximately 300 words**. |
|
Make it engaging: hosts ask questions, clarify ideas with analogies, and |
|
wrap up with a concise recap. Preserve technical accuracy. Use Markdown for host names (e.g., **Host 1:**). |
|
|
|
### Lecture Content |
|
{content} |
|
""" |
|
) |
|
|
|
|
|
def extract_pdf_text(pdf_path: str) -> str: |
|
try: |
|
reader = PdfReader(pdf_path) |
|
return "\n".join(page.extract_text() or "" for page in reader.pages) |
|
except Exception as e: |
|
raise gr.Error(f"Failed to process PDF: {e}") |
|
|
|
TOKEN_LIMIT = 8000 |
|
|
|
def truncate_text(text: str, limit: int = TOKEN_LIMIT) -> str: |
|
words = text.split() |
|
if len(words) > limit: |
|
gr.Warning(f"Input text was truncated from {len(words)} to {limit} words to fit LLM context window.") |
|
return " ".join(words[:limit]) |
|
return text |
|
|
|
|
|
|
|
|
|
CHUNK_CHAR_LIMIT = 1500 |
|
|
|
|
|
def _split_to_chunks(text: str, limit: int = CHUNK_CHAR_LIMIT) -> List[str]: |
|
sentences_raw = re.split(r"(?<=[.!?])\s+", text.strip()) |
|
sentences = [s.strip() for s in sentences_raw if s.strip()] |
|
if not sentences: return [] |
|
chunks, current_chunk = [], "" |
|
for sent in sentences: |
|
if current_chunk and (len(current_chunk) + len(sent) + 1 > limit): |
|
chunks.append(current_chunk) |
|
current_chunk = sent |
|
else: |
|
current_chunk += (" " + sent) if current_chunk else sent |
|
if current_chunk: chunks.append(current_chunk) |
|
return [chunk for chunk in chunks if chunk.strip()] |
|
|
|
|
|
def synthesize_speech_google( |
|
text: str, |
|
google_lang_code: str, |
|
lang_tmpdir: Path, |
|
tts_client: texttospeech.TextToSpeechClient |
|
) -> Path: |
|
"""Splits text, synthesizes with Google TTS, concatenates MP3s.""" |
|
chunks = _split_to_chunks(text) |
|
if not chunks: |
|
raise ValueError("Text resulted in no speakable chunks after splitting.") |
|
|
|
audio_segments: List[AudioSegment] = [] |
|
for idx, chunk in enumerate(chunks): |
|
gr.Info(f"Synthesizing audio for chunk {idx + 1}/{len(chunks)} with Google TTS...") |
|
|
|
synthesis_input = texttospeech.SynthesisInput(text=chunk) |
|
voice = texttospeech.VoiceSelectionParams( |
|
language_code=google_lang_code, |
|
|
|
|
|
) |
|
audio_config = texttospeech.AudioConfig( |
|
audio_encoding=texttospeech.AudioEncoding.MP3 |
|
) |
|
|
|
try: |
|
response = tts_client.synthesize_speech( |
|
input=synthesis_input, voice=voice, audio_config=audio_config |
|
) |
|
except Exception as e: |
|
raise RuntimeError(f"Google TTS request failed for chunk {idx+1}: {e}") from e |
|
|
|
part_path = lang_tmpdir / f"part_{idx}.mp3" |
|
with open(part_path, "wb") as out_mp3: |
|
out_mp3.write(response.audio_content) |
|
|
|
try: |
|
segment = AudioSegment.from_mp3(part_path) |
|
audio_segments.append(segment) |
|
except CouldntDecodeError as e: |
|
raise RuntimeError(f"Failed to decode MP3 audio chunk {idx+1} from {part_path}. Error: {e}") from e |
|
|
|
if not audio_segments: |
|
raise RuntimeError("No audio segments were successfully synthesized or decoded.") |
|
|
|
combined_audio = sum(audio_segments, AudioSegment.empty()) |
|
final_path = lang_tmpdir / "podcast_audio.mp3" |
|
combined_audio.export(final_path, format="mp3") |
|
return final_path |
|
|
|
|
|
|
|
|
|
|
|
def generate_podcast( |
|
gemini_api_key: Optional[str], |
|
pdf_file_obj: Optional[gr.File], |
|
selected_lang_names: List[str] |
|
) -> List[Optional[Any]]: |
|
|
|
if not gemini_api_key: |
|
raise gr.Error("Please enter your Google AI Studio API Key for Gemini.") |
|
if not pdf_file_obj: |
|
raise gr.Error("Please upload a PDF file.") |
|
if not selected_lang_names: |
|
raise gr.Error("Please select at least one language for the podcast.") |
|
|
|
try: |
|
genai.configure(api_key=gemini_api_key) |
|
except Exception as e: |
|
raise gr.Error(f"Failed to configure Gemini API. Check your API key. Error: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
google_creds_json_content = os.getenv("GOOGLE_CREDS_JSON_CONTENT") |
|
temp_creds_file = None |
|
if google_creds_json_content: |
|
try: |
|
fd, temp_creds_path = tempfile.mkstemp(suffix=".json") |
|
with os.fdopen(fd, "w") as tmp: |
|
tmp.write(google_creds_json_content) |
|
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = temp_creds_path |
|
temp_creds_file = Path(temp_creds_path) |
|
gr.Info("Using GOOGLE_CREDS_JSON_CONTENT secret for Text-to-Speech API authentication.") |
|
except Exception as e: |
|
gr.Warning(f"Could not process GOOGLE_CREDS_JSON_CONTENT secret: {e}. TTS might fail.") |
|
elif not os.getenv("GOOGLE_APPLICATION_CREDENTIALS"): |
|
gr.Warning( |
|
"GOOGLE_APPLICATION_CREDENTIALS environment variable not set, and no " |
|
"GOOGLE_CREDS_JSON_CONTENT secret found. " |
|
"Google Text-to-Speech API calls may fail. " |
|
"Please set up authentication for Google Cloud Text-to-Speech." |
|
) |
|
|
|
try: |
|
tts_client = texttospeech.TextToSpeechClient() |
|
except Exception as e: |
|
raise gr.Error(f"Failed to initialize Google Text-to-Speech client. Ensure authentication is set up. Error: {e}") |
|
|
|
|
|
selected_codes = [LANG_CODE_BY_NAME[name] for name in selected_lang_names] |
|
results_data: Dict[str, Dict[str, Optional[str]]] = { |
|
code: {"audio": None, "script_text": None, "script_file": None} |
|
for code in LANG_INFO.keys() |
|
} |
|
|
|
try: |
|
with tempfile.TemporaryDirectory() as td: |
|
tmpdir_base = Path(td) |
|
|
|
gr.Info("Extracting text from PDF...") |
|
lecture_raw = extract_pdf_text(pdf_file_obj.name) |
|
lecture_text = truncate_text(lecture_raw) |
|
|
|
if not lecture_text.strip(): |
|
raise gr.Error("Could not extract any text from the PDF, or the PDF content is empty.") |
|
|
|
|
|
|
|
gemini_model = genai.GenerativeModel('gemini-1.5-flash-latest') |
|
|
|
for code in selected_codes: |
|
info = LANG_INFO[code] |
|
lang_name = info["name"] |
|
google_tts_lang = info["tts_lang_code"] |
|
|
|
gr.Info(f"Processing for {lang_name}...") |
|
lang_tmpdir = tmpdir_base / code |
|
lang_tmpdir.mkdir(parents=True, exist_ok=True) |
|
|
|
dialogue: Optional[str] = None |
|
|
|
gr.Info(f"Generating dialogue for {lang_name} with Gemini...") |
|
prompt_for_gemini = PROMPT_TEMPLATE.format(lang_name=lang_name, content=lecture_text) |
|
try: |
|
response = gemini_model.generate_content(prompt_for_gemini) |
|
dialogue_raw = response.text |
|
|
|
if not dialogue_raw or not dialogue_raw.strip(): |
|
gr.Warning(f"Gemini returned empty dialogue for {lang_name}. Skipping.") |
|
continue |
|
|
|
dialogue = dialogue_raw |
|
results_data[code]["script_text"] = dialogue |
|
script_file_path = lang_tmpdir / f"podcast_script_{code}.txt" |
|
script_file_path.write_text(dialogue, encoding="utf-8") |
|
results_data[code]["script_file"] = str(script_file_path) |
|
|
|
except Exception as e: |
|
gr.Error(f"Error generating dialogue with Gemini for {lang_name}: {e}") |
|
continue |
|
|
|
if dialogue: |
|
gr.Info(f"Synthesizing speech for {lang_name} with Google TTS...") |
|
try: |
|
tts_path = synthesize_speech_google(dialogue, google_tts_lang, lang_tmpdir, tts_client) |
|
results_data[code]["audio"] = str(tts_path) |
|
except ValueError as e: |
|
gr.Warning(f"Could not synthesize speech for {lang_name} (ValueError): {e}") |
|
except RuntimeError as e: |
|
gr.Error(f"Error synthesizing speech for {lang_name} (RuntimeError): {e}") |
|
except Exception as e: |
|
gr.Error(f"Unexpected error during speech synthesis for {lang_name}: {e}") |
|
|
|
final_ordered_results: List[Optional[Any]] = [] |
|
for code_key in LANG_INFO.keys(): |
|
lang_output_data = results_data[code_key] |
|
final_ordered_results.append(lang_output_data["audio"]) |
|
final_ordered_results.append(lang_output_data["script_text"]) |
|
final_ordered_results.append(lang_output_data["script_file"]) |
|
|
|
gr.Info("Podcast generation complete!") |
|
return final_ordered_results |
|
|
|
except gr.Error as e: |
|
raise e |
|
except Exception as e: |
|
import traceback |
|
print("An unexpected error occurred in generate_podcast:") |
|
traceback.print_exc() |
|
raise gr.Error(f"An unexpected server error occurred. Details: {str(e)[:100]}...") |
|
finally: |
|
|
|
if temp_creds_file and temp_creds_file.exists(): |
|
try: |
|
temp_creds_file.unlink() |
|
|
|
|
|
|
|
except Exception as e_clean: |
|
print(f"Warning: Could not clean up temporary credentials file {temp_creds_file}: {e_clean}") |
|
|
|
|
|
|
|
|
|
|
|
language_names_ordered = [LANG_INFO[code]["name"] for code in LANG_INFO.keys()] |
|
|
|
inputs = [ |
|
gr.Textbox( |
|
label="Enter your Google AI Studio API Key (for Gemini)", |
|
type="password", |
|
placeholder="Paste your API key here", |
|
), |
|
gr.File(label="Upload Lecture PDF", file_types=[".pdf"]), |
|
gr.CheckboxGroup( |
|
choices=language_names_ordered, |
|
value=["English"], |
|
label="Select podcast language(s) to generate", |
|
), |
|
] |
|
|
|
outputs = [] |
|
for code in LANG_INFO.keys(): |
|
info = LANG_INFO[code] |
|
lang_name = info["name"] |
|
outputs.append(gr.Audio(label=f"{lang_name} Podcast (.mp3)", type="filepath")) |
|
outputs.append(gr.Markdown(label=f"{lang_name} Script")) |
|
outputs.append(gr.File(label=f"Download {lang_name} Script (.txt)", type="filepath")) |
|
|
|
iface = gr.Interface( |
|
fn=generate_podcast, |
|
inputs=inputs, |
|
outputs=outputs, |
|
title="Lecture β Podcast & Script (Google Gemini & TTS)", |
|
description=( |
|
"**IMPORTANT SETUP:**\n" |
|
"1. Enter your Google AI Studio API Key for Gemini text generation.\n" |
|
"2. For Text-to-Speech: Enable the 'Cloud Text-to-Speech API' in your Google Cloud Project. " |
|
"Create a service account with 'Cloud Text-to-Speech API User' role, download its JSON key. " |
|
"In this Hugging Face Space, go to 'Settings' -> 'Secrets' and add a new secret named `GOOGLE_CREDS_JSON_CONTENT`. " |
|
"Paste the *entire content* of your service account JSON key file as the value for this secret.\n\n" |
|
"Upload a lecture PDF, choose language(s), and receive an audio podcast " |
|
"and its script. Dialogue by Google Gemini, speech by Google Cloud TTS." |
|
), |
|
allow_flagging="never", |
|
) |
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
iface.launch() |