import os import asyncio import time import numpy as np import cv2 import gradio as gr from fastrtc import Stream, AsyncAudioVideoStreamHandler, get_cloudflare_turn_credentials_async, ReplyOnPause from google import genai from google.genai import types # Environment variable for API key API_KEY = os.getenv("GEMINI_API_KEY", "") class EnhancedScreenAssistantHandler(AsyncAudioVideoStreamHandler): """Enhanced real-time screen assistant with voice activity detection""" def __init__(self): super().__init__(input_audio_type="mono", output_sample_rate=24000, input_sample_rate=16000) self.session = None self.last_frame_time = 0 self.audio_queue = asyncio.Queue() self.text_queue = asyncio.Queue() self.connected = False self.frame_interval = 1.0 # Send one frame per second self.conversation_history = [] async def start_up(self): """Initialize Google GenAI Live session with enhanced configuration""" try: if not API_KEY: print("ā No GEMINI_API_KEY found in environment") return # Initialize Google GenAI client with alpha API access client = genai.Client(api_key=API_KEY, http_options={"api_version": "v1alpha"}) # Enhanced configuration for live session config = { "response_modalities": ["AUDIO", "TEXT"], "input_audio_transcription": {"model": "latest"}, "output_audio_transcription": {"model": "latest"}, "system_instruction": ( "You are an expert real-time screen assistant. You can see the user's screen " "and hear their voice. Provide clear, actionable guidance based on what you observe. " "Be proactive - if you see the user struggling or notice something important, " "offer helpful suggestions even without being asked. Keep responses concise but thorough. " "When giving instructions, be specific about what to click, where to look, " "and what to expect next." ), "generation_config": {"response_mime_type": "text/plain", "temperature": 0.7, "max_output_tokens": 512}, } # Connect to Live API self.session = await client.aio.live.connect(model="gemini-2.0-flash-live-preview", config=config) self.connected = True print("ā Connected to Google GenAI Live API with enhanced configuration") # Start background tasks with proper management self.background_tasks = set() response_task = asyncio.create_task(self._handle_responses()) context_task = asyncio.create_task(self._periodic_context_update()) self.background_tasks.add(response_task) self.background_tasks.add(context_task) response_task.add_done_callback(self.background_tasks.discard) context_task.add_done_callback(self.background_tasks.discard) except Exception as e: print(f"ā Failed to connect to GenAI: {e}") self.connected = False async def _handle_responses(self): """Handle incoming responses from AI with enhanced processing""" try: current_text = "" async for msg in self.session.receive(): if msg.data: # Audio response from AI # Convert raw PCM bytes to numpy array for FastRTC audio_array = np.frombuffer(msg.data, dtype=np.int16) if len(audio_array) > 0: audio_array = audio_array.reshape(1, -1) # Shape: (1, N) await self.audio_queue.put(audio_array) if msg.text: # Text response from AI current_text += msg.text print(f"š¤ AI: {msg.text}") # Add to conversation history when response is complete if msg.text.endswith((".", "!", "?", "\n")): self.conversation_history.append({"role": "assistant", "content": current_text.strip(), "timestamp": time.time()}) current_text = "" # Keep conversation history manageable if len(self.conversation_history) > 20: self.conversation_history = self.conversation_history[-15:] await self.text_queue.put(msg.text) except Exception as e: print(f"ā Error handling AI responses: {e}") async def _periodic_context_update(self): """Periodically send context updates to maintain session state""" while self.connected: await asyncio.sleep(30) # Update every 30 seconds if self.session and len(self.conversation_history) > 0: try: # Send a subtle context maintenance message context_msg = "Continue monitoring and providing assistance as needed." await self.session.send_realtime_input(text=context_msg) except Exception as e: print(f"ā ļø Context update failed: {e}") async def receive(self, frame: tuple[int, np.ndarray]): """Handle incoming audio with voice activity detection""" if not self.connected or not self.session: return try: _, audio_np = frame # Basic voice activity detection audio_level = np.abs(audio_np).mean() if audio_level > 0.01: # Threshold for voice activity audio_bytes = audio_np.tobytes() # Send audio to Google GenAI Live API await self.session.send_realtime_input(media=types.Blob(data=audio_bytes, mime_type="audio/pcm;rate=16000")) except Exception as e: print(f"ā Error processing audio: {e}") async def video_receive(self, frame: np.ndarray): """Handle incoming video frames with intelligent frame selection""" if not self.connected or not self.session: return try: current_time = time.time() # Adaptive frame rate based on activity # Send frames more frequently if there's likely activity frame_diff_threshold = 0.1 if hasattr(self, "last_frame"): frame_diff = np.abs(frame.astype(float) - self.last_frame.astype(float)).mean() if frame_diff > frame_diff_threshold: # More activity detected, reduce interval effective_interval = self.frame_interval * 0.5 else: effective_interval = self.frame_interval else: effective_interval = self.frame_interval if current_time - self.last_frame_time < effective_interval: return self.last_frame_time = current_time self.last_frame = frame.copy() # Resize frame for efficiency while maintaining quality height, width = frame.shape[:2] if width > 1280: scale = 1280 / width new_width = 1280 new_height = int(height * scale) frame = cv2.resize(frame, (new_width, new_height), interpolation=cv2.INTER_AREA) # Encode frame as JPEG with optimized quality success, jpg_bytes = cv2.imencode( ".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 75], # Balanced quality/size ) if not success: return # Send frame to Google GenAI await self.session.send_realtime_input(media=types.Blob(data=jpg_bytes.tobytes(), mime_type="image/jpeg")) print(f"šø Sent frame ({frame.shape[1]}x{frame.shape[0]}, {len(jpg_bytes)} bytes)") except Exception as e: print(f"ā Error processing video frame: {e}") async def emit(self): """Provide audio output back to user with queue management""" try: audio_chunk = self.audio_queue.get_nowait() return (24000, audio_chunk) except asyncio.QueueEmpty: return None async def get_latest_text(self): """Get latest text response for UI updates""" try: text = self.text_queue.get_nowait() return text except asyncio.QueueEmpty: return None async def shutdown(self): """Enhanced cleanup with proper resource management""" self.connected = False if self.session: try: # Send goodbye message await self.session.send_realtime_input(text="Session ending. Thank you!") await asyncio.sleep(0.5) # Brief delay for message to send await self.session.close() print("š“ Cleanly disconnected from GenAI Live API") except Exception as e: print(f"ā ļø Error during shutdown: {e}") # Cancel all background tasks properly if hasattr(self, "background_tasks"): for task in self.background_tasks.copy(): if not task.done(): task.cancel() # Wait for all tasks to complete or be cancelled if self.background_tasks: await asyncio.gather(*self.background_tasks, return_exceptions=True) self.background_tasks.clear() # Clear queues while not self.audio_queue.empty(): try: self.audio_queue.get_nowait() except asyncio.QueueEmpty: break while not self.text_queue.empty(): try: self.text_queue.get_nowait() except asyncio.QueueEmpty: break self.session = None self.conversation_history = [] # Global state management app_state = {"stream": None, "handler": None, "connected": False, "screen_sharing": False} def initialize_stream(): """Initialize the FastRTC stream with enhanced configuration""" try: # Create enhanced handler handler = EnhancedScreenAssistantHandler() app_state["handler"] = handler # Create stream with optimized settings for HF Spaces stream = Stream( handler=ReplyOnPause(handler), # Add voice activity detection modality="audio-video", mode="send-receive", rtc_configuration=get_cloudflare_turn_credentials_async, time_limit=600, # 10 minute session limit ui_args={ "audio_controls": True, "video_controls": True, }, ) app_state["stream"] = stream return stream except Exception as e: print(f"ā Error initializing stream: {e}") return None def handle_connect(): """Enhanced connection handler""" if not API_KEY: return "ā Please set GEMINI_API_KEY environment variable" if app_state["connected"]: return "ā Already connected - session is active" if app_state["handler"]: app_state["connected"] = True return "ā Connecting to AI... Please allow microphone and camera permissions" return "ā Stream not initialized - please refresh the page" def handle_screen_share(): """Handle screen sharing toggle""" app_state["screen_sharing"] = not app_state["screen_sharing"] if app_state["screen_sharing"]: return "š„ļø Screen sharing started - AI can now see your screen" else: return "š± Switched back to camera view" async def handle_disconnect_async(): """Async enhanced disconnection handler""" if app_state["handler"] and app_state["connected"]: try: await app_state["handler"].shutdown() app_state["connected"] = False app_state["screen_sharing"] = False app_state["handler"] = None return "š“ Disconnected from AI assistant" except Exception as e: return f"ā ļø Disconnect error: {e}" return "Already disconnected" def handle_disconnect(): """Sync wrapper for enhanced disconnection handler""" # Create task and store reference for proper cleanup if not hasattr(app_state, "disconnect_task") or app_state.get("disconnect_task", {}).done(): import asyncio app_state["disconnect_task"] = asyncio.create_task(handle_disconnect_async()) app_state["connected"] = False # Immediately mark as disconnected app_state["screen_sharing"] = False return "š Disconnecting... Please wait..." # Enhanced JavaScript for screen sharing enhanced_screen_share_js = """ async function toggleScreenShare() { try { const videoElements = document.querySelectorAll('video'); const webrtcVideo = Array.from(videoElements).find(video => video.srcObject && video.srcObject.getVideoTracks().length > 0 ); if (!webrtcVideo) { return "ā Could not find video element"; } const currentTrack = webrtcVideo.srcObject.getVideoTracks()[0]; const isScreenShare = currentTrack && currentTrack.label.includes('screen'); if (isScreenShare) { // Switch back to camera const cameraStream = await navigator.mediaDevices.getUserMedia({ video: { width: 640, height: 480 }, audio: false }); const videoTrack = cameraStream.getVideoTracks()[0]; webrtcVideo.srcObject.removeTrack(currentTrack); webrtcVideo.srcObject.addTrack(videoTrack); currentTrack.stop(); return "š± Switched to camera view"; } else { // Switch to screen share const screenStream = await navigator.mediaDevices.getDisplayMedia({ video: { mediaSource: 'screen', width: { ideal: 1280, max: 1920 }, height: { ideal: 720, max: 1080 }, frameRate: { ideal: 2, max: 5 } // Low frame rate for efficiency }, audio: false }); const videoTrack = screenStream.getVideoTracks()[0]; webrtcVideo.srcObject.removeTrack(currentTrack); webrtcVideo.srcObject.addTrack(videoTrack); // Handle when screen sharing ends videoTrack.onended = () => { console.log('Screen sharing ended by user'); // Automatically switch back to camera navigator.mediaDevices.getUserMedia({video: true, audio: false}) .then(cameraStream => { const cameraTrack = cameraStream.getVideoTracks()[0]; webrtcVideo.srcObject.addTrack(cameraTrack); }); }; currentTrack.stop(); return "š„ļø Screen sharing active"; } } catch (error) { console.error('Screen sharing error:', error); if (error.name === 'NotAllowedError') { return "ā Screen sharing permission denied"; } else if (error.name === 'NotFoundError') { return "ā No screen available to share"; } else { return `ā Error: ${error.message}`; } } } return toggleScreenShare(); """ def create_main_interface(): """Create the enhanced main interface""" # Initialize stream stream = initialize_stream() with gr.Blocks( title="Enhanced Real-Time Screen Assistant", theme=gr.themes.Soft(), css=""" .status-connected { background: linear-gradient(90deg, #4CAF50, #45a049); color: white; } .status-disconnected { background: linear-gradient(90deg, #f44336, #da190b); color: white; } .status-warning { background: linear-gradient(90deg, #ff9800, #f57c00); color: white; } .control-row { margin: 10px 0; } .stream-container { border: 2px solid #ddd; border-radius: 10px; padding: 20px; margin: 20px 0; } """, ) as demo: gr.Markdown("# š„ļø Enhanced Real-Time Screen Assistant") gr.Markdown(""" **Advanced AI assistant with live screen sharing, voice interaction, and real-time guidance** Powered by Google's Gemini Live API and FastRTC for ultra-low latency communication. """) # Status display status_display = gr.Textbox( label="š Status", value="Ready to connect - Click Connect to start your AI session", interactive=False, elem_classes=["status-disconnected"], ) # Control buttons with gr.Row(elem_classes=["control-row"]): connect_btn = gr.Button("š Connect to AI", variant="primary", size="lg") screen_btn = gr.Button("š„ļø Toggle Screen Share", variant="secondary", size="lg") disconnect_btn = gr.Button("š“ Disconnect", variant="stop", size="lg") # Stream container if stream and stream.ui: with gr.Group(elem_classes=["stream-container"]): gr.Markdown("### š” Live Stream") stream_interface = stream.ui else: stream_interface = gr.HTML("