Spaces:
Running
Running
import asyncio | |
import base64 | |
import json | |
from pathlib import Path | |
import os | |
import numpy as np | |
import openai | |
from dotenv import load_dotenv | |
from fastapi import FastAPI, Request | |
from fastapi.responses import HTMLResponse, StreamingResponse | |
from fastrtc import ( | |
AdditionalOutputs, | |
AsyncStreamHandler, | |
Stream, | |
get_twilio_turn_credentials, | |
wait_for_item, | |
) | |
from gradio.utils import get_space | |
from openai.types.beta.realtime import ResponseAudioTranscriptDoneEvent | |
import httpx | |
from typing import Optional, List, Dict | |
import gradio as gr | |
import io | |
from scipy import signal | |
import wave | |
load_dotenv() | |
SAMPLE_RATE = 24000 | |
# Supported languages for OpenAI Realtime API | |
SUPPORTED_LANGUAGES = { | |
"ko": "한국어 (Korean)", | |
"en": "English", | |
"es": "Español (Spanish)", | |
"fr": "Français (French)", | |
"de": "Deutsch (German)", | |
"it": "Italiano (Italian)", | |
"pt": "Português (Portuguese)", | |
"ru": "Русский (Russian)", | |
"ja": "日本語 (Japanese)", | |
"zh": "中文 (Chinese)", | |
"ar": "العربية (Arabic)", | |
"hi": "हिन्दी (Hindi)", | |
"nl": "Nederlands (Dutch)", | |
"pl": "Polski (Polish)", | |
"tr": "Türkçe (Turkish)", | |
"vi": "Tiếng Việt (Vietnamese)", | |
"th": "ไทย (Thai)", | |
"id": "Bahasa Indonesia", | |
"sv": "Svenska (Swedish)", | |
"da": "Dansk (Danish)", | |
"no": "Norsk (Norwegian)", | |
"fi": "Suomi (Finnish)", | |
"he": "עברית (Hebrew)", | |
"uk": "Українська (Ukrainian)", | |
"cs": "Čeština (Czech)", | |
"el": "Ελληνικά (Greek)", | |
"ro": "Română (Romanian)", | |
"hu": "Magyar (Hungarian)", | |
"ms": "Bahasa Melayu (Malay)" | |
} | |
# HTML content embedded as a string | |
HTML_CONTENT = """<!DOCTYPE html> | |
<html lang="ko"> | |
<head> | |
<meta charset="UTF-8"> | |
<meta name="viewport" content="width=device-width, initial-scale=1.0"> | |
<title>Mouth of 'MOUSE'</title> | |
<style> | |
:root { | |
--primary-color: #6f42c1; | |
--secondary-color: #563d7c; | |
--dark-bg: #121212; | |
--card-bg: #1e1e1e; | |
--text-color: #f8f9fa; | |
--border-color: #333; | |
--hover-color: #8a5cf6; | |
} | |
body { | |
font-family: "SF Pro Display", -apple-system, BlinkMacSystemFont, sans-serif; | |
background-color: var(--dark-bg); | |
color: var(--text-color); | |
margin: 0; | |
padding: 0; | |
height: 100vh; | |
display: flex; | |
flex-direction: column; | |
overflow: hidden; | |
} | |
.container { | |
max-width: 1400px; | |
margin: 0 auto; | |
padding: 20px; | |
flex-grow: 1; | |
display: flex; | |
flex-direction: column; | |
width: 100%; | |
height: 100vh; | |
box-sizing: border-box; | |
overflow: hidden; | |
} | |
.header { | |
text-align: center; | |
padding: 15px 0; | |
border-bottom: 1px solid var(--border-color); | |
margin-bottom: 20px; | |
flex-shrink: 0; | |
background-color: var(--card-bg); | |
} | |
.main-content { | |
display: flex; | |
gap: 20px; | |
flex-grow: 1; | |
min-height: 0; | |
overflow: hidden; | |
} | |
.sidebar { | |
width: 350px; | |
flex-shrink: 0; | |
display: flex; | |
flex-direction: column; | |
gap: 20px; | |
overflow-y: auto; | |
max-height: calc(100vh - 120px); | |
} | |
.chat-section { | |
flex-grow: 1; | |
display: flex; | |
flex-direction: column; | |
min-width: 0; | |
} | |
.logo { | |
display: flex; | |
align-items: center; | |
justify-content: center; | |
gap: 10px; | |
} | |
.logo h1 { | |
margin: 0; | |
background: linear-gradient(135deg, var(--primary-color), #a78bfa); | |
-webkit-background-clip: text; | |
background-clip: text; | |
color: transparent; | |
font-size: 32px; | |
letter-spacing: 1px; | |
} | |
/* Settings section */ | |
.settings-section { | |
background-color: var(--card-bg); | |
border-radius: 12px; | |
padding: 20px; | |
border: 1px solid var(--border-color); | |
overflow-y: auto; | |
flex-grow: 1; | |
} | |
.settings-grid { | |
display: flex; | |
flex-direction: column; | |
gap: 15px; | |
margin-bottom: 15px; | |
} | |
.interpretation-section { | |
display: flex; | |
flex-direction: column; | |
gap: 15px; | |
padding: 15px; | |
background-color: var(--dark-bg); | |
border-radius: 8px; | |
margin-top: 15px; | |
} | |
.interpretation-info { | |
font-size: 13px; | |
color: #999; | |
margin-top: 5px; | |
} | |
.setting-item { | |
display: flex; | |
align-items: center; | |
justify-content: space-between; | |
gap: 10px; | |
} | |
.setting-label { | |
font-size: 14px; | |
color: #aaa; | |
min-width: 60px; | |
} | |
/* Toggle switch */ | |
.toggle-switch { | |
position: relative; | |
width: 50px; | |
height: 26px; | |
background-color: #ccc; | |
border-radius: 13px; | |
cursor: pointer; | |
transition: background-color 0.3s; | |
} | |
.toggle-switch.active { | |
background-color: var(--primary-color); | |
} | |
.toggle-slider { | |
position: absolute; | |
top: 3px; | |
left: 3px; | |
width: 20px; | |
height: 20px; | |
background-color: white; | |
border-radius: 50%; | |
transition: transform 0.3s; | |
} | |
.toggle-switch.active .toggle-slider { | |
transform: translateX(24px); | |
} | |
/* Select dropdown */ | |
select { | |
background-color: var(--card-bg); | |
color: var(--text-color); | |
border: 1px solid var(--border-color); | |
padding: 8px 12px; | |
border-radius: 6px; | |
font-size: 14px; | |
cursor: pointer; | |
min-width: 120px; | |
max-width: 200px; | |
} | |
select:focus { | |
outline: none; | |
border-color: var(--primary-color); | |
} | |
/* Text inputs */ | |
.text-input-section { | |
margin-top: 15px; | |
} | |
input[type="text"], textarea { | |
width: 100%; | |
background-color: var(--dark-bg); | |
color: var(--text-color); | |
border: 1px solid var(--border-color); | |
padding: 10px; | |
border-radius: 6px; | |
font-size: 14px; | |
box-sizing: border-box; | |
margin-top: 5px; | |
} | |
input[type="text"]:focus, textarea:focus { | |
outline: none; | |
border-color: var(--primary-color); | |
} | |
textarea { | |
resize: vertical; | |
min-height: 80px; | |
} | |
.chat-container { | |
border-radius: 12px; | |
background-color: var(--card-bg); | |
box-shadow: 0 8px 32px rgba(0, 0, 0, 0.2); | |
padding: 20px; | |
flex-grow: 1; | |
display: flex; | |
flex-direction: column; | |
border: 1px solid var(--border-color); | |
overflow: hidden; | |
min-height: 0; | |
height: 100%; | |
} | |
.chat-messages { | |
flex-grow: 1; | |
overflow-y: auto; | |
padding: 15px; | |
scrollbar-width: thin; | |
scrollbar-color: var(--primary-color) var(--card-bg); | |
min-height: 0; | |
max-height: calc(100vh - 250px); | |
} | |
.chat-messages::-webkit-scrollbar { | |
width: 6px; | |
} | |
.chat-messages::-webkit-scrollbar-thumb { | |
background-color: var(--primary-color); | |
border-radius: 6px; | |
} | |
.message { | |
margin-bottom: 15px; | |
padding: 12px 16px; | |
border-radius: 8px; | |
font-size: 15px; | |
line-height: 1.5; | |
position: relative; | |
max-width: 85%; | |
animation: fade-in 0.3s ease-out; | |
word-wrap: break-word; | |
} | |
@keyframes fade-in { | |
from { | |
opacity: 0; | |
transform: translateY(10px); | |
} | |
to { | |
opacity: 1; | |
transform: translateY(0); | |
} | |
} | |
.message.user { | |
background: linear-gradient(135deg, #2c3e50, #34495e); | |
margin-left: auto; | |
border-bottom-right-radius: 2px; | |
} | |
.message.assistant { | |
background: linear-gradient(135deg, var(--secondary-color), var(--primary-color)); | |
margin-right: auto; | |
border-bottom-left-radius: 2px; | |
} | |
.message.search-result { | |
background: linear-gradient(135deg, #1a5a3e, #2e7d32); | |
font-size: 14px; | |
padding: 10px; | |
margin-bottom: 10px; | |
} | |
.message.assistant.interpretation { | |
background: linear-gradient(135deg, #1a5a3e, #2e7d32); | |
font-style: italic; | |
} | |
.interpretation-arrow { | |
color: #4caf50; | |
font-weight: bold; | |
margin: 0 10px; | |
} | |
.controls { | |
text-align: center; | |
margin-top: auto; | |
display: flex; | |
justify-content: center; | |
gap: 10px; | |
flex-shrink: 0; | |
padding-top: 20px; | |
} | |
/* Responsive design */ | |
@media (max-width: 1024px) { | |
.sidebar { | |
width: 300px; | |
} | |
} | |
@media (max-width: 768px) { | |
.main-content { | |
flex-direction: column; | |
} | |
.sidebar { | |
width: 100%; | |
margin-bottom: 20px; | |
} | |
.chat-section { | |
height: 400px; | |
} | |
} | |
button { | |
background: linear-gradient(135deg, var(--primary-color), var(--secondary-color)); | |
color: white; | |
border: none; | |
padding: 14px 28px; | |
font-family: inherit; | |
font-size: 16px; | |
cursor: pointer; | |
transition: all 0.3s; | |
text-transform: uppercase; | |
letter-spacing: 1px; | |
border-radius: 50px; | |
display: flex; | |
align-items: center; | |
justify-content: center; | |
gap: 10px; | |
box-shadow: 0 4px 10px rgba(111, 66, 193, 0.3); | |
} | |
button:hover { | |
transform: translateY(-2px); | |
box-shadow: 0 6px 15px rgba(111, 66, 193, 0.5); | |
background: linear-gradient(135deg, var(--hover-color), var(--primary-color)); | |
} | |
button:active { | |
transform: translateY(1px); | |
} | |
#send-button { | |
background: linear-gradient(135deg, #2ecc71, #27ae60); | |
padding: 10px 20px; | |
font-size: 14px; | |
flex-shrink: 0; | |
} | |
#send-button:hover { | |
background: linear-gradient(135deg, #27ae60, #229954); | |
} | |
#audio-output { | |
display: none; | |
} | |
.icon-with-spinner { | |
display: flex; | |
align-items: center; | |
justify-content: center; | |
gap: 12px; | |
min-width: 180px; | |
} | |
.spinner { | |
width: 20px; | |
height: 20px; | |
border: 2px solid #ffffff; | |
border-top-color: transparent; | |
border-radius: 50%; | |
animation: spin 1s linear infinite; | |
flex-shrink: 0; | |
} | |
@keyframes spin { | |
to { | |
transform: rotate(360deg); | |
} | |
} | |
.audio-visualizer { | |
display: flex; | |
align-items: center; | |
justify-content: center; | |
gap: 5px; | |
min-width: 80px; | |
height: 25px; | |
} | |
.visualizer-bar { | |
width: 4px; | |
height: 100%; | |
background-color: rgba(255, 255, 255, 0.7); | |
border-radius: 2px; | |
transform-origin: bottom; | |
transform: scaleY(0.1); | |
transition: transform 0.1s ease; | |
} | |
.toast { | |
position: fixed; | |
top: 20px; | |
left: 50%; | |
transform: translateX(-50%); | |
padding: 16px 24px; | |
border-radius: 8px; | |
font-size: 14px; | |
z-index: 1000; | |
display: none; | |
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.3); | |
} | |
.toast.error { | |
background-color: #f44336; | |
color: white; | |
} | |
.toast.warning { | |
background-color: #ff9800; | |
color: white; | |
} | |
.status-indicator { | |
display: inline-flex; | |
align-items: center; | |
margin-top: 10px; | |
font-size: 14px; | |
color: #aaa; | |
} | |
.status-dot { | |
width: 8px; | |
height: 8px; | |
border-radius: 50%; | |
margin-right: 8px; | |
} | |
.status-dot.connected { | |
background-color: #4caf50; | |
} | |
.status-dot.disconnected { | |
background-color: #f44336; | |
} | |
.status-dot.connecting { | |
background-color: #ff9800; | |
animation: pulse 1.5s infinite; | |
} | |
@keyframes pulse { | |
0% { | |
opacity: 0.6; | |
} | |
50% { | |
opacity: 1; | |
} | |
100% { | |
opacity: 0.6; | |
} | |
} | |
.mouse-logo { | |
position: relative; | |
width: 40px; | |
height: 40px; | |
} | |
.mouse-ears { | |
position: absolute; | |
width: 15px; | |
height: 15px; | |
background-color: var(--primary-color); | |
border-radius: 50%; | |
} | |
.mouse-ear-left { | |
top: 0; | |
left: 5px; | |
} | |
.mouse-ear-right { | |
top: 0; | |
right: 5px; | |
} | |
.mouse-face { | |
position: absolute; | |
top: 10px; | |
left: 5px; | |
width: 30px; | |
height: 30px; | |
background-color: var(--secondary-color); | |
border-radius: 50%; | |
} | |
.language-info { | |
font-size: 12px; | |
color: #888; | |
margin-left: 5px; | |
} | |
</style> | |
</head> | |
<body> | |
<div id="error-toast" class="toast"></div> | |
<div class="container"> | |
<div class="header"> | |
<div class="logo"> | |
<div class="mouse-logo"> | |
<div class="mouse-ears mouse-ear-left"></div> | |
<div class="mouse-ears mouse-ear-right"></div> | |
<div class="mouse-face"></div> | |
</div> | |
<h1>MOUSE 음성 챗</h1> | |
</div> | |
<div class="status-indicator"> | |
<div id="status-dot" class="status-dot disconnected"></div> | |
<span id="status-text">연결 대기 중</span> | |
</div> | |
</div> | |
<div class="main-content"> | |
<div class="sidebar"> | |
<div class="settings-section"> | |
<h3 style="margin: 0 0 15px 0; color: var(--primary-color);">설정</h3> | |
<div class="settings-grid"> | |
<div class="setting-item"> | |
<span class="setting-label">웹 검색</span> | |
<div id="search-toggle" class="toggle-switch"> | |
<div class="toggle-slider"></div> | |
</div> | |
</div> | |
<div class="setting-item"> | |
<span class="setting-label">자동 번역</span> | |
<select id="language-select"> | |
<option value="">비활성화</option> | |
<option value="ko">한국어 (Korean)</option> | |
<option value="en">English</option> | |
<option value="es">Español (Spanish)</option> | |
<option value="fr">Français (French)</option> | |
<option value="de">Deutsch (German)</option> | |
<option value="it">Italiano (Italian)</option> | |
<option value="pt">Português (Portuguese)</option> | |
<option value="ru">Русский (Russian)</option> | |
<option value="ja">日本語 (Japanese)</option> | |
<option value="zh">中文 (Chinese)</option> | |
<option value="ar">العربية (Arabic)</option> | |
<option value="hi">हिन्दी (Hindi)</option> | |
<option value="nl">Nederlands (Dutch)</option> | |
<option value="pl">Polski (Polish)</option> | |
<option value="tr">Türkçe (Turkish)</option> | |
<option value="vi">Tiếng Việt (Vietnamese)</option> | |
<option value="th">ไทย (Thai)</option> | |
<option value="id">Bahasa Indonesia</option> | |
<option value="sv">Svenska (Swedish)</option> | |
<option value="da">Dansk (Danish)</option> | |
<option value="no">Norsk (Norwegian)</option> | |
<option value="fi">Suomi (Finnish)</option> | |
<option value="he">עברית (Hebrew)</option> | |
<option value="uk">Українська (Ukrainian)</option> | |
<option value="cs">Čeština (Czech)</option> | |
<option value="el">Ελληνικά (Greek)</option> | |
<option value="ro">Română (Romanian)</option> | |
<option value="hu">Magyar (Hungarian)</option> | |
<option value="ms">Bahasa Melayu (Malay)</option> | |
</select> | |
</div> | |
</div> | |
<div class="interpretation-section"> | |
<div class="setting-item"> | |
<span class="setting-label">자동 통역</span> | |
<div id="interpretation-toggle" class="toggle-switch"> | |
<div class="toggle-slider"></div> | |
</div> | |
</div> | |
<div class="setting-item" id="interpretation-language-container" style="display: none;"> | |
<span class="setting-label">통역 언어</span> | |
<select id="interpretation-language-select"> | |
<option value="">언어 선택</option> | |
<option value="ko">한국어 (Korean)</option> | |
<option value="en">English</option> | |
<option value="es">Español (Spanish)</option> | |
<option value="fr">Français (French)</option> | |
<option value="de">Deutsch (German)</option> | |
<option value="it">Italiano (Italian)</option> | |
<option value="pt">Português (Portuguese)</option> | |
<option value="ru">Русский (Russian)</option> | |
<option value="ja">日本語 (Japanese)</option> | |
<option value="zh">中文 (Chinese)</option> | |
<option value="ar">العربية (Arabic)</option> | |
<option value="hi">हिन्दी (Hindi)</option> | |
<option value="nl">Nederlands (Dutch)</option> | |
<option value="pl">Polski (Polish)</option> | |
<option value="tr">Türkçe (Turkish)</option> | |
<option value="vi">Tiếng Việt (Vietnamese)</option> | |
<option value="th">ไทย (Thai)</option> | |
<option value="id">Bahasa Indonesia</option> | |
<option value="sv">Svenska (Swedish)</option> | |
<option value="da">Dansk (Danish)</option> | |
<option value="no">Norsk (Norwegian)</option> | |
<option value="fi">Suomi (Finnish)</option> | |
<option value="he">עברית (Hebrew)</option> | |
<option value="uk">Українська (Ukrainian)</option> | |
<option value="cs">Čeština (Czech)</option> | |
<option value="el">Ελληνικά (Greek)</option> | |
<option value="ro">Română (Romanian)</option> | |
<option value="hu">Magyar (Hungarian)</option> | |
<option value="ms">Bahasa Melayu (Malay)</option> | |
</select> | |
</div> | |
</div> | |
<div class="interpretation-info" id="interpretation-info" style="display: none;"> | |
<strong>통역 모드 안내:</strong><br> | |
• 음성으로 말하면 선택한 언어로 자동 통역됩니다<br> | |
• Whisper + GPT-4o-mini + TTS를 사용합니다<br> | |
• 말을 마치고 잠시 기다리면 통역이 시작됩니다 | |
</div> | |
<div class="text-input-section"> | |
<label for="system-prompt" class="setting-label">시스템 프롬프트:</label> | |
<textarea id="system-prompt" placeholder="AI 어시스턴트의 성격, 역할, 행동 방식을 정의하세요...">You are a helpful assistant. Respond in a friendly and professional manner.</textarea> | |
</div> | |
</div> | |
<div class="controls"> | |
<button id="start-button">대화 시작</button> | |
</div> | |
</div> | |
<div class="chat-section"> | |
<div class="chat-container"> | |
<h3 style="margin: 0 0 15px 0; color: var(--primary-color);">대화</h3> | |
<div class="chat-messages" id="chat-messages"></div> | |
<div class="text-input-section" style="margin-top: 10px;"> | |
<div style="display: flex; gap: 10px;"> | |
<input type="text" id="text-input" placeholder="텍스트 메시지를 입력하세요..." style="flex-grow: 1;" /> | |
<button id="send-button" style="display: none;">전송</button> | |
</div> | |
</div> | |
</div> | |
</div> | |
</div> | |
</div> | |
<audio id="audio-output"></audio> | |
<script> | |
let peerConnection; | |
let webrtc_id; | |
let webSearchEnabled = false; | |
let selectedLanguage = ""; | |
let interpretationMode = false; | |
let interpretationLanguage = ""; | |
let systemPrompt = "You are a helpful assistant. Respond in a friendly and professional manner."; | |
const audioOutput = document.getElementById('audio-output'); | |
const startButton = document.getElementById('start-button'); | |
const sendButton = document.getElementById('send-button'); | |
const chatMessages = document.getElementById('chat-messages'); | |
const statusDot = document.getElementById('status-dot'); | |
const statusText = document.getElementById('status-text'); | |
const searchToggle = document.getElementById('search-toggle'); | |
const languageSelect = document.getElementById('language-select'); | |
const interpretationToggle = document.getElementById('interpretation-toggle'); | |
const interpretationLanguageSelect = document.getElementById('interpretation-language-select'); | |
const interpretationLanguageContainer = document.getElementById('interpretation-language-container'); | |
const interpretationInfo = document.getElementById('interpretation-info'); | |
const systemPromptInput = document.getElementById('system-prompt'); | |
const textInput = document.getElementById('text-input'); | |
let audioLevel = 0; | |
let animationFrame; | |
let audioContext, analyser, audioSource; | |
let dataChannel = null; | |
let isVoiceActive = false; | |
// Web search toggle functionality | |
searchToggle.addEventListener('click', () => { | |
webSearchEnabled = !webSearchEnabled; | |
searchToggle.classList.toggle('active', webSearchEnabled); | |
console.log('Web search enabled:', webSearchEnabled); | |
}); | |
// Language selection | |
languageSelect.addEventListener('change', () => { | |
selectedLanguage = languageSelect.value; | |
console.log('Selected language:', selectedLanguage); | |
}); | |
// Interpretation mode toggle | |
interpretationToggle.addEventListener('click', () => { | |
if (!interpretationMode) { | |
// Turning ON interpretation mode | |
interpretationLanguageContainer.style.display = 'flex'; | |
interpretationInfo.style.display = 'block'; | |
// Show language selector first | |
showError('통역 언어를 선택해주세요.'); | |
interpretationToggle.classList.remove('active'); | |
// Don't actually enable interpretation mode until language is selected | |
return; | |
} else { | |
// Turning OFF interpretation mode | |
interpretationMode = false; | |
interpretationToggle.classList.remove('active'); | |
interpretationLanguageContainer.style.display = 'none'; | |
interpretationInfo.style.display = 'none'; | |
interpretationLanguage = ''; | |
interpretationLanguageSelect.value = ''; | |
// Re-enable other features | |
languageSelect.disabled = false; | |
searchToggle.style.opacity = '1'; | |
searchToggle.style.pointerEvents = 'auto'; | |
textInput.disabled = false; | |
textInput.placeholder = '텍스트 메시지를 입력하세요...'; | |
sendButton.style.display = 'block'; | |
console.log('Interpretation mode disabled'); | |
// If connected, restart to apply normal mode | |
if (peerConnection && peerConnection.connectionState === 'connected') { | |
showError('일반 모드로 전환하기 위해 연결을 다시 시작합니다.'); | |
stop(); | |
setTimeout(() => { | |
setupWebRTC(); | |
}, 500); | |
} | |
} | |
console.log('Interpretation mode:', interpretationMode); | |
}); | |
// Interpretation language selection | |
interpretationLanguageSelect.addEventListener('change', () => { | |
interpretationLanguage = interpretationLanguageSelect.value; | |
console.log('Interpretation language:', interpretationLanguage); | |
if (interpretationLanguage && !interpretationMode) { | |
// Now actually enable interpretation mode | |
interpretationMode = true; | |
interpretationToggle.classList.add('active'); | |
// Disable other features | |
languageSelect.value = ''; | |
selectedLanguage = ''; | |
languageSelect.disabled = true; | |
searchToggle.classList.remove('active'); | |
webSearchEnabled = false; | |
searchToggle.style.opacity = '0.5'; | |
searchToggle.style.pointerEvents = 'none'; | |
textInput.disabled = true; | |
textInput.placeholder = '통역 모드에서는 텍스트 입력이 지원되지 않습니다'; | |
sendButton.style.display = 'none'; | |
console.log('Interpretation mode enabled with language:', interpretationLanguage); | |
// If already connected, restart the connection with new settings | |
if (peerConnection && peerConnection.connectionState === 'connected') { | |
showError('통역 모드 설정을 적용하기 위해 연결을 다시 시작합니다.'); | |
stop(); | |
setTimeout(() => { | |
setupWebRTC(); | |
}, 500); | |
} | |
} | |
}); | |
// System prompt update | |
systemPromptInput.addEventListener('input', () => { | |
systemPrompt = systemPromptInput.value || "You are a helpful assistant. Respond in a friendly and professional manner."; | |
}); | |
// Text input handling | |
textInput.addEventListener('keypress', (e) => { | |
if (e.key === 'Enter' && !e.shiftKey) { | |
e.preventDefault(); | |
sendTextMessage(); | |
} | |
}); | |
sendButton.addEventListener('click', sendTextMessage); | |
async function sendTextMessage() { | |
const message = textInput.value.trim(); | |
if (!message) return; | |
// Don't allow text messages in interpretation mode | |
if (interpretationMode) { | |
showError('통역 모드에서는 텍스트 입력이 지원되지 않습니다.'); | |
return; | |
} | |
// Add user message to chat | |
addMessage('user', message); | |
textInput.value = ''; | |
// Show sending indicator | |
const typingIndicator = document.createElement('div'); | |
typingIndicator.classList.add('message', 'assistant'); | |
typingIndicator.textContent = '입력 중...'; | |
typingIndicator.id = 'typing-indicator'; | |
chatMessages.appendChild(typingIndicator); | |
chatMessages.scrollTop = chatMessages.scrollHeight; | |
try { | |
// Send to text chat endpoint | |
const response = await fetch('/chat/text', { | |
method: 'POST', | |
headers: { 'Content-Type': 'application/json' }, | |
body: JSON.stringify({ | |
message: message, | |
web_search_enabled: webSearchEnabled, | |
target_language: selectedLanguage, | |
system_prompt: systemPrompt | |
}) | |
}); | |
const data = await response.json(); | |
// Remove typing indicator | |
const indicator = document.getElementById('typing-indicator'); | |
if (indicator) indicator.remove(); | |
if (data.error) { | |
showError(data.error); | |
} else { | |
// Add assistant response | |
let content = data.response; | |
if (selectedLanguage && data.language) { | |
content += ` <span class="language-info">[${data.language}]</span>`; | |
} | |
addMessage('assistant', content); | |
} | |
} catch (error) { | |
console.error('Error sending text message:', error); | |
const indicator = document.getElementById('typing-indicator'); | |
if (indicator) indicator.remove(); | |
showError('메시지 전송 중 오류가 발생했습니다.'); | |
} | |
} | |
function updateStatus(state) { | |
statusDot.className = 'status-dot ' + state; | |
if (state === 'connected') { | |
statusText.textContent = '연결됨'; | |
if (!interpretationMode) { | |
sendButton.style.display = 'block'; | |
} | |
isVoiceActive = true; | |
} else if (state === 'connecting') { | |
statusText.textContent = '연결 중...'; | |
sendButton.style.display = 'none'; | |
} else { | |
statusText.textContent = '연결 대기 중'; | |
if (!interpretationMode) { | |
sendButton.style.display = 'block'; // Show send button even when disconnected for text chat | |
} | |
isVoiceActive = false; | |
} | |
} | |
function updateButtonState() { | |
const button = document.getElementById('start-button'); | |
if (peerConnection && (peerConnection.connectionState === 'connecting' || peerConnection.connectionState === 'new')) { | |
button.innerHTML = ` | |
<div class="icon-with-spinner"> | |
<div class="spinner"></div> | |
<span>연결 중...</span> | |
</div> | |
`; | |
updateStatus('connecting'); | |
} else if (peerConnection && peerConnection.connectionState === 'connected') { | |
button.innerHTML = ` | |
<div class="icon-with-spinner"> | |
<div class="audio-visualizer" id="audio-visualizer"> | |
<div class="visualizer-bar"></div> | |
<div class="visualizer-bar"></div> | |
<div class="visualizer-bar"></div> | |
<div class="visualizer-bar"></div> | |
<div class="visualizer-bar"></div> | |
</div> | |
<span>대화 종료</span> | |
</div> | |
`; | |
updateStatus('connected'); | |
} else { | |
button.innerHTML = '대화 시작'; | |
updateStatus('disconnected'); | |
} | |
} | |
function setupAudioVisualization(stream) { | |
audioContext = new (window.AudioContext || window.webkitAudioContext)(); | |
analyser = audioContext.createAnalyser(); | |
audioSource = audioContext.createMediaStreamSource(stream); | |
audioSource.connect(analyser); | |
analyser.fftSize = 256; | |
const bufferLength = analyser.frequencyBinCount; | |
const dataArray = new Uint8Array(bufferLength); | |
const visualizerBars = document.querySelectorAll('.visualizer-bar'); | |
const barCount = visualizerBars.length; | |
function updateAudioLevel() { | |
analyser.getByteFrequencyData(dataArray); | |
for (let i = 0; i < barCount; i++) { | |
const start = Math.floor(i * (bufferLength / barCount)); | |
const end = Math.floor((i + 1) * (bufferLength / barCount)); | |
let sum = 0; | |
for (let j = start; j < end; j++) { | |
sum += dataArray[j]; | |
} | |
const average = sum / (end - start) / 255; | |
const scaleY = 0.1 + average * 0.9; | |
visualizerBars[i].style.transform = `scaleY(${scaleY})`; | |
} | |
animationFrame = requestAnimationFrame(updateAudioLevel); | |
} | |
updateAudioLevel(); | |
} | |
function showError(message) { | |
const toast = document.getElementById('error-toast'); | |
toast.textContent = message; | |
toast.className = 'toast error'; | |
toast.style.display = 'block'; | |
setTimeout(() => { | |
toast.style.display = 'none'; | |
}, 5000); | |
} | |
async function setupWebRTC() { | |
const config = __RTC_CONFIGURATION__; | |
peerConnection = new RTCPeerConnection(config); | |
const timeoutId = setTimeout(() => { | |
const toast = document.getElementById('error-toast'); | |
toast.textContent = "연결이 평소보다 오래 걸리고 있습니다. VPN을 사용 중이신가요?"; | |
toast.className = 'toast warning'; | |
toast.style.display = 'block'; | |
setTimeout(() => { | |
toast.style.display = 'none'; | |
}, 5000); | |
}, 5000); | |
try { | |
const stream = await navigator.mediaDevices.getUserMedia({ | |
audio: true | |
}); | |
setupAudioVisualization(stream); | |
stream.getTracks().forEach(track => { | |
peerConnection.addTrack(track, stream); | |
}); | |
peerConnection.addEventListener('track', (evt) => { | |
if (audioOutput.srcObject !== evt.streams[0]) { | |
audioOutput.srcObject = evt.streams[0]; | |
audioOutput.play(); | |
} | |
}); | |
// Create data channel for text messages | |
dataChannel = peerConnection.createDataChannel('text'); | |
dataChannel.onopen = () => { | |
console.log('Data channel opened'); | |
}; | |
dataChannel.onmessage = (event) => { | |
const eventJson = JSON.parse(event.data); | |
if (eventJson.type === "error") { | |
showError(eventJson.message); | |
} | |
}; | |
const offer = await peerConnection.createOffer(); | |
await peerConnection.setLocalDescription(offer); | |
await new Promise((resolve) => { | |
if (peerConnection.iceGatheringState === "complete") { | |
resolve(); | |
} else { | |
const checkState = () => { | |
if (peerConnection.iceGatheringState === "complete") { | |
peerConnection.removeEventListener("icegatheringstatechange", checkState); | |
resolve(); | |
} | |
}; | |
peerConnection.addEventListener("icegatheringstatechange", checkState); | |
} | |
}); | |
peerConnection.addEventListener('connectionstatechange', () => { | |
console.log('connectionstatechange', peerConnection.connectionState); | |
if (peerConnection.connectionState === 'connected') { | |
clearTimeout(timeoutId); | |
const toast = document.getElementById('error-toast'); | |
toast.style.display = 'none'; | |
} | |
updateButtonState(); | |
}); | |
webrtc_id = Math.random().toString(36).substring(7); | |
// Log current settings before sending | |
console.log('Sending offer with settings:', { | |
webrtc_id: webrtc_id, | |
web_search_enabled: webSearchEnabled, | |
target_language: selectedLanguage, | |
system_prompt: systemPrompt, | |
interpretation_mode: interpretationMode, | |
interpretation_language: interpretationLanguage | |
}); | |
const response = await fetch('/webrtc/offer', { | |
method: 'POST', | |
headers: { 'Content-Type': 'application/json' }, | |
body: JSON.stringify({ | |
sdp: peerConnection.localDescription.sdp, | |
type: peerConnection.localDescription.type, | |
webrtc_id: webrtc_id, | |
web_search_enabled: webSearchEnabled, | |
target_language: selectedLanguage, | |
system_prompt: systemPrompt, | |
interpretation_mode: interpretationMode, | |
interpretation_language: interpretationLanguage | |
}) | |
}); | |
const serverResponse = await response.json(); | |
if (serverResponse.status === 'failed') { | |
showError(serverResponse.meta.error === 'concurrency_limit_reached' | |
? `너무 많은 연결입니다. 최대 한도는 ${serverResponse.meta.limit} 입니다.` | |
: serverResponse.meta.error); | |
stop(); | |
return; | |
} | |
await peerConnection.setRemoteDescription(serverResponse); | |
const eventSource = new EventSource('/outputs?webrtc_id=' + webrtc_id); | |
eventSource.addEventListener("output", (event) => { | |
const eventJson = JSON.parse(event.data); | |
let content = eventJson.content; | |
// Debug logging for interpretation mode | |
if (interpretationMode) { | |
console.log('[INTERPRETATION OUTPUT]', { | |
content: content, | |
language: eventJson.language, | |
mode: eventJson.mode, | |
expectedLanguage: interpretationLanguage | |
}); | |
} | |
if (selectedLanguage && eventJson.language) { | |
content += ` <span class="language-info">[${eventJson.language}]</span>`; | |
} else if (interpretationMode && eventJson.language) { | |
// In interpretation mode, show the translation process | |
if (content.includes('→')) { | |
// Format: "Korean text → English text" | |
const parts = content.split('→'); | |
if (parts.length === 2) { | |
content = `<span style="color: #999;">${parts[0].trim()}</span>` + | |
`<span class="interpretation-arrow">→</span>` + | |
`<strong>${parts[1].trim()}</strong>`; | |
} | |
} | |
content += ` <span class="language-info">[통역: ${eventJson.language}]</span>`; | |
} | |
addMessage("assistant", content); | |
}); | |
eventSource.addEventListener("search", (event) => { | |
const eventJson = JSON.parse(event.data); | |
if (eventJson.query) { | |
addMessage("search-result", `웹 검색 중: "${eventJson.query}"`); | |
} | |
}); | |
} catch (err) { | |
clearTimeout(timeoutId); | |
console.error('Error setting up WebRTC:', err); | |
showError('연결을 설정하지 못했습니다. 다시 시도해 주세요.'); | |
stop(); | |
} | |
} | |
function addMessage(role, content) { | |
const messageDiv = document.createElement('div'); | |
messageDiv.classList.add('message', role); | |
// Check if it's an interpretation message | |
if (interpretationMode && role === 'assistant' && content.includes('→')) { | |
messageDiv.classList.add('interpretation'); | |
} | |
if (content.includes('<span')) { | |
messageDiv.innerHTML = content; | |
} else { | |
messageDiv.textContent = content; | |
} | |
chatMessages.appendChild(messageDiv); | |
chatMessages.scrollTop = chatMessages.scrollHeight; | |
} | |
function stop() { | |
if (animationFrame) { | |
cancelAnimationFrame(animationFrame); | |
} | |
if (audioContext) { | |
audioContext.close(); | |
audioContext = null; | |
analyser = null; | |
audioSource = null; | |
} | |
if (peerConnection) { | |
if (peerConnection.getTransceivers) { | |
peerConnection.getTransceivers().forEach(transceiver => { | |
if (transceiver.stop) { | |
transceiver.stop(); | |
} | |
}); | |
} | |
if (peerConnection.getSenders) { | |
peerConnection.getSenders().forEach(sender => { | |
if (sender.track && sender.track.stop) sender.track.stop(); | |
}); | |
} | |
console.log('closing'); | |
peerConnection.close(); | |
} | |
dataChannel = null; | |
updateButtonState(); | |
audioLevel = 0; | |
} | |
startButton.addEventListener('click', () => { | |
console.log('clicked'); | |
console.log(peerConnection, peerConnection?.connectionState); | |
if (!peerConnection || peerConnection.connectionState !== 'connected') { | |
setupWebRTC(); | |
} else { | |
console.log('stopping'); | |
stop(); | |
} | |
}); | |
// Initialize send button visibility on page load | |
window.addEventListener('DOMContentLoaded', () => { | |
sendButton.style.display = 'block'; | |
}); | |
</script> | |
</body> | |
</html>""" | |
class BraveSearchClient: | |
"""Brave Search API client""" | |
def __init__(self, api_key: str): | |
self.api_key = api_key | |
self.base_url = "https://api.search.brave.com/res/v1/web/search" | |
async def search(self, query: str, count: int = 10) -> List[Dict]: | |
"""Perform a web search using Brave Search API""" | |
if not self.api_key: | |
return [] | |
headers = { | |
"Accept": "application/json", | |
"X-Subscription-Token": self.api_key | |
} | |
params = { | |
"q": query, | |
"count": count, | |
"lang": "ko" | |
} | |
async with httpx.AsyncClient() as client: | |
try: | |
response = await client.get(self.base_url, headers=headers, params=params) | |
response.raise_for_status() | |
data = response.json() | |
results = [] | |
if "web" in data and "results" in data["web"]: | |
for result in data["web"]["results"][:count]: | |
results.append({ | |
"title": result.get("title", ""), | |
"url": result.get("url", ""), | |
"description": result.get("description", "") | |
}) | |
return results | |
except Exception as e: | |
print(f"Brave Search error: {e}") | |
return [] | |
# Initialize search client globally | |
brave_api_key = os.getenv("BSEARCH_API") | |
search_client = BraveSearchClient(brave_api_key) if brave_api_key else None | |
print(f"Search client initialized: {search_client is not None}, API key present: {bool(brave_api_key)}") | |
# Store connection settings | |
connection_settings = {} | |
# Initialize OpenAI client for text chat | |
client = openai.AsyncOpenAI() | |
def get_translation_instructions(target_language: str) -> str: | |
"""Get instructions for translation based on target language""" | |
if not target_language: | |
return "" | |
language_name = SUPPORTED_LANGUAGES.get(target_language, target_language) | |
return ( | |
f"\n\nIMPORTANT: You must respond in {language_name} ({target_language}). " | |
f"Translate all your responses to {language_name}." | |
) | |
def update_chatbot(chatbot: list[dict], response: ResponseAudioTranscriptDoneEvent): | |
chatbot.append({"role": "assistant", "content": response.transcript}) | |
return chatbot | |
def get_translation_instructions(target_language: str) -> str: | |
"""Get instructions for translation based on target language""" | |
if not target_language: | |
return "" | |
language_name = SUPPORTED_LANGUAGES.get(target_language, target_language) | |
return ( | |
f"\n\nIMPORTANT: You must respond in {language_name} ({target_language}). " | |
f"Translate all your responses to {language_name}." | |
) | |
async def process_text_chat(message: str, web_search_enabled: bool, target_language: str, | |
system_prompt: str) -> Dict[str, str]: | |
"""Process text chat using GPT-4o-mini model""" | |
try: | |
# If target language is set, override system prompt completely | |
if target_language: | |
language_name = SUPPORTED_LANGUAGES.get(target_language, target_language) | |
# Create system prompt in target language | |
if target_language == "en": | |
base_instructions = f"You are a helpful assistant. You speak ONLY English. Never use Korean or any other language. {system_prompt}" | |
user_prefix = "Please respond in English: " | |
elif target_language == "ja": | |
base_instructions = f"あなたは親切なアシスタントです。日本語のみを話します。韓国語や他の言語は絶対に使用しません。{system_prompt}" | |
user_prefix = "日本語で答えてください: " | |
elif target_language == "zh": | |
base_instructions = f"你是一个乐于助人的助手。你只说中文。绝不使用韩语或其他语言。{system_prompt}" | |
user_prefix = "请用中文回答: " | |
elif target_language == "es": | |
base_instructions = f"Eres un asistente útil. Solo hablas español. Nunca uses coreano u otros idiomas. {system_prompt}" | |
user_prefix = "Por favor responde en español: " | |
else: | |
base_instructions = f"You are a helpful assistant that speaks ONLY {language_name}. {system_prompt}" | |
user_prefix = f"Please respond in {language_name}: " | |
else: | |
base_instructions = system_prompt or "You are a helpful assistant." | |
user_prefix = "" | |
messages = [ | |
{"role": "system", "content": base_instructions} | |
] | |
# Handle web search if enabled | |
if web_search_enabled and search_client: | |
# Check if the message requires web search | |
search_keywords = ["날씨", "기온", "비", "눈", "뉴스", "소식", "현재", "최근", | |
"오늘", "지금", "가격", "환율", "주가", "weather", "news", | |
"current", "today", "price", "2024", "2025"] | |
should_search = any(keyword in message.lower() for keyword in search_keywords) | |
if should_search: | |
# Perform web search | |
search_results = await search_client.search(message) | |
if search_results: | |
search_context = "웹 검색 결과:\n\n" | |
for i, result in enumerate(search_results[:5], 1): | |
search_context += f"{i}. {result['title']}\n{result['description']}\n\n" | |
# Add search context in target language if set | |
if target_language: | |
search_instruction = f"Use this search information but respond in {SUPPORTED_LANGUAGES.get(target_language, target_language)} only: " | |
else: | |
search_instruction = "다음 웹 검색 결과를 참고하여 답변하세요: " | |
messages.append({ | |
"role": "system", | |
"content": search_instruction + "\n\n" + search_context | |
}) | |
# Add user message with language prefix | |
messages.append({"role": "user", "content": user_prefix + message}) | |
# Call GPT-4o-mini | |
response = await client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=messages, | |
temperature=0.7, | |
max_tokens=2000 | |
) | |
response_text = response.choices[0].message.content | |
# Final check - remove any Korean if target language is not Korean | |
if target_language and target_language != "ko": | |
import re | |
if re.search(r'[가-힣]', response_text): | |
print(f"[TEXT CHAT] WARNING: Korean detected in response for {target_language}") | |
# Try again with stronger prompt | |
messages[-1] = {"role": "user", "content": f"ONLY {SUPPORTED_LANGUAGES.get(target_language, target_language)}, NO KOREAN: {message}"} | |
retry_response = await client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=messages, | |
temperature=0.3, | |
max_tokens=2000 | |
) | |
response_text = retry_response.choices[0].message.content | |
print(f"[TEXT CHAT] Target language: {target_language}") | |
print(f"[TEXT CHAT] Response preview: {response_text[:100]}...") | |
return { | |
"response": response_text, | |
"language": SUPPORTED_LANGUAGES.get(target_language, "") if target_language else "" | |
} | |
except Exception as e: | |
print(f"Error in text chat: {e}") | |
return {"error": str(e)} | |
class OpenAIHandler(AsyncStreamHandler): | |
def __init__(self, web_search_enabled: bool = False, target_language: str = "", | |
system_prompt: str = "", webrtc_id: str = None, | |
interpretation_mode: bool = False, interpretation_language: str = "") -> None: | |
super().__init__( | |
expected_layout="mono", | |
output_sample_rate=SAMPLE_RATE, | |
output_frame_size=480, | |
input_sample_rate=SAMPLE_RATE, | |
) | |
self.connection = None | |
self.output_queue = asyncio.Queue() | |
self.search_client = search_client | |
self.function_call_in_progress = False | |
self.current_function_args = "" | |
self.current_call_id = None | |
self.webrtc_id = webrtc_id | |
self.web_search_enabled = web_search_enabled | |
self.target_language = target_language | |
self.system_prompt = system_prompt | |
self.interpretation_mode = interpretation_mode | |
self.interpretation_language = interpretation_language | |
# For interpretation mode | |
self.audio_buffer = [] | |
self.is_recording = False | |
self.silence_frames = 0 | |
self.silence_threshold = 20 # Reduced for faster response (20 frames = ~0.4 seconds) | |
self.min_audio_length = 10 # Minimum frames to consider as speech | |
print(f"Handler created with web_search_enabled={web_search_enabled}, " | |
f"target_language={target_language}, webrtc_id={webrtc_id}, " | |
f"interpretation_mode={interpretation_mode}, interpretation_language={interpretation_language}") | |
def copy(self): | |
# Get the most recent settings | |
if connection_settings: | |
# Get the most recent webrtc_id | |
recent_ids = sorted(connection_settings.keys(), | |
key=lambda k: connection_settings[k].get('timestamp', 0), | |
reverse=True) | |
if recent_ids: | |
recent_id = recent_ids[0] | |
settings = connection_settings[recent_id] | |
return OpenAIHandler( | |
web_search_enabled=settings.get('web_search_enabled', False), | |
target_language=settings.get('target_language', ''), | |
system_prompt=settings.get('system_prompt', ''), | |
webrtc_id=recent_id, | |
interpretation_mode=settings.get('interpretation_mode', False), | |
interpretation_language=settings.get('interpretation_language', '') | |
) | |
print(f"Handler.copy() called - creating new handler with default settings") | |
return OpenAIHandler(web_search_enabled=False, interpretation_mode=False) | |
async def search_web(self, query: str) -> str: | |
"""Perform web search and return formatted results""" | |
if not self.search_client or not self.web_search_enabled: | |
return "웹 검색이 비활성화되어 있습니다." | |
print(f"Searching web for: {query}") | |
results = await self.search_client.search(query) | |
if not results: | |
return f"'{query}'에 대한 검색 결과를 찾을 수 없습니다." | |
# Format search results | |
formatted_results = [] | |
for i, result in enumerate(results, 1): | |
formatted_results.append( | |
f"{i}. {result['title']}\n" | |
f" URL: {result['url']}\n" | |
f" {result['description']}\n" | |
) | |
return f"웹 검색 결과 '{query}':\n\n" + "\n".join(formatted_results) | |
async def process_text_message(self, message: str): | |
"""Process text message from user""" | |
if self.connection: | |
await self.connection.conversation.item.create( | |
item={ | |
"type": "message", | |
"role": "user", | |
"content": [{"type": "input_text", "text": message}] | |
} | |
) | |
await self.connection.response.create() | |
async def process_interpretation(self): | |
"""Process audio buffer for interpretation""" | |
if not self.audio_buffer or not self.interpretation_language: | |
return | |
try: | |
print(f"[INTERPRETATION] Processing audio buffer with {len(self.audio_buffer)} frames") | |
# Convert audio buffer to WAV format | |
audio_data = np.concatenate(self.audio_buffer) | |
# Create WAV file in memory | |
wav_buffer = io.BytesIO() | |
with wave.open(wav_buffer, 'wb') as wav_file: | |
wav_file.setnchannels(1) # Mono | |
wav_file.setsampwidth(2) # 16-bit | |
wav_file.setframerate(SAMPLE_RATE) | |
wav_file.writeframes(audio_data.tobytes()) | |
wav_buffer.seek(0) | |
wav_buffer.name = "audio.wav" | |
# 1. Transcribe with Whisper | |
print("[INTERPRETATION] Transcribing with Whisper...") | |
transcript = await self.client.audio.transcriptions.create( | |
model="whisper-1", | |
file=wav_buffer, | |
language="ko" # Assuming Korean input | |
) | |
user_text = transcript.text.strip() | |
print(f"[INTERPRETATION] Transcribed: {user_text}") | |
if not user_text: | |
return | |
# 2. Translate with GPT-4o-mini | |
target_lang_name = SUPPORTED_LANGUAGES.get(self.interpretation_language, self.interpretation_language) | |
# Create very explicit translation examples | |
translation_examples = { | |
"en": { | |
"안녕하세요": "Hello", | |
"감사합니다": "Thank you", | |
"오늘 날씨가 좋네요": "The weather is nice today" | |
}, | |
"ja": { | |
"안녕하세요": "こんにちは", | |
"감사합니다": "ありがとうございます", | |
"오늘 날씨가 좋네요": "今日はいい天気ですね" | |
}, | |
"zh": { | |
"안녕하세요": "你好", | |
"감사합니다": "谢谢", | |
"오늘 날씨가 좋네요": "今天天气很好" | |
}, | |
"es": { | |
"안녕하세요": "Hola", | |
"감사합니다": "Gracias", | |
"오늘 날씨가 좋네요": "El clima está agradable hoy" | |
} | |
} | |
examples = translation_examples.get(self.interpretation_language, translation_examples["en"]) | |
examples_text = "\n".join([f'"{k}" → "{v}"' for k, v in examples.items()]) | |
# Ultra-specific prompt | |
system_prompt = f"""You are a Korean to {target_lang_name} translator. | |
STRICT RULES: | |
1. Output ONLY the {target_lang_name} translation | |
2. Do NOT output Korean | |
3. Do NOT add explanations | |
4. Do NOT answer questions | |
5. Just translate | |
Examples: | |
{examples_text} | |
Now translate the Korean text to {target_lang_name}. Output ONLY the translation in {target_lang_name}:""" | |
print(f"[INTERPRETATION] Translating to {target_lang_name}...") | |
print(f"[INTERPRETATION] System prompt: {system_prompt}") | |
translation_response = await self.client.chat.completions.create( | |
model="gpt-4o-mini", | |
messages=[ | |
{ | |
"role": "system", | |
"content": system_prompt | |
}, | |
{ | |
"role": "user", | |
"content": f"Translate this Korean to {target_lang_name}: {user_text}" | |
} | |
], | |
temperature=0.1, # Very low temperature | |
max_tokens=200 | |
) | |
translated_text = translation_response.choices[0].message.content.strip() | |
# Remove any Korean characters if they accidentally appear | |
import re | |
if re.search(r'[가-힣]', translated_text): | |
print(f"[INTERPRETATION] WARNING: Korean characters detected in translation: {translated_text}") | |
# Try to extract only non-Korean parts | |
translated_text = re.sub(r'[가-힣\s]+', ' ', translated_text).strip() | |
print(f"[INTERPRETATION] Translated: {translated_text}") | |
# 3. Generate speech with TTS | |
print(f"[INTERPRETATION] Generating speech for text: {translated_text}") | |
# Select appropriate voice and ensure it speaks the target language | |
voice_map = { | |
"en": "alloy", # Alloy is native English speaker | |
"es": "nova", # Nova handles Spanish well | |
"fr": "shimmer", # Shimmer handles French well | |
"de": "echo", # Echo handles German well | |
"ja": "nova", # Nova can handle Japanese | |
"zh": "nova", # Nova can handle Chinese | |
"ko": "nova", # Nova can handle Korean | |
} | |
selected_voice = voice_map.get(self.interpretation_language, "nova") | |
print(f"[INTERPRETATION] Using voice: {selected_voice} for language: {self.interpretation_language}") | |
# For some languages, we might need to add pronunciation hints | |
if self.interpretation_language == "en" and re.search(r'[가-힣]', translated_text): | |
print("[INTERPRETATION] ERROR: Korean characters in English translation!") | |
translated_text = "Translation error occurred" | |
try: | |
tts_response = await self.client.audio.speech.create( | |
model="tts-1", | |
voice=selected_voice, | |
input=translated_text, | |
response_format="pcm", # PCM format for direct playback | |
speed=1.0 | |
) | |
except Exception as tts_error: | |
print(f"[INTERPRETATION] TTS Error: {tts_error}") | |
# If TTS fails, try with a different voice | |
tts_response = await self.client.audio.speech.create( | |
model="tts-1", | |
voice="alloy", # Fallback to alloy | |
input=translated_text, | |
response_format="pcm", | |
speed=1.0 | |
) | |
# Convert response to bytes | |
audio_bytes = b"" | |
async for chunk in tts_response.iter_bytes(1024): | |
audio_bytes += chunk | |
# Convert PCM to numpy array (TTS outputs at 24kHz) | |
audio_array = np.frombuffer(audio_bytes, dtype=np.int16) | |
# Send audio in chunks | |
if len(audio_array) > 0: | |
# Split audio into chunks and send | |
chunk_size = 480 # Match our frame size | |
for i in range(0, len(audio_array), chunk_size): | |
chunk = audio_array[i:i + chunk_size] | |
if len(chunk) < chunk_size: | |
# Pad the last chunk if necessary | |
chunk = np.pad(chunk, (0, chunk_size - len(chunk)), 'constant') | |
await self.output_queue.put((SAMPLE_RATE, chunk.reshape(1, -1))) | |
# Send transcript event | |
output_data = { | |
"event": type('Event', (), { | |
'transcript': f"{user_text} → {translated_text}" | |
})(), | |
"language": target_lang_name, | |
"mode": "interpretation" | |
} | |
await self.output_queue.put(AdditionalOutputs(output_data)) | |
except Exception as e: | |
print(f"[INTERPRETATION] Error: {e}") | |
import traceback | |
traceback.print_exc() | |
# Send error message to client | |
error_data = { | |
"event": type('Event', (), { | |
'transcript': f"통역 오류: {str(e)}" | |
})(), | |
"language": "", | |
"mode": "error" | |
} | |
await self.output_queue.put(AdditionalOutputs(error_data)) | |
finally: | |
# Clear the audio buffer | |
self.audio_buffer = [] | |
self.is_recording = False | |
self.silence_frames = 0 | |
def get_translation_instructions(self): | |
"""Get instructions for translation based on target language""" | |
if not self.target_language or self.interpretation_mode: | |
return "" | |
language_name = SUPPORTED_LANGUAGES.get(self.target_language, self.target_language) | |
return ( | |
f"\n\nIMPORTANT: You must respond in {language_name} ({self.target_language}). " | |
f"Translate all your responses to {language_name}. " | |
f"This includes both spoken and written responses." | |
) | |
async def start_up(self): | |
"""Connect to realtime API or setup interpretation mode""" | |
# First check if we have the most recent settings | |
if connection_settings: | |
recent_ids = sorted(connection_settings.keys(), | |
key=lambda k: connection_settings[k].get('timestamp', 0), | |
reverse=True) | |
if recent_ids: | |
recent_id = recent_ids[0] | |
settings = connection_settings[recent_id] | |
self.web_search_enabled = settings.get('web_search_enabled', False) | |
self.target_language = settings.get('target_language', '') | |
self.system_prompt = settings.get('system_prompt', '') | |
self.interpretation_mode = settings.get('interpretation_mode', False) | |
self.interpretation_language = settings.get('interpretation_language', '') | |
self.webrtc_id = recent_id | |
print(f"start_up: Updated settings from storage - webrtc_id={self.webrtc_id}, " | |
f"web_search_enabled={self.web_search_enabled}, target_language={self.target_language}, " | |
f"interpretation_mode={self.interpretation_mode}") | |
print(f"Handler interpretation settings: mode={self.interpretation_mode}, language={self.interpretation_language}") | |
print(f"Starting up handler with web_search_enabled={self.web_search_enabled}, " | |
f"target_language={self.target_language}, interpretation_mode={self.interpretation_mode}, " | |
f"interpretation_language={self.interpretation_language}") | |
self.client = openai.AsyncOpenAI() | |
# If in interpretation mode, don't connect to Realtime API | |
if self.interpretation_mode: | |
print(f"[INTERPRETATION MODE] Active - using Whisper + GPT-4o-mini + TTS") | |
print(f"[INTERPRETATION MODE] Target language: {self.interpretation_language}") | |
# Just keep the handler ready to process audio | |
# Don't use infinite loop here - the handler will be called by the framework | |
self.client = openai.AsyncOpenAI() | |
return | |
# Normal mode - connect to Realtime API | |
# Define the web search function | |
tools = [] | |
base_instructions = self.system_prompt or "You are a helpful assistant." | |
# Add translation instructions if language is selected | |
if self.target_language: | |
language_name = SUPPORTED_LANGUAGES.get(self.target_language, self.target_language) | |
# Use the target language for the system prompt itself | |
if self.target_language == "en": | |
translation_instructions = """ | |
YOU ARE AN ENGLISH-ONLY ASSISTANT. | |
ABSOLUTE RULES: | |
1. You can ONLY speak English. No Korean (한국어) allowed. | |
2. Even if the user speaks Korean, you MUST respond in English. | |
3. Every single word must be in English. | |
4. If you output even one Korean character, you have failed. | |
5. Example response: "Hello! How can I help you today?" | |
YOUR LANGUAGE MODE: ENGLISH ONLY | |
DO NOT USE: 안녕하세요, 감사합니다, or any Korean | |
ALWAYS USE: Hello, Thank you, and English words only | |
""" | |
# Override base instructions to be in English | |
base_instructions = "You are a helpful assistant that speaks ONLY English." | |
elif self.target_language == "ja": | |
translation_instructions = """ | |
あなたは日本語のみを話すアシスタントです。 | |
絶対的なルール: | |
1. 日本語のみを使用してください。韓国語(한국어)は禁止です。 | |
2. ユーザーが韓国語で話しても、必ず日本語で返答してください。 | |
3. すべての単語は日本語でなければなりません。 | |
4. 韓国語を一文字でも出力したら失敗です。 | |
5. 応答例:「こんにちは!今日はどのようにお手伝いできますか?」 | |
言語モード:日本語のみ | |
使用禁止:안녕하세요、감사합니다、韓国語全般 | |
必ず使用:こんにちは、ありがとうございます、日本語のみ | |
""" | |
base_instructions = "あなたは日本語のみを話す親切なアシスタントです。" | |
elif self.target_language == "zh": | |
translation_instructions = """ | |
你是一个只说中文的助手。 | |
绝对规则: | |
1. 只能使用中文。禁止使用韩语(한국어)。 | |
2. 即使用户说韩语,也必须用中文回复。 | |
3. 每个字都必须是中文。 | |
4. 如果输出任何韩语字符,就是失败。 | |
5. 回复示例:"你好!我今天能为您做什么?" | |
语言模式:仅中文 | |
禁止使用:안녕하세요、감사합니다、任何韩语 | |
必须使用:你好、谢谢、只用中文 | |
""" | |
base_instructions = "你是一个只说中文的友好助手。" | |
elif self.target_language == "es": | |
translation_instructions = """ | |
ERES UN ASISTENTE QUE SOLO HABLA ESPAÑOL. | |
REGLAS ABSOLUTAS: | |
1. Solo puedes hablar español. No se permite coreano (한국어). | |
2. Incluso si el usuario habla coreano, DEBES responder en español. | |
3. Cada palabra debe estar en español. | |
4. Si produces aunque sea un carácter coreano, has fallado. | |
5. Respuesta ejemplo: "¡Hola! ¿Cómo puedo ayudarte hoy?" | |
MODO DE IDIOMA: SOLO ESPAÑOL | |
NO USAR: 안녕하세요, 감사합니다, o cualquier coreano | |
SIEMPRE USAR: Hola, Gracias, y solo palabras en español | |
""" | |
base_instructions = "Eres un asistente útil que habla SOLO español." | |
else: | |
translation_instructions = f""" | |
YOU MUST ONLY SPEAK {language_name.upper()}. | |
RULES: | |
1. Output only in {language_name} | |
2. Never use Korean | |
3. Always respond in {language_name} | |
""" | |
base_instructions = f"You are a helpful assistant that speaks ONLY {language_name}." | |
else: | |
translation_instructions = "" | |
if self.web_search_enabled and self.search_client: | |
tools = [{ | |
"type": "function", | |
"function": { | |
"name": "web_search", | |
"description": "Search the web for current information. Use this for weather, news, prices, current events, or any time-sensitive topics.", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"query": { | |
"type": "string", | |
"description": "The search query" | |
} | |
}, | |
"required": ["query"] | |
} | |
} | |
}] | |
print("Web search function added to tools") | |
search_instructions = ( | |
"\n\nYou have web search capabilities. " | |
"IMPORTANT: You MUST use the web_search function for ANY of these topics:\n" | |
"- Weather (날씨, 기온, 비, 눈)\n" | |
"- News (뉴스, 소식)\n" | |
"- Current events (현재, 최근, 오늘, 지금)\n" | |
"- Prices (가격, 환율, 주가)\n" | |
"- Sports scores or results\n" | |
"- Any question about 2024 or 2025\n" | |
"- Any time-sensitive information\n\n" | |
"When in doubt, USE web_search. It's better to search and provide accurate information " | |
"than to guess or use outdated information." | |
) | |
# Combine all instructions | |
if translation_instructions: | |
# Translation instructions already include base_instructions | |
instructions = translation_instructions + search_instructions | |
else: | |
instructions = base_instructions + search_instructions | |
else: | |
# No web search | |
if translation_instructions: | |
instructions = translation_instructions | |
else: | |
instructions = base_instructions | |
print(f"[NORMAL MODE] Base instructions: {base_instructions[:100]}...") | |
print(f"[NORMAL MODE] Translation instructions: {translation_instructions[:200] if translation_instructions else 'None'}...") | |
print(f"[NORMAL MODE] Combined instructions length: {len(instructions)}") | |
print(f"[NORMAL MODE] Target language: {self.target_language}") | |
async with self.client.beta.realtime.connect( | |
model="gpt-4o-mini-realtime-preview-2024-12-17" | |
) as conn: | |
# Update session with tools | |
session_update = { | |
"turn_detection": {"type": "server_vad"}, | |
"instructions": instructions, | |
"tools": tools, | |
"tool_choice": "auto" if tools else "none", | |
"temperature": 0.7, | |
"max_response_output_tokens": 4096, | |
"modalities": ["text", "audio"], | |
"voice": "alloy" # Default voice | |
} | |
# Use appropriate voice for the language | |
if self.target_language: | |
# Force language through multiple mechanisms | |
# 1. Use voice that's known to work well with the language | |
voice_map = { | |
"en": "nova", # Nova has clearer English | |
"es": "nova", # Nova works for Spanish | |
"fr": "shimmer", # Shimmer for French | |
"de": "echo", # Echo for German | |
"ja": "alloy", # Alloy can do Japanese | |
"zh": "alloy", # Alloy can do Chinese | |
"ko": "nova", # Nova for Korean | |
} | |
session_update["voice"] = voice_map.get(self.target_language, "nova") | |
# 2. Add language to modalities (experimental) | |
session_update["modalities"] = ["text", "audio"] | |
# 3. Set output format | |
session_update["output_audio_format"] = "pcm16" | |
# 4. Add language hint to the system (if supported by API) | |
if self.target_language in ["en", "es", "fr", "de", "ja", "zh"]: | |
session_update["language"] = self.target_language # Try setting language directly | |
print(f"[TRANSLATION MODE] Session update: {json.dumps(session_update, indent=2)}") | |
await conn.session.update(session=session_update) | |
self.connection = conn | |
print(f"Connected with tools: {len(tools)} functions, voice: {session_update.get('voice', 'default')}") | |
async for event in self.connection: | |
# Debug logging for function calls | |
if event.type.startswith("response.function_call"): | |
print(f"Function event: {event.type}") | |
if event.type == "response.audio_transcript.done": | |
print(f"[RESPONSE] Transcript: {event.transcript[:100]}...") | |
print(f"[RESPONSE] Expected language: {self.target_language}") | |
output_data = { | |
"event": event, | |
"language": SUPPORTED_LANGUAGES.get(self.target_language, "") if self.target_language else "" | |
} | |
await self.output_queue.put(AdditionalOutputs(output_data)) | |
elif event.type == "response.audio.delta": | |
await self.output_queue.put( | |
( | |
self.output_sample_rate, | |
np.frombuffer( | |
base64.b64decode(event.delta), dtype=np.int16 | |
).reshape(1, -1), | |
), | |
) | |
# Handle function calls (only in non-interpretation mode) | |
elif event.type == "response.function_call_arguments.start" and not self.interpretation_mode: | |
print(f"Function call started") | |
self.function_call_in_progress = True | |
self.current_function_args = "" | |
self.current_call_id = getattr(event, 'call_id', None) | |
elif event.type == "response.function_call_arguments.delta" and not self.interpretation_mode: | |
if self.function_call_in_progress: | |
self.current_function_args += event.delta | |
elif event.type == "response.function_call_arguments.done" and not self.interpretation_mode: | |
if self.function_call_in_progress: | |
print(f"Function call done, args: {self.current_function_args}") | |
try: | |
args = json.loads(self.current_function_args) | |
query = args.get("query", "") | |
# Emit search event to client | |
await self.output_queue.put(AdditionalOutputs({ | |
"type": "search", | |
"query": query | |
})) | |
# Perform the search | |
search_results = await self.search_web(query) | |
print(f"Search results length: {len(search_results)}") | |
# Send function result back to the model | |
if self.connection and self.current_call_id: | |
await self.connection.conversation.item.create( | |
item={ | |
"type": "function_call_output", | |
"call_id": self.current_call_id, | |
"output": search_results | |
} | |
) | |
await self.connection.response.create() | |
except Exception as e: | |
print(f"Function call error: {e}") | |
finally: | |
self.function_call_in_progress = False | |
self.current_function_args = "" | |
self.current_call_id = None | |
async def receive(self, frame: tuple[int, np.ndarray]) -> None: | |
if self.interpretation_mode: | |
# In interpretation mode, buffer audio and process with Whisper | |
_, array = frame | |
array = array.squeeze() | |
# Simple voice activity detection | |
audio_level = np.abs(array).mean() | |
if audio_level > 200: # Lower threshold for better detection | |
if not self.is_recording: | |
print(f"[INTERPRETATION] Started recording, level: {audio_level:.1f}") | |
self.is_recording = True | |
self.silence_frames = 0 | |
self.audio_buffer.append(array) | |
elif self.is_recording: | |
self.silence_frames += 1 | |
self.audio_buffer.append(array) | |
# If we've had enough silence, process the audio | |
if self.silence_frames > self.silence_threshold and len(self.audio_buffer) > self.min_audio_length: | |
print(f"[INTERPRETATION] Silence detected after {len(self.audio_buffer)} frames") | |
# Process in the background to avoid blocking | |
asyncio.create_task(self.process_interpretation()) | |
else: | |
# Normal mode - use Realtime API | |
if not self.connection: | |
return | |
try: | |
_, array = frame | |
array = array.squeeze() | |
audio_message = base64.b64encode(array.tobytes()).decode("utf-8") | |
await self.connection.input_audio_buffer.append(audio=audio_message) | |
except Exception as e: | |
print(f"Error in receive: {e}") | |
# Connection might be closed, ignore the error | |
async def emit(self) -> tuple[int, np.ndarray] | AdditionalOutputs | None: | |
# In interpretation mode, we need to keep checking for audio | |
if self.interpretation_mode: | |
# Use a timeout to prevent blocking forever | |
try: | |
item = await asyncio.wait_for(wait_for_item(self.output_queue), timeout=0.1) | |
return item | |
except asyncio.TimeoutError: | |
return None | |
else: | |
# Normal mode | |
item = await wait_for_item(self.output_queue) | |
# Check if it's a dict with text message | |
if isinstance(item, dict) and item.get('type') == 'text_message': | |
await self.process_text_message(item['content']) | |
return None | |
return item | |
async def shutdown(self) -> None: | |
if self.interpretation_mode: | |
# Clean up interpretation mode | |
self.audio_buffer = [] | |
self.is_recording = False | |
print("[INTERPRETATION MODE] Shutdown complete") | |
else: | |
# Normal mode - close Realtime API connection | |
if self.connection: | |
await self.connection.close() | |
self.connection = None | |
# Create initial handler instance | |
handler = OpenAIHandler(web_search_enabled=False, interpretation_mode=False) | |
# Create components | |
chatbot = gr.Chatbot(type="messages") | |
# Create stream with handler instance | |
stream = Stream( | |
handler, # Pass instance, not factory | |
mode="send-receive", | |
modality="audio", | |
additional_inputs=[chatbot], | |
additional_outputs=[chatbot], | |
additional_outputs_handler=update_chatbot, | |
rtc_configuration=get_twilio_turn_credentials() if get_space() else None, | |
concurrency_limit=5 if get_space() else None, | |
time_limit=300 if get_space() else None, | |
) | |
app = FastAPI() | |
# Mount stream | |
stream.mount(app) | |
# Intercept offer to capture settings | |
async def custom_offer(request: Request): | |
"""Intercept offer to capture settings""" | |
body = await request.json() | |
webrtc_id = body.get("webrtc_id") | |
web_search_enabled = body.get("web_search_enabled", False) | |
target_language = body.get("target_language", "") | |
system_prompt = body.get("system_prompt", "") | |
interpretation_mode = body.get("interpretation_mode", False) | |
interpretation_language = body.get("interpretation_language", "") | |
print(f"Custom offer - webrtc_id: {webrtc_id}, web_search_enabled: {web_search_enabled}, " | |
f"target_language: {target_language}, interpretation_mode: {interpretation_mode}, " | |
f"interpretation_language: {interpretation_language}") | |
# Store settings with timestamp | |
if webrtc_id: | |
connection_settings[webrtc_id] = { | |
'web_search_enabled': web_search_enabled, | |
'target_language': target_language, | |
'system_prompt': system_prompt, | |
'interpretation_mode': interpretation_mode, | |
'interpretation_language': interpretation_language, | |
'timestamp': asyncio.get_event_loop().time() | |
} | |
# Remove our custom route temporarily | |
custom_route = None | |
for i, route in enumerate(app.routes): | |
if hasattr(route, 'path') and route.path == "/webrtc/offer" and route.endpoint == custom_offer: | |
custom_route = app.routes.pop(i) | |
break | |
# Forward to stream's offer handler | |
response = await stream.offer(body) | |
# Re-add our custom route | |
if custom_route: | |
app.routes.insert(0, custom_route) | |
return response | |
async def chat_text(request: Request): | |
"""Handle text chat messages using GPT-4o-mini""" | |
try: | |
body = await request.json() | |
message = body.get("message", "") | |
web_search_enabled = body.get("web_search_enabled", False) | |
target_language = body.get("target_language", "") | |
system_prompt = body.get("system_prompt", "") | |
if not message: | |
return {"error": "메시지가 비어있습니다."} | |
# Process text chat | |
result = await process_text_chat(message, web_search_enabled, target_language, system_prompt) | |
return result | |
except Exception as e: | |
print(f"Error in chat_text endpoint: {e}") | |
return {"error": "채팅 처리 중 오류가 발생했습니다."} | |
async def receive_text_message(webrtc_id: str, request: Request): | |
"""Receive text message from client""" | |
body = await request.json() | |
message = body.get("content", "") | |
# Find the handler for this connection | |
if webrtc_id in stream.handlers: | |
handler = stream.handlers[webrtc_id] | |
# Queue the text message for processing | |
await handler.output_queue.put({ | |
'type': 'text_message', | |
'content': message | |
}) | |
return {"status": "ok"} | |
async def outputs(webrtc_id: str): | |
"""Stream outputs including search events""" | |
async def output_stream(): | |
async for output in stream.output_stream(webrtc_id): | |
if hasattr(output, 'args') and output.args: | |
# Check if it's a search event | |
if isinstance(output.args[0], dict) and output.args[0].get('type') == 'search': | |
yield f"event: search\ndata: {json.dumps(output.args[0])}\n\n" | |
# Regular transcript event with language info | |
elif isinstance(output.args[0], dict) and 'event' in output.args[0]: | |
event = output.args[0]['event'] | |
if hasattr(event, 'transcript'): | |
data = { | |
"role": "assistant", | |
"content": event.transcript, | |
"language": output.args[0].get('language', ''), | |
"mode": output.args[0].get('mode', 'normal') | |
} | |
yield f"event: output\ndata: {json.dumps(data)}\n\n" | |
return StreamingResponse(output_stream(), media_type="text/event-stream") | |
async def index(): | |
"""Serve the HTML page""" | |
rtc_config = get_twilio_turn_credentials() if get_space() else None | |
html_content = HTML_CONTENT.replace("__RTC_CONFIGURATION__", json.dumps(rtc_config)) | |
return HTMLResponse(content=html_content) | |
if __name__ == "__main__": | |
import uvicorn | |
mode = os.getenv("MODE") | |
if mode == "UI": | |
stream.ui.launch(server_port=7860) | |
elif mode == "PHONE": | |
stream.fastphone(host="0.0.0.0", port=7860) | |
else: | |
uvicorn.run(app, host="0.0.0.0", port=7860) |