import gradio as gr import os import tempfile import logging from podcastfy.client import generate_podcast from dotenv import load_dotenv # Configure logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Load environment variables load_dotenv() def get_api_key(key_name, ui_value): return ui_value if ui_value else os.getenv(key_name) def process_inputs( text_input, urls_input, pdf_files, image_files, gemini_key, openai_key, openai_base_url, # 新增参数 elevenlabs_key, word_count, conversation_style, roles_person1, roles_person2, dialogue_structure, podcast_name, podcast_tagline, output_language, tts_model, creativity_level, user_instructions, api_key_label, llm_model_name, longform, ): try: logger.info("Starting podcast generation process") # API key handling logger.debug("Setting API keys") os.environ["GEMINI_API_KEY"] = get_api_key("GEMINI_API_KEY", gemini_key) if tts_model == "openai": logger.debug("Setting OpenAI API key") if not openai_key and not os.getenv("OPENAI_API_KEY"): raise ValueError("OpenAI API key is required when using OpenAI TTS model") os.environ["OPENAI_API_KEY"] = get_api_key("OPENAI_API_KEY", openai_key) if openai_base_url: os.environ["OPENAI_API_BASE"] = openai_base_url if tts_model == "elevenlabs": logger.debug("Setting ElevenLabs API key") if not elevenlabs_key and not os.getenv("ELEVENLABS_API_KEY"): raise ValueError("ElevenLabs API key is required when using ElevenLabs TTS model") os.environ["ELEVENLABS_API_KEY"] = get_api_key("ELEVENLABS_API_KEY", elevenlabs_key) # Process URLs urls = [url.strip() for url in urls_input.split('\n') if url.strip()] logger.debug(f"Processed URLs: {urls}") temp_files = [] temp_dirs = [] # Handle PDF files if pdf_files is not None and len(pdf_files) > 0: logger.info(f"Processing {len(pdf_files)} PDF files") pdf_temp_dir = tempfile.mkdtemp() temp_dirs.append(pdf_temp_dir) for i, pdf_file in enumerate(pdf_files): pdf_path = os.path.join(pdf_temp_dir, f"input_pdf_{i}.pdf") temp_files.append(pdf_path) with open(pdf_path, 'wb') as f: f.write(pdf_file) urls.append(pdf_path) logger.debug(f"Saved PDF {i} to {pdf_path}") # Handle image files image_paths = [] if image_files is not None and len(image_files) > 0: logger.info(f"Processing {len(image_files)} image files") img_temp_dir = tempfile.mkdtemp() temp_dirs.append(img_temp_dir) for i, img_file in enumerate(image_files): # Get file extension from the original name in the file tuple original_name = img_file.orig_name if hasattr(img_file, 'orig_name') else f"image_{i}.jpg" extension = original_name.split('.')[-1] logger.debug(f"Processing image file {i}: {original_name}") img_path = os.path.join(img_temp_dir, f"input_image_{i}.{extension}") temp_files.append(img_path) try: # Write the bytes directly to the file with open(img_path, 'wb') as f: if isinstance(img_file, (tuple, list)): f.write(img_file[1]) # Write the bytes content else: f.write(img_file) # Write the bytes directly image_paths.append(img_path) logger.debug(f"Saved image {i} to {img_path}") except Exception as e: logger.error(f"Error saving image {i}: {str(e)}") raise # Prepare conversation config logger.debug("Preparing conversation config") conversation_config = { "word_count": word_count, "conversation_style": conversation_style.split(','), "roles_person1": roles_person1, "roles_person2": roles_person2, "dialogue_structure": dialogue_structure.split(','), "podcast_name": podcast_name, "podcast_tagline": podcast_tagline, "output_language": output_language, "creativity": creativity_level, "user_instructions": user_instructions, "api_key_label": api_key_label, "llm_model_name": llm_model_name, "longform": longform, } # Generate podcast logger.info("Calling generate_podcast function") logger.debug(f"URLs: {urls}") logger.debug(f"Image paths: {image_paths}") logger.debug(f"Text input present: {'Yes' if text_input else 'No'}") audio_file = generate_podcast( urls=urls if urls else None, text=text_input if text_input else None, image_paths=image_paths if image_paths else None, tts_model=tts_model, conversation_config=conversation_config ) logger.info("Podcast generation completed") # Cleanup logger.debug("Cleaning up temporary files") for file_path in temp_files: if os.path.exists(file_path): os.unlink(file_path) logger.debug(f"Removed temp file: {file_path}") for dir_path in temp_dirs: if os.path.exists(dir_path): os.rmdir(dir_path) logger.debug(f"Removed temp directory: {dir_path}") return audio_file except Exception as e: logger.error(f"Error in process_inputs: {str(e)}", exc_info=True) # Cleanup on error for file_path in temp_files: if os.path.exists(file_path): os.unlink(file_path) for dir_path in temp_dirs: if os.path.exists(dir_path): os.rmdir(dir_path) return str(e) # Create Gradio interface with updated theme with gr.Blocks( title="AI播客plus", theme=gr.themes.Base( primary_hue="blue", secondary_hue="slate", neutral_hue="slate" ), css=""" /* Move toggle arrow to left side */ .gr-accordion { --accordion-arrow-size: 1.5em; } .gr-accordion > .label-wrap { flex-direction: row !important; justify-content: flex-start !important; gap: 1em; } .gr-accordion > .label-wrap > .icon { order: -1; } """ ) as demo: with gr.Tab("默认环境变量已设置 Gemini、OpenAI API Key "): # API Keys Section with gr.Row(): gr.Markdown( """