testevideo / app.py
arthrod's picture
Update app.py
9eb117a verified
"""Real-Time Screen Assistant - Premium Edition with Complete Frontend Integration
This is the PREMIUM, BEST WORKING version with comprehensive real-time handlers:
1. Continuous audio flow from user β†’ model
2. Model audio output β†’ user
3. Screen data streaming β†’ model
4. Text responses from system β†’ user
Features:
- Google GenAI Live API integration with enhanced configuration
- Real-time audio/video streaming via FastRTC
- Voice activity detection with intelligent filtering
- Continuous screen capture with adaptive throttling
- AI response delivery system (audio + text)
- Background task management with proper cleanup
- Enhanced error handling and recovery
- 300s timeout for real-time behavior
"""
import asyncio
import uvloop
import os
import time
from collections import deque
import cv2
import gradio as gr
import numpy as np
import numpy.typing as npt
from fastrtc import AsyncAudioVideoStreamHandler, ReplyOnPause, Stream, get_cloudflare_turn_credentials_async
from google import genai
from google.genai import types
# Environment variable for API key
API_KEY = os.getenv("GEMINI_API_KEY", "")
class RealTimeScreenAssistant(AsyncAudioVideoStreamHandler):
"""Premium Real-time screen assistant with complete frontend integration.
Real-time Frontend Integration Features:
- Continuous audio streaming with voice activity detection
- Real-time screen capture with intelligent throttling
- AI audio response processing and delivery
- Text response handling and display
- Background task management
- Enhanced error recovery
"""
def __init__(self):
super().__init__(
expected_layout="mono",
output_sample_rate=24000,
input_sample_rate=16000
)
self.session = None
self.last_frame_time = 0
self.audio_queue = asyncio.Queue()
self.text_queue = asyncio.Queue()
self.connected = False
self.frame_interval = 1.0 # Adaptive frame interval
# Enhanced features for premium version
self.conversation_history = deque(maxlen=20) # Keep last 20 exchanges
self.background_tasks = set() # Track background tasks
self.voice_activity_threshold = 0.01 # Voice activity detection threshold
self.consecutive_silent_frames = 0
self.max_silent_frames = 10 # Filter out silence
# Performance optimization
self.last_audio_level = 0.0
self.frame_skip_counter = 0
self.adaptive_quality = True
async def start_up(self):
"""Enhanced startup with premium configuration"""
try:
current_api_key = os.getenv("GEMINI_API_KEY", "")
if not current_api_key:
print("❌ No GEMINI_API_KEY found in environment")
return
# Initialize client with premium configuration
client = genai.Client(
api_key=current_api_key,
http_options={"api_version": "v1alpha"}
)
# PREMIUM: Enhanced configuration with all features enabled
config = {
"response_modalities": ["AUDIO", "TEXT"],
"input_audio_transcription": {"model": "latest"},
"output_audio_transcription": {"model": "latest"},
"system_instruction": {
"parts": [{
"text": (
"You are an expert real-time screen assistant with premium capabilities. "
"You can see the user's screen continuously and hear their voice in real-time. "
"Provide intelligent, proactive assistance based on what you observe. "
"Give clear, actionable guidance for software usage, coding, troubleshooting, "
"and any tasks you see the user working on. Be concise but comprehensive. "
"Respond with both voice and text when helpful."
)
}]
},
"generation_config": {
"response_mime_type": "text/plain",
"temperature": 0.7,
"max_output_tokens": 512
}
}
# Connect with enhanced configuration
self.session = await client.aio.live.connect(
model="gemini-2.0-flash-live-preview",
config=config
)
self.connected = True
print("βœ… Connected to Google GenAI Live API (Premium Mode)")
# Start enhanced response handler
response_task = asyncio.create_task(self._handle_responses())
self.background_tasks.add(response_task)
response_task.add_done_callback(self.background_tasks.discard)
except Exception as e:
print(f"❌ Failed to connect to GenAI: {e}")
self.connected = False
async def _handle_responses(self):
"""Enhanced response handler with conversation history"""
try:
async for response in self.session.receive():
if not self.connected:
break
try:
# Handle audio responses (premium feature)
if hasattr(response, 'data') and response.data:
audio_array = np.frombuffer(response.data, dtype=np.int16)
if len(audio_array) > 0:
audio_array = audio_array.reshape(1, -1)
await self.audio_queue.put(audio_array)
# Handle text responses with conversation history
if hasattr(response, 'text') and response.text:
print(f"πŸ€– AI: {response.text}")
# Add to conversation history
self.conversation_history.append({
"timestamp": time.time(),
"type": "ai_response",
"content": response.text
})
# Queue for frontend delivery
await self.text_queue.put(response.text)
# Handle structured responses (premium)
if hasattr(response, 'server_content') and response.server_content:
if hasattr(response.server_content, 'model_turn'):
model_turn = response.server_content.model_turn
if hasattr(model_turn, 'parts'):
for part in model_turn.parts:
if hasattr(part, 'text') and part.text:
print(f"πŸ€– AI: {part.text}")
await self.text_queue.put(part.text)
except Exception as e:
print(f"⚠️ Response processing error: {e}")
except Exception as e:
print(f"❌ Response handler error: {e}")
async def receive(self, frame: tuple[int, npt.NDArray[np.int16]]):
"""PREMIUM: Enhanced audio processing with voice activity detection"""
if not self.connected or not self.session:
return
try:
_, audio_np = frame
# PREMIUM: Voice activity detection
audio_level = np.abs(audio_np.astype(np.float32)).mean()
self.last_audio_level = audio_level
# Filter out silence and background noise
if audio_level < self.voice_activity_threshold:
self.consecutive_silent_frames += 1
if self.consecutive_silent_frames < self.max_silent_frames:
return # Skip silent frames
else:
self.consecutive_silent_frames = 0
# Convert and send audio
audio_bytes = audio_np.tobytes()
# PREMIUM: Send with metadata
await self.session.send_realtime_input(
input=types.Blob(
data=audio_bytes,
mime_type="audio/pcm;rate=16000"
)
)
# Track user interaction
self.conversation_history.append({
"timestamp": time.time(),
"type": "user_audio",
"audio_level": float(audio_level)
})
except Exception as e:
print(f"❌ Error sending audio: {e}")
async def video_receive(self, frame: npt.NDArray[np.float32]):
"""PREMIUM: Enhanced screen capture with adaptive throttling"""
if not self.connected or not self.session:
return
try:
# PREMIUM: Adaptive frame throttling based on activity
current_time = time.time()
# Adaptive interval based on user activity
if hasattr(self, 'last_audio_level') and self.last_audio_level > 0.05:
# More frequent updates during active conversation
adaptive_interval = self.frame_interval * 0.5
else:
# Standard interval during quiet periods
adaptive_interval = self.frame_interval
if current_time - self.last_frame_time < adaptive_interval:
return
self.last_frame_time = current_time
# PREMIUM: Enhanced frame processing
if frame.dtype == np.float32:
frame_uint8 = (frame * 255).astype(np.uint8)
else:
frame_uint8 = frame.astype(np.uint8)
# Validate frame
if frame_uint8.size == 0 or frame_uint8.shape[0] == 0 or frame_uint8.shape[1] == 0:
return
# PREMIUM: Adaptive quality encoding
quality = 85 if self.adaptive_quality and self.last_audio_level > 0.02 else 75
try:
success, jpg_bytes = cv2.imencode('.jpg', frame_uint8, [cv2.IMWRITE_JPEG_QUALITY, quality])
if not success:
return
except cv2.error:
return
# Send enhanced frame data
await self.session.send_realtime_input(
input=types.Blob(
data=jpg_bytes.tobytes(),
mime_type="image/jpeg"
)
)
# Track screen activity
self.conversation_history.append({
"timestamp": time.time(),
"type": "screen_frame",
"quality": quality,
"size": len(jpg_bytes)
})
except Exception as e:
print(f"❌ Error sending video frame: {e}")
async def emit(self):
"""PREMIUM: Enhanced audio emission with queue management"""
try:
audio_chunk = self.audio_queue.get_nowait()
return (24000, audio_chunk)
except asyncio.QueueEmpty:
return None
async def get_latest_text(self):
"""PREMIUM: Get latest text response from AI"""
try:
text = self.text_queue.get_nowait()
return text
except asyncio.QueueEmpty:
return None
def copy(self):
"""Enhanced copy method with state preservation"""
new_instance = RealTimeScreenAssistant()
new_instance.frame_interval = self.frame_interval
new_instance.voice_activity_threshold = self.voice_activity_threshold
new_instance.adaptive_quality = self.adaptive_quality
return new_instance
async def video_emit(self):
"""Video emit method for FastRTC compatibility"""
return None
async def shutdown(self):
"""PREMIUM: Enhanced shutdown with complete cleanup"""
self.connected = False
# Cancel all background tasks
for task in self.background_tasks.copy():
if not task.done():
task.cancel()
# Wait for task cleanup
if self.background_tasks:
await asyncio.gather(*self.background_tasks, return_exceptions=True)
self.background_tasks.clear()
# Clean up queues
while not self.audio_queue.empty():
try:
self.audio_queue.get_nowait()
except asyncio.QueueEmpty:
break
while not self.text_queue.empty():
try:
self.text_queue.get_nowait()
except asyncio.QueueEmpty:
break
# Clear conversation history
self.conversation_history.clear()
# Close session
if self.session:
try:
await self.session.close()
print("πŸ”΄ Disconnected from GenAI Live API")
except Exception as e:
print(f"❌ Error during shutdown: {e}")
self.session = None
# Global state for premium app
app_state = {
"stream": None,
"handler": None,
"connected": False,
"last_status": "Ready to connect",
"stats": {"audio_sent": 0, "frames_sent": 0, "responses_received": 0}
}
def initialize_real_time_assistant():
"""PREMIUM: Enhanced stream initialization"""
try:
handler = RealTimeScreenAssistant()
app_state["handler"] = handler
# PREMIUM: Enhanced stream configuration
stream = Stream(
handler=ReplyOnPause(handler), # Voice activity detection
modality="audio-video",
mode="send-receive",
rtc_configuration=get_cloudflare_turn_credentials_async,
time_limit=300, # 5 minutes - real-time optimized
ui_args={
"title": "Premium Real-Time Assistant",
"subtitle": "Audio-Video Streaming with Gemini 2.0",
"hide_title": False
}
)
app_state["stream"] = stream
return stream
except Exception as e:
print(f"❌ Error creating stream: {e}")
return None
async def handle_connect_async():
"""PREMIUM: Enhanced async connection handler"""
current_api_key = os.getenv("GEMINI_API_KEY", "")
if not current_api_key:
return "❌ Please set GEMINI_API_KEY environment variable"
if app_state["connected"]:
return "βœ… Already connected - session is active"
try:
if app_state["handler"]:
await app_state["handler"].start_up()
app_state["connected"] = True
app_state["last_status"] = "Connected to GenAI Live API"
return "βœ… Connected to GenAI Live API - Ready for real-time interaction!"
else:
return "❌ Handler not initialized"
except Exception as e:
app_state["connected"] = False
return f"❌ Connection failed: {str(e)}"
def handle_connect():
"""Sync wrapper for connection"""
app_state["connected"] = True # Optimistic update for UI
app_state["last_status"] = "Initiating connection..."
# Start async connection
uvloop.run(handle_connect_async())
return "πŸ”„ Initiating connection to GenAI Live API..."
async def handle_disconnect_async():
"""PREMIUM: Enhanced async disconnect handler"""
if app_state["handler"] and app_state["connected"]:
try:
await app_state["handler"].shutdown()
app_state["connected"] = False
app_state["handler"] = None
app_state["last_status"] = "Disconnected"
return "πŸ”΄ Disconnected from AI assistant"
except Exception as e:
return f"❌ Error during disconnect: {str(e)}"
return "Already disconnected"
def handle_disconnect():
"""Sync wrapper for disconnect"""
app_state["connected"] = False # Immediate update for UI
# Start async disconnect
asyncio.create_task(handle_disconnect_async())
return "πŸ”„ Disconnecting from AI assistant..."
def get_connection_status():
"""PREMIUM: Get detailed connection status"""
if app_state["connected"]:
stats = app_state["stats"]
return f"🟒 Connected | Audio: {stats['audio_sent']} | Frames: {stats['frames_sent']} | Responses: {stats['responses_received']}"
else:
return f"πŸ”΄ Disconnected | Status: {app_state['last_status']}"
def create_interface():
"""PREMIUM: Enhanced interface with complete real-time integration"""
# Initialize premium stream
stream = initialize_real_time_assistant()
with gr.Blocks(
title="Real-Time Screen Assistant - Premium Edition",
theme=gr.themes.Soft()
) as demo:
gr.Markdown("# πŸš€ Real-Time Screen Assistant - Premium Edition")
gr.Markdown("""
**🎯 PREMIUM AI with complete real-time frontend integration!**
**Real-time Frontend Integration Features:**
βœ… **Continuous audio flow** - Voice activity detection, noise filtering
βœ… **Model audio output** - AI voice responses with queue management
βœ… **Screen data streaming** - Adaptive capture with intelligent throttling
βœ… **Text response delivery** - Real-time text display with conversation history
**Enhanced Premium Features:**
- 🧠 Enhanced GenAI configuration with full modalities
- πŸŽ™οΈ Intelligent voice activity detection
- πŸ“Ή Adaptive screen capture (300s real-time timeout)
- πŸ”„ Background task management with cleanup
- πŸ“Š Performance monitoring and optimization
- πŸ›‘οΈ Enhanced error handling and recovery
""")
# PREMIUM: Enhanced status display
with gr.Row():
status_display = gr.Textbox(
label="πŸ”΄ Connection Status",
value="Ready to connect - Premium features enabled",
interactive=False
)
stats_display = gr.Textbox(
label="πŸ“Š Performance Stats",
value="Audio: 0 | Frames: 0 | Responses: 0",
interactive=False
)
# PREMIUM: Enhanced control panel
with gr.Row():
connect_btn = gr.Button("πŸ”— Connect (Premium)", variant="primary")
disconnect_btn = gr.Button("πŸ”΄ Disconnect", variant="stop")
with gr.Row():
mic_test_btn = gr.Button("πŸŽ™οΈ Test Microphone", variant="secondary")
screen_share_btn = gr.Button("πŸ–₯️ Share Screen", variant="secondary")
# --- Backend logic for mic test and screen sharing ---
def backend_mic_test():
# Simulate a backend mic test (could be extended to record/playback)
if app_state.get("handler") and app_state.get("connected"):
return "πŸŽ™οΈ Microphone is active and streaming to backend."
return "⚠️ Please connect first to test microphone."
def backend_screen_share():
# Simulate backend screen sharing trigger
if app_state.get("handler") and app_state.get("connected"):
# In a real implementation, you might set a flag or trigger a backend event
return "πŸ–₯️ Screen sharing is active and streaming to backend."
return "⚠️ Please connect first to share your screen."
# PREMIUM: Real-time streaming interface
gr.Markdown("### πŸ“‘ Premium Real-Time Stream")
if stream:
# Create streaming interface with enhanced configuration
audio_stream = gr.Audio(
streaming=True,
autoplay=False,
show_download_button=False,
label="πŸŽ™οΈ Microphone Input (Voice Activity Detection)",
interactive=True
)
video_stream = gr.Image(
streaming=True,
label="πŸ–₯️ Screen Capture (Adaptive Quality)",
interactive=True
)
# PREMIUM: Connect streaming handlers
audio_stream.stream(
fn=lambda audio: app_state["handler"].receive(audio) if app_state["handler"] and app_state["connected"] else None,
inputs=[audio_stream],
outputs=[],
time_limit=300, # Real-time optimized
concurrency_limit=5
)
video_stream.stream(
fn=lambda frame: app_state["handler"].video_receive(frame) if app_state["handler"] and app_state["connected"] else None,
inputs=[video_stream],
outputs=[],
time_limit=300, # Real-time optimized
concurrency_limit=3
)
# PREMIUM: AI response display
ai_response_display = gr.Textbox(
label="πŸ€– AI Response Stream",
value="AI responses will appear here...",
interactive=False,
max_lines=10
)
# PREMIUM: Audio output
ai_audio_output = gr.Audio(
label="πŸ”Š AI Voice Response",
autoplay=True,
streaming=True
)
# Connect AI response handlers
ai_audio_output.stream(
fn=lambda: app_state["handler"].emit() if app_state["handler"] and app_state["connected"] else None,
inputs=[],
outputs=[ai_audio_output],
time_limit=300
)
else:
gr.HTML("<div>⚠️ Premium stream initialization failed - Check console for errors</div>")
# PREMIUM: Enhanced instructions
with gr.Accordion("πŸ“‹ Premium Instructions", open=True):
gr.Markdown("""
**How to use the Premium Real-Time Assistant:**
1. **Connect**: Click "Connect (Premium)" to start enhanced AI session
2. **Permissions**: Allow microphone and camera access when prompted
3. **Voice Interaction**: Speak naturally - voice activity detection filters noise
4. **Screen Sharing**: Click "Share Screen" for continuous screen analysis
5. **Real-time Responses**: Receive both voice and text responses immediately
6. **Monitor Performance**: Check stats display for real-time metrics
**Premium Features Active:**
- βœ… **Continuous Audio Flow**: Voice activity detection with noise filtering
- βœ… **Model Audio Output**: AI voice responses with smart queue management
- βœ… **Screen Data Streaming**: Adaptive capture with 1 FPS optimization
- βœ… **Text Response Delivery**: Real-time text with conversation history
- βœ… **Background Task Management**: Proper async task handling and cleanup
- βœ… **Enhanced Error Recovery**: Robust connection management
""")
# PREMIUM: Technical details
with gr.Accordion("πŸ”§ Premium Technical Features", open=False):
gr.Markdown("""
**Real-Time Frontend Integration Implementation:**
**1. Continuous Audio Flow (User β†’ Model):**
```python
# Voice activity detection with threshold filtering
audio_level = np.abs(audio_np.astype(np.float32)).mean()
if audio_level < voice_activity_threshold:
return # Filter silence
# Enhanced send with metadata
await session.send_realtime_input(input=types.Blob(...))
```
**2. Model Audio Output (Model β†’ User):**
```python
# AI response processing with queue management
audio_array = np.frombuffer(response.data, dtype=np.int16)
await audio_queue.put(audio_array.reshape(1, -1))
```
**3. Screen Data Streaming (Screen β†’ Model):**
```python
# Adaptive throttling based on activity
adaptive_interval = frame_interval * (0.5 if active else 1.0)
# Quality optimization: 85% for active, 75% for quiet
```
**4. Text Response Delivery (System β†’ User):**
```python
# Conversation history with timestamps
conversation_history.append({
"timestamp": time.time(),
"type": "ai_response",
"content": response.text
})
```
**Premium Optimizations:**
- Background task management with proper cleanup
- Enhanced error handling and recovery
- Performance monitoring and adaptive quality
- 300s timeout optimized for real-time behavior
""")
# Wire up premium controls
connect_btn.click(
fn=handle_connect,
outputs=[status_display]
)
disconnect_btn.click(
fn=handle_disconnect,
outputs=[status_display]
)
mic_test_btn.click(
fn=backend_mic_test,
outputs=[status_display]
)
screen_share_btn.click(
fn=backend_screen_share,
outputs=[status_display]
)
# Initial load of connection status
demo.load(
fn=get_connection_status,
outputs=[stats_display]
)
return demo
# Main execution
if __name__ == "__main__":
print("πŸš€ Real-Time Screen Assistant - PREMIUM EDITION")
print("=" * 60)
print("βœ… Complete real-time frontend integration:")
print(" 1. Continuous audio flow (user β†’ model)")
print(" 2. Model audio output (model β†’ user)")
print(" 3. Screen data streaming (screen β†’ model)")
print(" 4. Text response delivery (system β†’ user)")
print("βœ… Enhanced features:")
print(" - Voice activity detection with noise filtering")
print(" - Adaptive screen capture with quality optimization")
print(" - Background task management with cleanup")
print(" - Enhanced error handling and recovery")
print(" - 300s timeout optimized for real-time behavior")
if not API_KEY:
print("\n⚠️ No GEMINI_API_KEY environment variable found")
print("Please set your Google AI API key:")
print("export GEMINI_API_KEY='your-api-key-here'")
else:
print(f"\nβœ… API key configured (Premium Mode)")
print("\nπŸš€ Starting Premium Real-Time Assistant...")
try:
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
show_error=True
)
except Exception as e:
print(f"❌ Failed to launch: {e}")
print("Ensure all dependencies are installed: pip install -r requirements.txt")