Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| from pydub import AudioSegment | |
| import json | |
| import uuid | |
| import edge_tts | |
| import asyncio | |
| import aiofiles | |
| import os | |
| import time | |
| import mimetypes | |
| import torch | |
| import re | |
| from typing import List, Dict, Optional | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, GenerationConfig | |
| import PyPDF2 | |
| import io | |
| import traceback | |
| #from git import Repo | |
| #Repo.clone_from("https://huggingface.co/unsloth/Llama-3.2-3B-bnb-4bit", "./local_model_dir") | |
| # Constants | |
| MAX_FILE_SIZE_MB = 20 | |
| MAX_FILE_SIZE_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024 | |
| MODEL_ID = "unsloth/Qwen2.5-1.5B" #unsloth/Llama-3.2-3B" #unsloth/Llama-3.2-1B" | |
| # Global logging system - CRITICAL FIX #1 | |
| logs = [] | |
| def add_log(message): | |
| """Thread-safe logging function""" | |
| logs.append(f"[{time.strftime('%H:%M:%S')}] {message}") | |
| print(message) | |
| # Initialize model with comprehensive error handling - CRITICAL FIX #2 | |
| model = None | |
| tokenizer = None | |
| generation_config = None | |
| def test_llm_generation(): | |
| try: | |
| test_prompt = "Hello, how are you today?" | |
| inputs = tokenizer(test_prompt, return_tensors="pt").to(model.device) | |
| with torch.no_grad(): | |
| outputs = model.generate( | |
| **inputs, | |
| max_new_tokens=10, | |
| do_sample=False, | |
| pad_token_id=tokenizer.pad_token_id, | |
| eos_token_id=tokenizer.eos_token_id | |
| ) | |
| result = tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| add_log(f"π§ͺ Test LLM response: {result[:100]}") | |
| except Exception as e: | |
| add_log(f"β LLM quick test failed: {e}") | |
| def initialize_model(): | |
| """Separate model initialization with better error handling""" | |
| global model, tokenizer, generation_config | |
| try: | |
| add_log("π Initializing model...") | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| MODEL_ID, | |
| trust_remote_code=True, | |
| use_fast=False # Sometimes fast tokenizers cause issues | |
| ) | |
| # Ensure proper padding token | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| add_log("β Set pad_token to eos_token") | |
| # Load model with proper device management | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| torch_dtype = torch.float16 if torch.cuda.is_available() else torch.float32 | |
| model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_ID, | |
| torch_dtype=torch_dtype, | |
| device_map="auto" if torch.cuda.is_available() else None, | |
| trust_remote_code=True, | |
| low_cpu_mem_usage=True | |
| ) | |
| if not torch.cuda.is_available(): | |
| model = model.to(device) | |
| model.eval() | |
| # Configure generation parameters | |
| generation_config = GenerationConfig( | |
| max_new_tokens=4095, # Reduced for stability | |
| temperature=0.7, | |
| top_p=0.9, | |
| do_sample=True, | |
| pad_token_id=tokenizer.pad_token_id, | |
| eos_token_id=tokenizer.eos_token_id, | |
| repetition_penalty=1.1, | |
| length_penalty=1.0 | |
| ) | |
| add_log(f"β Model loaded successfully on device: {model.device}") | |
| return True | |
| except Exception as e: | |
| error_msg = f"β Model initialization failed: {str(e)}" | |
| add_log(error_msg) | |
| add_log(f"Traceback: {traceback.format_exc()}") | |
| return False | |
| # Initialize model at startup | |
| model_loaded = initialize_model() | |
| class PodcastGenerator: | |
| def __init__(self): | |
| self.model = model | |
| self.tokenizer = tokenizer | |
| self.generation_config = generation_config | |
| def extract_text_from_pdf(self, file_path: str) -> str: | |
| """Extract text from PDF file - CRITICAL FIX #3""" | |
| try: | |
| add_log(f"π Extracting text from PDF: {file_path}") | |
| with open(file_path, 'rb') as file: | |
| pdf_reader = PyPDF2.PdfReader(file) | |
| text = "" | |
| for page_num, page in enumerate(pdf_reader.pages): | |
| try: | |
| page_text = page.extract_text() | |
| text += page_text + "\n" | |
| add_log(f"β Extracted page {page_num + 1}") | |
| except Exception as e: | |
| add_log(f"β οΈ Failed to extract page {page_num + 1}: {e}") | |
| continue | |
| if not text.strip(): | |
| raise Exception("No text could be extracted from PDF") | |
| add_log(f"β PDF extraction complete. Text length: {len(text)} characters") | |
| return text.strip() | |
| except Exception as e: | |
| error_msg = f"β PDF extraction failed: {str(e)}" | |
| add_log(error_msg) | |
| raise Exception(error_msg) | |
| def clean_and_validate_json(self, text: str) -> Dict: | |
| """Improved JSON extraction and validation - CRITICAL FIX #4""" | |
| add_log("π Attempting to extract JSON from generated text") | |
| # Multiple strategies for JSON extraction | |
| strategies = [ | |
| # Strategy 1: Look for complete JSON objects | |
| r'\{[^{}]*"topic"[^{}]*"podcast"[^{}]*\[[^\]]*\][^{}]*\}', | |
| # Strategy 2: More flexible pattern | |
| r'\{.*?"topic".*?"podcast".*?\[.*?\].*?\}', | |
| # Strategy 3: Extract content between first { and last } | |
| r'\{.*\}' | |
| ] | |
| for i, pattern in enumerate(strategies): | |
| add_log(f"π― Trying extraction strategy {i+1}") | |
| matches = re.findall(pattern, text, re.DOTALL | re.IGNORECASE) | |
| for match in matches: | |
| try: | |
| # Clean the match | |
| cleaned = match.strip() | |
| # Fix common JSON issues | |
| cleaned = re.sub(r',\s*}', '}', cleaned) # Remove trailing commas | |
| cleaned = re.sub(r',\s*]', ']', cleaned) # Remove trailing commas in arrays | |
| parsed = json.loads(cleaned) | |
| # Validate structure | |
| if self.validate_podcast_structure(parsed): | |
| add_log("β Valid JSON structure found") | |
| return parsed | |
| except json.JSONDecodeError as e: | |
| add_log(f"β οΈ JSON parse error in strategy {i+1}: {e}") | |
| continue | |
| add_log("β οΈ No valid JSON found, creating fallback") | |
| return self.create_fallback_podcast(text) | |
| def validate_podcast_structure(self, data: Dict) -> bool: | |
| """Validate podcast JSON structure""" | |
| try: | |
| if not isinstance(data, dict): | |
| return False | |
| if 'topic' not in data or 'podcast' not in data: | |
| return False | |
| if not isinstance(data['podcast'], list): | |
| return False | |
| for item in data['podcast']: | |
| if not isinstance(item, dict): | |
| return False | |
| if 'speaker' not in item or 'line' not in item: | |
| return False | |
| if not isinstance(item['speaker'], int) or item['speaker'] not in [1, 2]: | |
| return False | |
| if not isinstance(item['line'], str) or len(item['line'].strip()) == 0: | |
| return False | |
| return len(data['podcast']) > 0 | |
| except Exception: | |
| return False | |
| def create_fallback_podcast(self, text: str) -> Dict: | |
| """Create fallback podcast structure - IMPROVED""" | |
| add_log("π§ Creating fallback podcast structure") | |
| # Extract meaningful content from the original text | |
| sentences = [s.strip() for s in text.split('.') if len(s.strip()) > 20] | |
| if not sentences: | |
| add_log("π§ failed sentences creating, fallback standard text") | |
| sentences = [ | |
| "Welcome to our podcast discussion", | |
| "Today we're exploring an interesting topic", | |
| "Let's dive into the key points", | |
| "That's a fascinating perspective", | |
| "What are your thoughts on this matter", | |
| "I think there are multiple angles to consider", | |
| "This is definitely worth exploring further", | |
| "Thank you for this engaging conversation" | |
| ] | |
| # Create balanced conversation | |
| podcast_lines = [] | |
| for i, sentence in enumerate(sentences[:12]): # Limit to 12 exchanges | |
| speaker = (i % 2) + 1 | |
| line = sentence + "." if not sentence.endswith('.') else sentence | |
| podcast_lines.append({ | |
| "speaker": speaker, | |
| "line": line | |
| }) | |
| result = { | |
| "topic": "Generated Discussion", | |
| "podcast": podcast_lines | |
| } | |
| add_log(f"β Fallback podcast created with {len(podcast_lines)} lines") | |
| return result | |
| async def generate_script(self, prompt: str, language: str, file_obj=None, progress=None) -> Dict: | |
| """Improved script generation with better error handling""" | |
| if not model_loaded or not self.model or not self.tokenizer: | |
| raise Exception("β Model not properly initialized. Please restart the application.") | |
| add_log("π¬ Starting script generation") | |
| # Process file if provided - CRITICAL FIX #5 | |
| if file_obj is not None: | |
| try: | |
| add_log(f"π Processing uploaded file: {file_obj}") | |
| if file_obj.endswith('.pdf'): | |
| extracted_text = self.extract_text_from_pdf(file_obj) | |
| # Truncate if too long | |
| if len(extracted_text) > 2000: | |
| extracted_text = extracted_text[:2000] + "..." | |
| add_log("βοΈ Text truncated to 2000 characters") | |
| prompt = extracted_text | |
| elif file_obj.endswith('.txt'): | |
| with open(file_obj, 'r', encoding='utf-8') as f: | |
| file_content = f.read() | |
| if len(file_content) > 2000: | |
| file_content = file_content[:2000] + "..." | |
| prompt = file_content | |
| except Exception as e: | |
| add_log(f"β οΈ File processing error: {e}") | |
| # Continue with original prompt | |
| # Create focused prompt - CRITICAL FIX #6 | |
| example_json = { | |
| "topic": "AI Technology", | |
| "podcast": [ | |
| {"speaker": 1, "line": "Welcome to our discussion about AI technology."}, | |
| {"speaker": 2, "line": "Thanks for having me. This is such an exciting field."}, | |
| {"speaker": 1, "line": "What aspects of AI do you find most interesting?"}, | |
| {"speaker": 2, "line": "I'm particularly fascinated by machine learning applications."} | |
| ] | |
| } | |
| language_instruction = f"Generate in {language}" if language != "Auto Detect" else "Use appropriate language" | |
| # Simplified and more reliable prompt | |
| system_prompt = f"""Create a podcast script in valid JSON format. | |
| Requirements: | |
| - Exactly 2 speakers (speaker 1 and 2) | |
| - The podcast should be long, focusing on the input text | |
| - Do not use names for the speakers. | |
| - DO NOT copy the example below , only use it as conversation reference | |
| - The podcast should be professional, in-depth, lively, witty and engaging, and hook the listener from the start. | |
| - The input text might be disorganized or unformatted. Ignore any formatting inconsistencies or irrelevant details; your task is to distill the essential points, identify key definitions, and highlight intriguing facts that would be suitable for discussion in a podcast. | |
| - The script must be in JSON format. | |
| - {language_instruction} | |
| """ | |
| #Example JSON structure: | |
| #{json.dumps(example_json, indent=2)} | |
| user_prompt = user_prompt = f"\nInput Text:\n{prompt}\n\nJSON:"# f"\nTopic: {prompt}\nJSON:" | |
| full_prompt = system_prompt + user_prompt | |
| add_log("π Prompt Preview:\n" + full_prompt[:2000]) | |
| try: | |
| if progress: | |
| progress(0.3, "π€ Generating script...") | |
| add_log("π€ Tokenizing input...") | |
| # Tokenize with proper handling | |
| inputs = self.tokenizer( | |
| full_prompt, | |
| return_tensors="pt", | |
| padding=True, | |
| truncation=True, | |
| max_length=1200, # Reduced for stability | |
| return_attention_mask=True | |
| ) | |
| # Move to correct device | |
| inputs = {k: v.to(self.model.device) for k, v in inputs.items()} | |
| add_log(f"β Inputs moved to device: {self.model.device}") | |
| add_log("π§ Generating with model...") | |
| # Generate with timeout and better parameters | |
| with torch.no_grad(): | |
| torch.cuda.empty_cache() if torch.cuda.is_available() else None | |
| outputs = self.model.generate( | |
| **inputs, | |
| generation_config=self.generation_config, | |
| pad_token_id=self.tokenizer.pad_token_id, | |
| # attention_mask=inputs.get('attention_mask'), | |
| use_cache=True | |
| ) | |
| add_log("β Model generation complete") | |
| # Decode only new tokens | |
| generated_text = self.tokenizer.decode( | |
| outputs[0][inputs['input_ids'].shape[1]:], | |
| skip_special_tokens=True, | |
| clean_up_tokenization_spaces=True | |
| ) | |
| add_log(f"π Generated text length: {len(generated_text)} characters") | |
| add_log(f"π Generated text preview: {generated_text[:2000]}...") | |
| if progress: | |
| progress(0.4, "π Processing generated script...") | |
| # Extract and validate JSON | |
| result = self.clean_and_validate_json(generated_text) | |
| if progress: | |
| progress(0.5, "β Script generated successfully!") | |
| add_log(f"π Full generated text:\n{generated_text}") | |
| add_log(f"β Final script has {len(result.get('podcast', []))} lines") | |
| return result | |
| except Exception as e: | |
| error_msg = f"β Script generation error: {str(e)}" | |
| add_log(error_msg) | |
| add_log(f"π failed script creation") | |
| add_log(f"π Traceback: {traceback.format_exc()}") | |
| # Return robust fallback | |
| return self.create_fallback_podcast("Welcome to our podcast") | |
| async def tts_generate(self, text: str, speaker: int, speaker1: str, speaker2: str) -> str: | |
| """Improved TTS generation with better error handling - CRITICAL FIX #7""" | |
| voice = speaker1 if speaker == 1 else speaker2 | |
| add_log(f"ποΈ Generating TTS for speaker {speaker} with voice {voice}") | |
| # Clean text for TTS | |
| text = text.strip() | |
| if not text: | |
| raise Exception("Empty text for TTS") | |
| # Remove problematic characters | |
| text = re.sub(r'[^\w\s.,!?;:\-\'"()]', '', text) | |
| temp_filename = f"temp_audio_{uuid.uuid4().hex[:8]}.wav" | |
| max_retries = 3 | |
| for attempt in range(max_retries): | |
| try: | |
| add_log(f"π΅ TTS attempt {attempt + 1} for: {text[:50]}...") | |
| communicate = edge_tts.Communicate(text, voice) | |
| # Use asyncio.wait_for with timeout | |
| await asyncio.wait_for( | |
| communicate.save(temp_filename), | |
| timeout=30.0 | |
| ) | |
| # Verify file was created and has content | |
| if os.path.exists(temp_filename) and os.path.getsize(temp_filename) > 1000: | |
| add_log(f"β TTS successful: {os.path.getsize(temp_filename)} bytes") | |
| return temp_filename | |
| else: | |
| raise Exception("Generated audio file is too small or empty") | |
| except asyncio.TimeoutError: | |
| add_log(f"β° TTS timeout on attempt {attempt + 1}") | |
| if os.path.exists(temp_filename): | |
| os.remove(temp_filename) | |
| if attempt == max_retries - 1: | |
| raise Exception("TTS generation timed out after multiple attempts") | |
| await asyncio.sleep(2) | |
| except Exception as e: | |
| add_log(f"β TTS error on attempt {attempt + 1}: {str(e)}") | |
| if os.path.exists(temp_filename): | |
| os.remove(temp_filename) | |
| if attempt == max_retries - 1: | |
| raise Exception(f"TTS generation failed after {max_retries} attempts: {str(e)}") | |
| await asyncio.sleep(2) | |
| async def combine_audio_files(self, audio_files: List[str], progress=None) -> str: | |
| """Improved audio combination - CRITICAL FIX #8""" | |
| if progress: | |
| progress(0.9, "π΅ Combining audio files...") | |
| add_log(f"π Combining {len(audio_files)} audio files") | |
| try: | |
| combined_audio = AudioSegment.empty() | |
| silence_padding = AudioSegment.silent(duration=800) # 800ms silence | |
| for i, audio_file in enumerate(audio_files): | |
| try: | |
| add_log(f"π Processing audio file {i+1}: {audio_file}") | |
| if not os.path.exists(audio_file): | |
| add_log(f"β οΈ Audio file not found: {audio_file}") | |
| continue | |
| file_size = os.path.getsize(audio_file) | |
| add_log(f"π File size: {file_size} bytes") | |
| if file_size < 2000: | |
| add_log(f"β οΈ 1 Audio file too small, skipping: {audio_file}") | |
| continue | |
| audio_segment = AudioSegment.from_file(audio_file) | |
| if len(audio_segment) < 500: # Less than 100ms | |
| add_log(f"β οΈ 2 Audio segment too short, skipping") | |
| continue | |
| combined_audio += audio_segment | |
| # Add silence between speakers (except for the last file) | |
| if i < len(audio_files) - 1: | |
| combined_audio += silence_padding | |
| add_log(f"β Added audio segment {i+1}, total duration: {len(combined_audio)}ms") | |
| except Exception as e: | |
| add_log(f"β οΈ Could not process audio file {audio_file}: {e}") | |
| continue | |
| finally: | |
| # Clean up temporary file | |
| try: | |
| if os.path.exists(audio_file): | |
| os.remove(audio_file) | |
| add_log(f"ποΈ Cleaned up temp file: {audio_file}") | |
| except: | |
| pass | |
| if len(combined_audio) == 0: | |
| raise Exception("No valid audio content was generated") | |
| if len(combined_audio) < 5000: # Less than 5 seconds | |
| raise Exception("3 Combined audio is too short") | |
| output_filename = f"podcast_output_{uuid.uuid4().hex[:8]}.wav" | |
| combined_audio.export(output_filename, format="wav") | |
| file_size = os.path.getsize(output_filename) | |
| duration = len(combined_audio) / 1000 # Duration in seconds | |
| add_log(f"β Final podcast: {output_filename} ({file_size} bytes, {duration:.1f}s)") | |
| if progress: | |
| progress(1.0, "π Podcast generated successfully!") | |
| return output_filename | |
| except Exception as e: | |
| error_msg = f"β Audio combination failed: {str(e)}" | |
| add_log(error_msg) | |
| # Clean up any remaining temp files | |
| for audio_file in audio_files: | |
| try: | |
| if os.path.exists(audio_file): | |
| os.remove(audio_file) | |
| except: | |
| pass | |
| raise Exception(error_msg) | |
| async def generate_podcast(self, input_text: str, language: str, speaker1: str, speaker2: str, file_obj=None, progress=None) -> str: | |
| """Main podcast generation pipeline - CRITICAL FIX #9""" | |
| start_time = time.time() | |
| add_log("π¬ Starting podcast generation pipeline") | |
| try: | |
| if progress: | |
| progress(0.1, "π Starting podcast generation...") | |
| # Generate script | |
| add_log("π Generating podcast script...") | |
| podcast_json = await self.generate_script(input_text, language, file_obj, progress) | |
| if not podcast_json.get('podcast') or len(podcast_json['podcast']) == 0: | |
| raise Exception("No podcast content was generated") | |
| add_log(f"β Script generated with {len(podcast_json['podcast'])} dialogue lines") | |
| if progress: | |
| progress(0.5, "ποΈ Converting text to speech...") | |
| # Generate TTS with proper error handling | |
| audio_files = [] | |
| total_lines = len(podcast_json['podcast']) | |
| successful_lines = 0 | |
| for i, item in enumerate(podcast_json['podcast']): | |
| try: | |
| add_log(f"π΅ Processing line {i+1}/{total_lines}: Speaker {item['speaker']}") | |
| audio_file = await self.tts_generate( | |
| item['line'], | |
| item['speaker'], | |
| speaker1, | |
| speaker2 | |
| ) | |
| audio_files.append(audio_file) | |
| successful_lines += 1 | |
| # Update progress | |
| if progress: | |
| current_progress = 0.5 + (0.4 * (i + 1) / total_lines) | |
| progress(current_progress, f"ποΈ Generated speech {successful_lines}/{total_lines}") | |
| except Exception as e: | |
| add_log(f"β TTS failed for line {i+1}: {e}") | |
| # Continue with remaining lines rather than failing completely | |
| continue | |
| if not audio_files: | |
| raise Exception("No audio files were generated successfully") | |
| if successful_lines < len(podcast_json['podcast']) / 2: | |
| add_log(f"β οΈ Warning: Only {successful_lines}/{total_lines} lines processed successfully") | |
| add_log(f"β TTS generation complete: {len(audio_files)} audio files") | |
| # Combine audio files | |
| combined_audio = await self.combine_audio_files(audio_files, progress) | |
| elapsed_time = time.time() - start_time | |
| add_log(f"π Podcast generation completed in {elapsed_time:.1f} seconds") | |
| return combined_audio | |
| except Exception as e: | |
| elapsed_time = time.time() - start_time | |
| error_msg = f"β Podcast generation failed after {elapsed_time:.1f}s: {str(e)}" | |
| add_log(error_msg) | |
| add_log(f"π Full traceback: {traceback.format_exc()}") | |
| raise Exception(error_msg) | |
| # Voice mapping | |
| VOICE_MAPPING = { | |
| "Andrew - English (United States)": "en-US-AndrewMultilingualNeural", | |
| "Ava - English (United States)": "en-US-AvaMultilingualNeural", | |
| "Brian - English (United States)": "en-US-BrianMultilingualNeural", | |
| "Emma - English (United States)": "en-US-EmmaMultilingualNeural", | |
| "Florian - German (Germany)": "de-DE-FlorianMultilingualNeural", | |
| "Seraphina - German (Germany)": "de-DE-SeraphinaMultilingualNeural", | |
| "Remy - French (France)": "fr-FR-RemyMultilingualNeural", | |
| "Vivienne - French (France)": "fr-FR-VivienneMultilingualNeural" | |
| } | |
| async def process_input(input_text: str, input_file, language: str, speaker1: str, speaker2: str, progress=None) -> str: | |
| """Process input and generate podcast - MAIN ENTRY POINT""" | |
| add_log("=" * 50) | |
| add_log("π¬ NEW PODCAST GENERATION REQUEST") | |
| add_log("=" * 50) | |
| try: | |
| if progress: | |
| progress(0.05, "π Processing input...") | |
| # Map speaker names to voice IDs | |
| speaker1_voice = VOICE_MAPPING.get(speaker1, "en-US-AndrewMultilingualNeural") | |
| speaker2_voice = VOICE_MAPPING.get(speaker2, "en-US-AvaMultilingualNeural") | |
| add_log(f"π Speaker 1: {speaker1} -> {speaker1_voice}") | |
| add_log(f"π Speaker 2: {speaker2} -> {speaker2_voice}") | |
| # Validate input | |
| if not input_text or input_text.strip() == "": | |
| if input_file is None: | |
| raise Exception("β Please provide either text input or upload a file") | |
| add_log("π No text input provided, will process uploaded file") | |
| else: | |
| add_log(f"π Text input provided: {len(input_text)} characters") | |
| if input_file: | |
| add_log(f"π File uploaded: {input_file}") | |
| # Check model status | |
| if not model_loaded: | |
| raise Exception("β Model not loaded. Please restart the application.") | |
| podcast_generator = PodcastGenerator() | |
| result = await podcast_generator.generate_podcast( | |
| input_text, language, speaker1_voice, speaker2_voice, input_file, progress | |
| ) | |
| add_log("π PODCAST GENERATION COMPLETED SUCCESSFULLY") | |
| return result | |
| except Exception as e: | |
| error_msg = f"β CRITICAL ERROR: {str(e)}" | |
| add_log(error_msg) | |
| add_log(f"π Traceback: {traceback.format_exc()}") | |
| raise Exception(error_msg) | |
| def generate_podcast_gradio(input_text, input_file, language, speaker1, speaker2): | |
| """Gradio interface function - CRITICAL FIX #10""" | |
| global logs | |
| logs = [] # Reset logs for each generation | |
| try: | |
| add_log("π¬ Gradio function called") | |
| add_log(f"π Parameters: text={bool(input_text)}, file={bool(input_file)}, lang={language}") | |
| # Validate inputs | |
| if not input_text and input_file is None: | |
| add_log("β No input provided") | |
| return None, "\n".join(logs) | |
| if input_text and len(input_text.strip()) == 0: | |
| input_text = None | |
| # Progress tracking | |
| def progress_callback(value, text): | |
| add_log(f"π Progress: {value:.1%} - {text}") | |
| # Create new event loop for this request - CRITICAL FIX | |
| try: | |
| # Try to get existing loop | |
| try: | |
| loop = asyncio.get_running_loop() | |
| except RuntimeError: | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| if loop.is_running(): | |
| # If loop is running, we need to run in thread | |
| import concurrent.futures | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| future = executor.submit( | |
| lambda: asyncio.run( | |
| process_input(input_text, input_file, language, speaker1, speaker2, progress_callback) | |
| ) | |
| ) | |
| result = future.result(timeout=300) # 5 minute timeout | |
| else: | |
| result = loop.run_until_complete( | |
| process_input(input_text, input_file, language, speaker1, speaker2, progress_callback) | |
| ) | |
| except RuntimeError: | |
| # No event loop exists, create new one | |
| result = asyncio.run( | |
| process_input(input_text, input_file, language, speaker1, speaker2, progress_callback) | |
| ) | |
| add_log("β Gradio function completed successfully") | |
| return result, "\n".join(logs) | |
| except Exception as e: | |
| error_msg = f"β Gradio function error: {str(e)}" | |
| add_log(error_msg) | |
| add_log(f"π Traceback: {traceback.format_exc()}") | |
| return None, "\n".join(logs) | |
| def create_interface(): | |
| model_loaded = initialize_model() | |
| if model_loaded: | |
| test_llm_generation() | |
| """Create the Gradio interface""" | |
| language_options = [ | |
| "Auto Detect", "English", "German", "French", "Spanish", "Italian", | |
| "Portuguese", "Dutch", "Russian", "Chinese", "Japanese", "Korean" | |
| ] | |
| voice_options = list(VOICE_MAPPING.keys()) | |
| with gr.Blocks( | |
| title="PodcastGen 2ποΈ", | |
| theme=gr.themes.Soft(), | |
| css=".gradio-container {max-width: 1200px; margin: auto;}" | |
| ) as demo: | |
| gr.Markdown("# ποΈ PodcastGen 2") | |
| gr.Markdown("Generate professional 2-speaker podcasts from text input!") | |
| # Model status indicator | |
| if model_loaded: | |
| gr.Markdown("β **Model Status: Ready**") | |
| else: | |
| gr.Markdown("β **Model Status: Failed to Load**") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| input_text = gr.Textbox( | |
| label="Input Text", | |
| lines=8, | |
| placeholder="Enter your topic or text for podcast generation...", | |
| info="Describe what you want the podcast to discuss" | |
| ) | |
| with gr.Column(scale=1): | |
| input_file = gr.File( | |
| label="Upload File (Optional)", | |
| file_types=[".pdf", ".txt"], | |
| type="filepath", | |
| #info=f"Max size: {MAX_FILE_SIZE_MB}MB" | |
| ) | |
| with gr.Row(): | |
| language = gr.Dropdown( | |
| label="Language", | |
| choices=language_options, | |
| value="Auto Detect", | |
| info="Select output language" | |
| ) | |
| speaker1 = gr.Dropdown( | |
| label="Speaker 1 Voice", | |
| choices=voice_options, | |
| value="Andrew - English (United States)" | |
| ) | |
| speaker2 = gr.Dropdown( | |
| label="Speaker 2 Voice", | |
| choices=voice_options, | |
| value="Ava - English (United States)" | |
| ) | |
| generate_btn = gr.Button( | |
| "ποΈ Generate Podcast", | |
| variant="primary", | |
| size="lg", | |
| interactive=model_loaded | |
| ) | |
| log_output = gr.Textbox( | |
| label="πͺ΅ Debug & Transcript Log", | |
| lines=15, | |
| interactive=False, | |
| info="Real-time generation logs and debugging information" | |
| ) | |
| output_audio = gr.Audio( | |
| label="Generated Podcast", | |
| type="filepath", | |
| format="wav", | |
| show_download_button=True | |
| ) | |
| # Connect the interface | |
| generate_btn.click( | |
| fn=generate_podcast_gradio, | |
| inputs=[input_text, input_file, language, speaker1, speaker2], | |
| outputs=[output_audio, log_output], | |
| show_progress=True | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True, | |
| share=False | |
| ) | |