diff --git "a/app.py" "b/app.py"
deleted file mode 100644--- "a/app.py"
+++ /dev/null
@@ -1,2875 +0,0 @@
-import asyncio
-import base64
-import json
-from pathlib import Path
-import os
-import numpy as np
-import openai
-from dotenv import load_dotenv
-from fastapi import FastAPI, Request, UploadFile, File, Form
-from fastapi.responses import HTMLResponse, StreamingResponse, JSONResponse
-from fastrtc import (
- AdditionalOutputs,
- AsyncStreamHandler,
- Stream,
- get_twilio_turn_credentials,
- wait_for_item,
-)
-from gradio.utils import get_space
-from openai.types.beta.realtime import ResponseAudioTranscriptDoneEvent
-import httpx
-from typing import Optional, List, Dict
-import gradio as gr
-import io
-from scipy import signal
-import wave
-import torch
-from transformers import pipeline
-import tempfile
-import subprocess
-import pdfplumber
-import scipy.signal as sps
-from datetime import datetime
-from zoneinfo import ZoneInfo
-import concurrent.futures
-
-load_dotenv()
-
-SAMPLE_RATE = 24000
-WHISPER_SAMPLE_RATE = 16000
-SEOUL_TZ = ZoneInfo("Asia/Seoul")
-
-# Whisper model settings
-WHISPER_MODEL_NAME = "openai/whisper-large-v3-turbo"
-WHISPER_BATCH_SIZE = 8
-
-# Real-time segmentation parameters
-MIN_SEG_SEC = 10
-MAX_SEG_SEC = 15
-SILENCE_SEC = 0.6
-SILENCE_THRESH = 1e-4
-
-# CPU-side pool for Whisper tasks
-whisper_executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
-whisper_futures_queue: list[concurrent.futures.Future] = []
-
-# Supported languages for OpenAI Realtime API
-SUPPORTED_LANGUAGES = {
- "ko": "한국어 (Korean)",
- "en": "English",
- "es": "Español (Spanish)",
- "fr": "Français (French)",
- "de": "Deutsch (German)",
- "it": "Italiano (Italian)",
- "pt": "Português (Portuguese)",
- "ru": "Русский (Russian)",
- "ja": "日本語 (Japanese)",
- "zh": "中文 (Chinese)",
- "ar": "العربية (Arabic)",
- "hi": "हिन्दी (Hindi)",
- "nl": "Nederlands (Dutch)",
- "pl": "Polski (Polish)",
- "tr": "Türkçe (Turkish)",
- "vi": "Tiếng Việt (Vietnamese)",
- "th": "ไทย (Thai)",
- "id": "Bahasa Indonesia",
- "sv": "Svenska (Swedish)",
- "da": "Dansk (Danish)",
- "no": "Norsk (Norwegian)",
- "fi": "Suomi (Finnish)",
- "he": "עברית (Hebrew)",
- "uk": "Українська (Ukrainian)",
- "cs": "Čeština (Czech)",
- "el": "Ελληνικά (Greek)",
- "ro": "Română (Romanian)",
- "hu": "Magyar (Hungarian)",
- "ms": "Bahasa Melayu (Malay)"
-}
-
-# HTML content embedded as a string (extended with new tabs)
-HTML_CONTENT = """
-
-
-
-
-
- Mouth of 'MOUSE' - Extended
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
마이크 녹음 → 전사 및 4개 언어 번역
-
-
-
-
-
-
-
-
-
오디오 파일 → 전사 및 4개 언어 번역
-
-
-
-
-
-
-
-
-
비디오 파일 → 오디오 추출 → 전사 및 4개 언어 번역
-
-
-
-
-
-
-
-
-
PDF 파일 → 텍스트 추출 → 4개 언어 번역
-
-
-
-
-
-
-
-
-
실시간 통역 (Korean → EN/ZH/TH/RU)
-
10-15초 문장 단위로 자동 전환 — 최신 내용이 위에 표시됩니다.
-
-
-
-
-
-
-
-
-
-
-"""
-
-# Whisper model loader
-def _get_whisper_pipe():
- """Lazy load Whisper pipeline"""
- if not hasattr(_get_whisper_pipe, "pipe"):
- device = 0 if torch.cuda.is_available() else "cpu"
- _get_whisper_pipe.pipe = pipeline(
- task="automatic-speech-recognition",
- model=WHISPER_MODEL_NAME,
- chunk_length_s=30,
- device=device,
- )
- return _get_whisper_pipe.pipe
-
-# Audio helpers for Whisper
-def _ensure_16k_whisper(y: np.ndarray, sr: int) -> tuple[np.ndarray, int]:
- """Resample audio to 16kHz for Whisper"""
- if sr == WHISPER_SAMPLE_RATE:
- return y.astype(np.float32), WHISPER_SAMPLE_RATE
- g = np.gcd(sr, WHISPER_SAMPLE_RATE)
- y = sps.resample_poly(y, WHISPER_SAMPLE_RATE // g, sr // g).astype(np.float32)
- return y, WHISPER_SAMPLE_RATE
-
-def _should_flush_whisper(buffer: np.ndarray, sr: int) -> bool:
- """Check if audio buffer should be flushed for processing"""
- dur = len(buffer) / sr
- if dur < MIN_SEG_SEC:
- return False
- tail_len = int(SILENCE_SEC * sr)
- tail = buffer[-tail_len:]
- rms = np.sqrt(np.mean(tail ** 2)) if len(tail) else 1.0
- end_of_sentence = rms < SILENCE_THRESH
- return end_of_sentence or dur >= MAX_SEG_SEC
-
-# Translation helper
-def _translate_text_4langs(text: str) -> str:
- """Translate text to 4 languages using OpenAI"""
- try:
- client = openai.OpenAI()
- prompt = (
- "Translate the following text into English (EN), Chinese (ZH), Thai (TH) and Russian (RU).\n"
- "Return ONLY the translations in this format (one per line):\n"
- "EN: \nZH: \nTH: \nRU: \n\n"
- f"Text: {text}"
- )
-
- response = client.chat.completions.create(
- model="gpt-4o-mini",
- messages=[
- {"role": "system", "content": "You are a professional translator."},
- {"role": "user", "content": prompt}
- ],
- temperature=0.7,
- max_tokens=512
- )
-
- return response.choices[0].message.content.strip()
- except Exception as e:
- print(f"Translation error: {e}")
- return f"Translation error: {str(e)}"
-
-# ffmpeg check
-def _check_ffmpeg() -> bool:
- try:
- subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True)
- return True
- except Exception:
- return False
-
-_HAS_FFMPEG = _check_ffmpeg()
-
-def extract_audio_from_video(video_path: str) -> str:
- """Extract audio from video file"""
- tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".wav")
- tmp.close()
-
- if _HAS_FFMPEG:
- cmd = [
- "ffmpeg", "-i", video_path, "-vn",
- "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", "-y", tmp.name
- ]
- result = subprocess.run(cmd, capture_output=True)
- if result.returncode != 0:
- os.unlink(tmp.name)
- raise RuntimeError("ffmpeg error extracting audio")
- return tmp.name
- else:
- raise RuntimeError("ffmpeg is required for video processing")
-
-# GPU workers for Whisper
-def gpu_transcribe_whisper(audio_path: str) -> str:
- """Transcribe audio using Whisper on GPU"""
- pipe = _get_whisper_pipe()
- result = pipe(audio_path, batch_size=WHISPER_BATCH_SIZE, generate_kwargs={"task": "transcribe"}, return_timestamps=True)
- return result["text"].strip()
-
-def gpu_asr_translate_whisper(audio: np.ndarray, sr: int) -> str:
- """Transcribe and translate audio for realtime"""
- pipe = _get_whisper_pipe()
- ko = pipe({"array": audio, "sampling_rate": sr}, batch_size=WHISPER_BATCH_SIZE)["text"].strip()
- trans = _translate_text_4langs(ko).replace("\n", "
")
- ts = datetime.now(SEOUL_TZ).strftime("%Y-%m-%d %H:%M:%S")
- return f"[{ts}]
[KO] {ko}
{trans}
{'-'*40}
"
-
-class BraveSearchClient:
- """Brave Search API client"""
- def __init__(self, api_key: str):
- self.api_key = api_key
- self.base_url = "https://api.search.brave.com/res/v1/web/search"
-
- async def search(self, query: str, count: int = 10) -> List[Dict]:
- """Perform a web search using Brave Search API"""
- if not self.api_key:
- return []
-
- headers = {
- "Accept": "application/json",
- "X-Subscription-Token": self.api_key
- }
- params = {
- "q": query,
- "count": count,
- "lang": "ko"
- }
-
- async with httpx.AsyncClient() as client:
- try:
- response = await client.get(self.base_url, headers=headers, params=params)
- response.raise_for_status()
- data = response.json()
-
- results = []
- if "web" in data and "results" in data["web"]:
- for result in data["web"]["results"][:count]:
- results.append({
- "title": result.get("title", ""),
- "url": result.get("url", ""),
- "description": result.get("description", "")
- })
- return results
- except Exception as e:
- print(f"Brave Search error: {e}")
- return []
-
-
-# Initialize search client globally
-brave_api_key = os.getenv("BSEARCH_API")
-search_client = BraveSearchClient(brave_api_key) if brave_api_key else None
-print(f"Search client initialized: {search_client is not None}, API key present: {bool(brave_api_key)}")
-
-# Store connection settings
-connection_settings = {}
-
-# Store realtime sessions
-realtime_sessions = {}
-
-# Initialize OpenAI client for text chat
-client = openai.AsyncOpenAI()
-
-def get_translation_instructions(target_language: str) -> str:
- """Get instructions for translation based on target language"""
- if not target_language:
- return ""
-
- language_name = SUPPORTED_LANGUAGES.get(target_language, target_language)
- return (
- f"\n\nIMPORTANT: You must respond in {language_name} ({target_language}). "
- f"Translate all your responses to {language_name}."
- )
-
-def update_chatbot(chatbot: list[dict], response: ResponseAudioTranscriptDoneEvent):
- chatbot.append({"role": "assistant", "content": response.transcript})
- return chatbot
-
-
-async def process_text_chat(message: str, web_search_enabled: bool, target_language: str,
- system_prompt: str) -> Dict[str, str]:
- """Process text chat using GPT-4o-mini model"""
- try:
- # If target language is set, override system prompt completely
- if target_language:
- language_name = SUPPORTED_LANGUAGES.get(target_language, target_language)
-
- # Create system prompt in target language
- if target_language == "en":
- base_instructions = f"You are a helpful assistant. You speak ONLY English. Never use Korean or any other language. {system_prompt}"
- user_prefix = "Please respond in English: "
- elif target_language == "ja":
- base_instructions = f"あなたは親切なアシスタントです。日本語のみを話します。韓国語や他の言語は絶対に使用しません。{system_prompt}"
- user_prefix = "日本語で答えてください: "
- elif target_language == "zh":
- base_instructions = f"你是一个乐于助人的助手。你只说中文。绝不使用韩语或其他语言。{system_prompt}"
- user_prefix = "请用中文回答: "
- elif target_language == "es":
- base_instructions = f"Eres un asistente útil. Solo hablas español. Nunca uses coreano u otros idiomas. {system_prompt}"
- user_prefix = "Por favor responde en español: "
- else:
- base_instructions = f"You are a helpful assistant that speaks ONLY {language_name}. {system_prompt}"
- user_prefix = f"Please respond in {language_name}: "
- else:
- base_instructions = system_prompt or "You are a helpful assistant."
- user_prefix = ""
-
- messages = [
- {"role": "system", "content": base_instructions}
- ]
-
- # Handle web search if enabled
- if web_search_enabled and search_client:
- # Check if the message requires web search
- search_keywords = ["날씨", "기온", "비", "눈", "뉴스", "소식", "현재", "최근",
- "오늘", "지금", "가격", "환율", "주가", "weather", "news",
- "current", "today", "price", "2024", "2025"]
-
- should_search = any(keyword in message.lower() for keyword in search_keywords)
-
- if should_search:
- # Perform web search
- search_results = await search_client.search(message)
- if search_results:
- search_context = "웹 검색 결과:\n\n"
- for i, result in enumerate(search_results[:5], 1):
- search_context += f"{i}. {result['title']}\n{result['description']}\n\n"
-
- # Add search context in target language if set
- if target_language:
- search_instruction = f"Use this search information but respond in {SUPPORTED_LANGUAGES.get(target_language, target_language)} only: "
- else:
- search_instruction = "다음 웹 검색 결과를 참고하여 답변하세요: "
-
- messages.append({
- "role": "system",
- "content": search_instruction + "\n\n" + search_context
- })
-
- # Add user message with language prefix
- messages.append({"role": "user", "content": user_prefix + message})
-
- # Call GPT-4o-mini
- response = await client.chat.completions.create(
- model="gpt-4o-mini",
- messages=messages,
- temperature=0.7,
- max_tokens=2000
- )
-
- response_text = response.choices[0].message.content
-
- # Final check - remove any Korean if target language is not Korean
- if target_language and target_language != "ko":
- import re
- if re.search(r'[가-힣]', response_text):
- print(f"[TEXT CHAT] WARNING: Korean detected in response for {target_language}")
- # Try again with stronger prompt
- messages[-1] = {"role": "user", "content": f"ONLY {SUPPORTED_LANGUAGES.get(target_language, target_language)}, NO KOREAN: {message}"}
- retry_response = await client.chat.completions.create(
- model="gpt-4o-mini",
- messages=messages,
- temperature=0.3,
- max_tokens=2000
- )
- response_text = retry_response.choices[0].message.content
-
- print(f"[TEXT CHAT] Target language: {target_language}")
- print(f"[TEXT CHAT] Response preview: {response_text[:100]}...")
-
- return {
- "response": response_text,
- "language": SUPPORTED_LANGUAGES.get(target_language, "") if target_language else ""
- }
-
- except Exception as e:
- print(f"Error in text chat: {e}")
- return {"error": str(e)}
-
-
-class OpenAIHandler(AsyncStreamHandler):
- def __init__(self, web_search_enabled: bool = False, target_language: str = "",
- system_prompt: str = "", webrtc_id: str = None) -> None:
- super().__init__(
- expected_layout="mono",
- output_sample_rate=SAMPLE_RATE,
- output_frame_size=480,
- input_sample_rate=SAMPLE_RATE,
- )
- self.connection = None
- self.output_queue = asyncio.Queue()
- self.search_client = search_client
- self.function_call_in_progress = False
- self.current_function_args = ""
- self.current_call_id = None
- self.webrtc_id = webrtc_id
- self.web_search_enabled = web_search_enabled
- self.target_language = target_language
- self.system_prompt = system_prompt
-
- print(f"[INIT] Handler created with web_search={web_search_enabled}, "
- f"target_language={target_language}")
-
- def copy(self):
- # Get the most recent settings
- if connection_settings:
- # Get the most recent webrtc_id
- recent_ids = sorted(connection_settings.keys(),
- key=lambda k: connection_settings[k].get('timestamp', 0),
- reverse=True)
- if recent_ids:
- recent_id = recent_ids[0]
- settings = connection_settings[recent_id]
-
- # Log the settings being copied
- print(f"[COPY] Copying settings from {recent_id}:")
-
- return OpenAIHandler(
- web_search_enabled=settings.get('web_search_enabled', False),
- target_language=settings.get('target_language', ''),
- system_prompt=settings.get('system_prompt', ''),
- webrtc_id=recent_id
- )
-
- print(f"[COPY] No settings found, creating default handler")
- return OpenAIHandler(web_search_enabled=False)
-
- async def search_web(self, query: str) -> str:
- """Perform web search and return formatted results"""
- if not self.search_client or not self.web_search_enabled:
- return "웹 검색이 비활성화되어 있습니다."
-
- print(f"Searching web for: {query}")
- results = await self.search_client.search(query)
- if not results:
- return f"'{query}'에 대한 검색 결과를 찾을 수 없습니다."
-
- # Format search results
- formatted_results = []
- for i, result in enumerate(results, 1):
- formatted_results.append(
- f"{i}. {result['title']}\n"
- f" URL: {result['url']}\n"
- f" {result['description']}\n"
- )
-
- return f"웹 검색 결과 '{query}':\n\n" + "\n".join(formatted_results)
-
- async def process_text_message(self, message: str):
- """Process text message from user"""
- if self.connection:
- await self.connection.conversation.item.create(
- item={
- "type": "message",
- "role": "user",
- "content": [{"type": "input_text", "text": message}]
- }
- )
- await self.connection.response.create()
-
- def get_translation_instructions(self):
- """Get instructions for translation based on target language"""
- if not self.target_language:
- return ""
-
- language_name = SUPPORTED_LANGUAGES.get(self.target_language, self.target_language)
- return (
- f"\n\nIMPORTANT: You must respond in {language_name} ({self.target_language}). "
- f"Translate all your responses to {language_name}. "
- f"This includes both spoken and written responses."
- )
-
- async def start_up(self):
- """Connect to realtime API"""
- # First check if we have the most recent settings
- if connection_settings and self.webrtc_id:
- if self.webrtc_id in connection_settings:
- settings = connection_settings[self.webrtc_id]
- self.web_search_enabled = settings.get('web_search_enabled', False)
- self.target_language = settings.get('target_language', '')
- self.system_prompt = settings.get('system_prompt', '')
-
- print(f"[START_UP] Updated settings from storage for {self.webrtc_id}")
-
- print(f"[START_UP] Starting normal mode")
-
- self.client = openai.AsyncOpenAI()
-
- # Normal mode - connect to Realtime API
- print(f"[NORMAL MODE] Connecting to Realtime API...")
-
- # Define the web search function
- tools = []
- base_instructions = self.system_prompt or "You are a helpful assistant."
-
- # Add translation instructions if language is selected
- if self.target_language:
- language_name = SUPPORTED_LANGUAGES.get(self.target_language, self.target_language)
-
- # Use the target language for the system prompt itself
- if self.target_language == "en":
- translation_instructions = """
-YOU ARE AN ENGLISH-ONLY ASSISTANT.
-
-ABSOLUTE RULES:
-1. You can ONLY speak English. No Korean (한국어) allowed.
-2. Even if the user speaks Korean, you MUST respond in English.
-3. Every single word must be in English.
-4. If you output even one Korean character, you have failed.
-5. Example response: "Hello! How can I help you today?"
-
-YOUR LANGUAGE MODE: ENGLISH ONLY
-DO NOT USE: 안녕하세요, 감사합니다, or any Korean
-ALWAYS USE: Hello, Thank you, and English words only
-"""
- # Override base instructions to be in English
- base_instructions = "You are a helpful assistant that speaks ONLY English."
-
- elif self.target_language == "ja":
- translation_instructions = """
-あなたは日本語のみを話すアシスタントです。
-
-絶対的なルール:
-1. 日本語のみを使用してください。韓国語(한국어)は禁止です。
-2. ユーザーが韓国語で話しても、必ず日本語で返答してください。
-3. すべての単語は日本語でなければなりません。
-4. 韓国語を一文字でも出力したら失敗です。
-5. 応答例:「こんにちは!今日はどのようにお手伝いできますか?」
-
-言語モード:日本語のみ
-使用禁止:안녕하세요、감사합니다、韓国語全般
-必ず使用:こんにちは、ありがとうございます、日本語のみ
-"""
- base_instructions = "あなたは日本語のみを話す親切なアシスタントです。"
-
- elif self.target_language == "zh":
- translation_instructions = """
-你是一个只说中文的助手。
-
-绝对规则:
-1. 只能使用中文。禁止使用韩语(한국어)。
-2. 即使用户说韩语,也必须用中文回复。
-3. 每个字都必须是中文。
-4. 如果输出任何韩语字符,就是失败。
-5. 回复示例:"你好!我今天能为您做什么?"
-
-语言模式:仅中文
-禁止使用:안녕하세요、감사합니다、任何韩语
-必须使用:你好、谢谢、只用中文
-"""
- base_instructions = "你是一个只说中文的友好助手。"
-
- elif self.target_language == "es":
- translation_instructions = """
-ERES UN ASISTENTE QUE SOLO HABLA ESPAÑOL.
-
-REGLAS ABSOLUTAS:
-1. Solo puedes hablar español. No se permite coreano (한국어).
-2. Incluso si el usuario habla coreano, DEBES responder en español.
-3. Cada palabra debe estar en español.
-4. Si produces aunque sea un carácter coreano, has fallado.
-5. Respuesta ejemplo: "¡Hola! ¿Cómo puedo ayudarte hoy?"
-
-MODO DE IDIOMA: SOLO ESPAÑOL
-NO USAR: 안녕하세요, 감사합니다, o cualquier coreano
-SIEMPRE USAR: Hola, Gracias, y solo palabras en español
-"""
- base_instructions = "Eres un asistente útil que habla SOLO español."
- else:
- translation_instructions = f"""
-YOU MUST ONLY SPEAK {language_name.upper()}.
-
-RULES:
-1. Output only in {language_name}
-2. Never use Korean
-3. Always respond in {language_name}
-"""
- base_instructions = f"You are a helpful assistant that speaks ONLY {language_name}."
- else:
- translation_instructions = ""
-
- if self.web_search_enabled and self.search_client:
- tools = [{
- "type": "function",
- "function": {
- "name": "web_search",
- "description": "Search the web for current information. Use this for weather, news, prices, current events, or any time-sensitive topics.",
- "parameters": {
- "type": "object",
- "properties": {
- "query": {
- "type": "string",
- "description": "The search query"
- }
- },
- "required": ["query"]
- }
- }
- }]
- print("Web search function added to tools")
-
- search_instructions = (
- "\n\nYou have web search capabilities. "
- "IMPORTANT: You MUST use the web_search function for ANY of these topics:\n"
- "- Weather (날씨, 기온, 비, 눈)\n"
- "- News (뉴스, 소식)\n"
- "- Current events (현재, 최근, 오늘, 지금)\n"
- "- Prices (가격, 환율, 주가)\n"
- "- Sports scores or results\n"
- "- Any question about 2024 or 2025\n"
- "- Any time-sensitive information\n\n"
- "When in doubt, USE web_search. It's better to search and provide accurate information "
- "than to guess or use outdated information."
- )
-
- # Combine all instructions
- if translation_instructions:
- # Translation instructions already include base_instructions
- instructions = translation_instructions + search_instructions
- else:
- instructions = base_instructions + search_instructions
- else:
- # No web search
- if translation_instructions:
- instructions = translation_instructions
- else:
- instructions = base_instructions
-
- print(f"[NORMAL MODE] Base instructions: {base_instructions[:100]}...")
- print(f"[NORMAL MODE] Translation instructions: {translation_instructions[:200] if translation_instructions else 'None'}...")
- print(f"[NORMAL MODE] Combined instructions length: {len(instructions)}")
- print(f"[NORMAL MODE] Target language: {self.target_language}")
-
- async with self.client.beta.realtime.connect(
- model="gpt-4o-mini-realtime-preview-2024-12-17"
- ) as conn:
- # Update session with tools
- session_update = {
- "turn_detection": {"type": "server_vad"},
- "instructions": instructions,
- "tools": tools,
- "tool_choice": "auto" if tools else "none",
- "temperature": 0.7,
- "max_response_output_tokens": 4096,
- "modalities": ["text", "audio"],
- "voice": "alloy" # Default voice
- }
-
- # Use appropriate voice for the language
- if self.target_language:
- # Force language through multiple mechanisms
- # 1. Use voice that's known to work well with the language
- voice_map = {
- "en": "nova", # Nova has clearer English
- "es": "nova", # Nova works for Spanish
- "fr": "shimmer", # Shimmer for French
- "de": "echo", # Echo for German
- "ja": "alloy", # Alloy can do Japanese
- "zh": "alloy", # Alloy can do Chinese
- "ko": "nova", # Nova for Korean
- }
- session_update["voice"] = voice_map.get(self.target_language, "nova")
-
- # 2. Add language to modalities (experimental)
- session_update["modalities"] = ["text", "audio"]
-
- # 3. Set output format
- session_update["output_audio_format"] = "pcm16"
-
- # 4. Add language hint to the system (if supported by API)
- if self.target_language in ["en", "es", "fr", "de", "ja", "zh"]:
- session_update["language"] = self.target_language # Try setting language directly
-
- print(f"[TRANSLATION MODE] Session update: {json.dumps(session_update, indent=2)}")
-
- await conn.session.update(session=session_update)
- self.connection = conn
- print(f"Connected with tools: {len(tools)} functions, voice: {session_update.get('voice', 'default')}")
-
- async for event in self.connection:
- # Debug logging for function calls
- if event.type.startswith("response.function_call"):
- print(f"Function event: {event.type}")
-
- if event.type == "response.audio_transcript.done":
- print(f"[RESPONSE] Transcript: {event.transcript[:100]}...")
- print(f"[RESPONSE] Expected language: {self.target_language}")
-
- output_data = {
- "event": event,
- "language": SUPPORTED_LANGUAGES.get(self.target_language, "") if self.target_language else ""
- }
- await self.output_queue.put(AdditionalOutputs(output_data))
-
- elif event.type == "response.audio.delta":
- await self.output_queue.put(
- (
- self.output_sample_rate,
- np.frombuffer(
- base64.b64decode(event.delta), dtype=np.int16
- ).reshape(1, -1),
- ),
- )
-
- # Handle function calls
- elif event.type == "response.function_call_arguments.start":
- print(f"Function call started")
- self.function_call_in_progress = True
- self.current_function_args = ""
- self.current_call_id = getattr(event, 'call_id', None)
-
- elif event.type == "response.function_call_arguments.delta":
- if self.function_call_in_progress:
- self.current_function_args += event.delta
-
- elif event.type == "response.function_call_arguments.done":
- if self.function_call_in_progress:
- print(f"Function call done, args: {self.current_function_args}")
- try:
- args = json.loads(self.current_function_args)
- query = args.get("query", "")
-
- # Emit search event to client
- await self.output_queue.put(AdditionalOutputs({
- "type": "search",
- "query": query
- }))
-
- # Perform the search
- search_results = await self.search_web(query)
- print(f"Search results length: {len(search_results)}")
-
- # Send function result back to the model
- if self.connection and self.current_call_id:
- await self.connection.conversation.item.create(
- item={
- "type": "function_call_output",
- "call_id": self.current_call_id,
- "output": search_results
- }
- )
- await self.connection.response.create()
-
- except Exception as e:
- print(f"Function call error: {e}")
- finally:
- self.function_call_in_progress = False
- self.current_function_args = ""
- self.current_call_id = None
-
- async def receive(self, frame: tuple[int, np.ndarray]) -> None:
- # Normal mode - use Realtime API
- if not self.connection:
- print(f"[RECEIVE] No connection in normal mode, skipping")
- return
- try:
- _, array = frame
- array = array.squeeze()
- audio_message = base64.b64encode(array.tobytes()).decode("utf-8")
- await self.connection.input_audio_buffer.append(audio=audio_message)
- except Exception as e:
- print(f"Error in receive: {e}")
-
- async def emit(self) -> tuple[int, np.ndarray] | AdditionalOutputs | None:
- # Normal mode
- item = await wait_for_item(self.output_queue)
-
- # Check if it's a dict with text message
- if isinstance(item, dict) and item.get('type') == 'text_message':
- await self.process_text_message(item['content'])
- return None
-
- return item
-
- async def shutdown(self) -> None:
- print(f"[SHUTDOWN] Called")
-
- # Normal mode - close Realtime API connection
- if self.connection:
- await self.connection.close()
- self.connection = None
- print("[NORMAL MODE] Connection closed")
-
-
-# Create initial handler instance
-handler = OpenAIHandler(web_search_enabled=False)
-
-# Create components
-chatbot = gr.Chatbot(type="messages")
-
-# Create stream with handler instance
-stream = Stream(
- handler, # Pass instance, not factory
- mode="send-receive",
- modality="audio",
- additional_inputs=[chatbot],
- additional_outputs=[chatbot],
- additional_outputs_handler=update_chatbot,
- rtc_configuration=get_twilio_turn_credentials() if get_space() else None,
- concurrency_limit=5 if get_space() else None,
- time_limit=300 if get_space() else None,
-)
-
-app = FastAPI()
-
-# Mount stream
-stream.mount(app)
-
-# Intercept offer to capture settings
-@app.post("/webrtc/offer", include_in_schema=False)
-async def custom_offer(request: Request):
- """Intercept offer to capture settings"""
- body = await request.json()
-
- webrtc_id = body.get("webrtc_id")
- web_search_enabled = body.get("web_search_enabled", False)
- target_language = body.get("target_language", "")
- system_prompt = body.get("system_prompt", "")
-
- print(f"[OFFER] Received offer with webrtc_id: {webrtc_id}")
- print(f"[OFFER] web_search_enabled: {web_search_enabled}")
- print(f"[OFFER] target_language: {target_language}")
-
- # Store settings with timestamp
- if webrtc_id:
- connection_settings[webrtc_id] = {
- 'web_search_enabled': web_search_enabled,
- 'target_language': target_language,
- 'system_prompt': system_prompt,
- 'timestamp': asyncio.get_event_loop().time()
- }
-
- print(f"[OFFER] Stored settings for {webrtc_id}:")
- print(f"[OFFER] {connection_settings[webrtc_id]}")
-
- # Remove our custom route temporarily
- custom_route = None
- for i, route in enumerate(app.routes):
- if hasattr(route, 'path') and route.path == "/webrtc/offer" and route.endpoint == custom_offer:
- custom_route = app.routes.pop(i)
- break
-
- # Forward to stream's offer handler
- print(f"[OFFER] Forwarding to stream.offer()")
- response = await stream.offer(body)
-
- # Re-add our custom route
- if custom_route:
- app.routes.insert(0, custom_route)
-
- print(f"[OFFER] Response status: {response.get('status', 'unknown') if isinstance(response, dict) else 'OK'}")
-
- return response
-
-
-@app.post("/chat/text")
-async def chat_text(request: Request):
- """Handle text chat messages using GPT-4o-mini"""
- try:
- body = await request.json()
- message = body.get("message", "")
- web_search_enabled = body.get("web_search_enabled", False)
- target_language = body.get("target_language", "")
- system_prompt = body.get("system_prompt", "")
-
- if not message:
- return {"error": "메시지가 비어있습니다."}
-
- # Process text chat
- result = await process_text_chat(message, web_search_enabled, target_language, system_prompt)
-
- return result
-
- except Exception as e:
- print(f"Error in chat_text endpoint: {e}")
- return {"error": "채팅 처리 중 오류가 발생했습니다."}
-
-
-@app.post("/text_message/{webrtc_id}")
-async def receive_text_message(webrtc_id: str, request: Request):
- """Receive text message from client"""
- body = await request.json()
- message = body.get("content", "")
-
- # Find the handler for this connection
- if webrtc_id in stream.handlers:
- handler = stream.handlers[webrtc_id]
- # Queue the text message for processing
- await handler.output_queue.put({
- 'type': 'text_message',
- 'content': message
- })
-
- return {"status": "ok"}
-
-
-@app.get("/outputs")
-async def outputs(webrtc_id: str):
- """Stream outputs including search events"""
- async def output_stream():
- async for output in stream.output_stream(webrtc_id):
- if hasattr(output, 'args') and output.args:
- # Check if it's a search event
- if isinstance(output.args[0], dict) and output.args[0].get('type') == 'search':
- yield f"event: search\ndata: {json.dumps(output.args[0])}\n\n"
- # Regular transcript event with language info
- elif isinstance(output.args[0], dict) and 'event' in output.args[0]:
- event_data = output.args[0]
- if 'event' in event_data and hasattr(event_data['event'], 'transcript'):
- data = {
- "role": "assistant",
- "content": event_data['event'].transcript,
- "language": event_data.get('language', '')
- }
- yield f"event: output\ndata: {json.dumps(data)}\n\n"
-
- return StreamingResponse(output_stream(), media_type="text/event-stream")
-
-
-# Whisper endpoints
-@app.post("/whisper/transcribe")
-async def whisper_transcribe(audio: UploadFile = File(...)):
- """Transcribe audio using Whisper"""
- try:
- # Save uploaded file temporarily
- with tempfile.NamedTemporaryFile(delete=False, suffix=".webm") as tmp:
- content = await audio.read()
- tmp.write(content)
- tmp_path = tmp.name
-
- # Transcribe
- text = await asyncio.get_event_loop().run_in_executor(
- whisper_executor, gpu_transcribe_whisper, tmp_path
- )
-
- # Translate
- translation = _translate_text_4langs(text)
-
- # Clean up
- os.unlink(tmp_path)
-
- return {"text": text, "translation": translation}
-
- except Exception as e:
- print(f"Whisper transcribe error: {e}")
- return {"error": str(e)}
-
-
-@app.post("/whisper/audio")
-async def whisper_audio(audio: UploadFile = File(...)):
- """Process audio file"""
- try:
- # Save uploaded file temporarily
- with tempfile.NamedTemporaryFile(delete=False, suffix=Path(audio.filename).suffix) as tmp:
- content = await audio.read()
- tmp.write(content)
- tmp_path = tmp.name
-
- # Transcribe
- text = await asyncio.get_event_loop().run_in_executor(
- whisper_executor, gpu_transcribe_whisper, tmp_path
- )
-
- # Translate
- translation = _translate_text_4langs(text)
-
- # Clean up
- os.unlink(tmp_path)
-
- return {"text": text, "translation": translation}
-
- except Exception as e:
- print(f"Whisper audio error: {e}")
- return {"error": str(e)}
-
-
-@app.post("/whisper/video")
-async def whisper_video(video: UploadFile = File(...)):
- """Process video file"""
- try:
- # Save uploaded file temporarily
- with tempfile.NamedTemporaryFile(delete=False, suffix=Path(video.filename).suffix) as tmp:
- content = await video.read()
- tmp.write(content)
- tmp_path = tmp.name
-
- # Extract audio
- audio_path = await asyncio.get_event_loop().run_in_executor(
- None, extract_audio_from_video, tmp_path
- )
-
- # Transcribe
- text = await asyncio.get_event_loop().run_in_executor(
- whisper_executor, gpu_transcribe_whisper, audio_path
- )
-
- # Translate
- translation = _translate_text_4langs(text)
-
- # Clean up
- os.unlink(tmp_path)
- os.unlink(audio_path)
-
- return {"text": text, "translation": translation}
-
- except Exception as e:
- print(f"Whisper video error: {e}")
- return {"error": str(e)}
-
-
-@app.post("/whisper/pdf")
-async def whisper_pdf(pdf: UploadFile = File(...), max_pages: int = Form(10)):
- """Process PDF file"""
- try:
- # Save uploaded file temporarily
- with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
- content = await pdf.read()
- tmp.write(content)
- tmp_path = tmp.name
-
- # Extract text
- extracted = []
- with pdfplumber.open(tmp_path) as pdf_doc:
- pages = pdf_doc.pages[:max_pages]
- for idx, pg in enumerate(pages, start=1):
- txt = pg.extract_text() or ""
- if txt.strip():
- extracted.append(f"[Page {idx}]\n{txt}")
-
- full_text = "\n\n".join(extracted)
-
- # Translate each page
- translated = []
- for page_text in extracted:
- trans = _translate_text_4langs(page_text.split('\n', 1)[1]) # Skip page header
- translated.append(page_text.split('\n')[0] + "\n" + trans)
-
- # Clean up
- os.unlink(tmp_path)
-
- return {"text": full_text, "translation": "\n\n".join(translated)}
-
- except Exception as e:
- print(f"Whisper PDF error: {e}")
- return {"error": str(e)}
-
-
-@app.post("/whisper/realtime/start")
-async def whisper_realtime_start():
- """Start realtime transcription session"""
- session_id = os.urandom(16).hex()
- realtime_sessions[session_id] = {
- "buffer": [],
- "queue": asyncio.Queue(),
- "active": True
- }
- return {"session_id": session_id}
-
-
-@app.post("/whisper/realtime/process")
-async def whisper_realtime_process(
- audio: UploadFile = File(...),
- session_id: str = Form(...)
-):
- """Process realtime audio chunk"""
- if session_id not in realtime_sessions:
- return {"error": "Invalid session"}
-
- try:
- # Read audio data
- content = await audio.read()
- audio_array = np.frombuffer(content, dtype=np.int16).astype(np.float32) / 32768.0
-
- # Process in executor
- result = await asyncio.get_event_loop().run_in_executor(
- whisper_executor, gpu_asr_translate_whisper, audio_array, WHISPER_SAMPLE_RATE
- )
-
- # Parse result
- lines = result.split('
')
- timestamp = lines[0].strip('[]') if lines else ""
- text = lines[1].replace('[KO]', '').strip() if len(lines) > 1 else ""
- translation = '
'.join(lines[2:-2]) if len(lines) > 3 else ""
-
- # Queue result
- await realtime_sessions[session_id]["queue"].put({
- "timestamp": timestamp,
- "text": text,
- "translation": translation
- })
-
- return {"status": "ok"}
-
- except Exception as e:
- print(f"Realtime process error: {e}")
- return {"error": str(e)}
-
-
-@app.get("/whisper/realtime/stream")
-async def whisper_realtime_stream(session_id: str):
- """Stream realtime results"""
- if session_id not in realtime_sessions:
- return JSONResponse({"error": "Invalid session"}, status_code=404)
-
- async def stream_results():
- session = realtime_sessions[session_id]
- try:
- while session["active"]:
- try:
- result = await asyncio.wait_for(session["queue"].get(), timeout=1.0)
- yield f"data: {json.dumps(result)}\n\n"
- except asyncio.TimeoutError:
- yield f"data: {json.dumps({'keepalive': True})}\n\n"
- except Exception as e:
- print(f"Stream error: {e}")
- finally:
- # Cleanup session
- if session_id in realtime_sessions:
- del realtime_sessions[session_id]
-
- return StreamingResponse(stream_results(), media_type="text/event-stream")
-
-
-@app.get("/")
-async def index():
- """Serve the HTML page"""
- rtc_config = get_twilio_turn_credentials() if get_space() else None
-
- # Step 1: 원래 HTML의 기본 구조로 시작
- test_html = """
-
-
-
- MOUSE Extended
-
-
-
- MOUSE Extended - Step 1
-
-
-
-
-
-
-
-
-
Voice Chat Tab
-
-
-
-
-
-
-
-
-
-
-
-
마이크 전사
-
-
-
-
-
오디오 파일
-
- 클릭하여 업로드
-
-
-
-
-
-"""
-
- return HTMLResponse(content=test_html)
-
-
-if __name__ == "__main__":
- import uvicorn
-
- mode = os.getenv("MODE")
- if mode == "UI":
- stream.ui.launch(server_port=7860)
- elif mode == "PHONE":
- stream.fastphone(host="0.0.0.0", port=7860)
- else:
- uvicorn.run(app, host="0.0.0.0", port=7860)
\ No newline at end of file