testevideo / app.py
arthrod's picture
Update app.py
bdb76ca verified
raw
history blame
20.9 kB
"""
Real-Time Screen Assistant - Refactored for Google GenAI Live API + FastRTC
This application transforms the original screenshot analyzer into a real-time
screen sharing assistant with voice interaction, following the refactoring
instructions for live streaming capabilities.
"""
import os
import asyncio
import time
import numpy as np
import numpy.typing as npt
import cv2
import gradio as gr
from fastrtc import Stream, AsyncAudioVideoStreamHandler, get_cloudflare_turn_credentials_async, ReplyOnPause
from google import genai
from google.genai import types
# Environment variable for API key
API_KEY = os.getenv("GEMINI_API_KEY", "")
class RealTimeScreenAssistant(AsyncAudioVideoStreamHandler):
"""
Real-time screen assistant implementing the refactoring instructions.
Features:
- Google GenAI Live API integration
- Real-time audio/video streaming via FastRTC
- Voice activity detection with ReplyOnPause
- Intelligent frame sampling for screen sharing
- Cloudflare TURN server support for HF Spaces
"""
def __init__(self):
super().__init__(
expected_layout="mono",
output_sample_rate=24000,
input_sample_rate=16000
)
self.session = None
self.last_frame_time = 0
self.audio_queue = asyncio.Queue()
self.connected = False
self.frame_interval = 1.0 # 1 FPS as per instructions
async def start_up(self):
"""Initialize Google GenAI Live session as per Task 8-10"""
try:
# Re-check environment variable in case it was set after import
current_api_key = os.getenv("GEMINI_API_KEY", "")
if not current_api_key:
print("❌ No GEMINI_API_KEY found in environment")
return
# Initialize client with v1alpha API (Task 8)
client = genai.Client(
api_key=current_api_key,
http_options={"api_version": "v1alpha"}
)
# Configure live session (Task 9) - minimal working config
from google.genai.types import LiveConnectConfig
# Start with minimal config to avoid WebSocket errors
config = LiveConnectConfig(
system_instruction=(
"You are a helpful real-time assistant who watches the user's screen and provides "
"guidance on using software. Give clear, step-by-step instructions based on what "
"you see and hear. Be proactive in offering assistance."
)
)
# Connect to Live API (Task 10) - using async context manager
self.session_context = client.aio.live.connect(
model="gemini-2.0-flash-live-001",
config=config
)
self.session = await self.session_context.__aenter__()
self.connected = True
print("βœ… Connected to Google GenAI Live API")
# Start response handler (Task 13)
self.response_task = asyncio.create_task(self._handle_responses())
except Exception as e:
print(f"❌ Failed to connect to GenAI: {e}")
self.connected = False
async def _handle_responses(self):
"""Handle AI responses as per Task 12-13"""
try:
# The Google GenAI Live API provides an async iterator through session.receive()
# We need to handle this as a streaming response
while self.connected and self.session:
try:
# Get the next message from the session
response_stream = self.session.receive()
# Check if this is an async iterator or needs to be awaited
if hasattr(response_stream, '__aiter__'):
# It's an async iterator
async for msg in response_stream:
if not self.connected:
break
if msg.data: # Audio response
# Convert to numpy for FastRTC (Task 13)
audio_array = np.frombuffer(msg.data, dtype=np.int16)
if len(audio_array) > 0:
audio_array = audio_array.reshape(1, -1)
await self.audio_queue.put(audio_array)
if msg.text: # Text response
print(f"πŸ€– AI: {msg.text}")
else:
# It's a single response that needs to be awaited
msg = await response_stream
if msg:
if msg.data: # Audio response
audio_array = np.frombuffer(msg.data, dtype=np.int16)
if len(audio_array) > 0:
audio_array = audio_array.reshape(1, -1)
await self.audio_queue.put(audio_array)
if msg.text: # Text response
print(f"πŸ€– AI: {msg.text}")
except Exception as inner_e:
if "connection" in str(inner_e).lower() or "closed" in str(inner_e).lower():
print("πŸ”΄ Connection closed, stopping response handler")
break
else:
print(f"⚠️ Response handling error: {inner_e}")
await asyncio.sleep(0.1) # Brief pause before retry
except Exception as e:
print(f"❌ Error handling AI responses: {e}")
async def receive(self, frame: tuple[int, npt.NDArray[np.int16]]):
"""Handle microphone audio (Task 11)"""
if not self.connected or not self.session:
return
try:
_, audio_np = frame
audio_bytes = audio_np.tobytes()
# Send audio to GenAI Live API using new non-deprecated method
await self.session.send_realtime_input(
input=types.Blob(
data=audio_bytes,
mime_type="audio/pcm;rate=16000"
)
)
except Exception as e:
print(f"❌ Error sending audio: {e}")
async def video_receive(self, frame: npt.NDArray[np.float32]):
"""Handle screen video frames (Task 11-12)"""
if not self.connected or not self.session:
return
try:
# Throttle to 1 FPS as per instructions
current_time = time.time()
if current_time - self.last_frame_time < self.frame_interval:
return
self.last_frame_time = current_time
# Convert float32 frame to uint8 for JPEG encoding
if frame.dtype == np.float32:
# Assuming frame is in range [0, 1], convert to [0, 255]
frame_uint8 = (frame * 255).astype(np.uint8)
else:
frame_uint8 = frame.astype(np.uint8)
# Check for empty frame before encoding
if frame_uint8.size == 0 or frame_uint8.shape[0] == 0 or frame_uint8.shape[1] == 0:
return
# Encode as JPEG (Task 12)
try:
success, jpg_bytes = cv2.imencode('.jpg', frame_uint8, [cv2.IMWRITE_JPEG_QUALITY, 80])
if not success:
return
except cv2.error:
# Handle OpenCV encoding errors gracefully
return
# Send to GenAI using new non-deprecated method
await self.session.send_realtime_input(
input=types.Blob(
data=jpg_bytes.tobytes(),
mime_type="image/jpeg"
)
)
except Exception as e:
print(f"❌ Error sending video frame: {e}")
async def emit(self):
"""Emit audio back to user (Task 13)"""
try:
audio_chunk = self.audio_queue.get_nowait()
return (24000, audio_chunk)
except asyncio.QueueEmpty:
return None
def copy(self):
"""Copy method required by FastRTC AsyncAudioVideoStreamHandler"""
# Return a new instance with same configuration
new_instance = RealTimeScreenAssistant()
new_instance.frame_interval = self.frame_interval
return new_instance
async def video_emit(self):
"""Video emit method required by FastRTC AsyncAudioVideoStreamHandler"""
# For this use case, we don't emit video back to user
return None
async def shutdown(self):
"""Clean shutdown (Task 17)"""
self.connected = False
# Cancel response handler task if it exists
if hasattr(self, 'response_task') and not self.response_task.done():
self.response_task.cancel()
try:
await self.response_task
except asyncio.CancelledError:
pass
if self.session:
try:
# Properly close the session using context manager
if hasattr(self, 'session_context'):
await self.session_context.__aexit__(None, None, None)
else:
await self.session.close()
print("πŸ”΄ Disconnected from GenAI Live API")
except Exception as e:
print(f"❌ Error during shutdown: {e}")
self.session = None
if hasattr(self, 'session_context'):
self.session_context = None
# Global state
app_state = {"stream": None, "handler": None, "connected": False}
def initialize_real_time_assistant():
"""Initialize the real-time assistant (Task 26-29)"""
try:
# Create handler
handler = RealTimeScreenAssistant()
app_state["handler"] = handler
# Create stream with Cloudflare TURN (Task 22-23)
stream = Stream(
handler=ReplyOnPause(handler), # Voice activity detection (Task 3)
modality="audio-video",
mode="send-receive",
rtc_configuration=get_cloudflare_turn_credentials_async,
time_limit=300 # 5 minute limit for Spaces
)
app_state["stream"] = stream
return stream
except Exception as e:
print(f"❌ Error creating stream: {e}")
return None
def handle_connect():
"""Connect button handler (Task 16)"""
# Re-check environment variable in case it was set after import
current_api_key = os.getenv("GEMINI_API_KEY", "")
if not current_api_key:
return "❌ Please set GEMINI_API_KEY environment variable"
if app_state["connected"]:
return "βœ… Already connected - session is active"
app_state["connected"] = True
return "βœ… Connecting... Please allow microphone and camera permissions"
def handle_disconnect():
"""Disconnect button handler (Task 17)"""
if app_state["handler"] and app_state["connected"]:
asyncio.run(app_state["handler"].shutdown())
app_state["connected"] = False
return "πŸ”΄ Disconnected from AI assistant"
return "Already disconnected"
# Screen sharing JavaScript - Fixed syntax for HF Spaces
screen_share_js = '''
(async function() {
try {
if (!navigator.mediaDevices || !navigator.mediaDevices.getDisplayMedia) {
return "❌ Screen sharing not supported in this browser";
}
const stream = await navigator.mediaDevices.getDisplayMedia({
video: {
width: { ideal: 1920 },
height: { ideal: 1080 }
},
audio: false
});
// Find the video element from FastRTC
const videos = document.querySelectorAll('video');
let targetVideo = null;
for (let video of videos) {
if (video.srcObject && video.srcObject.getVideoTracks().length > 0) {
targetVideo = video;
break;
}
}
if (targetVideo && targetVideo.srcObject) {
// Replace the camera track with screen track
const screenTrack = stream.getVideoTracks()[0];
const sender = targetVideo.srcObject.getVideoTracks()[0];
// Remove old track and add screen track
targetVideo.srcObject.removeTrack(sender);
targetVideo.srcObject.addTrack(screenTrack);
screenTrack.onended = () => {
console.log("Screen sharing ended");
};
return "πŸ–₯️ Screen sharing started successfully!";
} else {
return "❌ Could not find video stream to replace";
}
} catch (error) {
console.error("Screen sharing error:", error);
if (error.name === "NotAllowedError") {
return "❌ Screen sharing permission denied - please allow screen access";
} else if (error.name === "NotSupportedError") {
return "❌ Screen sharing not supported in this environment";
} else {
return "❌ Screen sharing failed: " + error.message;
}
}
})()'''
def create_interface():
"""Create main interface (Task 26-30)"""
# Initialize stream
stream = initialize_real_time_assistant()
with gr.Blocks(
title="Real-Time Screen Assistant",
theme=gr.themes.Soft()
) as demo:
gr.Markdown("# πŸ–₯️ Real-Time Screen Assistant")
gr.Markdown("""
**🎯 LIVE AI that sees your screen and provides real-time guidance!**
**How it works:**
1. **Connect** - Links to Google's GenAI Live API for real-time AI processing
2. **Share Screen** - AI can see exactly what you're doing on your screen
3. **Voice Chat** - Talk naturally, AI responds with voice and sees everything
4. **Get Help** - Real-time assistance with software, coding, troubleshooting
**Tech Stack:**
- 🧠 Google GenAI Live API (multimodal real-time AI)
- πŸ“Ή FastRTC (low-latency screen/audio streaming)
- πŸŽ™οΈ Voice activity detection
- 🌐 Cloudflare TURN servers (HF Spaces optimized)
""")
# Status display
status_display = gr.Textbox(
label="Status",
value="Ready to connect - Click Connect to start real-time session",
interactive=False
)
# Control buttons (Task 3, 16-17)
with gr.Row():
connect_btn = gr.Button("πŸ”— Connect", variant="primary")
mic_btn = gr.Button("πŸŽ™οΈ Test Microphone", variant="secondary")
screen_btn = gr.Button("πŸ–₯️ Show Your Screen", variant="secondary")
disconnect_btn = gr.Button("πŸ”΄ Disconnect", variant="stop")
# Stream interface - FastRTC UI for microphone and video
gr.Markdown("### πŸ“‘ Live Audio/Video Stream")
if stream:
# Mount the FastRTC stream UI - this provides microphone access
gr.HTML("""
<div id="fastrtc-container">
<p>πŸŽ™οΈ Microphone and video streaming handled by FastRTC</p>
<p>Click 'Test Microphone' and 'Show Your Screen' to activate</p>
</div>
""")
else:
gr.HTML("<div>⚠️ Stream initialization failed - Check console for errors</div>")
# Microphone activation JavaScript
microphone_js = '''
(async function() {
try {
// Request microphone permission and start audio
const stream = await navigator.mediaDevices.getUserMedia({
audio: {
sampleRate: 16000,
channelCount: 1,
echoCancellation: true,
noiseSuppression: true
}
});
console.log("Microphone access granted");
return "πŸŽ™οΈ Microphone connected successfully";
} catch (error) {
console.error("Microphone error:", error);
if (error.name === "NotAllowedError") {
return "❌ Microphone permission denied - please allow microphone access";
} else {
return "❌ Microphone failed: " + error.message;
}
}
})()
'''
# Instructions (Task 1-3)
with gr.Accordion("πŸ“‹ Instructions", open=True):
gr.Markdown("""
**How to use the real-time assistant:**
1. **Connect**: Click Connect to start the AI session
2. **Permissions**: Allow microphone and camera access
3. **Show Screen**: Click "Show Your Screen" to share your screen
4. **Voice Interaction**: Simply speak - the AI will respond
5. **Real-time Guidance**: AI sees your screen and provides live help
6. **Disconnect**: Click Disconnect when finished
**Features implemented from refactoring instructions:**
- βœ… FastRTC WebRTC streaming (Task 2)
- βœ… Google GenAI Live API integration (Task 7-15)
- βœ… Connect/Show Screen/Disconnect controls (Task 3, 16-17)
- βœ… Voice activity detection with ReplyOnPause (Task 3)
- βœ… Screen sharing via getDisplayMedia (Task 6)
- βœ… Real-time advice generation (Task 18-21)
- βœ… Cloudflare TURN for HF Spaces (Task 22-23)
""")
# Privacy notice (Task 24-25)
with gr.Accordion("πŸ”’ Privacy & Security", open=False):
gr.Markdown("""
**Privacy Notice:**
- Screen content and voice are processed by Google's AI services
- Data is transmitted securely via encrypted WebRTC connections
- No permanent storage - all processing is real-time
- You control what is shared and can disconnect anytime
**Technical Details:**
- Uses Google Gemini Live API for real-time multimodal processing
- FastRTC provides low-latency WebRTC streaming
- Cloudflare TURN servers ensure reliable connectivity on HF Spaces
- Voice activity detection prevents interruptions
""")
# Wire up controls
connect_btn.click(
fn=handle_connect,
outputs=[status_display]
)
mic_btn.click(
fn=lambda: "πŸŽ™οΈ Testing microphone...",
outputs=[status_display],
js=microphone_js
)
screen_btn.click(
fn=lambda: "πŸ–₯️ Requesting screen share...",
outputs=[status_display],
js=screen_share_js
)
disconnect_btn.click(
fn=handle_disconnect,
outputs=[status_display]
)
return demo
# Main execution
if __name__ == "__main__":
print("πŸ–₯️ Real-Time Screen Assistant")
print("=" * 50)
print("Refactored according to instructions for:")
print("- Google GenAI Live API integration")
print("- FastRTC real-time streaming")
print("- Voice activity detection")
print("- Screen sharing capabilities")
print("- Cloudflare TURN for HF Spaces")
if not API_KEY:
print("\n⚠️ No GEMINI_API_KEY environment variable found")
print("Please set your Google AI API key:")
print("export GEMINI_API_KEY='your-api-key-here'")
else:
print(f"\nβœ… API key configured (length: {len(API_KEY)})")
print("\nπŸš€ Starting real-time assistant...")
try:
demo = create_interface()
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True
)
except Exception as e:
print(f"❌ Failed to launch: {e}")
print("Ensure all dependencies are installed: pip install -r requirements.txt")