import spaces import gradio as gr import os import asyncio import torch import io import json import re import httpx import tempfile import wave import base64 import numpy as np import soundfile as sf import subprocess import shutil import requests import logging from datetime import datetime, timedelta from typing import List, Tuple, Dict, Optional from pathlib import Path from threading import Thread from dotenv import load_dotenv # PDF processing imports from langchain_community.document_loaders import PyPDFLoader # Edge TTS imports import edge_tts from pydub import AudioSegment # OpenAI imports from openai import OpenAI # Transformers imports (for legacy local mode) from transformers import ( AutoModelForCausalLM, AutoTokenizer, TextIteratorStreamer, BitsAndBytesConfig, ) # Llama CPP imports (for new local mode) try: from llama_cpp import Llama from llama_cpp_agent import LlamaCppAgent, MessagesFormatterType from llama_cpp_agent.providers import LlamaCppPythonProvider from llama_cpp_agent.chat_history import BasicChatHistory from llama_cpp_agent.chat_history.messages import Roles from huggingface_hub import hf_hub_download LLAMA_CPP_AVAILABLE = True except ImportError: LLAMA_CPP_AVAILABLE = False # Spark TTS imports try: from huggingface_hub import snapshot_download SPARK_AVAILABLE = True except: SPARK_AVAILABLE = False # MeloTTS imports (for local mode) try: # unidic 다운로드를 조건부로 처리 if not os.path.exists("/usr/local/lib/python3.10/site-packages/unidic"): try: os.system("python -m unidic download") except: pass from melo.api import TTS as MeloTTS MELO_AVAILABLE = True except: MELO_AVAILABLE = False # Import config and prompts from config_prompts import ( ConversationConfig, PromptBuilder, DefaultConversations, EDGE_TTS_ONLY_LANGUAGES, EDGE_TTS_VOICES ) load_dotenv() # Brave Search API 설정 BRAVE_KEY = os.getenv("BSEARCH_API") BRAVE_ENDPOINT = "https://api.search.brave.com/res/v1/web/search" def brave_search(query: str, count: int = 8, freshness_days: int | None = None): """Brave Search API를 사용하여 최신 정보 검색""" if not BRAVE_KEY: return [] params = {"q": query, "count": str(count)} if freshness_days: dt_from = (datetime.utcnow() - timedelta(days=freshness_days)).strftime("%Y-%m-%d") params["freshness"] = dt_from try: r = requests.get( BRAVE_ENDPOINT, headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_KEY}, params=params, timeout=15 ) raw = r.json().get("web", {}).get("results") or [] return [{ "title": r.get("title", ""), "url": r.get("url", r.get("link", "")), "snippet": r.get("description", r.get("text", "")), "host": re.sub(r"https?://(www\.)?", "", r.get("url", "")).split("/")[0] } for r in raw[:count]] except Exception as e: logging.error(f"Brave search error: {e}") return [] def format_search_results(query: str, for_keyword: bool = False) -> str: """검색 결과를 포맷팅하여 반환""" # 키워드 검색의 경우 더 많은 결과 사용 count = 5 if for_keyword else 3 rows = brave_search(query, count, freshness_days=7 if not for_keyword else None) if not rows: return "" results = [] # 키워드 검색의 경우 더 상세한 정보 포함 max_results = 4 if for_keyword else 2 for r in rows[:max_results]: if for_keyword: # 키워드 검색은 더 긴 스니펫 사용 snippet = r['snippet'][:200] + "..." if len(r['snippet']) > 200 else r['snippet'] results.append(f"**{r['title']}**\n{snippet}\nSource: {r['host']}") else: # 일반 검색은 짧은 스니펫 snippet = r['snippet'][:100] + "..." if len(r['snippet']) > 100 else r['snippet'] results.append(f"- {r['title']}: {snippet}") return "\n\n".join(results) + "\n" def extract_keywords_for_search(text: str, language: str = "English") -> List[str]: """텍스트에서 검색할 키워드 추출 (개선)""" # 텍스트 앞부분만 사용 (너무 많은 텍스트 처리 방지) text_sample = text[:500] if language == "Korean": import re # 한국어 명사 추출 (2글자 이상) keywords = re.findall(r'[가-힣]{2,}', text_sample) # 중복 제거하고 가장 긴 단어 1개만 선택 unique_keywords = list(dict.fromkeys(keywords)) # 길이 순으로 정렬하고 가장 의미있을 것 같은 단어 선택 unique_keywords.sort(key=len, reverse=True) return unique_keywords[:1] # 1개만 반환 else: # 영어는 대문자로 시작하는 단어 중 가장 긴 것 1개 words = text_sample.split() keywords = [word.strip('.,!?;:') for word in words if len(word) > 4 and word[0].isupper()] if keywords: return [max(keywords, key=len)] # 가장 긴 단어 1개 return [] def search_and_compile_content(keyword: str, language: str = "English") -> str: """키워드로 검색하여 충분한 콘텐츠 컴파일""" if not BRAVE_KEY: # API 없을 때도 기본 콘텐츠 생성 if language == "Korean": return f""" '{keyword}'에 대한 종합적인 정보: {keyword}는 현대 사회에서 매우 중요한 주제입니다. 이 주제는 다양한 측면에서 우리의 삶에 영향을 미치고 있으며, 최근 들어 더욱 주목받고 있습니다. 주요 특징: 1. 기술적 발전과 혁신 2. 사회적 영향과 변화 3. 미래 전망과 가능성 4. 실용적 활용 방안 5. 글로벌 트렌드와 동향 전문가들은 {keyword}가 앞으로 더욱 중요해질 것으로 예상하고 있으며, 이에 대한 깊이 있는 이해가 필요한 시점입니다. """ else: return f""" Comprehensive information about '{keyword}': {keyword} is a significant topic in modern society. This subject impacts our lives in various ways and has been gaining increasing attention recently. Key aspects: 1. Technological advancement and innovation 2. Social impact and changes 3. Future prospects and possibilities 4. Practical applications 5. Global trends and developments Experts predict that {keyword} will become even more important, and it's crucial to develop a deep understanding of this topic. """ # 언어에 따른 다양한 검색 쿼리 if language == "Korean": queries = [ f"{keyword} 최신 뉴스 2024", f"{keyword} 정보 설명", f"{keyword} 트렌드 전망", f"{keyword} 장점 단점", f"{keyword} 활용 방법", f"{keyword} 전문가 의견" ] else: queries = [ f"{keyword} latest news 2024", f"{keyword} explained comprehensive", f"{keyword} trends forecast", f"{keyword} advantages disadvantages", f"{keyword} how to use", f"{keyword} expert opinions" ] all_content = [] total_content_length = 0 for query in queries: results = brave_search(query, count=5) # 더 많은 결과 가져오기 for r in results[:3]: # 각 쿼리당 상위 3개 content = f"**{r['title']}**\n{r['snippet']}\nSource: {r['host']}\n" all_content.append(content) total_content_length += len(r['snippet']) # 콘텐츠가 부족하면 추가 생성 if total_content_length < 1000: # 최소 1000자 확보 if language == "Korean": additional_content = f""" 추가 정보: {keyword}와 관련된 최근 동향을 살펴보면, 이 분야는 빠르게 발전하고 있습니다. 많은 전문가들이 이 주제에 대해 활발히 연구하고 있으며, 실생활에서의 응용 가능성도 계속 확대되고 있습니다. 특히 주목할 점은: - 기술 혁신의 가속화 - 사용자 경험의 개선 - 접근성의 향상 - 비용 효율성 증대 - 글로벌 시장의 성장 이러한 요소들이 {keyword}의 미래를 더욱 밝게 만들고 있습니다. """ else: additional_content = f""" Additional insights: Recent developments in {keyword} show rapid advancement in this field. Many experts are actively researching this topic, and its practical applications continue to expand. Key points to note: - Accelerating technological innovation - Improving user experience - Enhanced accessibility - Increased cost efficiency - Growing global market These factors are making the future of {keyword} increasingly promising. """ all_content.append(additional_content) # 컴파일된 콘텐츠 반환 compiled = "\n\n".join(all_content) # 키워드 기반 소개 if language == "Korean": intro = f"### '{keyword}'에 대한 종합적인 정보와 최신 동향:\n\n" else: intro = f"### Comprehensive information and latest trends about '{keyword}':\n\n" return intro + compiled class UnifiedAudioConverter: def __init__(self, config: ConversationConfig): self.config = config self.llm_client = None self.legacy_local_model = None self.legacy_tokenizer = None # 새로운 로컬 LLM 관련 self.local_llm = None self.local_llm_model = None self.melo_models = None self.spark_model_dir = None self.device = "cuda" if torch.cuda.is_available() else "cpu" # 프롬프트 빌더 추가 self.prompt_builder = PromptBuilder() def initialize_api_mode(self, api_key: str): """Initialize API mode with Together API""" self.llm_client = OpenAI(api_key=api_key, base_url="https://api.together.xyz/v1") @spaces.GPU(duration=120) def initialize_local_mode(self): """Initialize new local mode with Llama CPP""" if not LLAMA_CPP_AVAILABLE: raise RuntimeError("Llama CPP dependencies not available. Please install llama-cpp-python and llama-cpp-agent.") if self.local_llm is None or self.local_llm_model != self.config.local_model_name: try: # 모델 다운로드 model_path = hf_hub_download( repo_id=self.config.local_model_repo, filename=self.config.local_model_name, local_dir="./models" ) model_path_local = os.path.join("./models", self.config.local_model_name) if not os.path.exists(model_path_local): raise RuntimeError(f"Model file not found at {model_path_local}") # Llama 모델 초기화 self.local_llm = Llama( model_path=model_path_local, flash_attn=True, n_gpu_layers=81 if torch.cuda.is_available() else 0, n_batch=1024, n_ctx=16384, ) self.local_llm_model = self.config.local_model_name print(f"Local LLM initialized: {model_path_local}") except Exception as e: print(f"Failed to initialize local LLM: {e}") raise RuntimeError(f"Failed to initialize local LLM: {e}") @spaces.GPU(duration=60) def initialize_legacy_local_mode(self): """Initialize legacy local mode with Hugging Face model (fallback)""" if self.legacy_local_model is None: quantization_config = BitsAndBytesConfig( load_in_4bit=True, bnb_4bit_compute_dtype=torch.float16 ) self.legacy_local_model = AutoModelForCausalLM.from_pretrained( self.config.legacy_local_model_name, quantization_config=quantization_config ) self.legacy_tokenizer = AutoTokenizer.from_pretrained( self.config.legacy_local_model_name, revision='8ab73a6800796d84448bc936db9bac5ad9f984ae' ) def initialize_spark_tts(self): """Initialize Spark TTS model by downloading if needed""" if not SPARK_AVAILABLE: raise RuntimeError("Spark TTS dependencies not available") model_dir = "pretrained_models/Spark-TTS-0.5B" # Check if model exists, if not download it if not os.path.exists(model_dir): print("Downloading Spark-TTS model...") try: os.makedirs("pretrained_models", exist_ok=True) snapshot_download( "SparkAudio/Spark-TTS-0.5B", local_dir=model_dir ) print("Spark-TTS model downloaded successfully") except Exception as e: raise RuntimeError(f"Failed to download Spark-TTS model: {e}") self.spark_model_dir = model_dir # Check if we have the CLI inference script if not os.path.exists("cli/inference.py"): print("Warning: Spark-TTS CLI not found. Please clone the Spark-TTS repository.") @spaces.GPU(duration=60) def initialize_melo_tts(self): """Initialize MeloTTS models""" if MELO_AVAILABLE and self.melo_models is None: self.melo_models = {"EN": MeloTTS(language="EN", device=self.device)} def fetch_text(self, url: str) -> str: """Fetch text content from URL""" if not url: raise ValueError("URL cannot be empty") if not url.startswith("http://") and not url.startswith("https://"): raise ValueError("URL must start with 'http://' or 'https://'") full_url = f"{self.config.prefix_url}{url}" try: response = httpx.get(full_url, timeout=60.0) response.raise_for_status() return response.text except httpx.HTTPError as e: raise RuntimeError(f"Failed to fetch URL: {e}") def extract_text_from_pdf(self, pdf_file) -> str: """Extract text content from PDF file""" try: # Gradio returns file path, not file object if isinstance(pdf_file, str): pdf_path = pdf_file else: # If it's a file object (shouldn't happen with Gradio) with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: tmp_file.write(pdf_file.read()) pdf_path = tmp_file.name # PDF 로드 및 텍스트 추출 loader = PyPDFLoader(pdf_path) pages = loader.load() # 모든 페이지의 텍스트를 결합 text = "\n".join([page.page_content for page in pages]) # 임시 파일인 경우 삭제 if not isinstance(pdf_file, str) and os.path.exists(pdf_path): os.unlink(pdf_path) return text except Exception as e: raise RuntimeError(f"Failed to extract text from PDF: {e}") def _get_messages_formatter_type(self, model_name): """Get appropriate message formatter for the model""" if "Mistral" in model_name or "BitSix" in model_name: return MessagesFormatterType.CHATML else: return MessagesFormatterType.LLAMA_3 @spaces.GPU(duration=120) def extract_conversation_local(self, text: str, language: str = "English", progress=None) -> Dict: """Extract conversation using new local LLM with enhanced professional style""" try: # 검색 컨텍스트 생성 (키워드 기반이 아닌 경우) search_context = "" if BRAVE_KEY and not text.startswith("Keyword-based content:"): try: keywords = extract_keywords_for_search(text, language) if keywords: search_query = keywords[0] if language == "Korean" else f"{keywords[0]} latest news" search_context = format_search_results(search_query) print(f"Search context added for: {search_query}") except Exception as e: print(f"Search failed, continuing without context: {e}") # 먼저 새로운 로컬 LLM 시도 self.initialize_local_mode() chat_template = self._get_messages_formatter_type(self.config.local_model_name) provider = LlamaCppPythonProvider(self.local_llm) # 언어별 시스템 메시지 system_messages = { "Korean": ( "당신은 한국의 유명 팟캐스트 전문 작가입니다. " "청취자들이 깊이 있는 전문 지식을 얻을 수 있는 고품질 대담을 한국어로 만듭니다. " "반드시 서로 존댓말을 사용하며, 12회의 대화 교환으로 구성하세요. " "모든 대화는 반드시 한국어로 작성하고 JSON 형식으로만 응답하세요." ), "Japanese": ( "あなたは日本の有名なポッドキャスト専門作家です。" "聴衆が深い専門知識を得られる高品質な対談を日本語で作成します。" "必ずお互いに丁寧語を使用し、12回の対話交換で構成してください。" "すべての対話は必ず日本語で作成し、JSON形式でのみ回答してください。" ), "French": ( "Vous êtes un célèbre scénariste de podcast professionnel français. " "Créez des discussions de haute qualité en français qui donnent au public " "des connaissances professionnelles approfondies. " "Créez exactement 12 échanges de conversation et répondez uniquement en format JSON." ), "German": ( "Sie sind ein berühmter professioneller Podcast-Drehbuchautor aus Deutschland. " "Erstellen Sie hochwertige Diskussionen auf Deutsch, die dem Publikum " "tiefgreifendes Fachwissen vermitteln. " "Erstellen Sie genau 12 Gesprächsaustausche und antworten Sie nur im JSON-Format." ), "Spanish": ( "Eres un famoso guionista de podcast profesional español. " "Crea discusiones de alta calidad en español que brinden al público " "conocimientos profesionales profundos. " "Crea exactamente 12 intercambios de conversación y responde solo en formato JSON." ), "Chinese": ( "您是中国著名的专业播客编剧。" "创建高质量的中文讨论,为观众提供深入的专业知识。" "创建恰好12次对话交换,仅以JSON格式回答。" ), "Russian": ( "Вы известный профессиональный сценарист подкастов из России. " "Создавайте высококачественные дискуссии на русском языке, которые дают аудитории " "глубокие профессиональные знания. " "Создайте ровно 12 обменов разговором и отвечайте только в формате JSON." ) } system_message = system_messages.get(language, f"You are a professional podcast scriptwriter creating high-quality, " f"insightful discussions in {language}. Create exactly 12 conversation exchanges " f"with professional expertise. All dialogue must be in {language}. " f"Respond only in JSON format." ) agent = LlamaCppAgent( provider, system_prompt=system_message, predefined_messages_formatter_type=chat_template, debug_output=False ) settings = provider.get_provider_default_settings() settings.temperature = 0.75 settings.top_k = 40 settings.top_p = 0.95 settings.max_tokens = self.config.max_tokens settings.repeat_penalty = 1.1 settings.stream = False messages = BasicChatHistory() prompt = self.prompt_builder.build_prompt(text, language, search_context) response = agent.get_chat_response( prompt, llm_sampling_settings=settings, chat_history=messages, returns_streaming_generator=False, print_output=False ) # JSON 파싱 pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" json_match = re.search(pattern, response) if json_match: conversation_data = json.loads(json_match.group()) return conversation_data else: raise ValueError("No valid JSON found in local LLM response") except Exception as e: print(f"Local LLM failed: {e}, falling back to legacy local method") return self.extract_conversation_legacy_local(text, language, progress, search_context) @spaces.GPU(duration=120) def extract_conversation_legacy_local(self, text: str, language: str = "English", progress=None, search_context: str = "") -> Dict: """Extract conversation using legacy local model""" try: self.initialize_legacy_local_mode() # 언어별 시스템 메시지는 config_prompts에서 가져옴 messages = self.prompt_builder.build_messages_for_local(text, language, search_context) terminators = [ self.legacy_tokenizer.eos_token_id, self.legacy_tokenizer.convert_tokens_to_ids("<|eot_id|>") ] chat_messages = self.legacy_tokenizer.apply_chat_template( messages, tokenize=False, add_generation_prompt=True ) model_inputs = self.legacy_tokenizer([chat_messages], return_tensors="pt").to(self.device) streamer = TextIteratorStreamer( self.legacy_tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True ) generate_kwargs = dict( model_inputs, streamer=streamer, max_new_tokens=self.config.max_new_tokens, do_sample=True, temperature=0.75, eos_token_id=terminators, ) t = Thread(target=self.legacy_local_model.generate, kwargs=generate_kwargs) t.start() partial_text = "" for new_text in streamer: partial_text += new_text pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" json_match = re.search(pattern, partial_text) if json_match: return json.loads(json_match.group()) else: raise ValueError("No valid JSON found in legacy local response") except Exception as e: print(f"Legacy local model also failed: {e}") return DefaultConversations.get_conversation(language) def extract_conversation_api(self, text: str, language: str = "English") -> Dict: """Extract conversation using API""" if not self.llm_client: raise RuntimeError("API mode not initialized") try: # 검색 컨텍스트 생성 search_context = "" if BRAVE_KEY and not text.startswith("Keyword-based content:"): try: keywords = extract_keywords_for_search(text, language) if keywords: search_query = keywords[0] if language == "Korean" else f"{keywords[0]} latest news" search_context = format_search_results(search_query) print(f"Search context added for: {search_query}") except Exception as e: print(f"Search failed, continuing without context: {e}") # 메시지 빌드 messages = self.prompt_builder.build_messages_for_local(text, language, search_context) chat_completion = self.llm_client.chat.completions.create( messages=messages, model=self.config.api_model_name, temperature=0.75, ) pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}" json_match = re.search(pattern, chat_completion.choices[0].message.content) if not json_match: raise ValueError("No valid JSON found in response") return json.loads(json_match.group()) except Exception as e: raise RuntimeError(f"Failed to extract conversation: {e}") def parse_conversation_text(self, conversation_text: str) -> Dict: """Parse conversation text back to JSON format""" lines = conversation_text.strip().split('\n') conversation_data = {"conversation": []} for line in lines: if ':' in line: speaker, text = line.split(':', 1) conversation_data["conversation"].append({ "speaker": speaker.strip(), "text": text.strip() }) return conversation_data async def text_to_speech_edge(self, conversation_json: Dict, language: str = "English") -> Tuple[str, str]: """Convert text to speech using Edge TTS""" output_dir = Path(self._create_output_directory()) filenames = [] try: # 언어별 음성 설정 voices = EDGE_TTS_VOICES.get(language, EDGE_TTS_VOICES["English"]) for i, turn in enumerate(conversation_json["conversation"]): filename = output_dir / f"output_{i}.wav" voice = voices[i % len(voices)] tmp_path = await self._generate_audio_edge(turn["text"], voice) os.rename(tmp_path, filename) filenames.append(str(filename)) # Combine audio files final_output = os.path.join(output_dir, "combined_output.wav") self._combine_audio_files(filenames, final_output) # Generate conversation text conversation_text = "\n".join( f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" for i, turn in enumerate(conversation_json["conversation"]) ) return final_output, conversation_text except Exception as e: raise RuntimeError(f"Failed to convert text to speech: {e}") async def _generate_audio_edge(self, text: str, voice: str) -> str: """Generate audio using Edge TTS""" if not text.strip(): raise ValueError("Text cannot be empty") voice_short_name = voice.split(" - ")[0] if " - " in voice else voice communicate = edge_tts.Communicate(text, voice_short_name) with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: tmp_path = tmp_file.name await communicate.save(tmp_path) return tmp_path @spaces.GPU(duration=60) def text_to_speech_spark(self, conversation_json: Dict, language: str = "English", progress=None) -> Tuple[str, str]: """Convert text to speech using Spark TTS CLI""" if not SPARK_AVAILABLE or not self.spark_model_dir: raise RuntimeError("Spark TTS not available") try: output_dir = self._create_output_directory() audio_files = [] # Create different voice characteristics for different speakers speaker1, speaker2 = self.prompt_builder.get_speaker_names(language) if language == "Korean": voice_configs = [ {"prompt_text": f"안녕하세요, 오늘 팟캐스트 진행을 맡은 {speaker1}입니다.", "gender": "male"}, {"prompt_text": f"안녕하세요, 저는 오늘 이 주제에 대해 설명드릴 {speaker2}입니다.", "gender": "male"} ] else: voice_configs = [ {"prompt_text": f"Hello everyone, I'm {speaker1}, your host for today's podcast.", "gender": "male"}, {"prompt_text": f"Hi, I'm {speaker2}. I'm excited to share my insights with you.", "gender": "male"} ] for i, turn in enumerate(conversation_json["conversation"]): text = turn["text"] if not text.strip(): continue voice_config = voice_configs[i % len(voice_configs)] output_file = os.path.join(output_dir, f"spark_output_{i}.wav") cmd = [ "python", "-m", "cli.inference", "--text", text, "--device", "0" if torch.cuda.is_available() else "cpu", "--save_dir", output_dir, "--model_dir", self.spark_model_dir, "--prompt_text", voice_config["prompt_text"], "--output_name", f"spark_output_{i}.wav" ] try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=60, cwd="." ) if result.returncode == 0: audio_files.append(output_file) else: print(f"Spark TTS error for turn {i}: {result.stderr}") silence = np.zeros(int(22050 * 1.0)) sf.write(output_file, silence, 22050) audio_files.append(output_file) except subprocess.TimeoutExpired: print(f"Spark TTS timeout for turn {i}") silence = np.zeros(int(22050 * 1.0)) sf.write(output_file, silence, 22050) audio_files.append(output_file) except Exception as e: print(f"Error running Spark TTS for turn {i}: {e}") silence = np.zeros(int(22050 * 1.0)) sf.write(output_file, silence, 22050) audio_files.append(output_file) # Combine all audio files if audio_files: final_output = os.path.join(output_dir, "spark_combined.wav") self._combine_audio_files(audio_files, final_output) else: raise RuntimeError("No audio files generated") conversation_text = "\n".join( f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" for i, turn in enumerate(conversation_json["conversation"]) ) return final_output, conversation_text except Exception as e: raise RuntimeError(f"Failed to convert text to speech with Spark TTS: {e}") @spaces.GPU(duration=60) def text_to_speech_melo(self, conversation_json: Dict, progress=None) -> Tuple[str, str]: """Convert text to speech using MeloTTS""" if not MELO_AVAILABLE or not self.melo_models: raise RuntimeError("MeloTTS not available") speakers = ["EN-Default", "EN-US"] combined_audio = AudioSegment.empty() for i, turn in enumerate(conversation_json["conversation"]): bio = io.BytesIO() text = turn["text"] speaker = speakers[i % 2] speaker_id = self.melo_models["EN"].hps.data.spk2id[speaker] self.melo_models["EN"].tts_to_file( text, speaker_id, bio, speed=1.0, pbar=progress.tqdm if progress else None, format="wav" ) bio.seek(0) audio_segment = AudioSegment.from_file(bio, format="wav") combined_audio += audio_segment final_audio_path = "melo_podcast.mp3" combined_audio.export(final_audio_path, format="mp3") conversation_text = "\n".join( f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" for i, turn in enumerate(conversation_json["conversation"]) ) return final_audio_path, conversation_text def _create_output_directory(self) -> str: """Create a unique output directory""" random_bytes = os.urandom(8) folder_name = base64.urlsafe_b64encode(random_bytes).decode("utf-8") os.makedirs(folder_name, exist_ok=True) return folder_name def _combine_audio_files(self, filenames: List[str], output_file: str) -> None: """Combine multiple audio files into one""" if not filenames: raise ValueError("No input files provided") try: audio_segments = [] for filename in filenames: if os.path.exists(filename): audio_segment = AudioSegment.from_file(filename) audio_segments.append(audio_segment) if audio_segments: combined = sum(audio_segments) combined.export(output_file, format="wav") # Clean up temporary files for filename in filenames: if os.path.exists(filename): os.remove(filename) except Exception as e: raise RuntimeError(f"Failed to combine audio files: {e}") # Global converter instance converter = UnifiedAudioConverter(ConversationConfig()) async def synthesize(article_input, input_type: str = "URL", mode: str = "Local", tts_engine: str = "Edge-TTS", language: str = "English"): """Main synthesis function - handles URL, PDF, and Keyword inputs""" try: # Extract text based on input type if input_type == "URL": if not article_input or not isinstance(article_input, str): return "Please provide a valid URL.", None text = converter.fetch_text(article_input) elif input_type == "PDF": if not article_input: return "Please upload a PDF file.", None text = converter.extract_text_from_pdf(article_input) else: # Keyword if not article_input or not isinstance(article_input, str): return "Please provide a keyword or topic.", None text = search_and_compile_content(article_input, language) text = f"Keyword-based content:\n{text}" # Limit text to max words words = text.split() if len(words) > converter.config.max_words: text = " ".join(words[:converter.config.max_words]) # Extract conversation based on mode if mode == "Local": try: conversation_json = converter.extract_conversation_local(text, language) except Exception as e: print(f"Local mode failed: {e}, trying API fallback") api_key = os.environ.get("TOGETHER_API_KEY") if api_key: converter.initialize_api_mode(api_key) conversation_json = converter.extract_conversation_api(text, language) else: raise RuntimeError("Local mode failed and no API key available for fallback") else: # API mode api_key = os.environ.get("TOGETHER_API_KEY") if not api_key: print("API key not found, falling back to local mode") conversation_json = converter.extract_conversation_local(text, language) else: try: converter.initialize_api_mode(api_key) conversation_json = converter.extract_conversation_api(text, language) except Exception as e: print(f"API mode failed: {e}, falling back to local mode") conversation_json = converter.extract_conversation_local(text, language) # Generate conversation text conversation_text = "\n".join( f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}" for i, turn in enumerate(conversation_json["conversation"]) ) return conversation_text, None except Exception as e: return f"Error: {str(e)}", None async def regenerate_audio(conversation_text: str, tts_engine: str = "Edge-TTS", language: str = "English"): """Regenerate audio from edited conversation text""" if not conversation_text.strip(): return "Please provide conversation text.", None try: conversation_json = converter.parse_conversation_text(conversation_text) if not conversation_json["conversation"]: return "No valid conversation found in the text.", None # Edge TTS 전용 언어는 자동으로 Edge-TTS 사용 if language in EDGE_TTS_ONLY_LANGUAGES and tts_engine != "Edge-TTS": tts_engine = "Edge-TTS" # Generate audio based on TTS engine if tts_engine == "Edge-TTS": output_file, _ = await converter.text_to_speech_edge(conversation_json, language) elif tts_engine == "Spark-TTS": if not SPARK_AVAILABLE: return "Spark TTS not available. Please install required dependencies and clone the Spark-TTS repository.", None converter.initialize_spark_tts() output_file, _ = converter.text_to_speech_spark(conversation_json, language) else: # MeloTTS if not MELO_AVAILABLE: return "MeloTTS not available. Please install required dependencies.", None if language in EDGE_TTS_ONLY_LANGUAGES: return f"MeloTTS does not support {language}. Please use Edge-TTS for this language.", None converter.initialize_melo_tts() output_file, _ = converter.text_to_speech_melo(conversation_json) return "Audio generated successfully!", output_file except Exception as e: return f"Error generating audio: {str(e)}", None def synthesize_sync(article_input, input_type: str = "URL", mode: str = "Local", tts_engine: str = "Edge-TTS", language: str = "English"): """Synchronous wrapper for async synthesis""" return asyncio.run(synthesize(article_input, input_type, mode, tts_engine, language)) def regenerate_audio_sync(conversation_text: str, tts_engine: str = "Edge-TTS", language: str = "English"): """Synchronous wrapper for async audio regeneration""" return asyncio.run(regenerate_audio(conversation_text, tts_engine, language)) def update_tts_engine_for_language(language): """언어별 TTS 엔진 옵션 업데이트""" if language in EDGE_TTS_ONLY_LANGUAGES: language_info = { "Korean": "한국어는 Edge-TTS만 지원됩니다", "Japanese": "日本語はEdge-TTSのみサポートされています", "French": "Le français n'est pris en charge que par Edge-TTS", "German": "Deutsch wird nur von Edge-TTS unterstützt", "Spanish": "El español solo es compatible con Edge-TTS", "Italian": "L'italiano è supportato solo da Edge-TTS", "Portuguese": "O português é suportado apenas pelo Edge-TTS", "Dutch": "Nederlands wordt alleen ondersteund door Edge-TTS", "Thai": "ภาษาไทยรองรับเฉพาะ Edge-TTS เท่านั้น", "Vietnamese": "Tiếng Việt chỉ được hỗ trợ bởi Edge-TTS", "Arabic": "العربية مدعومة فقط من Edge-TTS", "Hebrew": "עברית נתמכת רק על ידי Edge-TTS", "Indonesian": "Bahasa Indonesia hanya didukung oleh Edge-TTS", "Hindi": "हिंदी केवल Edge-TTS द्वारा समर्थित है", "Russian": "Русский поддерживается только Edge-TTS", "Chinese": "中文仅支持Edge-TTS" } info_text = language_info.get(language, f"{language} is only supported by Edge-TTS") return gr.Radio( choices=["Edge-TTS"], value="Edge-TTS", label="TTS Engine", info=info_text, interactive=False ) else: return gr.Radio( choices=["Edge-TTS", "Spark-TTS", "MeloTTS"], value="Edge-TTS", label="TTS Engine", info="Edge-TTS: Cloud-based, natural voices | Spark-TTS: Local AI model | MeloTTS: Local, requires GPU", interactive=True ) def toggle_input_visibility(input_type): """Toggle visibility of URL input, file upload, and keyword input based on input type""" if input_type == "URL": return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False) elif input_type == "PDF": return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) else: # Keyword return gr.update(visible=False), gr.update(visible=False), gr.update(visible=True) # 모델 초기화 (앱 시작 시) if LLAMA_CPP_AVAILABLE: try: model_path = hf_hub_download( repo_id=converter.config.local_model_repo, filename=converter.config.local_model_name, local_dir="./models" ) print(f"Model downloaded to: {model_path}") except Exception as e: print(f"Failed to download model at startup: {e}") # Gradio Interface - 개선된 다국어 레이아웃 with gr.Blocks(theme='soft', title="AI Podcast Generator", css=""" .container {max-width: 1200px; margin: auto; padding: 20px;} .header-text {text-align: center; margin-bottom: 30px;} .input-group {background: #f7f7f7; padding: 20px; border-radius: 10px; margin-bottom: 20px;} .output-group {background: #f0f0f0; padding: 20px; border-radius: 10px;} .status-box {background: #e8f4f8; padding: 15px; border-radius: 8px; margin-top: 10px;} """) as demo: with gr.Column(elem_classes="container"): # 헤더 with gr.Row(elem_classes="header-text"): gr.Markdown(""" # 🎙️ AI Podcast Generator - Professional Multi-Language Edition ### Convert any article, blog, PDF document, or topic into an engaging professional podcast conversation in 24+ languages! """) with gr.Row(elem_classes="discord-badge"): gr.HTML("""

