import asyncio
import base64
import json
from pathlib import Path
import os
import numpy as np
import openai
from dotenv import load_dotenv
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, StreamingResponse
from fastrtc import (
AdditionalOutputs,
AsyncStreamHandler,
Stream,
get_twilio_turn_credentials,
wait_for_item,
)
from gradio.utils import get_space
from openai.types.beta.realtime import ResponseAudioTranscriptDoneEvent
import httpx
from typing import Optional, List, Dict
import gradio as gr
import io
from scipy import signal
import wave
import aiosqlite
from langdetect import detect, LangDetectException
from datetime import datetime
import uuid
load_dotenv()
SAMPLE_RATE = 24000
# Use Persistent Storage path for Hugging Face Space
# In HF Spaces, persistent storage is at /data
if os.path.exists("/data"):
PERSISTENT_DIR = "/data"
else:
PERSISTENT_DIR = "./data"
os.makedirs(PERSISTENT_DIR, exist_ok=True)
DB_PATH = os.path.join(PERSISTENT_DIR, "personal_assistant.db")
print(f"Using persistent directory: {PERSISTENT_DIR}")
print(f"Database path: {DB_PATH}")
# HTML content embedded as a string
HTML_CONTENT = """
Personal AI Assistant
"""
class BraveSearchClient:
"""Brave Search API client"""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.search.brave.com/res/v1/web/search"
async def search(self, query: str, count: int = 10) -> List[Dict]:
"""Perform a web search using Brave Search API"""
if not self.api_key:
return []
headers = {
"Accept": "application/json",
"X-Subscription-Token": self.api_key
}
params = {
"q": query,
"count": count,
"lang": "ko"
}
async with httpx.AsyncClient() as client:
try:
response = await client.get(self.base_url, headers=headers, params=params)
response.raise_for_status()
data = response.json()
results = []
if "web" in data and "results" in data["web"]:
for result in data["web"]["results"][:count]:
results.append({
"title": result.get("title", ""),
"url": result.get("url", ""),
"description": result.get("description", "")
})
return results
except Exception as e:
print(f"Brave Search error: {e}")
return []
# Database helper class
class PersonalAssistantDB:
"""Database manager for personal assistant"""
@staticmethod
async def init():
"""Initialize database tables"""
async with aiosqlite.connect(DB_PATH) as db:
# Conversations table
await db.execute("""
CREATE TABLE IF NOT EXISTS conversations (
id TEXT PRIMARY KEY,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
summary TEXT
)
""")
# Messages table
await db.execute("""
CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
detected_language TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (session_id) REFERENCES conversations(id)
)
""")
# User memories table - stores personal information
await db.execute("""
CREATE TABLE IF NOT EXISTS user_memories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT NOT NULL,
content TEXT NOT NULL,
confidence REAL DEFAULT 1.0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
source_session_id TEXT,
FOREIGN KEY (source_session_id) REFERENCES conversations(id)
)
""")
# Create indexes for better performance
await db.execute("CREATE INDEX IF NOT EXISTS idx_memories_category ON user_memories(category)")
await db.execute("CREATE INDEX IF NOT EXISTS idx_messages_session ON messages(session_id)")
await db.commit()
@staticmethod
async def create_session(session_id: str):
"""Create a new conversation session"""
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"INSERT INTO conversations (id) VALUES (?)",
(session_id,)
)
await db.commit()
@staticmethod
async def save_message(session_id: str, role: str, content: str):
"""Save a message to the database"""
# Check for None or empty content
if not content:
print(f"[SAVE_MESSAGE] Empty content for {role} message, skipping")
return
# Detect language
detected_language = None
try:
if content and len(content) > 10:
detected_language = detect(content)
except (LangDetectException, Exception) as e:
print(f"Language detection error: {e}")
async with aiosqlite.connect(DB_PATH) as db:
await db.execute(
"""INSERT INTO messages (session_id, role, content, detected_language)
VALUES (?, ?, ?, ?)""",
(session_id, role, content, detected_language)
)
# Update conversation's updated_at timestamp
await db.execute(
"UPDATE conversations SET updated_at = CURRENT_TIMESTAMP WHERE id = ?",
(session_id,)
)
# Update conversation summary (use first user message as summary)
if role == "user":
cursor = await db.execute(
"SELECT summary FROM conversations WHERE id = ?",
(session_id,)
)
row = await cursor.fetchone()
if row and not row[0]:
summary = content[:100] + "..." if len(content) > 100 else content
await db.execute(
"UPDATE conversations SET summary = ? WHERE id = ?",
(summary, session_id)
)
await db.commit()
@staticmethod
async def get_recent_conversations(limit: int = 10):
"""Get recent conversations"""
async with aiosqlite.connect(DB_PATH) as db:
cursor = await db.execute(
"""SELECT id, created_at, summary
FROM conversations
ORDER BY updated_at DESC
LIMIT ?""",
(limit,)
)
rows = await cursor.fetchall()
return [
{
"id": row[0],
"created_at": row[1],
"summary": row[2] or "์ ๋ํ"
}
for row in rows
]
@staticmethod
async def get_conversation_messages(session_id: str):
"""Get all messages for a conversation"""
async with aiosqlite.connect(DB_PATH) as db:
cursor = await db.execute(
"""SELECT role, content, detected_language, timestamp
FROM messages
WHERE session_id = ?
ORDER BY timestamp ASC""",
(session_id,)
)
rows = await cursor.fetchall()
return [
{
"role": row[0],
"content": row[1],
"detected_language": row[2],
"timestamp": row[3]
}
for row in rows
]
@staticmethod
async def save_memory(category: str, content: str, session_id: str = None, confidence: float = 1.0):
"""Save or update a user memory"""
async with aiosqlite.connect(DB_PATH) as db:
# Check if similar memory exists
cursor = await db.execute(
"""SELECT id, content FROM user_memories
WHERE category = ? AND content LIKE ?
LIMIT 1""",
(category, f"%{content[:20]}%")
)
existing = await cursor.fetchone()
if existing:
# Update existing memory
await db.execute(
"""UPDATE user_memories
SET content = ?, confidence = ?, updated_at = CURRENT_TIMESTAMP,
source_session_id = ?
WHERE id = ?""",
(content, confidence, session_id, existing[0])
)
else:
# Insert new memory
await db.execute(
"""INSERT INTO user_memories (category, content, confidence, source_session_id)
VALUES (?, ?, ?, ?)""",
(category, content, confidence, session_id)
)
await db.commit()
@staticmethod
async def get_all_memories():
"""Get all user memories"""
async with aiosqlite.connect(DB_PATH) as db:
cursor = await db.execute(
"""SELECT category, content, confidence, updated_at
FROM user_memories
ORDER BY category, updated_at DESC"""
)
rows = await cursor.fetchall()
return [
{
"category": row[0],
"content": row[1],
"confidence": row[2],
"updated_at": row[3]
}
for row in rows
]
@staticmethod
async def extract_and_save_memories(session_id: str):
"""Extract memories from conversation and save them"""
# Get all messages from the session
messages = await PersonalAssistantDB.get_conversation_messages(session_id)
if not messages:
return
# Prepare conversation text for analysis
conversation_text = "\n".join([
f"{msg['role']}: {msg['content']}"
for msg in messages if msg.get('content')
])
# Use GPT to extract memories
client = openai.AsyncOpenAI()
try:
response = await client.chat.completions.create(
model="gpt-4.1-mini",
messages=[
{
"role": "system",
"content": """You are a memory extraction system. Extract personal information from conversations.
Categories to extract:
- personal_info: ์ด๋ฆ, ๋์ด, ์ฑ๋ณ, ์ง์
, ๊ฑฐ์ฃผ์ง
- preferences: ์ข์ํ๋ ๊ฒ, ์ซ์ดํ๋ ๊ฒ, ์ทจํฅ
- important_dates: ์์ผ, ๊ธฐ๋
์ผ, ์ค์ํ ๋ ์ง
- relationships: ๊ฐ์กฑ, ์น๊ตฌ, ๋๋ฃ ๊ด๊ณ
- hobbies: ์ทจ๋ฏธ, ๊ด์ฌ์ฌ
- health: ๊ฑด๊ฐ ์ํ, ์๋ ๋ฅด๊ธฐ, ์๋ฃ ์ ๋ณด
- goals: ๋ชฉํ, ๊ณํ, ๊ฟ
- routines: ์ผ์, ์ต๊ด, ๋ฃจํด
- work: ์ง์ฅ, ์
๋ฌด, ํ๋ก์ ํธ
- education: ํ๋ ฅ, ์ ๊ณต, ํ์ต
Return as JSON array with format:
[
{
"category": "category_name",
"content": "extracted information in Korean",
"confidence": 0.0-1.0
}
]
Only extract clear, factual information. Do not make assumptions."""
},
{
"role": "user",
"content": f"Extract memories from this conversation:\n\n{conversation_text}"
}
],
temperature=0.3,
max_tokens=2000
)
# Parse and save memories
memories_text = response.choices[0].message.content
# Extract JSON from response
import re
json_match = re.search(r'\[.*\]', memories_text, re.DOTALL)
if json_match:
memories = json.loads(json_match.group())
for memory in memories:
if memory.get('content') and len(memory['content']) > 5:
await PersonalAssistantDB.save_memory(
category=memory.get('category', 'general'),
content=memory['content'],
session_id=session_id,
confidence=memory.get('confidence', 0.8)
)
print(f"Extracted and saved {len(memories)} memories from session {session_id}")
except Exception as e:
print(f"Error extracting memories: {e}")
# Initialize search client globally
brave_api_key = os.getenv("BSEARCH_API")
search_client = BraveSearchClient(brave_api_key) if brave_api_key else None
print(f"Search client initialized: {search_client is not None}, API key present: {bool(brave_api_key)}")
# Store connection settings
connection_settings = {}
# Initialize OpenAI client for text chat
client = openai.AsyncOpenAI()
def update_chatbot(chatbot: list[dict], response: ResponseAudioTranscriptDoneEvent):
chatbot.append({"role": "assistant", "content": response.transcript})
return chatbot
def format_memories_for_prompt(memories: Dict[str, List[str]]) -> str:
"""Format memories for inclusion in system prompt"""
if not memories:
return ""
memory_text = "\n\n=== ๊ธฐ์ต๋ ์ ๋ณด ===\n"
memory_count = 0
for category, items in memories.items():
if items and isinstance(items, list):
valid_items = [item for item in items if item] # None์ด๋ ๋น ๋ฌธ์์ด ์ ์ธ
if valid_items:
memory_text += f"\n[{category}]\n"
for item in valid_items:
memory_text += f"- {item}\n"
memory_count += 1
print(f"[FORMAT_MEMORIES] Formatted {memory_count} memory items")
return memory_text if memory_count > 0 else ""
async def process_text_chat(message: str, web_search_enabled: bool, session_id: str,
user_name: str = "", memories: Dict = None) -> Dict[str, str]:
"""Process text chat using GPT-4o-mini model"""
try:
# Check for empty or None message
if not message:
return {"error": "๋ฉ์์ง๊ฐ ๋น์ด์์ต๋๋ค."}
# Check for stop words
stop_words = ["์ค๋จ", "๊ทธ๋ง", "์คํฑ", "stop", "๋ฅ์ณ", "๋ฉ์ถฐ", "์ค์ง"]
if any(word in message.lower() for word in stop_words):
return {
"response": "๋ํ๋ฅผ ์ค๋จํฉ๋๋ค.",
"detected_language": "ko"
}
# Build system prompt with memories
base_prompt = f"""You are a personal AI assistant for {user_name if user_name else 'the user'}.
You remember all previous conversations and personal information about the user.
Be friendly, helpful, and personalized in your responses.
Always use the information you remember to make conversations more personal and relevant.
IMPORTANT: Give only ONE response. Do not repeat or give multiple answers."""
# Add memories to prompt
if memories:
memory_text = format_memories_for_prompt(memories)
base_prompt += memory_text
messages = [{"role": "system", "content": base_prompt}]
# Handle web search if enabled
if web_search_enabled and search_client and message:
search_keywords = ["๋ ์จ", "๊ธฐ์จ", "๋น", "๋", "๋ด์ค", "์์", "ํ์ฌ", "์ต๊ทผ",
"์ค๋", "์ง๊ธ", "๊ฐ๊ฒฉ", "ํ์จ", "์ฃผ๊ฐ", "weather", "news",
"current", "today", "price", "2024", "2025"]
should_search = any(keyword in message.lower() for keyword in search_keywords)
if should_search:
search_results = await search_client.search(message)
if search_results:
search_context = "์น ๊ฒ์ ๊ฒฐ๊ณผ:\n\n"
for i, result in enumerate(search_results[:5], 1):
search_context += f"{i}. {result['title']}\n{result['description']}\n\n"
messages.append({
"role": "system",
"content": "๋ค์ ์น ๊ฒ์ ๊ฒฐ๊ณผ๋ฅผ ์ฐธ๊ณ ํ์ฌ ๋ต๋ณํ์ธ์:\n\n" + search_context
})
messages.append({"role": "user", "content": message})
# Call GPT-4o-mini
response = await client.chat.completions.create(
model="gpt-4.1-mini",
messages=messages,
temperature=0.7,
max_tokens=2000
)
response_text = response.choices[0].message.content
# Detect language
detected_language = None
try:
if response_text and len(response_text) > 10:
detected_language = detect(response_text)
except:
pass
# Save messages to database
if session_id:
await PersonalAssistantDB.save_message(session_id, "user", message)
await PersonalAssistantDB.save_message(session_id, "assistant", response_text)
return {
"response": response_text,
"detected_language": detected_language
}
except Exception as e:
print(f"Error in text chat: {e}")
return {"error": str(e)}
class OpenAIHandler(AsyncStreamHandler):
def __init__(self, web_search_enabled: bool = False, webrtc_id: str = None,
session_id: str = None, user_name: str = "", memories: Dict = None) -> None:
super().__init__(
expected_layout="mono",
output_sample_rate=SAMPLE_RATE,
output_frame_size=480,
input_sample_rate=SAMPLE_RATE,
)
self.connection = None
self.output_queue = asyncio.Queue()
self.search_client = search_client
self.function_call_in_progress = False
self.current_function_args = ""
self.current_call_id = None
self.webrtc_id = webrtc_id
self.web_search_enabled = web_search_enabled
self.session_id = session_id
self.user_name = user_name
self.memories = memories or {}
self.is_responding = False
self.should_stop = False
# ๋ฉ๋ชจ๋ฆฌ ์ ๋ณด ๋ก๊น
memory_count = sum(len(items) for items in self.memories.values() if isinstance(items, list))
print(f"[INIT] Handler created with:")
print(f" - web_search={web_search_enabled}")
print(f" - session_id={session_id}")
print(f" - user={user_name}")
print(f" - memory categories={list(self.memories.keys())}")
print(f" - total memory items={memory_count}")
def copy(self):
# ๊ฐ์ฅ ์ต๊ทผ์ connection settings ๊ฐ์ ธ์ค๊ธฐ
if connection_settings:
recent_ids = sorted(connection_settings.keys(),
key=lambda k: connection_settings[k].get('timestamp', 0),
reverse=True)
if recent_ids:
recent_id = recent_ids[0]
settings = connection_settings[recent_id]
print(f"[COPY] Copying settings from {recent_id}:")
print(f"[COPY] - web_search: {settings.get('web_search_enabled', False)}")
print(f"[COPY] - session_id: {settings.get('session_id')}")
print(f"[COPY] - user_name: {settings.get('user_name', '')}")
memories = settings.get('memories', {})
# ๋ฉ๋ชจ๋ฆฌ๊ฐ ์์ผ๋ฉด DB์์ ์ง์ ๋ก๋ (๋๊ธฐ์ ์ผ๋ก)
if not memories:
print(f"[COPY] No memories in settings, loading from DB...")
import asyncio
try:
# ํ์ฌ ์ด๋ฒคํธ ๋ฃจํ๊ฐ ์๋์ง ํ์ธ
loop = asyncio.get_event_loop()
if loop.is_running():
# ์ด๋ฏธ ์คํ ์ค์ธ ๋ฃจํ๊ฐ ์์ผ๋ฉด run_in_executor ์ฌ์ฉ
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(self._load_memories_sync)
memories_list = future.result()
else:
# ์ ๋ฃจํ์์ ์คํ
memories_list = loop.run_until_complete(PersonalAssistantDB.get_all_memories())
except:
# ์ ๋ฃจํ ์์ฑ
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
memories_list = new_loop.run_until_complete(PersonalAssistantDB.get_all_memories())
new_loop.close()
# ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์นดํ
๊ณ ๋ฆฌ๋ณ๋ก ๊ทธ๋ฃนํ
for memory in memories_list:
category = memory['category']
if category not in memories:
memories[category] = []
memories[category].append(memory['content'])
print(f"[COPY] Loaded {len(memories_list)} memories from DB")
print(f"[COPY] - memories count: {sum(len(items) for items in memories.values() if isinstance(items, list))}")
return OpenAIHandler(
web_search_enabled=settings.get('web_search_enabled', False),
webrtc_id=recent_id,
session_id=settings.get('session_id'),
user_name=settings.get('user_name', ''),
memories=memories
)
print(f"[COPY] No settings found, creating default handler")
return OpenAIHandler(web_search_enabled=False)
def _load_memories_sync(self):
"""๋๊ธฐ์ ์ผ๋ก ๋ฉ๋ชจ๋ฆฌ ๋ก๋ (Thread์์ ์คํ์ฉ)"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(PersonalAssistantDB.get_all_memories())
loop.close()
return result
async def search_web(self, query: str) -> str:
"""Perform web search and return formatted results"""
if not self.search_client or not self.web_search_enabled:
return "์น ๊ฒ์์ด ๋นํ์ฑํ๋์ด ์์ต๋๋ค."
print(f"Searching web for: {query}")
results = await self.search_client.search(query)
if not results:
return f"'{query}'์ ๋ํ ๊ฒ์ ๊ฒฐ๊ณผ๋ฅผ ์ฐพ์ ์ ์์ต๋๋ค."
formatted_results = []
for i, result in enumerate(results, 1):
formatted_results.append(
f"{i}. {result['title']}\n"
f" URL: {result['url']}\n"
f" {result['description']}\n"
)
return f"์น ๊ฒ์ ๊ฒฐ๊ณผ '{query}':\n\n" + "\n".join(formatted_results)
async def process_text_message(self, message: str):
"""Process text message from user"""
if self.connection:
await self.connection.conversation.item.create(
item={
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": message}]
}
)
await self.connection.response.create()
async def start_up(self):
"""Connect to realtime API"""
if connection_settings and self.webrtc_id:
if self.webrtc_id in connection_settings:
settings = connection_settings[self.webrtc_id]
self.web_search_enabled = settings.get('web_search_enabled', False)
self.session_id = settings.get('session_id')
self.user_name = settings.get('user_name', '')
self.memories = settings.get('memories', {})
print(f"[START_UP] Updated settings from storage for {self.webrtc_id}")
# ๋ฉ๋ชจ๋ฆฌ๊ฐ ๋น์ด์๊ณ session_id๊ฐ ์์ผ๋ฉด DB์์ ๋ก๋
if not self.memories:
print(f"[START_UP] No memories found, loading from DB...")
memories_list = await PersonalAssistantDB.get_all_memories()
# ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์นดํ
๊ณ ๋ฆฌ๋ณ๋ก ๊ทธ๋ฃนํ
self.memories = {}
for memory in memories_list:
category = memory['category']
if category not in self.memories:
self.memories[category] = []
self.memories[category].append(memory['content'])
print(f"[START_UP] Loaded {len(memories_list)} memories from DB")
print(f"[START_UP] Final memory count: {sum(len(items) for items in self.memories.values() if isinstance(items, list))}")
self.client = openai.AsyncOpenAI()
print(f"[REALTIME API] Connecting...")
# Build system prompt with memories
base_instructions = f"""You are a personal AI assistant for {self.user_name if self.user_name else 'the user'}.
You remember all previous conversations and personal information about the user.
Be friendly, helpful, and personalized in your responses.
Always use the information you remember to make conversations more personal and relevant.
IMPORTANT: Give only ONE response per user input. Do not repeat yourself or give multiple answers."""
# Add memories to prompt
if self.memories:
memory_text = format_memories_for_prompt(self.memories)
base_instructions += memory_text
print(f"[START_UP] Added memories to system prompt: {len(memory_text)} characters")
else:
print(f"[START_UP] No memories to add to system prompt")
# Define the web search function
tools = []
if self.web_search_enabled and self.search_client:
tools = [{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web for current information. Use this for weather, news, prices, current events, or any time-sensitive topics.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query"
}
},
"required": ["query"]
}
}
}]
search_instructions = (
"\n\nYou have web search capabilities. "
"Use web_search for current information like weather, news, prices, etc."
)
instructions = base_instructions + search_instructions
else:
instructions = base_instructions
async with self.client.beta.realtime.connect(
model="gpt-4o-mini-realtime-preview-2024-12-17"
) as conn:
session_update = {
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
"prefix_padding_ms": 300,
"silence_duration_ms": 200
},
"instructions": instructions,
"tools": tools,
"tool_choice": "auto" if tools else "none",
"temperature": 0.7,
"max_response_output_tokens": 4096,
"modalities": ["text", "audio"],
"voice": "alloy"
}
try:
await conn.session.update(session=session_update)
self.connection = conn
print(f"Connected with tools: {len(tools)} functions")
print(f"Session update successful")
except Exception as e:
print(f"Error updating session: {e}")
raise
async for event in self.connection:
# Debug log for all events
if hasattr(event, 'type'):
if event.type not in ["response.audio.delta", "response.audio.done"]:
print(f"[EVENT] Type: {event.type}")
# Handle user input audio transcription
if event.type == "conversation.item.input_audio_transcription.completed":
if hasattr(event, 'transcript') and event.transcript:
user_text = event.transcript.lower()
stop_words = ["์ค๋จ", "๊ทธ๋ง", "์คํฑ", "stop", "๋ฅ์ณ", "๋ฉ์ถฐ", "์ค์ง"]
if any(word in user_text for word in stop_words):
print(f"[STOP DETECTED] User said: {event.transcript}")
self.should_stop = True
if self.connection:
try:
await self.connection.response.cancel()
except:
pass
continue
# Save user message to database
if self.session_id:
await PersonalAssistantDB.save_message(self.session_id, "user", event.transcript)
# Handle user transcription for stop detection (alternative event)
elif event.type == "conversation.item.created":
if hasattr(event, 'item') and hasattr(event.item, 'role') and event.item.role == "user":
if hasattr(event.item, 'content') and event.item.content:
for content_item in event.item.content:
if hasattr(content_item, 'transcript') and content_item.transcript:
user_text = content_item.transcript.lower()
stop_words = ["์ค๋จ", "๊ทธ๋ง", "์คํฑ", "stop", "๋ฅ์ณ", "๋ฉ์ถฐ", "์ค์ง"]
if any(word in user_text for word in stop_words):
print(f"[STOP DETECTED] User said: {content_item.transcript}")
self.should_stop = True
if self.connection:
try:
await self.connection.response.cancel()
except:
pass
continue
# Save user message to database
if self.session_id:
await PersonalAssistantDB.save_message(self.session_id, "user", content_item.transcript)
elif event.type == "response.audio_transcript.done":
# Prevent multiple responses
if self.is_responding:
print("[DUPLICATE RESPONSE] Skipping duplicate response")
continue
self.is_responding = True
print(f"[RESPONSE] Transcript: {event.transcript[:100] if event.transcript else 'None'}...")
# Detect language
detected_language = None
try:
if event.transcript and len(event.transcript) > 10:
detected_language = detect(event.transcript)
except Exception as e:
print(f"Language detection error: {e}")
# Save to database
if self.session_id and event.transcript:
await PersonalAssistantDB.save_message(self.session_id, "assistant", event.transcript)
output_data = {
"event": event,
"detected_language": detected_language
}
await self.output_queue.put(AdditionalOutputs(output_data))
elif event.type == "response.done":
# Reset responding flag when response is complete
self.is_responding = False
self.should_stop = False
print("[RESPONSE DONE] Response completed")
elif event.type == "response.audio.delta":
# Check if we should stop
if self.should_stop:
continue
if hasattr(event, 'delta'):
await self.output_queue.put(
(
self.output_sample_rate,
np.frombuffer(
base64.b64decode(event.delta), dtype=np.int16
).reshape(1, -1),
),
)
# Handle errors
elif event.type == "error":
print(f"[ERROR] {event}")
self.is_responding = False
# Handle function calls
elif event.type == "response.function_call_arguments.start":
print(f"Function call started")
self.function_call_in_progress = True
self.current_function_args = ""
self.current_call_id = getattr(event, 'call_id', None)
elif event.type == "response.function_call_arguments.delta":
if self.function_call_in_progress:
self.current_function_args += event.delta
elif event.type == "response.function_call_arguments.done":
if self.function_call_in_progress:
print(f"Function call done, args: {self.current_function_args}")
try:
args = json.loads(self.current_function_args)
query = args.get("query", "")
# Emit search event to client
await self.output_queue.put(AdditionalOutputs({
"type": "search",
"query": query
}))
# Perform the search
search_results = await self.search_web(query)
print(f"Search results length: {len(search_results)}")
# Send function result back to the model
if self.connection and self.current_call_id:
await self.connection.conversation.item.create(
item={
"type": "function_call_output",
"call_id": self.current_call_id,
"output": search_results
}
)
await self.connection.response.create()
except Exception as e:
print(f"Function call error: {e}")
finally:
self.function_call_in_progress = False
self.current_function_args = ""
self.current_call_id = None
async def receive(self, frame: tuple[int, np.ndarray]) -> None:
if not self.connection:
print(f"[RECEIVE] No connection, skipping")
return
try:
if frame is None or len(frame) < 2:
print(f"[RECEIVE] Invalid frame")
return
_, array = frame
if array is None:
print(f"[RECEIVE] Null array")
return
array = array.squeeze()
audio_message = base64.b64encode(array.tobytes()).decode("utf-8")
await self.connection.input_audio_buffer.append(audio=audio_message)
except Exception as e:
print(f"Error in receive: {e}")
async def emit(self) -> tuple[int, np.ndarray] | AdditionalOutputs | None:
item = await wait_for_item(self.output_queue)
if isinstance(item, dict) and item.get('type') == 'text_message':
await self.process_text_message(item['content'])
return None
return item
async def shutdown(self) -> None:
print(f"[SHUTDOWN] Called")
if self.connection:
await self.connection.close()
self.connection = None
print("[REALTIME API] Connection closed")
# Create initial handler instance
handler = OpenAIHandler(web_search_enabled=False)
# Create components
chatbot = gr.Chatbot(type="messages")
# Create stream with handler instance
stream = Stream(
handler,
mode="send-receive",
modality="audio",
additional_inputs=[chatbot],
additional_outputs=[chatbot],
additional_outputs_handler=update_chatbot,
rtc_configuration=get_twilio_turn_credentials() if get_space() else None,
concurrency_limit=5 if get_space() else None,
time_limit=300 if get_space() else None,
)
app = FastAPI()
# Mount stream
stream.mount(app)
# Initialize database on startup
@app.on_event("startup")
async def startup_event():
try:
await PersonalAssistantDB.init()
print(f"Database initialized at: {DB_PATH}")
print(f"Persistent directory: {PERSISTENT_DIR}")
print(f"DB file exists: {os.path.exists(DB_PATH)}")
# Check if we're in Hugging Face Space
if os.path.exists("/data"):
print("Running in Hugging Face Space with persistent storage")
# List files in persistent directory
try:
files = os.listdir(PERSISTENT_DIR)
print(f"Files in persistent directory: {files}")
except Exception as e:
print(f"Error listing files: {e}")
except Exception as e:
print(f"Error during startup: {e}")
# Try to create directory if it doesn't exist
os.makedirs(PERSISTENT_DIR, exist_ok=True)
await PersonalAssistantDB.init()
# Intercept offer to capture settings
@app.post("/webrtc/offer", include_in_schema=False)
async def custom_offer(request: Request):
"""Intercept offer to capture settings"""
body = await request.json()
webrtc_id = body.get("webrtc_id")
web_search_enabled = body.get("web_search_enabled", False)
session_id = body.get("session_id")
user_name = body.get("user_name", "")
memories = body.get("memories", {})
print(f"[OFFER] Received offer with webrtc_id: {webrtc_id}")
print(f"[OFFER] web_search_enabled: {web_search_enabled}")
print(f"[OFFER] session_id: {session_id}")
print(f"[OFFER] user_name: {user_name}")
print(f"[OFFER] memories categories: {list(memories.keys())}")
print(f"[OFFER] memories total items: {sum(len(items) for items in memories.values() if isinstance(items, list))}")
# ๋ฉ๋ชจ๋ฆฌ๊ฐ ๋น์ด์์ผ๋ฉด DB์์ ๋ก๋
if not memories and session_id:
print(f"[OFFER] No memories received, loading from DB...")
memories_list = await PersonalAssistantDB.get_all_memories()
# ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์นดํ
๊ณ ๋ฆฌ๋ณ๋ก ๊ทธ๋ฃนํ
memories = {}
for memory in memories_list:
category = memory['category']
if category not in memories:
memories[category] = []
memories[category].append(memory['content'])
print(f"[OFFER] Loaded {len(memories_list)} memories from DB")
# Store settings with timestamp
if webrtc_id:
connection_settings[webrtc_id] = {
'web_search_enabled': web_search_enabled,
'session_id': session_id,
'user_name': user_name,
'memories': memories, # DB์์ ๋ก๋ํ ๋ฉ๋ชจ๋ฆฌ ์ ์ฅ
'timestamp': asyncio.get_event_loop().time()
}
print(f"[OFFER] Stored settings for {webrtc_id} with {sum(len(items) for items in memories.values() if isinstance(items, list))} memory items")
# Remove our custom route temporarily
custom_route = None
for i, route in enumerate(app.routes):
if hasattr(route, 'path') and route.path == "/webrtc/offer" and route.endpoint == custom_offer:
custom_route = app.routes.pop(i)
break
# Forward to stream's offer handler
print(f"[OFFER] Forwarding to stream.offer()")
response = await stream.offer(body)
# Re-add our custom route
if custom_route:
app.routes.insert(0, custom_route)
print(f"[OFFER] Response status: {response.get('status', 'unknown') if isinstance(response, dict) else 'OK'}")
return response
@app.post("/session/new")
async def create_new_session():
"""Create a new chat session"""
session_id = str(uuid.uuid4())
await PersonalAssistantDB.create_session(session_id)
return {"session_id": session_id}
@app.post("/session/end")
async def end_session(request: Request):
"""End session and extract memories"""
body = await request.json()
session_id = body.get("session_id")
if not session_id:
return {"error": "session_id required"}
# Extract and save memories from the conversation
await PersonalAssistantDB.extract_and_save_memories(session_id)
return {"status": "ok"}
@app.post("/message/save")
async def save_message(request: Request):
"""Save a message to the database"""
body = await request.json()
session_id = body.get("session_id")
role = body.get("role")
content = body.get("content")
if not all([session_id, role, content]):
return {"error": "Missing required fields"}
await PersonalAssistantDB.save_message(session_id, role, content)
return {"status": "ok"}
@app.get("/history/recent")
async def get_recent_history():
"""Get recent conversation history"""
conversations = await PersonalAssistantDB.get_recent_conversations()
return conversations
@app.get("/history/{session_id}")
async def get_conversation(session_id: str):
"""Get messages for a specific conversation"""
messages = await PersonalAssistantDB.get_conversation_messages(session_id)
return messages
@app.get("/memory/all")
async def get_all_memories():
"""Get all user memories"""
memories = await PersonalAssistantDB.get_all_memories()
return memories
@app.post("/chat/text")
async def chat_text(request: Request):
"""Handle text chat messages using GPT-4o-mini"""
try:
body = await request.json()
message = body.get("message", "")
web_search_enabled = body.get("web_search_enabled", False)
session_id = body.get("session_id")
user_name = body.get("user_name", "")
memories = body.get("memories", {})
if not message:
return {"error": "๋ฉ์์ง๊ฐ ๋น์ด์์ต๋๋ค."}
# Process text chat
result = await process_text_chat(message, web_search_enabled, session_id, user_name, memories)
return result
except Exception as e:
print(f"Error in chat_text endpoint: {e}")
return {"error": "์ฑํ
์ฒ๋ฆฌ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ต๋๋ค."}
@app.post("/text_message/{webrtc_id}")
async def receive_text_message(webrtc_id: str, request: Request):
"""Receive text message from client"""
body = await request.json()
message = body.get("content", "")
# Find the handler for this connection
if webrtc_id in stream.handlers:
handler = stream.handlers[webrtc_id]
# Queue the text message for processing
await handler.output_queue.put({
'type': 'text_message',
'content': message
})
return {"status": "ok"}
@app.get("/outputs")
async def outputs(webrtc_id: str):
"""Stream outputs including search events"""
async def output_stream():
async for output in stream.output_stream(webrtc_id):
if hasattr(output, 'args') and output.args:
# Check if it's a search event
if isinstance(output.args[0], dict) and output.args[0].get('type') == 'search':
yield f"event: search\ndata: {json.dumps(output.args[0])}\n\n"
# Regular transcript event with language info
elif isinstance(output.args[0], dict) and 'event' in output.args[0]:
event_data = output.args[0]
if 'event' in event_data and hasattr(event_data['event'], 'transcript'):
data = {
"role": "assistant",
"content": event_data['event'].transcript,
"detected_language": event_data.get('detected_language')
}
yield f"event: output\ndata: {json.dumps(data)}\n\n"
return StreamingResponse(output_stream(), media_type="text/event-stream")
@app.get("/")
async def index():
"""Serve the HTML page"""
rtc_config = get_twilio_turn_credentials() if get_space() else None
html_content = HTML_CONTENT.replace("__RTC_CONFIGURATION__", json.dumps(rtc_config))
return HTMLResponse(content=html_content)
if __name__ == "__main__":
import uvicorn
mode = os.getenv("MODE")
if mode == "UI":
stream.ui.launch(server_port=7860)
elif mode == "PHONE":
stream.fastphone(host="0.0.0.0", port=7860)
else:
uvicorn.run(app, host="0.0.0.0", port=7860)