Spaces:
Sleeping
Sleeping
""" | |
Real-Time Screen Assistant - Refactored for Google GenAI Live API + FastRTC | |
This application transforms the original screenshot analyzer into a real-time | |
screen sharing assistant with voice interaction, following the refactoring | |
instructions for live streaming capabilities. | |
""" | |
import os | |
import asyncio | |
import time | |
import numpy as np | |
import numpy.typing as npt | |
import cv2 | |
import gradio as gr | |
from fastrtc import Stream, AsyncAudioVideoStreamHandler, get_cloudflare_turn_credentials_async, ReplyOnPause | |
from google import genai | |
from google.genai import types | |
# Environment variable for API key | |
API_KEY = os.getenv("GEMINI_API_KEY", "") | |
class RealTimeScreenAssistant(AsyncAudioVideoStreamHandler): | |
""" | |
Real-time screen assistant implementing the refactoring instructions. | |
Features: | |
- Google GenAI Live API integration | |
- Real-time audio/video streaming via FastRTC | |
- Voice activity detection with ReplyOnPause | |
- Intelligent frame sampling for screen sharing | |
- Cloudflare TURN server support for HF Spaces | |
""" | |
def __init__(self): | |
super().__init__( | |
expected_layout="mono", | |
output_sample_rate=24000, | |
input_sample_rate=16000 | |
) | |
self.session = None | |
self.last_frame_time = 0 | |
self.audio_queue = asyncio.Queue() | |
self.connected = False | |
self.frame_interval = 1.0 # 1 FPS as per instructions | |
async def start_up(self): | |
"""Initialize Google GenAI Live session as per Task 8-10""" | |
try: | |
# Re-check environment variable in case it was set after import | |
current_api_key = os.getenv("GEMINI_API_KEY", "") | |
if not current_api_key: | |
print("β No GEMINI_API_KEY found in environment") | |
return | |
# Initialize client with v1alpha API (Task 8) | |
client = genai.Client( | |
api_key=current_api_key, | |
http_options={"api_version": "v1alpha"} | |
) | |
# Configure live session (Task 9) - minimal working config | |
from google.genai.types import LiveConnectConfig | |
# Start with minimal config to avoid WebSocket errors | |
config = LiveConnectConfig( | |
system_instruction=( | |
"You are a helpful real-time assistant who watches the user's screen and provides " | |
"guidance on using software. Give clear, step-by-step instructions based on what " | |
"you see and hear. Be proactive in offering assistance." | |
) | |
) | |
# Connect to Live API (Task 10) - using async context manager | |
self.session_context = client.aio.live.connect( | |
model="gemini-2.0-flash-live-001", | |
config=config | |
) | |
self.session = await self.session_context.__aenter__() | |
self.connected = True | |
print("β Connected to Google GenAI Live API") | |
# Start response handler (Task 13) | |
self.response_task = asyncio.create_task(self._handle_responses()) | |
except Exception as e: | |
print(f"β Failed to connect to GenAI: {e}") | |
self.connected = False | |
async def _handle_responses(self): | |
"""Handle AI responses as per Task 12-13""" | |
try: | |
# The Google GenAI Live API provides an async iterator through session.receive() | |
# We need to handle this as a streaming response | |
while self.connected and self.session: | |
try: | |
# Get the next message from the session | |
response_stream = self.session.receive() | |
# Check if this is an async iterator or needs to be awaited | |
if hasattr(response_stream, '__aiter__'): | |
# It's an async iterator | |
async for msg in response_stream: | |
if not self.connected: | |
break | |
if msg.data: # Audio response | |
# Convert to numpy for FastRTC (Task 13) | |
audio_array = np.frombuffer(msg.data, dtype=np.int16) | |
if len(audio_array) > 0: | |
audio_array = audio_array.reshape(1, -1) | |
await self.audio_queue.put(audio_array) | |
if msg.text: # Text response | |
print(f"π€ AI: {msg.text}") | |
else: | |
# It's a single response that needs to be awaited | |
msg = await response_stream | |
if msg: | |
if msg.data: # Audio response | |
audio_array = np.frombuffer(msg.data, dtype=np.int16) | |
if len(audio_array) > 0: | |
audio_array = audio_array.reshape(1, -1) | |
await self.audio_queue.put(audio_array) | |
if msg.text: # Text response | |
print(f"π€ AI: {msg.text}") | |
except Exception as inner_e: | |
if "connection" in str(inner_e).lower() or "closed" in str(inner_e).lower(): | |
print("π΄ Connection closed, stopping response handler") | |
break | |
else: | |
print(f"β οΈ Response handling error: {inner_e}") | |
await asyncio.sleep(0.1) # Brief pause before retry | |
except Exception as e: | |
print(f"β Error handling AI responses: {e}") | |
async def receive(self, frame: tuple[int, npt.NDArray[np.int16]]): | |
"""Handle microphone audio (Task 11)""" | |
if not self.connected or not self.session: | |
return | |
try: | |
_, audio_np = frame | |
audio_bytes = audio_np.tobytes() | |
# Send audio to GenAI Live API using new non-deprecated method | |
await self.session.send_realtime_input( | |
input=types.Blob( | |
data=audio_bytes, | |
mime_type="audio/pcm;rate=16000" | |
) | |
) | |
except Exception as e: | |
print(f"β Error sending audio: {e}") | |
async def video_receive(self, frame: npt.NDArray[np.float32]): | |
"""Handle screen video frames (Task 11-12)""" | |
if not self.connected or not self.session: | |
return | |
try: | |
# Throttle to 1 FPS as per instructions | |
current_time = time.time() | |
if current_time - self.last_frame_time < self.frame_interval: | |
return | |
self.last_frame_time = current_time | |
# Convert float32 frame to uint8 for JPEG encoding | |
if frame.dtype == np.float32: | |
# Assuming frame is in range [0, 1], convert to [0, 255] | |
frame_uint8 = (frame * 255).astype(np.uint8) | |
else: | |
frame_uint8 = frame.astype(np.uint8) | |
# Check for empty frame before encoding | |
if frame_uint8.size == 0 or frame_uint8.shape[0] == 0 or frame_uint8.shape[1] == 0: | |
return | |
# Encode as JPEG (Task 12) | |
try: | |
success, jpg_bytes = cv2.imencode('.jpg', frame_uint8, [cv2.IMWRITE_JPEG_QUALITY, 80]) | |
if not success: | |
return | |
except cv2.error: | |
# Handle OpenCV encoding errors gracefully | |
return | |
# Send to GenAI using new non-deprecated method | |
await self.session.send_realtime_input( | |
input=types.Blob( | |
data=jpg_bytes.tobytes(), | |
mime_type="image/jpeg" | |
) | |
) | |
except Exception as e: | |
print(f"β Error sending video frame: {e}") | |
async def emit(self): | |
"""Emit audio back to user (Task 13)""" | |
try: | |
audio_chunk = self.audio_queue.get_nowait() | |
return (24000, audio_chunk) | |
except asyncio.QueueEmpty: | |
return None | |
def copy(self): | |
"""Copy method required by FastRTC AsyncAudioVideoStreamHandler""" | |
# Return a new instance with same configuration | |
new_instance = RealTimeScreenAssistant() | |
new_instance.frame_interval = self.frame_interval | |
return new_instance | |
async def video_emit(self): | |
"""Video emit method required by FastRTC AsyncAudioVideoStreamHandler""" | |
# For this use case, we don't emit video back to user | |
return None | |
async def shutdown(self): | |
"""Clean shutdown (Task 17)""" | |
self.connected = False | |
# Cancel response handler task if it exists | |
if hasattr(self, 'response_task') and not self.response_task.done(): | |
self.response_task.cancel() | |
try: | |
await self.response_task | |
except asyncio.CancelledError: | |
pass | |
if self.session: | |
try: | |
# Properly close the session using context manager | |
if hasattr(self, 'session_context'): | |
await self.session_context.__aexit__(None, None, None) | |
else: | |
await self.session.close() | |
print("π΄ Disconnected from GenAI Live API") | |
except Exception as e: | |
print(f"β Error during shutdown: {e}") | |
self.session = None | |
if hasattr(self, 'session_context'): | |
self.session_context = None | |
# Global state | |
app_state = {"stream": None, "handler": None, "connected": False} | |
def initialize_real_time_assistant(): | |
"""Initialize the real-time assistant (Task 26-29)""" | |
try: | |
# Create handler | |
handler = RealTimeScreenAssistant() | |
app_state["handler"] = handler | |
# Create stream with Cloudflare TURN (Task 22-23) | |
stream = Stream( | |
handler=ReplyOnPause(handler), # Voice activity detection (Task 3) | |
modality="audio-video", | |
mode="send-receive", | |
rtc_configuration=get_cloudflare_turn_credentials_async, | |
time_limit=300 # 5 minute limit for Spaces | |
) | |
app_state["stream"] = stream | |
return stream | |
except Exception as e: | |
print(f"β Error creating stream: {e}") | |
return None | |
def handle_connect(): | |
"""Connect button handler (Task 16)""" | |
# Re-check environment variable in case it was set after import | |
current_api_key = os.getenv("GEMINI_API_KEY", "") | |
if not current_api_key: | |
return "β Please set GEMINI_API_KEY environment variable" | |
if app_state["connected"]: | |
return "β Already connected - session is active" | |
app_state["connected"] = True | |
return "β Connecting... Please allow microphone and camera permissions" | |
def handle_disconnect(): | |
"""Disconnect button handler (Task 17)""" | |
if app_state["handler"] and app_state["connected"]: | |
asyncio.run(app_state["handler"].shutdown()) | |
app_state["connected"] = False | |
return "π΄ Disconnected from AI assistant" | |
return "Already disconnected" | |
# Screen sharing JavaScript - Fixed syntax for HF Spaces | |
screen_share_js = ''' | |
(async function() { | |
try { | |
if (!navigator.mediaDevices || !navigator.mediaDevices.getDisplayMedia) { | |
return "β Screen sharing not supported in this browser"; | |
} | |
const stream = await navigator.mediaDevices.getDisplayMedia({ | |
video: { | |
width: { ideal: 1920 }, | |
height: { ideal: 1080 } | |
}, | |
audio: false | |
}); | |
// Find the video element from FastRTC | |
const videos = document.querySelectorAll('video'); | |
let targetVideo = null; | |
for (let video of videos) { | |
if (video.srcObject && video.srcObject.getVideoTracks().length > 0) { | |
targetVideo = video; | |
break; | |
} | |
} | |
if (targetVideo && targetVideo.srcObject) { | |
// Replace the camera track with screen track | |
const screenTrack = stream.getVideoTracks()[0]; | |
const sender = targetVideo.srcObject.getVideoTracks()[0]; | |
// Remove old track and add screen track | |
targetVideo.srcObject.removeTrack(sender); | |
targetVideo.srcObject.addTrack(screenTrack); | |
screenTrack.onended = () => { | |
console.log("Screen sharing ended"); | |
}; | |
return "π₯οΈ Screen sharing started successfully!"; | |
} else { | |
return "β Could not find video stream to replace"; | |
} | |
} catch (error) { | |
console.error("Screen sharing error:", error); | |
if (error.name === "NotAllowedError") { | |
return "β Screen sharing permission denied - please allow screen access"; | |
} else if (error.name === "NotSupportedError") { | |
return "β Screen sharing not supported in this environment"; | |
} else { | |
return "β Screen sharing failed: " + error.message; | |
} | |
} | |
})()''' | |
def create_interface(): | |
"""Create main interface (Task 26-30)""" | |
# Initialize stream | |
stream = initialize_real_time_assistant() | |
with gr.Blocks( | |
title="Real-Time Screen Assistant", | |
theme=gr.themes.Soft() | |
) as demo: | |
gr.Markdown("# π₯οΈ Real-Time Screen Assistant") | |
gr.Markdown(""" | |
**π― LIVE AI that sees your screen and provides real-time guidance!** | |
**How it works:** | |
1. **Connect** - Links to Google's GenAI Live API for real-time AI processing | |
2. **Share Screen** - AI can see exactly what you're doing on your screen | |
3. **Voice Chat** - Talk naturally, AI responds with voice and sees everything | |
4. **Get Help** - Real-time assistance with software, coding, troubleshooting | |
**Tech Stack:** | |
- π§ Google GenAI Live API (multimodal real-time AI) | |
- πΉ FastRTC (low-latency screen/audio streaming) | |
- ποΈ Voice activity detection | |
- π Cloudflare TURN servers (HF Spaces optimized) | |
""") | |
# Status display | |
status_display = gr.Textbox( | |
label="Status", | |
value="Ready to connect - Click Connect to start real-time session", | |
interactive=False | |
) | |
# Control buttons (Task 3, 16-17) | |
with gr.Row(): | |
connect_btn = gr.Button("π Connect", variant="primary") | |
mic_btn = gr.Button("ποΈ Test Microphone", variant="secondary") | |
screen_btn = gr.Button("π₯οΈ Show Your Screen", variant="secondary") | |
disconnect_btn = gr.Button("π΄ Disconnect", variant="stop") | |
# Stream interface - FastRTC UI for microphone and video | |
gr.Markdown("### π‘ Live Audio/Video Stream") | |
if stream: | |
# Mount the FastRTC stream UI - this provides microphone access | |
gr.HTML(""" | |
<div id="fastrtc-container"> | |
<p>ποΈ Microphone and video streaming handled by FastRTC</p> | |
<p>Click 'Test Microphone' and 'Show Your Screen' to activate</p> | |
</div> | |
""") | |
else: | |
gr.HTML("<div>β οΈ Stream initialization failed - Check console for errors</div>") | |
# Microphone activation JavaScript | |
microphone_js = ''' | |
(async function() { | |
try { | |
// Request microphone permission and start audio | |
const stream = await navigator.mediaDevices.getUserMedia({ | |
audio: { | |
sampleRate: 16000, | |
channelCount: 1, | |
echoCancellation: true, | |
noiseSuppression: true | |
} | |
}); | |
console.log("Microphone access granted"); | |
return "ποΈ Microphone connected successfully"; | |
} catch (error) { | |
console.error("Microphone error:", error); | |
if (error.name === "NotAllowedError") { | |
return "β Microphone permission denied - please allow microphone access"; | |
} else { | |
return "β Microphone failed: " + error.message; | |
} | |
} | |
})() | |
''' | |
# Instructions (Task 1-3) | |
with gr.Accordion("π Instructions", open=True): | |
gr.Markdown(""" | |
**How to use the real-time assistant:** | |
1. **Connect**: Click Connect to start the AI session | |
2. **Permissions**: Allow microphone and camera access | |
3. **Show Screen**: Click "Show Your Screen" to share your screen | |
4. **Voice Interaction**: Simply speak - the AI will respond | |
5. **Real-time Guidance**: AI sees your screen and provides live help | |
6. **Disconnect**: Click Disconnect when finished | |
**Features implemented from refactoring instructions:** | |
- β FastRTC WebRTC streaming (Task 2) | |
- β Google GenAI Live API integration (Task 7-15) | |
- β Connect/Show Screen/Disconnect controls (Task 3, 16-17) | |
- β Voice activity detection with ReplyOnPause (Task 3) | |
- β Screen sharing via getDisplayMedia (Task 6) | |
- β Real-time advice generation (Task 18-21) | |
- β Cloudflare TURN for HF Spaces (Task 22-23) | |
""") | |
# Privacy notice (Task 24-25) | |
with gr.Accordion("π Privacy & Security", open=False): | |
gr.Markdown(""" | |
**Privacy Notice:** | |
- Screen content and voice are processed by Google's AI services | |
- Data is transmitted securely via encrypted WebRTC connections | |
- No permanent storage - all processing is real-time | |
- You control what is shared and can disconnect anytime | |
**Technical Details:** | |
- Uses Google Gemini Live API for real-time multimodal processing | |
- FastRTC provides low-latency WebRTC streaming | |
- Cloudflare TURN servers ensure reliable connectivity on HF Spaces | |
- Voice activity detection prevents interruptions | |
""") | |
# Wire up controls | |
connect_btn.click( | |
fn=handle_connect, | |
outputs=[status_display] | |
) | |
mic_btn.click( | |
fn=lambda: "ποΈ Testing microphone...", | |
outputs=[status_display], | |
js=microphone_js | |
) | |
screen_btn.click( | |
fn=lambda: "π₯οΈ Requesting screen share...", | |
outputs=[status_display], | |
js=screen_share_js | |
) | |
disconnect_btn.click( | |
fn=handle_disconnect, | |
outputs=[status_display] | |
) | |
return demo | |
# Main execution | |
if __name__ == "__main__": | |
print("π₯οΈ Real-Time Screen Assistant") | |
print("=" * 50) | |
print("Refactored according to instructions for:") | |
print("- Google GenAI Live API integration") | |
print("- FastRTC real-time streaming") | |
print("- Voice activity detection") | |
print("- Screen sharing capabilities") | |
print("- Cloudflare TURN for HF Spaces") | |
if not API_KEY: | |
print("\nβ οΈ No GEMINI_API_KEY environment variable found") | |
print("Please set your Google AI API key:") | |
print("export GEMINI_API_KEY='your-api-key-here'") | |
else: | |
print(f"\nβ API key configured (length: {len(API_KEY)})") | |
print("\nπ Starting real-time assistant...") | |
try: | |
demo = create_interface() | |
demo.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
show_error=True | |
) | |
except Exception as e: | |
print(f"β Failed to launch: {e}") | |
print("Ensure all dependencies are installed: pip install -r requirements.txt") |