badge badge

""") # 상태 표시 섹션 with gr.Row(): with gr.Column(scale=1): gr.Markdown(f""" #### 🤖 System Status - **LLM**: {converter.config.local_model_name.split('.')[0]} - **Fallback**: {converter.config.api_model_name.split('/')[-1]} - **Llama CPP**: {"✅ Ready" if LLAMA_CPP_AVAILABLE else "❌ Not Available"} - **Search**: {"✅ Brave API" if BRAVE_KEY else "❌ No API"} """) with gr.Column(scale=1): gr.Markdown(""" #### 🌍 Multi-Language Support - **24+ Languages**: Korean, Japanese, French, German, Spanish, Italian, etc. - **Native Voices**: Optimized for each language - **Professional Style**: Expert discussions with data & insights - **Auto-TTS Selection**: Best engine per language """) # 메인 입력 섹션 with gr.Group(elem_classes="input-group"): with gr.Row(): # 왼쪽: 입력 옵션들 with gr.Column(scale=2): # 입력 타입 선택 input_type_selector = gr.Radio( choices=["URL", "PDF", "Keyword"], value="URL", label="📥 Input Type", info="Choose your content source" ) # URL 입력 url_input = gr.Textbox( label="🔗 Article URL", placeholder="Enter the article URL here...", value="", visible=True, lines=2 ) # PDF 업로드 pdf_input = gr.File( label="📄 Upload PDF", file_types=[".pdf"], visible=False ) # 키워드 입력 keyword_input = gr.Textbox( label="🔍 Topic/Keyword", placeholder="Enter a topic (e.g., 'AI trends 2024', '인공지능', 'IA tendances', 'KI Trends')", value="", visible=False, info="System will search and compile latest information", lines=2 ) # 오른쪽: 설정 옵션들 with gr.Column(scale=1): # 언어 선택 language_selector = gr.Radio( choices=[ "English", "Korean", "Japanese", "French", "German", "Spanish", "Italian", "Portuguese", "Dutch", "Thai", "Vietnamese", "Arabic", "Hebrew", "Indonesian", "Hindi", "Russian", "Chinese", "Norwegian", "Swedish", "Finnish", "Danish", "Polish", "Turkish", "Greek", "Czech" ], value="English", label="🌐 Language / 언어 / 语言", info="Select podcast language" ) # 처리 모드 mode_selector = gr.Radio( choices=["Local", "API"], value="Local", label="⚙️ Processing Mode", info="Local: On-device | API: Cloud" ) # TTS 엔진 tts_selector = gr.Radio( choices=["Edge-TTS", "Spark-TTS", "MeloTTS"], value="Edge-TTS", label="🔊 TTS Engine", info="Voice synthesis engine" ) # 생성 버튼 with gr.Row(): convert_btn = gr.Button( "🎯 Generate Professional Conversation", variant="primary", size="lg", scale=1 ) # 출력 섹션 with gr.Group(elem_classes="output-group"): with gr.Row(): # 왼쪽: 대화 텍스트 with gr.Column(scale=3): conversation_output = gr.Textbox( label="💬 Generated Professional Conversation (Editable)", lines=25, max_lines=50, interactive=True, placeholder="Professional podcast conversation will appear here...\n전문 팟캐스트 대화가 여기에 표시됩니다...\nLa conversation professionnelle du podcast apparaîtra ici...", info="Edit the conversation as needed. Format: 'Speaker Name: Text'" ) # 오디오 생성 버튼 with gr.Row(): generate_audio_btn = gr.Button( "🎙️ Generate Audio from Text", variant="secondary", size="lg" ) # 오른쪽: 오디오 출력 및 상태 with gr.Column(scale=2): audio_output = gr.Audio( label="🎧 Professional Podcast Audio", type="filepath", interactive=False ) status_output = gr.Textbox( label="📊 Status", interactive=False, lines=3, elem_classes="status-box" ) # 도움말 gr.Markdown(""" #### 💡 Quick Tips: - **URL**: Paste any article link - **PDF**: Upload documents directly - **Keyword**: Enter topics for AI research - **24+ Languages** fully supported - Edit conversation before audio generation - Auto TTS engine selection per language """) # 예제 섹션 with gr.Accordion("📚 Multi-Language Examples", open=False): gr.Examples( examples=[ ["https://huggingface.co/blog/openfreeai/cycle-navigator", "URL", "Local", "Edge-TTS", "English"], ["quantum computing breakthroughs", "Keyword", "Local", "Edge-TTS", "English"], ["인공지능 윤리와 규제", "Keyword", "Local", "Edge-TTS", "Korean"], ["https://huggingface.co/papers/2505.14810", "URL", "Local", "Edge-TTS", "Japanese"], ["intelligence artificielle tendances", "Keyword", "Local", "Edge-TTS", "French"], ["künstliche intelligenz entwicklung", "Keyword", "Local", "Edge-TTS", "German"], ["inteligencia artificial avances", "Keyword", "Local", "Edge-TTS", "Spanish"], ], inputs=[url_input, input_type_selector, mode_selector, tts_selector, language_selector], outputs=[conversation_output, status_output], fn=synthesize_sync, cache_examples=False, ) # Input type change handler input_type_selector.change( fn=toggle_input_visibility, inputs=[input_type_selector], outputs=[url_input, pdf_input, keyword_input] ) # 언어 변경 시 TTS 엔진 옵션 업데이트 language_selector.change( fn=update_tts_engine_for_language, inputs=[language_selector], outputs=[tts_selector] ) # 이벤트 연결 def get_article_input(input_type, url_input, pdf_input, keyword_input): """Get the appropriate input based on input type""" if input_type == "URL": return url_input elif input_type == "PDF": return pdf_input else: # Keyword return keyword_input convert_btn.click( fn=lambda input_type, url_input, pdf_input, keyword_input, mode, tts, lang: synthesize_sync( get_article_input(input_type, url_input, pdf_input, keyword_input), input_type, mode, tts, lang ), inputs=[input_type_selector, url_input, pdf_input, keyword_input, mode_selector, tts_selector, language_selector], outputs=[conversation_output, status_output] ) generate_audio_btn.click( fn=regenerate_audio_sync, inputs=[conversation_output, tts_selector, language_selector], outputs=[status_output, audio_output] ) # Launch the app if __name__ == "__main__": demo.queue(api_open=True, default_concurrency_limit=10).launch( show_api=True, share=False, server_name="0.0.0.0", server_port=7860 )