Live-Podcast / app.py
openfree's picture
Update app.py
36f3f92 verified
raw
history blame
54.1 kB
import spaces
import gradio as gr
import os
import asyncio
import torch
import io
import json
import re
import httpx
import tempfile
import wave
import base64
import numpy as np
import soundfile as sf
import subprocess
import shutil
import requests
import logging
from datetime import datetime, timedelta
from typing import List, Tuple, Dict, Optional
from pathlib import Path
from threading import Thread
from dotenv import load_dotenv
# PDF processing imports
from langchain_community.document_loaders import PyPDFLoader
# Edge TTS imports
import edge_tts
from pydub import AudioSegment
# OpenAI imports
from openai import OpenAI
# Transformers imports (for legacy local mode)
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
TextIteratorStreamer,
BitsAndBytesConfig,
)
# Llama CPP imports (for new local mode)
try:
from llama_cpp import Llama
from llama_cpp_agent import LlamaCppAgent, MessagesFormatterType
from llama_cpp_agent.providers import LlamaCppPythonProvider
from llama_cpp_agent.chat_history import BasicChatHistory
from llama_cpp_agent.chat_history.messages import Roles
from huggingface_hub import hf_hub_download
LLAMA_CPP_AVAILABLE = True
except ImportError:
LLAMA_CPP_AVAILABLE = False
# Spark TTS imports
try:
from huggingface_hub import snapshot_download
SPARK_AVAILABLE = True
except:
SPARK_AVAILABLE = False
# MeloTTS imports (for local mode)
try:
# unidic ๋‹ค์šด๋กœ๋“œ๋ฅผ ์กฐ๊ฑด๋ถ€๋กœ ์ฒ˜๋ฆฌ
if not os.path.exists("/usr/local/lib/python3.10/site-packages/unidic"):
try:
os.system("python -m unidic download")
except:
pass
from melo.api import TTS as MeloTTS
MELO_AVAILABLE = True
except:
MELO_AVAILABLE = False
# Import config and prompts
from config_prompts import (
ConversationConfig,
PromptBuilder,
DefaultConversations,
EDGE_TTS_ONLY_LANGUAGES,
EDGE_TTS_VOICES
)
load_dotenv()
# Brave Search API ์„ค์ •
BRAVE_KEY = os.getenv("BSEARCH_API")
BRAVE_ENDPOINT = "https://api.search.brave.com/res/v1/web/search"
def brave_search(query: str, count: int = 8, freshness_days: int | None = None):
"""Brave Search API๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ ์ตœ์‹  ์ •๋ณด ๊ฒ€์ƒ‰"""
if not BRAVE_KEY:
return []
params = {"q": query, "count": str(count)}
if freshness_days:
dt_from = (datetime.utcnow() - timedelta(days=freshness_days)).strftime("%Y-%m-%d")
params["freshness"] = dt_from
try:
r = requests.get(
BRAVE_ENDPOINT,
headers={"Accept": "application/json", "X-Subscription-Token": BRAVE_KEY},
params=params,
timeout=15
)
raw = r.json().get("web", {}).get("results") or []
return [{
"title": r.get("title", ""),
"url": r.get("url", r.get("link", "")),
"snippet": r.get("description", r.get("text", "")),
"host": re.sub(r"https?://(www\.)?", "", r.get("url", "")).split("/")[0]
} for r in raw[:count]]
except Exception as e:
logging.error(f"Brave search error: {e}")
return []
def format_search_results(query: str, for_keyword: bool = False) -> str:
"""๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ๋ฅผ ํฌ๋งทํŒ…ํ•˜์—ฌ ๋ฐ˜ํ™˜"""
# ํ‚ค์›Œ๋“œ ๊ฒ€์ƒ‰์˜ ๊ฒฝ์šฐ ๋” ๋งŽ์€ ๊ฒฐ๊ณผ ์‚ฌ์šฉ
count = 5 if for_keyword else 3
rows = brave_search(query, count, freshness_days=7 if not for_keyword else None)
if not rows:
return ""
results = []
# ํ‚ค์›Œ๋“œ ๊ฒ€์ƒ‰์˜ ๊ฒฝ์šฐ ๋” ์ƒ์„ธํ•œ ์ •๋ณด ํฌํ•จ
max_results = 4 if for_keyword else 2
for r in rows[:max_results]:
if for_keyword:
# ํ‚ค์›Œ๋“œ ๊ฒ€์ƒ‰์€ ๋” ๊ธด ์Šค๋‹ˆํŽซ ์‚ฌ์šฉ
snippet = r['snippet'][:200] + "..." if len(r['snippet']) > 200 else r['snippet']
results.append(f"**{r['title']}**\n{snippet}\nSource: {r['host']}")
else:
# ์ผ๋ฐ˜ ๊ฒ€์ƒ‰์€ ์งง์€ ์Šค๋‹ˆํŽซ
snippet = r['snippet'][:100] + "..." if len(r['snippet']) > 100 else r['snippet']
results.append(f"- {r['title']}: {snippet}")
return "\n\n".join(results) + "\n"
def extract_keywords_for_search(text: str, language: str = "English") -> List[str]:
"""ํ…์ŠคํŠธ์—์„œ ๊ฒ€์ƒ‰ํ•  ํ‚ค์›Œ๋“œ ์ถ”์ถœ (๊ฐœ์„ )"""
# ํ…์ŠคํŠธ ์•ž๋ถ€๋ถ„๋งŒ ์‚ฌ์šฉ (๋„ˆ๋ฌด ๋งŽ์€ ํ…์ŠคํŠธ ์ฒ˜๋ฆฌ ๋ฐฉ์ง€)
text_sample = text[:500]
if language == "Korean":
import re
# ํ•œ๊ตญ์–ด ๋ช…์‚ฌ ์ถ”์ถœ (2๊ธ€์ž ์ด์ƒ)
keywords = re.findall(r'[๊ฐ€-ํžฃ]{2,}', text_sample)
# ์ค‘๋ณต ์ œ๊ฑฐํ•˜๊ณ  ๊ฐ€์žฅ ๊ธด ๋‹จ์–ด 1๊ฐœ๋งŒ ์„ ํƒ
unique_keywords = list(dict.fromkeys(keywords))
# ๊ธธ์ด ์ˆœ์œผ๋กœ ์ •๋ ฌํ•˜๊ณ  ๊ฐ€์žฅ ์˜๋ฏธ์žˆ์„ ๊ฒƒ ๊ฐ™์€ ๋‹จ์–ด ์„ ํƒ
unique_keywords.sort(key=len, reverse=True)
return unique_keywords[:1] # 1๊ฐœ๋งŒ ๋ฐ˜ํ™˜
else:
# ์˜์–ด๋Š” ๋Œ€๋ฌธ์ž๋กœ ์‹œ์ž‘ํ•˜๋Š” ๋‹จ์–ด ์ค‘ ๊ฐ€์žฅ ๊ธด ๊ฒƒ 1๊ฐœ
words = text_sample.split()
keywords = [word.strip('.,!?;:') for word in words
if len(word) > 4 and word[0].isupper()]
if keywords:
return [max(keywords, key=len)] # ๊ฐ€์žฅ ๊ธด ๋‹จ์–ด 1๊ฐœ
return []
def search_and_compile_content(keyword: str, language: str = "English") -> str:
"""ํ‚ค์›Œ๋“œ๋กœ ๊ฒ€์ƒ‰ํ•˜์—ฌ ์ถฉ๋ถ„ํ•œ ์ฝ˜ํ…์ธ  ์ปดํŒŒ์ผ"""
if not BRAVE_KEY:
# API ์—†์„ ๋•Œ๋„ ๊ธฐ๋ณธ ์ฝ˜ํ…์ธ  ์ƒ์„ฑ
if language == "Korean":
return f"""
'{keyword}'์— ๋Œ€ํ•œ ์ข…ํ•ฉ์ ์ธ ์ •๋ณด:
{keyword}๋Š” ํ˜„๋Œ€ ์‚ฌํšŒ์—์„œ ๋งค์šฐ ์ค‘์š”ํ•œ ์ฃผ์ œ์ž…๋‹ˆ๋‹ค.
์ด ์ฃผ์ œ๋Š” ๋‹ค์–‘ํ•œ ์ธก๋ฉด์—์„œ ์šฐ๋ฆฌ์˜ ์‚ถ์— ์˜ํ–ฅ์„ ๋ฏธ์น˜๊ณ  ์žˆ์œผ๋ฉฐ,
์ตœ๊ทผ ๋“ค์–ด ๋”์šฑ ์ฃผ๋ชฉ๋ฐ›๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
์ฃผ์š” ํŠน์ง•:
1. ๊ธฐ์ˆ ์  ๋ฐœ์ „๊ณผ ํ˜์‹ 
2. ์‚ฌํšŒ์  ์˜ํ–ฅ๊ณผ ๋ณ€ํ™”
3. ๋ฏธ๋ž˜ ์ „๋ง๊ณผ ๊ฐ€๋Šฅ์„ฑ
4. ์‹ค์šฉ์  ํ™œ์šฉ ๋ฐฉ์•ˆ
5. ๊ธ€๋กœ๋ฒŒ ํŠธ๋ Œ๋“œ์™€ ๋™ํ–ฅ
์ „๋ฌธ๊ฐ€๋“ค์€ {keyword}๊ฐ€ ์•ž์œผ๋กœ ๋”์šฑ ์ค‘์š”ํ•ด์งˆ ๊ฒƒ์œผ๋กœ ์˜ˆ์ƒํ•˜๊ณ  ์žˆ์œผ๋ฉฐ,
์ด์— ๋Œ€ํ•œ ๊นŠ์ด ์žˆ๋Š” ์ดํ•ด๊ฐ€ ํ•„์š”ํ•œ ์‹œ์ ์ž…๋‹ˆ๋‹ค.
"""
else:
return f"""
Comprehensive information about '{keyword}':
{keyword} is a significant topic in modern society.
This subject impacts our lives in various ways and has been
gaining increasing attention recently.
Key aspects:
1. Technological advancement and innovation
2. Social impact and changes
3. Future prospects and possibilities
4. Practical applications
5. Global trends and developments
Experts predict that {keyword} will become even more important,
and it's crucial to develop a deep understanding of this topic.
"""
# ์–ธ์–ด์— ๋”ฐ๋ฅธ ๋‹ค์–‘ํ•œ ๊ฒ€์ƒ‰ ์ฟผ๋ฆฌ
if language == "Korean":
queries = [
f"{keyword} ์ตœ์‹  ๋‰ด์Šค 2024",
f"{keyword} ์ •๋ณด ์„ค๋ช…",
f"{keyword} ํŠธ๋ Œ๋“œ ์ „๋ง",
f"{keyword} ์žฅ์  ๋‹จ์ ",
f"{keyword} ํ™œ์šฉ ๋ฐฉ๋ฒ•",
f"{keyword} ์ „๋ฌธ๊ฐ€ ์˜๊ฒฌ"
]
else:
queries = [
f"{keyword} latest news 2024",
f"{keyword} explained comprehensive",
f"{keyword} trends forecast",
f"{keyword} advantages disadvantages",
f"{keyword} how to use",
f"{keyword} expert opinions"
]
all_content = []
total_content_length = 0
for query in queries:
results = brave_search(query, count=5) # ๋” ๋งŽ์€ ๊ฒฐ๊ณผ ๊ฐ€์ ธ์˜ค๊ธฐ
for r in results[:3]: # ๊ฐ ์ฟผ๋ฆฌ๋‹น ์ƒ์œ„ 3๊ฐœ
content = f"**{r['title']}**\n{r['snippet']}\nSource: {r['host']}\n"
all_content.append(content)
total_content_length += len(r['snippet'])
# ์ฝ˜ํ…์ธ ๊ฐ€ ๋ถ€์กฑํ•˜๋ฉด ์ถ”๊ฐ€ ์ƒ์„ฑ
if total_content_length < 1000: # ์ตœ์†Œ 1000์ž ํ™•๋ณด
if language == "Korean":
additional_content = f"""
์ถ”๊ฐ€ ์ •๋ณด:
{keyword}์™€ ๊ด€๋ จ๋œ ์ตœ๊ทผ ๋™ํ–ฅ์„ ์‚ดํŽด๋ณด๋ฉด, ์ด ๋ถ„์•ผ๋Š” ๋น ๋ฅด๊ฒŒ ๋ฐœ์ „ํ•˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
๋งŽ์€ ์ „๋ฌธ๊ฐ€๋“ค์ด ์ด ์ฃผ์ œ์— ๋Œ€ํ•ด ํ™œ๋ฐœํžˆ ์—ฐ๊ตฌํ•˜๊ณ  ์žˆ์œผ๋ฉฐ,
์‹ค์ƒํ™œ์—์„œ์˜ ์‘์šฉ ๊ฐ€๋Šฅ์„ฑ๋„ ๊ณ„์† ํ™•๋Œ€๋˜๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
ํŠนํžˆ ์ฃผ๋ชฉํ•  ์ ์€:
- ๊ธฐ์ˆ  ํ˜์‹ ์˜ ๊ฐ€์†ํ™”
- ์‚ฌ์šฉ์ž ๊ฒฝํ—˜์˜ ๊ฐœ์„ 
- ์ ‘๊ทผ์„ฑ์˜ ํ–ฅ์ƒ
- ๋น„์šฉ ํšจ์œจ์„ฑ ์ฆ๋Œ€
- ๊ธ€๋กœ๋ฒŒ ์‹œ์žฅ์˜ ์„ฑ์žฅ
์ด๋Ÿฌํ•œ ์š”์†Œ๋“ค์ด {keyword}์˜ ๋ฏธ๋ž˜๋ฅผ ๋”์šฑ ๋ฐ๊ฒŒ ๋งŒ๋“ค๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค.
"""
else:
additional_content = f"""
Additional insights:
Recent developments in {keyword} show rapid advancement in this field.
Many experts are actively researching this topic, and its practical
applications continue to expand.
Key points to note:
- Accelerating technological innovation
- Improving user experience
- Enhanced accessibility
- Increased cost efficiency
- Growing global market
These factors are making the future of {keyword} increasingly promising.
"""
all_content.append(additional_content)
# ์ปดํŒŒ์ผ๋œ ์ฝ˜ํ…์ธ  ๋ฐ˜ํ™˜
compiled = "\n\n".join(all_content)
# ํ‚ค์›Œ๋“œ ๊ธฐ๋ฐ˜ ์†Œ๊ฐœ
if language == "Korean":
intro = f"### '{keyword}'์— ๋Œ€ํ•œ ์ข…ํ•ฉ์ ์ธ ์ •๋ณด์™€ ์ตœ์‹  ๋™ํ–ฅ:\n\n"
else:
intro = f"### Comprehensive information and latest trends about '{keyword}':\n\n"
return intro + compiled
class UnifiedAudioConverter:
def __init__(self, config: ConversationConfig):
self.config = config
self.llm_client = None
self.legacy_local_model = None
self.legacy_tokenizer = None
# ์ƒˆ๋กœ์šด ๋กœ์ปฌ LLM ๊ด€๋ จ
self.local_llm = None
self.local_llm_model = None
self.melo_models = None
self.spark_model_dir = None
self.device = "cuda" if torch.cuda.is_available() else "cpu"
# ํ”„๋กฌํ”„ํŠธ ๋นŒ๋” ์ถ”๊ฐ€
self.prompt_builder = PromptBuilder()
def initialize_api_mode(self, api_key: str):
"""Initialize API mode with Together API"""
self.llm_client = OpenAI(api_key=api_key, base_url="https://api.together.xyz/v1")
@spaces.GPU(duration=120)
def initialize_local_mode(self):
"""Initialize new local mode with Llama CPP"""
if not LLAMA_CPP_AVAILABLE:
raise RuntimeError("Llama CPP dependencies not available. Please install llama-cpp-python and llama-cpp-agent.")
if self.local_llm is None or self.local_llm_model != self.config.local_model_name:
try:
# ๋ชจ๋ธ ๋‹ค์šด๋กœ๋“œ
model_path = hf_hub_download(
repo_id=self.config.local_model_repo,
filename=self.config.local_model_name,
local_dir="./models"
)
model_path_local = os.path.join("./models", self.config.local_model_name)
if not os.path.exists(model_path_local):
raise RuntimeError(f"Model file not found at {model_path_local}")
# Llama ๋ชจ๋ธ ์ดˆ๊ธฐํ™”
self.local_llm = Llama(
model_path=model_path_local,
flash_attn=True,
n_gpu_layers=81 if torch.cuda.is_available() else 0,
n_batch=1024,
n_ctx=16384,
)
self.local_llm_model = self.config.local_model_name
print(f"Local LLM initialized: {model_path_local}")
except Exception as e:
print(f"Failed to initialize local LLM: {e}")
raise RuntimeError(f"Failed to initialize local LLM: {e}")
@spaces.GPU(duration=60)
def initialize_legacy_local_mode(self):
"""Initialize legacy local mode with Hugging Face model (fallback)"""
if self.legacy_local_model is None:
quantization_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16
)
self.legacy_local_model = AutoModelForCausalLM.from_pretrained(
self.config.legacy_local_model_name,
quantization_config=quantization_config
)
self.legacy_tokenizer = AutoTokenizer.from_pretrained(
self.config.legacy_local_model_name,
revision='8ab73a6800796d84448bc936db9bac5ad9f984ae'
)
def initialize_spark_tts(self):
"""Initialize Spark TTS model by downloading if needed"""
if not SPARK_AVAILABLE:
raise RuntimeError("Spark TTS dependencies not available")
model_dir = "pretrained_models/Spark-TTS-0.5B"
# Check if model exists, if not download it
if not os.path.exists(model_dir):
print("Downloading Spark-TTS model...")
try:
os.makedirs("pretrained_models", exist_ok=True)
snapshot_download(
"SparkAudio/Spark-TTS-0.5B",
local_dir=model_dir
)
print("Spark-TTS model downloaded successfully")
except Exception as e:
raise RuntimeError(f"Failed to download Spark-TTS model: {e}")
self.spark_model_dir = model_dir
# Check if we have the CLI inference script
if not os.path.exists("cli/inference.py"):
print("Warning: Spark-TTS CLI not found. Please clone the Spark-TTS repository.")
@spaces.GPU(duration=60)
def initialize_melo_tts(self):
"""Initialize MeloTTS models"""
if MELO_AVAILABLE and self.melo_models is None:
self.melo_models = {"EN": MeloTTS(language="EN", device=self.device)}
def fetch_text(self, url: str) -> str:
"""Fetch text content from URL"""
if not url:
raise ValueError("URL cannot be empty")
if not url.startswith("http://") and not url.startswith("https://"):
raise ValueError("URL must start with 'http://' or 'https://'")
full_url = f"{self.config.prefix_url}{url}"
try:
response = httpx.get(full_url, timeout=60.0)
response.raise_for_status()
return response.text
except httpx.HTTPError as e:
raise RuntimeError(f"Failed to fetch URL: {e}")
def extract_text_from_pdf(self, pdf_file) -> str:
"""Extract text content from PDF file"""
try:
# Gradio returns file path, not file object
if isinstance(pdf_file, str):
pdf_path = pdf_file
else:
# If it's a file object (shouldn't happen with Gradio)
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
tmp_file.write(pdf_file.read())
pdf_path = tmp_file.name
# PDF ๋กœ๋“œ ๋ฐ ํ…์ŠคํŠธ ์ถ”์ถœ
loader = PyPDFLoader(pdf_path)
pages = loader.load()
# ๋ชจ๋“  ํŽ˜์ด์ง€์˜ ํ…์ŠคํŠธ๋ฅผ ๊ฒฐํ•ฉ
text = "\n".join([page.page_content for page in pages])
# ์ž„์‹œ ํŒŒ์ผ์ธ ๊ฒฝ์šฐ ์‚ญ์ œ
if not isinstance(pdf_file, str) and os.path.exists(pdf_path):
os.unlink(pdf_path)
return text
except Exception as e:
raise RuntimeError(f"Failed to extract text from PDF: {e}")
def _get_messages_formatter_type(self, model_name):
"""Get appropriate message formatter for the model"""
if "Mistral" in model_name or "BitSix" in model_name:
return MessagesFormatterType.CHATML
else:
return MessagesFormatterType.LLAMA_3
@spaces.GPU(duration=120)
def extract_conversation_local(self, text: str, language: str = "English", progress=None) -> Dict:
"""Extract conversation using new local LLM with enhanced professional style"""
try:
# ๊ฒ€์ƒ‰ ์ปจํ…์ŠคํŠธ ์ƒ์„ฑ (ํ‚ค์›Œ๋“œ ๊ธฐ๋ฐ˜์ด ์•„๋‹Œ ๊ฒฝ์šฐ)
search_context = ""
if BRAVE_KEY and not text.startswith("Keyword-based content:"):
try:
keywords = extract_keywords_for_search(text, language)
if keywords:
search_query = keywords[0] if language == "Korean" else f"{keywords[0]} latest news"
search_context = format_search_results(search_query)
print(f"Search context added for: {search_query}")
except Exception as e:
print(f"Search failed, continuing without context: {e}")
# ๋จผ์ € ์ƒˆ๋กœ์šด ๋กœ์ปฌ LLM ์‹œ๋„
self.initialize_local_mode()
chat_template = self._get_messages_formatter_type(self.config.local_model_name)
provider = LlamaCppPythonProvider(self.local_llm)
# ์–ธ์–ด๋ณ„ ์‹œ์Šคํ…œ ๋ฉ”์‹œ์ง€
system_messages = {
"Korean": (
"๋‹น์‹ ์€ ํ•œ๊ตญ์˜ ์œ ๋ช… ํŒŸ์บ์ŠคํŠธ ์ „๋ฌธ ์ž‘๊ฐ€์ž…๋‹ˆ๋‹ค. "
"์ฒญ์ทจ์ž๋“ค์ด ๊นŠ์ด ์žˆ๋Š” ์ „๋ฌธ ์ง€์‹์„ ์–ป์„ ์ˆ˜ ์žˆ๋Š” ๊ณ ํ’ˆ์งˆ ๋Œ€๋‹ด์„ ํ•œ๊ตญ์–ด๋กœ ๋งŒ๋“ญ๋‹ˆ๋‹ค. "
"๋ฐ˜๋“œ์‹œ ์„œ๋กœ ์กด๋Œ“๋ง์„ ์‚ฌ์šฉํ•˜๋ฉฐ, 12ํšŒ์˜ ๋Œ€ํ™” ๊ตํ™˜์œผ๋กœ ๊ตฌ์„ฑํ•˜์„ธ์š”. "
"๋ชจ๋“  ๋Œ€ํ™”๋Š” ๋ฐ˜๋“œ์‹œ ํ•œ๊ตญ์–ด๋กœ ์ž‘์„ฑํ•˜๊ณ  JSON ํ˜•์‹์œผ๋กœ๋งŒ ์‘๋‹ตํ•˜์„ธ์š”."
),
"Japanese": (
"ใ‚ใชใŸใฏๆ—ฅๆœฌใฎๆœ‰ๅใชใƒใƒƒใƒ‰ใ‚ญใƒฃใ‚นใƒˆๅฐ‚้–€ไฝœๅฎถใงใ™ใ€‚"
"่ด่ก†ใŒๆทฑใ„ๅฐ‚้–€็Ÿฅ่ญ˜ใ‚’ๅพ—ใ‚‰ใ‚Œใ‚‹้ซ˜ๅ“่ณชใชๅฏพ่ซ‡ใ‚’ๆ—ฅๆœฌ่ชžใงไฝœๆˆใ—ใพใ™ใ€‚"
"ๅฟ…ใšใŠไบ’ใ„ใซไธๅฏง่ชžใ‚’ไฝฟ็”จใ—ใ€12ๅ›žใฎๅฏพ่ฉฑไบคๆ›ใงๆง‹ๆˆใ—ใฆใใ ใ•ใ„ใ€‚"
"ใ™ในใฆใฎๅฏพ่ฉฑใฏๅฟ…ใšๆ—ฅๆœฌ่ชžใงไฝœๆˆใ—ใ€JSONๅฝขๅผใงใฎใฟๅ›ž็ญ”ใ—ใฆใใ ใ•ใ„ใ€‚"
),
"French": (
"Vous รชtes un cรฉlรจbre scรฉnariste de podcast professionnel franรงais. "
"Crรฉez des discussions de haute qualitรฉ en franรงais qui donnent au public "
"des connaissances professionnelles approfondies. "
"Crรฉez exactement 12 รฉchanges de conversation et rรฉpondez uniquement en format JSON."
),
"German": (
"Sie sind ein berรผhmter professioneller Podcast-Drehbuchautor aus Deutschland. "
"Erstellen Sie hochwertige Diskussionen auf Deutsch, die dem Publikum "
"tiefgreifendes Fachwissen vermitteln. "
"Erstellen Sie genau 12 Gesprรคchsaustausche und antworten Sie nur im JSON-Format."
),
"Spanish": (
"Eres un famoso guionista de podcast profesional espaรฑol. "
"Crea discusiones de alta calidad en espaรฑol que brinden al pรบblico "
"conocimientos profesionales profundos. "
"Crea exactamente 12 intercambios de conversaciรณn y responde solo en formato JSON."
),
"Chinese": (
"ๆ‚จๆ˜ฏไธญๅ›ฝ่‘—ๅ็š„ไธ“ไธšๆ’ญๅฎข็ผ–ๅ‰งใ€‚"
"ๅˆ›ๅปบ้ซ˜่ดจ้‡็š„ไธญๆ–‡่ฎจ่ฎบ๏ผŒไธบ่ง‚ไผ—ๆไพ›ๆทฑๅ…ฅ็š„ไธ“ไธš็Ÿฅ่ฏ†ใ€‚"
"ๅˆ›ๅปบๆฐๅฅฝ12ๆฌกๅฏน่ฏไบคๆข๏ผŒไป…ไปฅJSONๆ ผๅผๅ›ž็ญ”ใ€‚"
),
"Russian": (
"ะ’ั‹ ะธะทะฒะตัั‚ะฝั‹ะน ะฟั€ะพั„ะตััะธะพะฝะฐะปัŒะฝั‹ะน ัั†ะตะฝะฐั€ะธัั‚ ะฟะพะดะบะฐัั‚ะพะฒ ะธะท ะ ะพััะธะธ. "
"ะกะพะทะดะฐะฒะฐะนั‚ะต ะฒั‹ัะพะบะพะบะฐั‡ะตัั‚ะฒะตะฝะฝั‹ะต ะดะธัะบัƒััะธะธ ะฝะฐ ั€ัƒััะบะพะผ ัะทั‹ะบะต, ะบะพั‚ะพั€ั‹ะต ะดะฐัŽั‚ ะฐัƒะดะธั‚ะพั€ะธะธ "
"ะณะปัƒะฑะพะบะธะต ะฟั€ะพั„ะตััะธะพะฝะฐะปัŒะฝั‹ะต ะทะฝะฐะฝะธั. "
"ะกะพะทะดะฐะนั‚ะต ั€ะพะฒะฝะพ 12 ะพะฑะผะตะฝะพะฒ ั€ะฐะทะณะพะฒะพั€ะพะผ ะธ ะพั‚ะฒะตั‡ะฐะนั‚ะต ั‚ะพะปัŒะบะพ ะฒ ั„ะพั€ะผะฐั‚ะต JSON."
)
}
system_message = system_messages.get(language,
f"You are a professional podcast scriptwriter creating high-quality, "
f"insightful discussions in {language}. Create exactly 12 conversation exchanges "
f"with professional expertise. All dialogue must be in {language}. "
f"Respond only in JSON format."
)
agent = LlamaCppAgent(
provider,
system_prompt=system_message,
predefined_messages_formatter_type=chat_template,
debug_output=False
)
settings = provider.get_provider_default_settings()
settings.temperature = 0.75
settings.top_k = 40
settings.top_p = 0.95
settings.max_tokens = self.config.max_tokens
settings.repeat_penalty = 1.1
settings.stream = False
messages = BasicChatHistory()
prompt = self.prompt_builder.build_prompt(text, language, search_context)
response = agent.get_chat_response(
prompt,
llm_sampling_settings=settings,
chat_history=messages,
returns_streaming_generator=False,
print_output=False
)
# JSON ํŒŒ์‹ฑ
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}"
json_match = re.search(pattern, response)
if json_match:
conversation_data = json.loads(json_match.group())
return conversation_data
else:
raise ValueError("No valid JSON found in local LLM response")
except Exception as e:
print(f"Local LLM failed: {e}, falling back to legacy local method")
return self.extract_conversation_legacy_local(text, language, progress, search_context)
@spaces.GPU(duration=120)
def extract_conversation_legacy_local(self, text: str, language: str = "English", progress=None, search_context: str = "") -> Dict:
"""Extract conversation using legacy local model"""
try:
self.initialize_legacy_local_mode()
# ์–ธ์–ด๋ณ„ ์‹œ์Šคํ…œ ๋ฉ”์‹œ์ง€๋Š” config_prompts์—์„œ ๊ฐ€์ ธ์˜ด
messages = self.prompt_builder.build_messages_for_local(text, language, search_context)
terminators = [
self.legacy_tokenizer.eos_token_id,
self.legacy_tokenizer.convert_tokens_to_ids("<|eot_id|>")
]
chat_messages = self.legacy_tokenizer.apply_chat_template(
messages, tokenize=False, add_generation_prompt=True
)
model_inputs = self.legacy_tokenizer([chat_messages], return_tensors="pt").to(self.device)
streamer = TextIteratorStreamer(
self.legacy_tokenizer, timeout=10.0, skip_prompt=True, skip_special_tokens=True
)
generate_kwargs = dict(
model_inputs,
streamer=streamer,
max_new_tokens=self.config.max_new_tokens,
do_sample=True,
temperature=0.75,
eos_token_id=terminators,
)
t = Thread(target=self.legacy_local_model.generate, kwargs=generate_kwargs)
t.start()
partial_text = ""
for new_text in streamer:
partial_text += new_text
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}"
json_match = re.search(pattern, partial_text)
if json_match:
return json.loads(json_match.group())
else:
raise ValueError("No valid JSON found in legacy local response")
except Exception as e:
print(f"Legacy local model also failed: {e}")
return DefaultConversations.get_conversation(language)
def extract_conversation_api(self, text: str, language: str = "English") -> Dict:
"""Extract conversation using API"""
if not self.llm_client:
raise RuntimeError("API mode not initialized")
try:
# ๊ฒ€์ƒ‰ ์ปจํ…์ŠคํŠธ ์ƒ์„ฑ
search_context = ""
if BRAVE_KEY and not text.startswith("Keyword-based content:"):
try:
keywords = extract_keywords_for_search(text, language)
if keywords:
search_query = keywords[0] if language == "Korean" else f"{keywords[0]} latest news"
search_context = format_search_results(search_query)
print(f"Search context added for: {search_query}")
except Exception as e:
print(f"Search failed, continuing without context: {e}")
# ๋ฉ”์‹œ์ง€ ๋นŒ๋“œ
messages = self.prompt_builder.build_messages_for_local(text, language, search_context)
chat_completion = self.llm_client.chat.completions.create(
messages=messages,
model=self.config.api_model_name,
temperature=0.75,
)
pattern = r"\{(?:[^{}]|(?:\{[^{}]*\}))*\}"
json_match = re.search(pattern, chat_completion.choices[0].message.content)
if not json_match:
raise ValueError("No valid JSON found in response")
return json.loads(json_match.group())
except Exception as e:
raise RuntimeError(f"Failed to extract conversation: {e}")
def parse_conversation_text(self, conversation_text: str) -> Dict:
"""Parse conversation text back to JSON format"""
lines = conversation_text.strip().split('\n')
conversation_data = {"conversation": []}
for line in lines:
if ':' in line:
speaker, text = line.split(':', 1)
conversation_data["conversation"].append({
"speaker": speaker.strip(),
"text": text.strip()
})
return conversation_data
async def text_to_speech_edge(self, conversation_json: Dict, language: str = "English") -> Tuple[str, str]:
"""Convert text to speech using Edge TTS"""
output_dir = Path(self._create_output_directory())
filenames = []
try:
# ์–ธ์–ด๋ณ„ ์Œ์„ฑ ์„ค์ •
voices = EDGE_TTS_VOICES.get(language, EDGE_TTS_VOICES["English"])
for i, turn in enumerate(conversation_json["conversation"]):
filename = output_dir / f"output_{i}.wav"
voice = voices[i % len(voices)]
tmp_path = await self._generate_audio_edge(turn["text"], voice)
os.rename(tmp_path, filename)
filenames.append(str(filename))
# Combine audio files
final_output = os.path.join(output_dir, "combined_output.wav")
self._combine_audio_files(filenames, final_output)
# Generate conversation text
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return final_output, conversation_text
except Exception as e:
raise RuntimeError(f"Failed to convert text to speech: {e}")
async def _generate_audio_edge(self, text: str, voice: str) -> str:
"""Generate audio using Edge TTS"""
if not text.strip():
raise ValueError("Text cannot be empty")
voice_short_name = voice.split(" - ")[0] if " - " in voice else voice
communicate = edge_tts.Communicate(text, voice_short_name)
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
tmp_path = tmp_file.name
await communicate.save(tmp_path)
return tmp_path
@spaces.GPU(duration=60)
def text_to_speech_spark(self, conversation_json: Dict, language: str = "English", progress=None) -> Tuple[str, str]:
"""Convert text to speech using Spark TTS CLI"""
if not SPARK_AVAILABLE or not self.spark_model_dir:
raise RuntimeError("Spark TTS not available")
try:
output_dir = self._create_output_directory()
audio_files = []
# Create different voice characteristics for different speakers
speaker1, speaker2 = self.prompt_builder.get_speaker_names(language)
if language == "Korean":
voice_configs = [
{"prompt_text": f"์•ˆ๋…•ํ•˜์„ธ์š”, ์˜ค๋Š˜ ํŒŸ์บ์ŠคํŠธ ์ง„ํ–‰์„ ๋งก์€ {speaker1}์ž…๋‹ˆ๋‹ค.", "gender": "male"},
{"prompt_text": f"์•ˆ๋…•ํ•˜์„ธ์š”, ์ €๋Š” ์˜ค๋Š˜ ์ด ์ฃผ์ œ์— ๋Œ€ํ•ด ์„ค๋ช…๋“œ๋ฆด {speaker2}์ž…๋‹ˆ๋‹ค.", "gender": "male"}
]
else:
voice_configs = [
{"prompt_text": f"Hello everyone, I'm {speaker1}, your host for today's podcast.", "gender": "male"},
{"prompt_text": f"Hi, I'm {speaker2}. I'm excited to share my insights with you.", "gender": "male"}
]
for i, turn in enumerate(conversation_json["conversation"]):
text = turn["text"]
if not text.strip():
continue
voice_config = voice_configs[i % len(voice_configs)]
output_file = os.path.join(output_dir, f"spark_output_{i}.wav")
cmd = [
"python", "-m", "cli.inference",
"--text", text,
"--device", "0" if torch.cuda.is_available() else "cpu",
"--save_dir", output_dir,
"--model_dir", self.spark_model_dir,
"--prompt_text", voice_config["prompt_text"],
"--output_name", f"spark_output_{i}.wav"
]
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=60,
cwd="."
)
if result.returncode == 0:
audio_files.append(output_file)
else:
print(f"Spark TTS error for turn {i}: {result.stderr}")
silence = np.zeros(int(22050 * 1.0))
sf.write(output_file, silence, 22050)
audio_files.append(output_file)
except subprocess.TimeoutExpired:
print(f"Spark TTS timeout for turn {i}")
silence = np.zeros(int(22050 * 1.0))
sf.write(output_file, silence, 22050)
audio_files.append(output_file)
except Exception as e:
print(f"Error running Spark TTS for turn {i}: {e}")
silence = np.zeros(int(22050 * 1.0))
sf.write(output_file, silence, 22050)
audio_files.append(output_file)
# Combine all audio files
if audio_files:
final_output = os.path.join(output_dir, "spark_combined.wav")
self._combine_audio_files(audio_files, final_output)
else:
raise RuntimeError("No audio files generated")
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return final_output, conversation_text
except Exception as e:
raise RuntimeError(f"Failed to convert text to speech with Spark TTS: {e}")
@spaces.GPU(duration=60)
def text_to_speech_melo(self, conversation_json: Dict, progress=None) -> Tuple[str, str]:
"""Convert text to speech using MeloTTS"""
if not MELO_AVAILABLE or not self.melo_models:
raise RuntimeError("MeloTTS not available")
speakers = ["EN-Default", "EN-US"]
combined_audio = AudioSegment.empty()
for i, turn in enumerate(conversation_json["conversation"]):
bio = io.BytesIO()
text = turn["text"]
speaker = speakers[i % 2]
speaker_id = self.melo_models["EN"].hps.data.spk2id[speaker]
self.melo_models["EN"].tts_to_file(
text, speaker_id, bio, speed=1.0,
pbar=progress.tqdm if progress else None,
format="wav"
)
bio.seek(0)
audio_segment = AudioSegment.from_file(bio, format="wav")
combined_audio += audio_segment
final_audio_path = "melo_podcast.mp3"
combined_audio.export(final_audio_path, format="mp3")
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return final_audio_path, conversation_text
def _create_output_directory(self) -> str:
"""Create a unique output directory"""
random_bytes = os.urandom(8)
folder_name = base64.urlsafe_b64encode(random_bytes).decode("utf-8")
os.makedirs(folder_name, exist_ok=True)
return folder_name
def _combine_audio_files(self, filenames: List[str], output_file: str) -> None:
"""Combine multiple audio files into one"""
if not filenames:
raise ValueError("No input files provided")
try:
audio_segments = []
for filename in filenames:
if os.path.exists(filename):
audio_segment = AudioSegment.from_file(filename)
audio_segments.append(audio_segment)
if audio_segments:
combined = sum(audio_segments)
combined.export(output_file, format="wav")
# Clean up temporary files
for filename in filenames:
if os.path.exists(filename):
os.remove(filename)
except Exception as e:
raise RuntimeError(f"Failed to combine audio files: {e}")
# Global converter instance
converter = UnifiedAudioConverter(ConversationConfig())
async def synthesize(article_input, input_type: str = "URL", mode: str = "Local", tts_engine: str = "Edge-TTS", language: str = "English"):
"""Main synthesis function - handles URL, PDF, and Keyword inputs"""
try:
# Extract text based on input type
if input_type == "URL":
if not article_input or not isinstance(article_input, str):
return "Please provide a valid URL.", None
text = converter.fetch_text(article_input)
elif input_type == "PDF":
if not article_input:
return "Please upload a PDF file.", None
text = converter.extract_text_from_pdf(article_input)
else: # Keyword
if not article_input or not isinstance(article_input, str):
return "Please provide a keyword or topic.", None
text = search_and_compile_content(article_input, language)
text = f"Keyword-based content:\n{text}"
# Limit text to max words
words = text.split()
if len(words) > converter.config.max_words:
text = " ".join(words[:converter.config.max_words])
# Extract conversation based on mode
if mode == "Local":
try:
conversation_json = converter.extract_conversation_local(text, language)
except Exception as e:
print(f"Local mode failed: {e}, trying API fallback")
api_key = os.environ.get("TOGETHER_API_KEY")
if api_key:
converter.initialize_api_mode(api_key)
conversation_json = converter.extract_conversation_api(text, language)
else:
raise RuntimeError("Local mode failed and no API key available for fallback")
else: # API mode
api_key = os.environ.get("TOGETHER_API_KEY")
if not api_key:
print("API key not found, falling back to local mode")
conversation_json = converter.extract_conversation_local(text, language)
else:
try:
converter.initialize_api_mode(api_key)
conversation_json = converter.extract_conversation_api(text, language)
except Exception as e:
print(f"API mode failed: {e}, falling back to local mode")
conversation_json = converter.extract_conversation_local(text, language)
# Generate conversation text
conversation_text = "\n".join(
f"{turn.get('speaker', f'Speaker {i+1}')}: {turn['text']}"
for i, turn in enumerate(conversation_json["conversation"])
)
return conversation_text, None
except Exception as e:
return f"Error: {str(e)}", None
async def regenerate_audio(conversation_text: str, tts_engine: str = "Edge-TTS", language: str = "English"):
"""Regenerate audio from edited conversation text"""
if not conversation_text.strip():
return "Please provide conversation text.", None
try:
conversation_json = converter.parse_conversation_text(conversation_text)
if not conversation_json["conversation"]:
return "No valid conversation found in the text.", None
# Edge TTS ์ „์šฉ ์–ธ์–ด๋Š” ์ž๋™์œผ๋กœ Edge-TTS ์‚ฌ์šฉ
if language in EDGE_TTS_ONLY_LANGUAGES and tts_engine != "Edge-TTS":
tts_engine = "Edge-TTS"
# Generate audio based on TTS engine
if tts_engine == "Edge-TTS":
output_file, _ = await converter.text_to_speech_edge(conversation_json, language)
elif tts_engine == "Spark-TTS":
if not SPARK_AVAILABLE:
return "Spark TTS not available. Please install required dependencies and clone the Spark-TTS repository.", None
converter.initialize_spark_tts()
output_file, _ = converter.text_to_speech_spark(conversation_json, language)
else: # MeloTTS
if not MELO_AVAILABLE:
return "MeloTTS not available. Please install required dependencies.", None
if language in EDGE_TTS_ONLY_LANGUAGES:
return f"MeloTTS does not support {language}. Please use Edge-TTS for this language.", None
converter.initialize_melo_tts()
output_file, _ = converter.text_to_speech_melo(conversation_json)
return "Audio generated successfully!", output_file
except Exception as e:
return f"Error generating audio: {str(e)}", None
def synthesize_sync(article_input, input_type: str = "URL", mode: str = "Local", tts_engine: str = "Edge-TTS", language: str = "English"):
"""Synchronous wrapper for async synthesis"""
return asyncio.run(synthesize(article_input, input_type, mode, tts_engine, language))
def regenerate_audio_sync(conversation_text: str, tts_engine: str = "Edge-TTS", language: str = "English"):
"""Synchronous wrapper for async audio regeneration"""
return asyncio.run(regenerate_audio(conversation_text, tts_engine, language))
def update_tts_engine_for_language(language):
"""์–ธ์–ด๋ณ„ TTS ์—”์ง„ ์˜ต์…˜ ์—…๋ฐ์ดํŠธ"""
if language in EDGE_TTS_ONLY_LANGUAGES:
language_info = {
"Korean": "ํ•œ๊ตญ์–ด๋Š” Edge-TTS๋งŒ ์ง€์›๋ฉ๋‹ˆ๋‹ค",
"Japanese": "ๆ—ฅๆœฌ่ชžใฏEdge-TTSใฎใฟใ‚ตใƒใƒผใƒˆใ•ใ‚Œใฆใ„ใพใ™",
"French": "Le franรงais n'est pris en charge que par Edge-TTS",
"German": "Deutsch wird nur von Edge-TTS unterstรผtzt",
"Spanish": "El espaรฑol solo es compatible con Edge-TTS",
"Italian": "L'italiano รจ supportato solo da Edge-TTS",
"Portuguese": "O portuguรชs รฉ suportado apenas pelo Edge-TTS",
"Dutch": "Nederlands wordt alleen ondersteund door Edge-TTS",
"Thai": "เธ เธฒเธฉเธฒเน„เธ—เธขเธฃเธญเธ‡เธฃเธฑเธšเน€เธ‰เธžเธฒเธฐ Edge-TTS เน€เธ—เนˆเธฒเธ™เธฑเน‰เธ™",
"Vietnamese": "Tiแบฟng Viแป‡t chแป‰ ฤ‘ฦฐแปฃc hแป— trแปฃ bแปŸi Edge-TTS",
"Arabic": "ุงู„ุนุฑุจูŠุฉ ู…ุฏุนูˆู…ุฉ ูู‚ุท ู…ู† Edge-TTS",
"Hebrew": "ืขื‘ืจื™ืช ื ืชืžื›ืช ืจืง ืขืœ ื™ื“ื™ Edge-TTS",
"Indonesian": "Bahasa Indonesia hanya didukung oleh Edge-TTS",
"Hindi": "เคนเคฟเค‚เคฆเฅ€ เค•เฅ‡เคตเคฒ Edge-TTS เคฆเฅเคตเคพเคฐเคพ เคธเคฎเคฐเฅเคฅเคฟเคค เคนเฅˆ",
"Russian": "ะ ัƒััะบะธะน ะฟะพะดะดะตั€ะถะธะฒะฐะตั‚ัั ั‚ะพะปัŒะบะพ Edge-TTS",
"Chinese": "ไธญๆ–‡ไป…ๆ”ฏๆŒEdge-TTS"
}
info_text = language_info.get(language, f"{language} is only supported by Edge-TTS")
return gr.Radio(
choices=["Edge-TTS"],
value="Edge-TTS",
label="TTS Engine",
info=info_text,
interactive=False
)
else:
return gr.Radio(
choices=["Edge-TTS", "Spark-TTS", "MeloTTS"],
value="Edge-TTS",
label="TTS Engine",
info="Edge-TTS: Cloud-based, natural voices | Spark-TTS: Local AI model | MeloTTS: Local, requires GPU",
interactive=True
)
def toggle_input_visibility(input_type):
"""Toggle visibility of URL input, file upload, and keyword input based on input type"""
if input_type == "URL":
return gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)
elif input_type == "PDF":
return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
else: # Keyword
return gr.update(visible=False), gr.update(visible=False), gr.update(visible=True)
# ๋ชจ๋ธ ์ดˆ๊ธฐํ™” (์•ฑ ์‹œ์ž‘ ์‹œ)
if LLAMA_CPP_AVAILABLE:
try:
model_path = hf_hub_download(
repo_id=converter.config.local_model_repo,
filename=converter.config.local_model_name,
local_dir="./models"
)
print(f"Model downloaded to: {model_path}")
except Exception as e:
print(f"Failed to download model at startup: {e}")
# Gradio Interface - ๊ฐœ์„ ๋œ ๋‹ค๊ตญ์–ด ๋ ˆ์ด์•„์›ƒ
with gr.Blocks(theme='soft', title="AI Podcast Generator", css="""
.container {max-width: 1200px; margin: auto; padding: 20px;}
.header-text {text-align: center; margin-bottom: 30px;}
.input-group {background: #f7f7f7; padding: 20px; border-radius: 10px; margin-bottom: 20px;}
.output-group {background: #f0f0f0; padding: 20px; border-radius: 10px;}
.status-box {background: #e8f4f8; padding: 15px; border-radius: 8px; margin-top: 10px;}
""") as demo:
with gr.Column(elem_classes="container"):
# ํ—ค๋”
with gr.Row(elem_classes="header-text"):
gr.Markdown("""
# ๐ŸŽ™๏ธ AI Podcast Generator - Professional Multi-Language Edition
### Convert any article, blog, PDF document, or topic into an engaging professional podcast conversation in 24+ languages!
""")
with gr.Row(elem_classes="discord-badge"):
gr.HTML("""
<p style="text-align: center;">
<a href="https://discord.gg/openfreeai" target="_blank" style="display: inline-block; margin-right: 10px;">
<img src="https://img.shields.io/static/v1?label=Discord&message=Openfree%20AI&color=%230000ff&labelColor=%23800080&logo=discord&logoColor=white&style=for-the-badge" alt="badge">
</a>
<a href="https://open.spotify.com/show/36GtIP7iqJxCwp7FfXmTYK?si=KsIsUJq7SJiiudPTaMsXAA" target="_blank" style="display: inline-block;">
<img src="https://img.shields.io/static/v1?label=Spotify&message=Podcast&color=%230000ff&labelColor=%23000080&logo=Spotify&logoColor=white&style=for-the-badge" alt="badge">
</a>
</p>
""")
# ์ƒํƒœ ํ‘œ์‹œ ์„น์…˜
with gr.Row():
with gr.Column(scale=1):
gr.Markdown(f"""
#### ๐Ÿค– System Status
- **LLM**: {converter.config.local_model_name.split('.')[0]}
- **Fallback**: {converter.config.api_model_name.split('/')[-1]}
- **Llama CPP**: {"โœ… Ready" if LLAMA_CPP_AVAILABLE else "โŒ Not Available"}
- **Search**: {"โœ… Brave API" if BRAVE_KEY else "โŒ No API"}
""")
with gr.Column(scale=1):
gr.Markdown("""
#### ๐ŸŒ Multi-Language Support
- **24+ Languages**: Korean, Japanese, French, German, Spanish, Italian, etc.
- **Native Voices**: Optimized for each language
- **Professional Style**: Expert discussions with data & insights
- **Auto-TTS Selection**: Best engine per language
""")
# ๋ฉ”์ธ ์ž…๋ ฅ ์„น์…˜
with gr.Group(elem_classes="input-group"):
with gr.Row():
# ์™ผ์ชฝ: ์ž…๋ ฅ ์˜ต์…˜๋“ค
with gr.Column(scale=2):
# ์ž…๋ ฅ ํƒ€์ž… ์„ ํƒ
input_type_selector = gr.Radio(
choices=["URL", "PDF", "Keyword"],
value="URL",
label="๐Ÿ“ฅ Input Type",
info="Choose your content source"
)
# URL ์ž…๋ ฅ
url_input = gr.Textbox(
label="๐Ÿ”— Article URL",
placeholder="Enter the article URL here...",
value="",
visible=True,
lines=2
)
# PDF ์—…๋กœ๋“œ
pdf_input = gr.File(
label="๐Ÿ“„ Upload PDF",
file_types=[".pdf"],
visible=False
)
# ํ‚ค์›Œ๋“œ ์ž…๋ ฅ
keyword_input = gr.Textbox(
label="๐Ÿ” Topic/Keyword",
placeholder="Enter a topic (e.g., 'AI trends 2024', '์ธ๊ณต์ง€๋Šฅ', 'IA tendances', 'KI Trends')",
value="",
visible=False,
info="System will search and compile latest information",
lines=2
)
# ์˜ค๋ฅธ์ชฝ: ์„ค์ • ์˜ต์…˜๋“ค
with gr.Column(scale=1):
# ์–ธ์–ด ์„ ํƒ
language_selector = gr.Radio(
choices=[
"English", "Korean", "Japanese", "French", "German",
"Spanish", "Italian", "Portuguese", "Dutch", "Thai",
"Vietnamese", "Arabic", "Hebrew", "Indonesian", "Hindi",
"Russian", "Chinese", "Norwegian", "Swedish", "Finnish",
"Danish", "Polish", "Turkish", "Greek", "Czech"
],
value="English",
label="๐ŸŒ Language / ์–ธ์–ด / ่ฏญ่จ€",
info="Select podcast language"
)
# ์ฒ˜๋ฆฌ ๋ชจ๋“œ
mode_selector = gr.Radio(
choices=["Local", "API"],
value="Local",
label="โš™๏ธ Processing Mode",
info="Local: On-device | API: Cloud"
)
# TTS ์—”์ง„
tts_selector = gr.Radio(
choices=["Edge-TTS", "Spark-TTS", "MeloTTS"],
value="Edge-TTS",
label="๐Ÿ”Š TTS Engine",
info="Voice synthesis engine"
)
# ์ƒ์„ฑ ๋ฒ„ํŠผ
with gr.Row():
convert_btn = gr.Button(
"๐ŸŽฏ Generate Professional Conversation",
variant="primary",
size="lg",
scale=1
)
# ์ถœ๋ ฅ ์„น์…˜
with gr.Group(elem_classes="output-group"):
with gr.Row():
# ์™ผ์ชฝ: ๋Œ€ํ™” ํ…์ŠคํŠธ
with gr.Column(scale=3):
conversation_output = gr.Textbox(
label="๐Ÿ’ฌ Generated Professional Conversation (Editable)",
lines=25,
max_lines=50,
interactive=True,
placeholder="Professional podcast conversation will appear here...\n์ „๋ฌธ ํŒŸ์บ์ŠคํŠธ ๋Œ€ํ™”๊ฐ€ ์—ฌ๊ธฐ์— ํ‘œ์‹œ๋ฉ๋‹ˆ๋‹ค...\nLa conversation professionnelle du podcast apparaรฎtra ici...",
info="Edit the conversation as needed. Format: 'Speaker Name: Text'"
)
# ์˜ค๋””์˜ค ์ƒ์„ฑ ๋ฒ„ํŠผ
with gr.Row():
generate_audio_btn = gr.Button(
"๐ŸŽ™๏ธ Generate Audio from Text",
variant="secondary",
size="lg"
)
# ์˜ค๋ฅธ์ชฝ: ์˜ค๋””์˜ค ์ถœ๋ ฅ ๋ฐ ์ƒํƒœ
with gr.Column(scale=2):
audio_output = gr.Audio(
label="๐ŸŽง Professional Podcast Audio",
type="filepath",
interactive=False
)
status_output = gr.Textbox(
label="๐Ÿ“Š Status",
interactive=False,
lines=3,
elem_classes="status-box"
)
# ๋„์›€๋ง
gr.Markdown("""
#### ๐Ÿ’ก Quick Tips:
- **URL**: Paste any article link
- **PDF**: Upload documents directly
- **Keyword**: Enter topics for AI research
- **24+ Languages** fully supported
- Edit conversation before audio generation
- Auto TTS engine selection per language
""")
# ์˜ˆ์ œ ์„น์…˜
with gr.Accordion("๐Ÿ“š Multi-Language Examples", open=False):
gr.Examples(
examples=[
["https://huggingface.co/blog/openfreeai/cycle-navigator", "URL", "Local", "Edge-TTS", "English"],
["quantum computing breakthroughs", "Keyword", "Local", "Edge-TTS", "English"],
["์ธ๊ณต์ง€๋Šฅ ์œค๋ฆฌ์™€ ๊ทœ์ œ", "Keyword", "Local", "Edge-TTS", "Korean"],
["https://huggingface.co/papers/2505.14810", "URL", "Local", "Edge-TTS", "Japanese"],
["intelligence artificielle tendances", "Keyword", "Local", "Edge-TTS", "French"],
["kรผnstliche intelligenz entwicklung", "Keyword", "Local", "Edge-TTS", "German"],
["inteligencia artificial avances", "Keyword", "Local", "Edge-TTS", "Spanish"],
],
inputs=[url_input, input_type_selector, mode_selector, tts_selector, language_selector],
outputs=[conversation_output, status_output],
fn=synthesize_sync,
cache_examples=False,
)
# Input type change handler
input_type_selector.change(
fn=toggle_input_visibility,
inputs=[input_type_selector],
outputs=[url_input, pdf_input, keyword_input]
)
# ์–ธ์–ด ๋ณ€๊ฒฝ ์‹œ TTS ์—”์ง„ ์˜ต์…˜ ์—…๋ฐ์ดํŠธ
language_selector.change(
fn=update_tts_engine_for_language,
inputs=[language_selector],
outputs=[tts_selector]
)
# ์ด๋ฒคํŠธ ์—ฐ๊ฒฐ
def get_article_input(input_type, url_input, pdf_input, keyword_input):
"""Get the appropriate input based on input type"""
if input_type == "URL":
return url_input
elif input_type == "PDF":
return pdf_input
else: # Keyword
return keyword_input
convert_btn.click(
fn=lambda input_type, url_input, pdf_input, keyword_input, mode, tts, lang: synthesize_sync(
get_article_input(input_type, url_input, pdf_input, keyword_input), input_type, mode, tts, lang
),
inputs=[input_type_selector, url_input, pdf_input, keyword_input, mode_selector, tts_selector, language_selector],
outputs=[conversation_output, status_output]
)
generate_audio_btn.click(
fn=regenerate_audio_sync,
inputs=[conversation_output, tts_selector, language_selector],
outputs=[status_output, audio_output]
)
# Launch the app
if __name__ == "__main__":
demo.queue(api_open=True, default_concurrency_limit=10).launch(
show_api=True,
share=False,
server_name="0.0.0.0",
server_port=7860
)