import gradio as gr import requests import io from pydub import AudioSegment import traceback # 用于打印更详细的错误信息 import tempfile import os import uuid import atexit import shutil import threading import time from typing import Dict, Set # 尝试相对导入,这在通过 `python -m src.podcast_transcribe.webui.app` 运行时有效 try: from podcast_transcribe.rss.podcast_rss_parser import parse_podcast_rss from podcast_transcribe.schemas import PodcastChannel, PodcastEpisode, CombinedTranscriptionResult, EnhancedSegment from podcast_transcribe.transcriber import transcribe_podcast_audio except ImportError: # 如果直接运行此脚本,并且项目根目录不在PYTHONPATH中, # 则需要将项目根目录添加到 sys.path import sys import os # 获取当前脚本文件所在的目录 (src/podcast_transcribe/webui) current_dir = os.path.dirname(os.path.abspath(__file__)) # 获取项目根目录 (向上三级: webui -> podcast_transcribe -> src -> project_root) # 修正:应该是 src 的父目录是项目根 project_root = os.path.dirname(os.path.dirname(os.path.dirname(current_dir))) # 将 src 目录添加到 sys.path,因为模块是 podcast_transcribe.xxx src_path = os.path.join(project_root, "src") if src_path not in sys.path: sys.path.insert(0, src_path) from podcast_transcribe.rss.podcast_rss_parser import parse_podcast_rss from podcast_transcribe.schemas import PodcastChannel, PodcastEpisode, CombinedTranscriptionResult, EnhancedSegment from podcast_transcribe.transcriber import transcribe_podcast_audio # 会话级别的临时文件管理 class SessionFileManager: def __init__(self): self.session_files: Dict[str, Set[str]] = {} self.lock = threading.Lock() self.cleanup_thread = None self.start_cleanup_thread() def start_cleanup_thread(self): """启动后台清理线程""" if self.cleanup_thread is None or not self.cleanup_thread.is_alive(): self.cleanup_thread = threading.Thread(target=self._periodic_cleanup, daemon=True) self.cleanup_thread.start() def _periodic_cleanup(self): """定期清理过期的临时文件""" while True: try: time.sleep(300) # 每5分钟清理一次 self._cleanup_old_files() except Exception as e: print(f"清理线程错误: {e}") def _cleanup_old_files(self): """清理超过30分钟的临时文件""" current_time = time.time() with self.lock: for session_id, files in list(self.session_files.items()): files_to_remove = [] for filepath in list(files): try: if os.path.exists(filepath): # 检查文件创建时间 file_age = current_time - os.path.getctime(filepath) if file_age > 1800: # 30分钟 os.remove(filepath) files_to_remove.append(filepath) print(f"自动清理过期临时文件: {filepath}") else: files_to_remove.append(filepath) except Exception as e: print(f"清理文件 {filepath} 时出错: {e}") files_to_remove.append(filepath) # 从集合中移除已清理的文件 for filepath in files_to_remove: files.discard(filepath) # 如果会话没有文件了,移除会话记录 if not files: del self.session_files[session_id] def add_file(self, session_id: str, filepath: str): """添加文件到会话管理""" with self.lock: if session_id not in self.session_files: self.session_files[session_id] = set() self.session_files[session_id].add(filepath) def cleanup_session(self, session_id: str): """清理特定会话的所有文件""" with self.lock: if session_id in self.session_files: files = self.session_files[session_id] for filepath in list(files): try: if os.path.exists(filepath): os.remove(filepath) print(f"清理会话文件: {filepath}") except Exception as e: print(f"无法删除文件 {filepath}: {e}") del self.session_files[session_id] def cleanup_all(self): """清理所有临时文件""" with self.lock: total_files = 0 for session_id in list(self.session_files.keys()): total_files += len(self.session_files[session_id]) self.cleanup_session(session_id) print(f"应用程序退出,清理了 {total_files} 个临时文件") # 全局文件管理器 file_manager = SessionFileManager() def cleanup_temp_files(): """清理应用程序使用的临时文件""" file_manager.cleanup_all() # 注册应用程序退出时的清理函数 atexit.register(cleanup_temp_files) def get_session_id(request: gr.Request = None) -> str: """获取会话ID,用于文件管理""" if request and hasattr(request, 'session_hash'): return request.session_hash else: # 如果无法获取会话ID,使用UUID return str(uuid.uuid4()) # 添加资源限制检查 def check_system_resources(): """检查系统资源是否足够""" try: import psutil # 检查可用内存 memory = psutil.virtual_memory() if memory.available < 500 * 1024 * 1024: # 少于500MB return False, "系统内存不足,请稍后再试" # 检查磁盘空间 disk = psutil.disk_usage(tempfile.gettempdir()) if disk.free < 1024 * 1024 * 1024: # 少于1GB return False, "磁盘空间不足,请稍后再试" return True, "资源充足" except ImportError: # 如果没有psutil,跳过检查 return True, "无法检查资源状态" except Exception as e: print(f"资源检查错误: {e}") return True, "资源检查失败,继续执行" def parse_rss_feed(rss_url: str): """回调函数:解析 RSS Feed""" print(f"开始解析RSS: {rss_url}") if not rss_url: print("RSS地址为空") return { status_message_area: gr.update(value="Error: Please enter an RSS URL."), podcast_title_display: gr.update(value="", visible=False), episode_dropdown: gr.update(choices=[], value=None, interactive=False), podcast_data_state: None, audio_player: gr.update(value=None), current_audio_url_state: None, episode_shownotes: gr.update(value="", visible=False), transcription_output_df: gr.update(value=None, headers=["Speaker", "Text", "Time"]), transcribe_button: gr.update(interactive=False), selected_episode_index_state: None } try: print(f"正在解析RSS: {rss_url}") # 先更新状态消息,但由于不再使用生成器,我们直接在解析后更新UI podcast_data: PodcastChannel = parse_podcast_rss(rss_url) print(f"RSS解析结果: 频道名称={podcast_data.title if podcast_data else 'None'}, 剧集数量={len(podcast_data.episodes) if podcast_data and podcast_data.episodes else 0}") if podcast_data and podcast_data.episodes: choices = [] for i, episode in enumerate(podcast_data.episodes): # 使用 (标题 (时长), guid 或索引) 作为选项 # 如果 guid 不可靠或缺失,可以使用索引 label = f"{episode.title or 'Untitled'} (Duration: {episode.duration or 'Unknown'})" # 将 episode 对象直接作为值传递,或仅传递一个唯一标识符 # 为了简单起见,我们使用索引作为唯一ID,因为我们需要从 podcast_data_state 中检索完整的 episode choices.append((label, i)) # 显示播客标题 podcast_title = f"## 🎙️ {podcast_data.title or 'Unknown Podcast'}" if podcast_data.author: podcast_title += f"\n**Host/Producer:** {podcast_data.author}" if podcast_data.description: # 限制描述长度,避免界面过长 description = podcast_data.description[:300] if len(podcast_data.description) > 300: description += "..." podcast_title += f"\n\n**Podcast Description:** {description}" return { status_message_area: gr.update(value=f"Successfully parsed {len(podcast_data.episodes)} episodes. Please select an episode."), podcast_title_display: gr.update(value=podcast_title, visible=True), episode_dropdown: gr.update(choices=choices, value=None, interactive=True), podcast_data_state: podcast_data, audio_player: gr.update(value=None), current_audio_url_state: None, episode_shownotes: gr.update(value="", visible=False), transcription_output_df: gr.update(value=None), transcribe_button: gr.update(interactive=False), selected_episode_index_state: None } elif podcast_data: # 有 channel 信息但没有 episodes print("解析成功但未找到剧集") podcast_title = f"## 🎙️ {podcast_data.title or 'Unknown Podcast'}" if podcast_data.author: podcast_title += f"\n**Host/Producer:** {podcast_data.author}" return { status_message_area: gr.update(value="Parsing successful, but no episodes found."), podcast_title_display: gr.update(value=podcast_title, visible=True), episode_dropdown: gr.update(choices=[], value=None, interactive=False), podcast_data_state: podcast_data, # 仍然存储,以防万一 audio_player: gr.update(value=None), current_audio_url_state: None, episode_shownotes: gr.update(value="", visible=False), transcription_output_df: gr.update(value=None), transcribe_button: gr.update(interactive=False), selected_episode_index_state: None } else: print(f"解析RSS失败: {rss_url}") return { status_message_area: gr.update(value=f"Failed to parse RSS: {rss_url}. Please check the URL or network connection."), podcast_title_display: gr.update(value="", visible=False), episode_dropdown: gr.update(choices=[], value=None, interactive=False), podcast_data_state: None, audio_player: gr.update(value=None), current_audio_url_state: None, episode_shownotes: gr.update(value="", visible=False), transcription_output_df: gr.update(value=None), transcribe_button: gr.update(interactive=False), selected_episode_index_state: None } except Exception as e: print(f"解析 RSS 时发生错误: {e}") traceback.print_exc() return { status_message_area: gr.update(value=f"Serious error occurred while parsing RSS: {e}"), podcast_title_display: gr.update(value="", visible=False), episode_dropdown: gr.update(choices=[], value=None, interactive=False), podcast_data_state: None, audio_player: gr.update(value=None), current_audio_url_state: None, episode_shownotes: gr.update(value="", visible=False), transcription_output_df: gr.update(value=None), transcribe_button: gr.update(interactive=False), selected_episode_index_state: None } def load_episode_audio(selected_episode_index: int, podcast_data: PodcastChannel, request: gr.Request = None): """回调函数:当用户从下拉菜单选择一个剧集时加载音频""" print(f"开始加载剧集音频,选择的索引: {selected_episode_index}") # 获取会话ID session_id = get_session_id(request) # 检查系统资源 resource_ok, resource_msg = check_system_resources() if not resource_ok: return { audio_player: gr.update(value=None), current_audio_url_state: None, status_message_area: gr.update(value=f"⚠️ {resource_msg}"), episode_shownotes: gr.update(value="", visible=False), transcription_output_df: gr.update(value=None), local_audio_file_path: None, transcribe_button: gr.update(interactive=False), selected_episode_index_state: None } if selected_episode_index is None or podcast_data is None or not podcast_data.episodes: print("未选择剧集或无播客数据") return { audio_player: gr.update(value=None), current_audio_url_state: None, status_message_area: gr.update(value="Please parse RSS first and select an episode."), episode_shownotes: gr.update(value="", visible=False), transcription_output_df: gr.update(value=None), local_audio_file_path: None, transcribe_button: gr.update(interactive=False), selected_episode_index_state: None } try: episode = podcast_data.episodes[selected_episode_index] audio_url = episode.audio_url print(f"获取到剧集信息,标题: {episode.title}, 音频URL: {audio_url}") # 准备剧集信息显示 episode_shownotes_content = "" # 准备shownotes内容 if episode.shownotes: # 清理HTML标签并格式化shownotes import re # 简单的HTML标签清理 clean_shownotes = re.sub(r'<[^>]+>', '', episode.shownotes) # 替换HTML实体 clean_shownotes = clean_shownotes.replace(' ', ' ').replace('&', '&').replace('<', '<').replace('>', '>') # 清理多余空白 clean_shownotes = re.sub(r'\s+', ' ', clean_shownotes).strip() episode_shownotes_content = f"### 📝 Episode Details\n\n**Title:** {episode.title or 'Untitled'}\n\n" if episode.published_date: episode_shownotes_content += f"**Published Date:** {episode.published_date.strftime('%Y-%m-%d')}\n\n" if episode.duration: episode_shownotes_content += f"**Duration:** {episode.duration}\n\n" episode_shownotes_content += f"**Episode Description:**\n\n{clean_shownotes}" elif episode.summary: # 如果没有shownotes,使用summary episode_shownotes_content = f"### 📝 Episode Details\n\n**Title:** {episode.title or 'Untitled'}\n\n" if episode.published_date: episode_shownotes_content += f"**Published Date:** {episode.published_date.strftime('%Y-%m-%d')}\n\n" if episode.duration: episode_shownotes_content += f"**Duration:** {episode.duration}\n\n" episode_shownotes_content += f"**Episode Summary:**\n\n{episode.summary}" else: # 最基本的信息 episode_shownotes_content = f"### 📝 Episode Details\n\n**Title:** {episode.title or 'Untitled'}\n\n" if episode.published_date: episode_shownotes_content += f"**Published Date:** {episode.published_date.strftime('%Y-%m-%d')}\n\n" if episode.duration: episode_shownotes_content += f"**Duration:** {episode.duration}\n\n" if audio_url: # 更新状态消息 print(f"正在下载音频: {audio_url}") # 下载音频文件 try: headers = { 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36' } # 创建临时文件 temp_dir = tempfile.gettempdir() # 使用会话ID和UUID生成唯一文件名,避免冲突 unique_filename = f"podcast_audio_{session_id[:8]}_{uuid.uuid4().hex[:8]}" # 先发送一个HEAD请求获取内容类型和文件大小 head_response = requests.head(audio_url, timeout=30, headers=headers) # 检查文件大小限制(例如限制为200MB) content_length = head_response.headers.get('Content-Length') if content_length: file_size = int(content_length) max_size = 200 * 1024 * 1024 # 200MB if file_size > max_size: return { audio_player: gr.update(value=None), current_audio_url_state: None, status_message_area: gr.update(value=f"⚠️ 音频文件过大 ({file_size/1024/1024:.1f}MB),超过限制 ({max_size/1024/1024}MB)"), episode_shownotes: gr.update(value=episode_shownotes_content, visible=True), transcription_output_df: gr.update(value=None), local_audio_file_path: None, transcribe_button: gr.update(interactive=False), selected_episode_index_state: None } # 根据内容类型确定文件扩展名 content_type = head_response.headers.get('Content-Type', '').lower() if 'mp3' in content_type: file_ext = '.mp3' elif 'mpeg' in content_type: file_ext = '.mp3' elif 'mp4' in content_type or 'm4a' in content_type: file_ext = '.mp4' elif 'wav' in content_type: file_ext = '.wav' elif 'ogg' in content_type: file_ext = '.ogg' else: # 默认扩展名 file_ext = '.mp3' temp_filepath = os.path.join(temp_dir, unique_filename + file_ext) # 将文件路径添加到会话文件管理器 file_manager.add_file(session_id, temp_filepath) # 保存到临时文件 # 使用流式下载,避免一次性加载整个文件到内存 with open(temp_filepath, 'wb') as f: # 使用流式响应并设置较大的块大小提高效率 response = requests.get(audio_url, timeout=60, headers=headers, stream=True) response.raise_for_status() # 从响应中获取文件大小(如果服务器提供) total_size = int(response.headers.get('content-length', 0)) downloaded = 0 chunk_size = 8192 # 8KB 的块大小 # 分块下载并写入文件 for chunk in response.iter_content(chunk_size=chunk_size): if chunk: # 过滤掉保持连接活跃的空块 f.write(chunk) downloaded += len(chunk) # 可以在这里添加下载进度更新 if total_size > 0: download_percentage = downloaded / total_size print(f"下载进度: {download_percentage:.1%}") print(f"音频已下载到临时文件: {temp_filepath}") return { audio_player: gr.update(value=temp_filepath, label=f"Now Playing: {episode.title or 'Untitled'}"), current_audio_url_state: audio_url, status_message_area: gr.update(value=f"✅ Episode loaded: {episode.title or 'Untitled'}."), episode_shownotes: gr.update(value=episode_shownotes_content, visible=True), transcription_output_df: gr.update(value=None), local_audio_file_path: temp_filepath, transcribe_button: gr.update(interactive=True), selected_episode_index_state: selected_episode_index } except requests.exceptions.RequestException as e: print(f"下载音频失败: {e}") traceback.print_exc() return { audio_player: gr.update(value=None), current_audio_url_state: None, status_message_area: gr.update(value=f"❌ Error: Failed to download audio: {e}"), episode_shownotes: gr.update(value=episode_shownotes_content, visible=True), transcription_output_df: gr.update(value=None), local_audio_file_path: None, transcribe_button: gr.update(interactive=False), selected_episode_index_state: None } else: print(f"剧集 '{episode.title}' 缺少有效的音频URL") return { audio_player: gr.update(value=None), current_audio_url_state: None, status_message_area: gr.update(value=f"❌ Error: Selected episode '{episode.title}' does not provide a valid audio URL."), episode_shownotes: gr.update(value=episode_shownotes_content, visible=True), transcription_output_df: gr.update(value=None), local_audio_file_path: None, transcribe_button: gr.update(interactive=False), selected_episode_index_state: None } except IndexError: print(f"无效的剧集索引: {selected_episode_index}") return { audio_player: gr.update(value=None), current_audio_url_state: None, status_message_area: gr.update(value="❌ Error: Invalid episode index selected."), episode_shownotes: gr.update(value="", visible=False), transcription_output_df: gr.update(value=None), local_audio_file_path: None, transcribe_button: gr.update(interactive=False), selected_episode_index_state: None } except Exception as e: print(f"加载音频时发生错误: {e}") traceback.print_exc() return { audio_player: gr.update(value=None), current_audio_url_state: None, status_message_area: gr.update(value=f"❌ Serious error occurred while loading audio: {e}"), episode_shownotes: gr.update(value="", visible=False), transcription_output_df: gr.update(value=None), local_audio_file_path: None, transcribe_button: gr.update(interactive=False), selected_episode_index_state: None } def disable_buttons_before_transcription(local_audio_file_path: str): """在开始转录前禁用按钮""" print("禁用界面按钮以防止转录期间的交互") return { parse_button: gr.update(interactive=False), episode_dropdown: gr.update(interactive=False), transcribe_button: gr.update(interactive=False), status_message_area: gr.update(value="Starting transcription process, please wait...") } def start_transcription(local_audio_file_path: str, podcast_data: PodcastChannel, selected_episode_index: int, progress=gr.Progress(track_tqdm=True)): """回调函数:开始转录当前加载的音频""" print(f"开始转录本地音频文件: {local_audio_file_path}, 选中剧集索引: {selected_episode_index}") # 检查系统资源 resource_ok, resource_msg = check_system_resources() if not resource_ok: return { transcription_output_df: gr.update(value=None), status_message_area: gr.update(value=f"⚠️ {resource_msg}"), parse_button: gr.update(interactive=True), episode_dropdown: gr.update(interactive=True), transcribe_button: gr.update(interactive=True) } if not local_audio_file_path or not os.path.exists(local_audio_file_path): print("没有可用的本地音频文件") return { transcription_output_df: gr.update(value=None), status_message_area: gr.update(value="❌ Error: No valid audio file for transcription. Please select an episode first."), parse_button: gr.update(interactive=True), episode_dropdown: gr.update(interactive=True), transcribe_button: gr.update(interactive=True) } try: # 先更新状态消息并禁用按钮 progress(0, desc="Initializing transcription process...") # 使用progress回调来更新进度 progress(0.2, desc="Loading audio file...") # 从文件加载音频 audio_segment = AudioSegment.from_file(local_audio_file_path) audio_duration = len(audio_segment) / 1000 # 转换为秒 print(f"音频加载完成,时长: {audio_duration}秒") # 检查音频时长限制(例如限制为60分钟) max_duration = 60 * 60 # 60分钟 if audio_duration > max_duration: return { transcription_output_df: gr.update(value=None), status_message_area: gr.update(value=f"⚠️ 音频时长过长 ({audio_duration/60:.1f}分钟),超过限制 ({max_duration/60}分钟)"), parse_button: gr.update(interactive=True), episode_dropdown: gr.update(interactive=True), transcribe_button: gr.update(interactive=True) } progress(0.4, desc="Audio loaded, starting transcription (this may take a while)...") # 获取当前选中的剧集信息 episode_info = None if podcast_data and podcast_data.episodes and selected_episode_index is not None: if 0 <= selected_episode_index < len(podcast_data.episodes): episode_info = podcast_data.episodes[selected_episode_index] print(f"获取到当前选中剧集信息: {episode_info.title if episode_info else '无'}") # 调用转录函数 print("开始转录音频...") result: CombinedTranscriptionResult = transcribe_podcast_audio(audio_segment, podcast_info=podcast_data, episode_info=episode_info, segmentation_batch_size=32, # 减少批次大小以节省内存 parallel=True) print(f"转录完成,结果: {result is not None}, 段落数: {len(result.segments) if result and result.segments else 0}") progress(0.9, desc="Transcription completed, formatting results...") if result and result.segments: formatted_segments = [] for seg in result.segments: time_str = f"{seg.start:.2f}s - {seg.end:.2f}s" formatted_segments.append([seg.speaker, seg.speaker_name, seg.text, time_str]) progress(1.0, desc="Transcription results generated!") return { transcription_output_df: gr.update(value=formatted_segments), status_message_area: gr.update(value=f"✅ Transcription completed! {len(result.segments)} segments generated. {result.num_speakers} speakers detected."), parse_button: gr.update(interactive=True), episode_dropdown: gr.update(interactive=True), transcribe_button: gr.update(interactive=True) } elif result: # 有 result 但没有 segments progress(1.0, desc="Transcription completed, but no text segments") return { transcription_output_df: gr.update(value=None), status_message_area: gr.update(value="⚠️ Transcription completed, but no text segments were generated."), parse_button: gr.update(interactive=True), episode_dropdown: gr.update(interactive=True), transcribe_button: gr.update(interactive=True) } else: # result 为 None progress(1.0, desc="Transcription failed") return { transcription_output_df: gr.update(value=None), status_message_area: gr.update(value="❌ Transcription failed, no results obtained."), parse_button: gr.update(interactive=True), episode_dropdown: gr.update(interactive=True), transcribe_button: gr.update(interactive=True) } except Exception as e: print(f"转录过程中发生错误: {e}") traceback.print_exc() progress(1.0, desc="Transcription failed: processing error") return { transcription_output_df: gr.update(value=None), status_message_area: gr.update(value=f"❌ Serious error occurred during transcription: {e}"), parse_button: gr.update(interactive=True), episode_dropdown: gr.update(interactive=True), transcribe_button: gr.update(interactive=True) } # --- Gradio 界面定义 --- with gr.Blocks(title="Podcast Transcriber v2", css=""" .status-message-container { min-height: 50px; height: auto; max-height: none; overflow-y: visible; white-space: normal; word-wrap: break-word; margin-top: 10px; margin-bottom: 10px; border-radius: 6px; background-color: rgba(32, 36, 45, 0.03); border: 1px solid rgba(32, 36, 45, 0.1); color: #303030; } .episode-cover { max-width: 300px; max-height: 300px; border-radius: 8px; box-shadow: 0 4px 8px rgba(0,0,0,0.1); } .resource-warning { background-color: #fff3cd; border: 1px solid #ffeaa7; border-radius: 6px; padding: 10px; margin: 10px 0; color: #856404; } """) as demo: gr.Markdown("# 🎙️ Podcast Transcriber") # 状态管理 podcast_data_state = gr.State(None) # 存储解析后的 PodcastChannel 对象 current_audio_url_state = gr.State(None) # 存储当前选中剧集的音频URL local_audio_file_path = gr.State(None) # 存储下载到本地的音频文件路径 selected_episode_index_state = gr.State(None) # 存储当前选中的剧集索引 with gr.Row(): rss_url_input = gr.Textbox( label="Podcast RSS URL (Find RSS URL on https://castos.com/tools/find-podcast-rss-feed/)", placeholder="e.g., https://your-podcast-feed.com/rss.xml", elem_id="rss-url-input" ) parse_button = gr.Button("🔗 Parse RSS", elem_id="parse-rss-button") # 添加示例 RSS URL gr.Examples( examples=[["https://feeds.buzzsprout.com/2460059.rss"]], inputs=[rss_url_input], label="Example RSS URLs" ) status_message_area = gr.Markdown( "", elem_id="status-message", elem_classes="status-message-container", # 添加自定义CSS类 show_label=False ) # 播客标题显示区域 podcast_title_display = gr.Markdown( "", visible=False, elem_id="podcast-title-display" ) episode_dropdown = gr.Dropdown( label="Select Episode", choices=[], interactive=False, # 初始时不可交互,解析成功后设为 True elem_id="episode-dropdown" ) # 剧集信息显示区域 with gr.Row(): with gr.Column(scale=2): episode_shownotes = gr.Markdown( "", visible=False, elem_id="episode-shownotes" ) audio_player = gr.Audio( label="Podcast Audio Player", interactive=False, # 音频源由程序控制,用户不能直接修改 elem_id="audio-player" ) transcribe_button = gr.Button("🔊 Start Transcription", elem_id="transcribe-button", interactive=False) gr.Markdown("## 📝 Transcription Results") transcription_output_df = gr.DataFrame( headers=["Speaker ID", "Speaker Name", "Transcription Text", "Time Range"], interactive=False, wrap=True, # 允许文本换行 row_count=(10, "dynamic"), # 显示10行,可滚动 col_count=(4, "fixed"), elem_id="transcription-output" ) # --- 事件处理 --- parse_button.click( fn=parse_rss_feed, inputs=[rss_url_input], outputs=[ status_message_area, podcast_title_display, episode_dropdown, podcast_data_state, audio_player, current_audio_url_state, episode_shownotes, transcription_output_df, transcribe_button, selected_episode_index_state ] ) episode_dropdown.change( fn=load_episode_audio, inputs=[episode_dropdown, podcast_data_state], outputs=[ audio_player, current_audio_url_state, status_message_area, episode_shownotes, transcription_output_df, local_audio_file_path, transcribe_button, selected_episode_index_state ] ) # 首先禁用按钮,然后执行转录 transcribe_button.click( fn=disable_buttons_before_transcription, inputs=[local_audio_file_path], outputs=[parse_button, episode_dropdown, transcribe_button, status_message_area] ).then( fn=start_transcription, inputs=[local_audio_file_path, podcast_data_state, selected_episode_index_state], outputs=[transcription_output_df, status_message_area, parse_button, episode_dropdown, transcribe_button] ) if __name__ == "__main__": try: # demo.launch(debug=True, share=True) # share=True 会生成一个公开链接 demo.launch(debug=True) finally: # 确保在应用程序退出时清理临时文件 cleanup_temp_files()