# File: app.py import gradio as gr import asyncio import base64 import io import traceback import cv2 import pyaudio import PIL.Image import mss import google.generativeai as genai from google.generativeai import types import google.api_core.exceptions import wave import numpy as np import threading import queue import os import time import tempfile import atexit # For cleanup # --- Constants --- FORMAT = pyaudio.paInt16 CHANNELS = 1 SEND_SAMPLE_RATE = 16000 RECEIVE_SAMPLE_RATE = 24000 # Gemini outputs at 24kHz CHUNK_SIZE = 1024 MODEL = "models/gemini-2.0-flash-exp" # Use the requested experimental model DEFAULT_VIDEO_MODE = "none" AVAILABLE_VOICES = ["Puck", "Charon", "Kore", "Fenrir", "Aoede"] DEFAULT_VOICE = "Puck" SYSTEM_INSTRUCTION_TEXT = "Answer user ask replay same thing user say no other word explain " # --- Global State --- audio_loop_instance = None background_tasks = set() background_loop = None # Event loop for the background thread pya = None # Initialize PyAudio globally later background_thread = None # Keep track of the thread stop_background_loop = False # Flag to signal loop termination # --- Original AudioLoop Class Methods (Included for potential future use) --- # Note: We inherit from the original structure for clarity but override key methods class OriginalAudioLoop: """Base class structure placeholder - includes relevant methods from original script""" def __init__(self, video_mode=DEFAULT_VIDEO_MODE): self.video_mode = video_mode self.out_queue = None # Queue for data *to* Gemini (mic audio, images) self.session = None self.audio_stream = None # Mic input stream def _get_frame(self, cap): ret, frame = cap.read() if not ret: return None frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) img = PIL.Image.fromarray(frame_rgb) img.thumbnail([1024, 1024]) image_io = io.BytesIO() img.save(image_io, format="jpeg") image_io.seek(0) mime_type = "image/jpeg" image_bytes = image_io.read() return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} async def get_frames(self): cap = None try: print("Attempting to open camera...") cap = await asyncio.to_thread(cv2.VideoCapture, 0) if not cap.isOpened(): print("Error: Could not open camera.") # Signal error back to Gradio? For now, just log and exit task. await run_coro_in_background_loop(update_status("Error: Could not open camera.")) return print("Camera opened successfully.") while True: if not self.session: # Stop if disconnected print("get_frames: Session closed, stopping camera task.") break # print("Reading frame from camera...") frame = await asyncio.to_thread(self._get_frame, cap) if frame is None: # print("Warning: Failed to get frame from camera.") await asyncio.sleep(0.1) # Avoid busy loop continue # Skip putting None in queue if self.out_queue: # print("Putting camera frame in queue.") await self.out_queue.put(frame) await asyncio.sleep(1.0) # Send frame every second except asyncio.CancelledError: print("get_frames task cancelled.") except Exception as e: print(f"Error in get_frames: {e}") await run_coro_in_background_loop(update_status(f"Camera Error: {e}")) finally: if cap and cap.isOpened(): print("Releasing camera.") await asyncio.to_thread(cap.release) print("Camera task finished.") def _get_screen(self): try: with mss.mss() as sct: # Attempt to grab the primary monitor (often index 1 in mss.monitors) monitor = sct.monitors[1] sct_img = sct.grab(monitor) img = PIL.Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX") # Handle BGRA image_io = io.BytesIO() img.thumbnail([1024, 1024]) # Resize before saving img.save(image_io, format="jpeg") image_io.seek(0) mime_type = "image/jpeg" image_bytes = image_io.read() return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} except IndexError: print("Error capturing screen: Could not find monitor at index 1. Trying index 0.") try: # Fallback to monitor 0 (usually includes all screens) with mss.mss() as sct: monitor = sct.monitors[0] sct_img = sct.grab(monitor) img = PIL.Image.frombytes("RGB", sct_img.size, sct_img.bgra, "raw", "BGRX") image_io = io.BytesIO() img.thumbnail([1024, 1024]) img.save(image_io, format="jpeg") image_io.seek(0) mime_type = "image/jpeg" image_bytes = image_io.read() return {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()} except Exception as e_fallback: print(f"Error capturing screen (fallback monitor 0): {e_fallback}") return None except Exception as e: print(f"Error capturing screen: {e}") return None async def get_screen(self): while True: if not self.session: # Stop if disconnected print("get_screen: Session closed, stopping screen task.") break # print("Capturing screen...") frame = await asyncio.to_thread(self._get_screen) if frame is None: print("Warning: Failed to capture screen.") await asyncio.sleep(1.0) # Wait before retrying if error occurred continue # Skip putting None in queue if self.out_queue: # print("Putting screen frame in queue.") await self.out_queue.put(frame) await asyncio.sleep(1.0) # Send screen frame every second async def send_realtime(self): """Sends microphone audio or video frames from the out_queue to Gemini.""" while True: if not self.session or not self.out_queue: # Wait if session/queue not ready or if disconnected await asyncio.sleep(0.1) if not self.session: # Check again after sleep if disconnected print("send_realtime: Session closed, stopping task.") break continue try: msg = await asyncio.wait_for(self.out_queue.get(), timeout=1.0) # Wait with timeout if self.session: # Check again in case session closed while waiting # print(f"Sending {msg.get('mime_type', 'unknown type')} to Gemini...") await self.session.send(input=msg) self.out_queue.task_done() except asyncio.TimeoutError: # print("send_realtime: Queue empty, waiting...") continue # No message in queue, loop again except asyncio.CancelledError: print("send_realtime task cancelled.") break except Exception as e: print(f"Error in send_realtime: {e}") await run_coro_in_background_loop(update_status(f"Send Error: {e}")) # Avoid continuous errors if session is bad if isinstance(e, (google.api_core.exceptions.GoogleAPICallError, ConnectionError)): print("Connection error in send_realtime, pausing...") await asyncio.sleep(5) async def listen_audio(self): """Listens to microphone and puts audio chunks onto the out_queue.""" global pya if not pya: print("Error: PyAudio not initialized in listen_audio.") await run_coro_in_background_loop(update_status("Error: Audio system not ready.")) return mic_info = None stream = None try: print("Attempting to open microphone...") mic_info = await asyncio.to_thread(pya.get_default_input_device_info) stream = await asyncio.to_thread( pya.open, format=FORMAT, channels=CHANNELS, rate=SEND_SAMPLE_RATE, input=True, input_device_index=mic_info["index"], frames_per_buffer=CHUNK_SIZE, ) self.audio_stream = stream # Store reference for cleanup print("Microphone stream opened.") if __debug__: kwargs = {"exception_on_overflow": False} else: kwargs = {} while True: if not self.session: # Stop if disconnected print("listen_audio: Session closed, stopping microphone task.") break try: # print("Reading from microphone...") data = await asyncio.to_thread(stream.read, CHUNK_SIZE, **kwargs) if self.out_queue: # print("Putting microphone data in queue.") await self.out_queue.put({"data": data, "mime_type": "audio/pcm"}) except IOError as e: # This often happens if the buffer overflows or the stream is closed abruptly # print(f"PyAudio read error (possible overflow or stream closed): {e}") await asyncio.sleep(0.05) # Short pause before trying again except asyncio.CancelledError: print("listen_audio task cancelled.") break except OSError as e: print(f"Error opening microphone: {e}. Is a microphone connected and accessible?") await run_coro_in_background_loop(update_status(f"Mic Error: {e}")) except Exception as e: print(f"Error in listen_audio: {e}") traceback.print_exc() await run_coro_in_background_loop(update_status(f"Mic Error: {e}")) finally: if stream: print("Stopping and closing microphone stream.") await asyncio.to_thread(stream.stop_stream) await asyncio.to_thread(stream.close) self.audio_stream = None # Clear reference print("Microphone stream closed.") # --- Gradio Specific Audio Loop --- class GradioAudioLoop(OriginalAudioLoop): # Inherit and modify/add methods def __init__(self, video_mode=DEFAULT_VIDEO_MODE, api_key=None, voice_name=DEFAULT_VOICE): super().__init__(video_mode) self.api_key = api_key self.voice_name = voice_name self.client = None self.config = None self.connection_status = "Disconnected" # Internal status # Queues for communication between Gradio handler and background loop self.text_input_queue = asyncio.Queue() self.response_text_queue = asyncio.Queue() self.response_audio_queue = asyncio.Queue() self.response_event = asyncio.Event() # Signal when response is ready # Buffers for accumulating response data within a turn self.current_audio_buffer = io.BytesIO() self.current_text_response = "" def _initialize_client_and_config(self): """Initialize Gemini client and configuration.""" if not self.api_key: raise ValueError("API key is not set.") try: # Use v1beta for experimental models if needed, adjust if stable # http_options={"api_version": "v1beta"} # Try if v1alpha causes issues # Check if GEMINI_API_KEY env var exists, otherwise use provided key api_key_to_use = os.getenv("GEMINI_API_KEY", self.api_key) if not api_key_to_use: raise ValueError("No API key provided or found in GEMINI_API_KEY environment variable.") # Use Client instead of genai.configure if passing key directly print("Initializing Gemini Client...") self.client = genai.Client(api_key=api_key_to_use) print(f"Setting up LiveConnectConfig with voice: {self.voice_name}") self.config = types.LiveConnectConfig( response_modalities=["audio", "text"], # Get both audio and text speech_config=types.SpeechConfig( voice_config=types.VoiceConfig( prebuilt_voice_config=types.PrebuiltVoiceConfig(voice_name=self.voice_name) # Use selected voice ) ), system_instruction=types.Content( parts=[types.Part.from_text(text=SYSTEM_INSTRUCTION_TEXT)], role="user" # System instructions often role='user' ), ) print("Gemini client and config initialized successfully.") self.connection_status = "Initialized" return True # Indicate success except Exception as e: print(f"Error initializing Gemini client: {e}") self.client = None self.config = None self.connection_status = f"Initialization Error: {e}" return False # Indicate failure async def process_text_inputs(self): """ Task to wait for text input from Gradio and send it to Gemini """ while True: try: # Wait indefinitely for an item from the queue text_to_send = await self.text_input_queue.get() if text_to_send is None: # Use None as a signal to stop print("Stopping text input processing.") break if self.session and self.connection_status == "Connected": print(f"Sending text to Gemini: {text_to_send[:50]}...") # Reset response holders before sending new message self.current_audio_buffer = io.BytesIO() self.current_text_response = "" self.response_event.clear() # Send text and indicate end of turn await self.session.send(input=text_to_send or ".", end_of_turn=True) print("Text sent, waiting for response...") else: print(f"Warning: Cannot send text. Session not active or status is {self.connection_status}.") # Signal back an error to the waiting Gradio handler await self.response_text_queue.put(f"Error: Not connected or connection issue ({self.connection_status}). Cannot send message.") await self.response_audio_queue.put(b"") # Empty audio self.response_event.set() # Unblock the handler self.text_input_queue.task_done() # Mark task as done except asyncio.CancelledError: print("process_text_inputs task cancelled.") break except Exception as e: print(f"Error in process_text_inputs: {e}") # Signal error back to the waiting Gradio handler await self.response_text_queue.put(f"Error sending message: {e}") await self.response_audio_queue.put(b"") self.response_event.set() # Avoid loop BSoD on continuous errors await asyncio.sleep(1) async def receive_responses(self): """ Task to receive responses (audio/text) from Gemini """ while True: if not self.session or self.connection_status != "Connected": # print("receive_responses: Session not ready or not connected, waiting...") await asyncio.sleep(0.2) if not self.session: # Check if disconnected while waiting print("receive_responses: Session closed, stopping task.") break continue try: # print("Waiting for Gemini turn...") turn = self.session.receive() # This blocks until a turn starts # print("Gemini turn started.") async for response in turn: if data := response.data: # print(f"Received audio chunk: {len(data)} bytes") self.current_audio_buffer.write(data) if text := response.text: # print(f"Received text chunk: {text}") self.current_text_response += text # Turn complete - put results onto response queues and signal Gradio handler # print("Gemini turn complete.") audio_data = self.current_audio_buffer.getvalue() # print(f"Total audio received: {len(audio_data)} bytes") # print(f"Total text received: {self.current_text_response}") await self.response_audio_queue.put(audio_data) await self.response_text_queue.put(self.current_text_response) self.response_event.set() # Signal that response is ready for the Gradio handler except asyncio.CancelledError: print("receive_responses task cancelled.") break except google.api_core.exceptions.Cancelled: print("Gemini receive cancelled (likely due to interruption or end)") # Signal completion even if cancelled externally await self.response_audio_queue.put(self.current_audio_buffer.getvalue()) await self.response_text_queue.put(self.current_text_response + " [Receive Cancelled]") self.response_event.set() except Exception as e: print(f"Error receiving responses: {e}") traceback.print_exc() # Signal completion with error to unblock handler await self.response_audio_queue.put(b"") # Empty audio await self.response_text_queue.put(f"Error receiving response: {e}") self.response_event.set() # Pause on significant errors to avoid spamming logs if isinstance(e, (google.api_core.exceptions.GoogleAPICallError, ConnectionError)): print("Connection error in receive_responses, pausing...") self.connection_status = f"Receive Error: {e}" # Update status await asyncio.sleep(5) async def send_message_and_wait_for_response(self, text): """ Puts text on input queue and waits for the response event """ if not self.session or self.connection_status != "Connected": return f"Error: Not connected ({self.connection_status}).", None await self.text_input_queue.put(text) print("Waiting for response event...") try: # Wait for the event with a timeout await asyncio.wait_for(self.response_event.wait(), timeout=60.0) # 60 second timeout print("Response event received.") except asyncio.TimeoutError: print("Timeout waiting for Gemini response.") return "Error: Timeout waiting for response.", None except Exception as e: print(f"Error waiting for response event: {e}") return f"Error waiting for response: {e}", None # Retrieve results from response queues # Use get_nowait as the event guarantees items are present try: audio_data = self.response_audio_queue.get_nowait() text_response = self.response_text_queue.get_nowait() self.response_audio_queue.task_done() self.response_text_queue.task_done() except asyncio.QueueEmpty: print("Error: Response queues were empty after event was set.") return "Internal Error: Response queues empty.", None except Exception as e: print(f"Error retrieving from response queues: {e}") return f"Internal Error: {e}", None return text_response, audio_data async def run_main_loop(self): """ The main async method to establish connection and manage tasks """ global background_tasks if not self._initialize_client_and_config(): print("Initialization failed, cannot connect.") self.connection_status = "Connection Failed: Initialization error." await run_coro_in_background_loop(update_status(self.connection_status)) return # Stop if client setup failed try: print(f"Attempting to connect to Gemini model: {MODEL}...") self.connection_status = "Connecting..." await run_coro_in_background_loop(update_status(self.connection_status)) # --- Connect to Gemini --- # Use a timeout for the connection attempt itself try: # The actual connection happens within the context manager entry async with asyncio.wait_for( self.client.aio.live.connect(model=MODEL, config=self.config), timeout=30.0 # 30 second timeout for connection ) as session: self.session = session self.connection_status = "Connected" print("Session established successfully.") await run_coro_in_background_loop(update_status(self.connection_status)) # Queue for mic/video data TO Gemini self.out_queue = asyncio.Queue(maxsize=20) # --- Create and manage background tasks --- tasks = set() tasks.add(asyncio.create_task(self.process_text_inputs(), name="process_text_inputs")) tasks.add(asyncio.create_task(self.receive_responses(), name="receive_responses")) if self.video_mode != "none": tasks.add(asyncio.create_task(self.send_realtime(), name="send_realtime")) if self.video_mode == "camera": print("Starting camera input task...") tasks.add(asyncio.create_task(self.get_frames(), name="get_frames")) elif self.video_mode == "screen": print("Starting screen capture task...") tasks.add(asyncio.create_task(self.get_screen(), name="get_screen")) # Option to add microphone input alongside video if needed # print("Starting microphone input task...") # tasks.add(asyncio.create_task(self.listen_audio(), name="listen_audio")) background_tasks.update(tasks) # Keep running while connected and tasks are active # We primarily rely on receive_responses to detect session closure/errors while self.connection_status == "Connected" and self.session: await asyncio.sleep(0.5) # Check status periodically print("Exiting main run loop (disconnected or error).") except asyncio.TimeoutError: print("CONNECTION FAILED: Timeout while trying to connect.") self.connection_status = "Connection Failed: Timeout" await run_coro_in_background_loop(update_status(self.connection_status)) except google.api_core.exceptions.PermissionDenied as e: print(f"CONNECTION FAILED: Permission Denied. Check API key and permissions. {e}") self.connection_status = "Connection Failed: Permission Denied" await run_coro_in_background_loop(update_status(f"{self.connection_status}. Check API Key.")) except google.api_core.exceptions.InvalidArgument as e: print(f"CONNECTION FAILED: Invalid Argument. Check model name ('{MODEL}') and config. {e}") self.connection_status = f"Connection Failed: Invalid Argument (Model/Config?)" await run_coro_in_background_loop(update_status(f"{self.connection_status} Details: {e}")) except Exception as e: # Catch other potential connection errors print(f"CONNECTION FAILED: An unexpected error occurred during connection. {e}") traceback.print_exc() self.connection_status = f"Connection Failed: {e}" await run_coro_in_background_loop(update_status(self.connection_status)) except asyncio.CancelledError: print("run_main_loop task cancelled.") self.connection_status = "Disconnected (Cancelled)" except Exception as e: print(f"Error in AudioLoop run_main_loop: {e}") traceback.print_exc() self.connection_status = f"Runtime Error: {e}" await run_coro_in_background_loop(update_status(self.connection_status)) finally: print("Cleaning up audio loop resources...") final_status = self.connection_status # Capture status before changing if final_status == "Connected": # If loop exited cleanly but was connected final_status = "Disconnected" self.connection_status = "Disconnected" # Ensure status is updated # Cancel remaining background tasks associated with *this* instance tasks_to_cancel = list(background_tasks) # Iterate over a copy background_tasks.clear() # Clear global set for this instance for task in tasks_to_cancel: if task and not task.done(): task.cancel() print(f"Cancelled task: {task.get_name()}") # Close PyAudio stream if open (managed by listen_audio task's finally now) # if self.audio_stream and not self.audio_stream.is_stopped(): ... handled in listen_audio # Reset session and client self.session = None # Important to signal disconnection self.client = None self.out_queue = None # Clear queue reference print("AudioLoop run finished.") await run_coro_in_background_loop(update_status(final_status)) # Update Gradio status async def disconnect(self): """Initiates the disconnection process.""" print("Disconnect requested.") if self.session: # The run_main_loop should detect the session closing or errors. # Explicitly closing the session might be possible depending on the SDK, # but often letting the context manager exit is the intended way. # For now, just update status and let the main loop handle cleanup. print("Setting status to Disconnecting...") self.connection_status = "Disconnecting" # Signal tasks relying on session to stop self.session = None # This should help loops terminate # Put None on input queue to stop text processor if waiting await self.text_input_queue.put(None) else: self.connection_status = "Disconnected" # Cleanup might happen in run_main_loop's finally block await run_coro_in_background_loop(update_status("Disconnected")) # --- Helper Functions --- def start_asyncio_loop(): """Starts the asyncio event loop in a separate thread.""" global background_loop, stop_background_loop stop_background_loop = False background_loop = asyncio.new_event_loop() asyncio.set_event_loop(background_loop) print("Background asyncio loop starting.") try: # Run until explicitly stopped while not stop_background_loop: background_loop.call_later(0.1, background_loop.stop) # Wake up periodically background_loop.run_forever() if stop_background_loop: print("Stop signal received, exiting run_forever loop.") break # Exit outer loop if stopped # Run pending tasks before closing print("Running pending tasks before closing loop...") pending = asyncio.all_tasks(loop=background_loop) if pending: background_loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True)) except Exception as e: print(f"Error in background loop: {e}") traceback.print_exc() finally: if background_loop.is_running(): background_loop.stop() print("Closing background loop...") # Give tasks a moment to finish cancelling time.sleep(0.5) background_loop.close() print("Background asyncio loop stopped.") background_loop = None # Clear global ref def stop_asyncio_loop(): """Signals the background asyncio loop to stop.""" global stop_background_loop, background_loop print("Signalling background loop to stop...") stop_background_loop = True if background_loop and background_loop.is_running(): # This helps wake up the loop if it's idle background_loop.call_soon_threadsafe(background_loop.stop) async def run_coro_in_background_loop(coro): """Submits a coroutine to the background event loop and returns its future.""" global background_loop if background_loop and background_loop.is_running() and not stop_background_loop: try: # Use run_coroutine_threadsafe for thread safety future = asyncio.run_coroutine_threadsafe(coro, background_loop) return future # Return the concurrent.futures.Future except RuntimeError as e: # Handle cases where the loop might be shutting down print(f"Error submitting coroutine (loop shutting down?): {e}") future = asyncio.Future() future.set_exception(e) return None # Indicate failure to schedule except Exception as e: print(f"Unexpected error submitting coroutine: {e}") future = asyncio.Future() future.set_exception(e) return None else: print("Error: Background asyncio loop not running or stopping.") # Create a dummy future that resolves immediately with an error? # Or just return None to indicate failure return None def format_audio_for_gradio(pcm_data): """Converts raw PCM data to a format Gradio's Audio component can use.""" if not pcm_data: # print("No audio data received to format.") return None try: # Ensure PyAudio is initialized to get sample width if not pya: initialize_py_audio() if not pya: return None # Could not initialize # Create a WAV file in memory wav_buffer = io.BytesIO() with wave.open(wav_buffer, 'wb') as wf: wf.setnchannels(CHANNELS) wf.setsampwidth(pya.get_sample_size(FORMAT)) # Should be 2 for paInt16 wf.setframerate(RECEIVE_SAMPLE_RATE) wf.writeframes(pcm_data) wav_buffer.seek(0) # Read the WAV data back and convert to numpy array with wave.open(wav_buffer, 'rb') as wf_read: n_frames = wf_read.getnframes() data = wf_read.readframes(n_frames) dtype = np.int16 # Based on pyaudio.paInt16 numpy_array = np.frombuffer(data, dtype=dtype) # print(f"Formatted audio: {len(numpy_array)} samples, rate {RECEIVE_SAMPLE_RATE}") # Return tuple for Gradio Audio: (sample_rate, numpy_array) return (RECEIVE_SAMPLE_RATE, numpy_array) except Exception as e: print(f"Error formatting audio: {e}") traceback.print_exc() return None def initialize_py_audio(): global pya if pya is None: try: print("Initializing PyAudio...") pya = pyaudio.PyAudio() print("PyAudio initialized.") return True except Exception as e: print(f"Failed to initialize PyAudio: {e}") pya = None return False return True # Already initialized def terminate_py_audio(): global pya if pya: print("Terminating PyAudio...") try: pya.terminate() except Exception as e: print(f"Error terminating PyAudio: {e}") finally: pya = None print("PyAudio terminated.") # --- Gradio Interface and Handlers --- # Placeholder for status updates - needs to run in the background loop async def update_status(new_status: str): """Coroutine to update the Gradio status component.""" # This function itself doesn't directly update Gradio. # It relies on being scheduled and the Gradio handler returning the value. # However, for internal logging, we print here. print(f"Status Update (async): {new_status}") # The actual update happens when the calling handler returns this status # For direct async updates, you'd need Gradio's streaming features if applicable. def handle_connect(api_key, voice_name, video_mode): """Handles the 'Connect' button click.""" global audio_loop_instance, background_loop, background_thread print("\n--- Connect Button Clicked ---") status = "Connecting..." yield status, None, None # Initial status update if not api_key: yield "Error: Please enter a Gemini API key.", None, None return if audio_loop_instance and audio_loop_instance.connection_status not in ["Disconnected", "Initialization Error", "Connection Failed: Timeout", "Connection Failed: Permission Denied", "Connection Failed: Invalid Argument (Model/Config?)"]: yield f"Already connected or connecting ({audio_loop_instance.connection_status}). Disconnect first.", None, None return # Start background loop thread if not running if not background_thread or not background_thread.is_alive(): print("Starting background thread...") background_thread = threading.Thread(target=start_asyncio_loop, daemon=True) background_thread.start() time.sleep(0.5) # Give the loop a moment to start # Ensure PyAudio is initialized if not initialize_py_audio(): yield "Error: Failed to initialize audio system.", None, None return print(f"Attempting to connect with voice: {voice_name}, video: {video_mode}") audio_loop_instance = GradioAudioLoop(video_mode=video_mode, api_key=api_key, voice_name=voice_name) # Run the audio loop's main logic in the background asyncio loop connect_future = run_coro_in_background_loop(audio_loop_instance.run_main_loop()) if not connect_future: audio_loop_instance = None # Cleanup if scheduling failed yield "Error: Failed to schedule connection task.", None, None return # Don't block Gradio here. The run_main_loop will update status via update_status coroutine calls. # We yield the initial "Connecting..." status. Subsequent updates handled async. # We might need a short sleep/check or rely purely on async updates. Let's rely on async updates. # yield "Connecting... Waiting for confirmation.", None, None # Add a small delay to allow the initial connection steps to run and update status await asyncio.sleep(1) # Use await if in async context, time.sleep otherwise? Gradio handler might be sync. # Use time.sleep in sync Gradio handler context time.sleep(1.5) # The final status will be updated by the run_main_loop's finally block or error handling # Check the instance status directly after a short wait if audio_loop_instance: current_status = audio_loop_instance.connection_status yield current_status, None, None else: # This case shouldn't happen if scheduling worked, but as a fallback yield "Error: Connection process failed unexpectedly.", None, None def handle_disconnect(): """Handles the 'Disconnect' button click.""" global audio_loop_instance print("\n--- Disconnect Button Clicked ---") status = "Disconnecting..." yield status, None, None # Initial status update if not audio_loop_instance or audio_loop_instance.connection_status == "Disconnected": yield "Already disconnected.", None, None return # Schedule the disconnect coroutine disconnect_future = run_coro_in_background_loop(audio_loop_instance.disconnect()) if not disconnect_future: yield "Error: Failed to schedule disconnection task.", None, None return try: # Wait briefly for disconnect to initiate disconnect_future.result(timeout=5.0) status = "Disconnected" except TimeoutError: status = "Disconnect timeout. Check logs." print("Timeout waiting for disconnect confirmation.") except Exception as e: status = f"Error during disconnect: {e}" print(f"Error during disconnect future result: {e}") # Clean up global instance audio_loop_instance = None # Optionally terminate PyAudio here or let atexit handle it # terminate_py_audio() # Can cause issues if connect is clicked again quickly yield status, None, None # Final status update def handle_send_message(message): """Handles sending a text message.""" global audio_loop_instance print(f"\n--- Sending Message: {message[:30]}... ---") if not audio_loop_instance or audio_loop_instance.connection_status != "Connected": yield "Error: Not connected. Cannot send message.", None # Update status text, no audio return if not message or message.strip() == "": yield "Cannot send empty message.", None return # Clear previous outputs yield "Sending message...", None # Update status, clear audio # Schedule the send/receive task and wait for its result response_future = run_coro_in_background_loop( audio_loop_instance.send_message_and_wait_for_response(message) ) if not response_future: yield "Error: Failed to schedule message task.", None return text_response = "Error: No response received." audio_output = None try: # Wait for the background task to complete and return results # Adjust timeout as needed result_text, result_audio_data = response_future.result(timeout=60.0) # Wait up to 60 secs text_response = result_text if result_audio_data: print(f"Received audio data ({len(result_audio_data)} bytes), formatting...") audio_output = format_audio_for_gradio(result_audio_data) if audio_output is None: print("Failed to format audio for Gradio.") text_response += " [Audio Formatting Error]" else: print("No audio data received in response.") text_response += " [No Audio Received]" except TimeoutError: print("Timeout waiting for response future.") text_response = "Error: Timeout waiting for Gemini response." # Optionally try to cancel the future if possible/needed except Exception as e: print(f"Error getting result from response future: {e}") traceback.print_exc() text_response = f"Error processing response: {e}" print(f"Final Text Response: {text_response}") print(f"Final Audio Output: {'Present' if audio_output else 'None'}") yield text_response, audio_output # --- Gradio Interface Definition --- with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# Gemini LiveConnect TTS Interface") gr.Markdown(f"Using Model: `{MODEL}`") with gr.Row(): api_key_input = gr.Textbox(label="Gemini API Key", type="password", placeholder="Enter your API key") voice_select = gr.Dropdown(label="Select Voice", choices=AVAILABLE_VOICES, value=DEFAULT_VOICE) video_mode_select = gr.Radio(label="Video Input (Optional)", choices=["none", "camera", "screen"], value=DEFAULT_VIDEO_MODE, visible=False) # Hidden for now, focus on TTS with gr.Row(): connect_button = gr.Button("Connect") disconnect_button = gr.Button("Disconnect") status_output = gr.Textbox(label="Status", value="Disconnected", interactive=False) with gr.Column(): message_input = gr.Textbox(label="Your Message", placeholder="Type your message here...") send_button = gr.Button("Send Message") with gr.Column(): gr.Markdown("## Response") response_text_output = gr.Textbox(label="Gemini Text", interactive=False) audio_output = gr.Audio(label="Gemini Audio", type="numpy", interactive=False) # Use numpy for (rate, data) tuple # --- Event Handlers --- connect_button.click( fn=handle_connect, inputs=[api_key_input, voice_select, video_mode_select], outputs=[status_output, response_text_output, audio_output] # Clear outputs on connect ) disconnect_button.click( fn=handle_disconnect, inputs=[], outputs=[status_output, response_text_output, audio_output] # Clear outputs on disconnect ) send_button.click( fn=handle_send_message, inputs=[message_input], outputs=[response_text_output, audio_output] ) # Allow sending message by pressing Enter in the textbox message_input.submit( fn=handle_send_message, inputs=[message_input], outputs=[response_text_output, audio_output] ) # --- Cleanup Function --- def cleanup(): print("Running cleanup...") global audio_loop_instance # Disconnect if connected if audio_loop_instance and audio_loop_instance.connection_status != "Disconnected": print("Disconnecting during cleanup...") disconnect_future = run_coro_in_background_loop(audio_loop_instance.disconnect()) if disconnect_future: try: disconnect_future.result(timeout=5.0) print("Disconnect successful during cleanup.") except Exception as e: print(f"Error during cleanup disconnect: {e}") audio_loop_instance = None # Signal background loop to stop stop_asyncio_loop() # Wait for background thread to finish if background_thread and background_thread.is_alive(): print("Waiting for background thread to join...") background_thread.join(timeout=5.0) if background_thread.is_alive(): print("Warning: Background thread did not exit cleanly.") # Terminate PyAudio terminate_py_audio() print("Cleanup finished.") # Register cleanup function to run on exit atexit.register(cleanup) # --- Main Execution --- if __name__ == "__main__": # Start the background thread immediately (optional, connect can start it too) # print("Starting background thread on launch...") # background_thread = threading.Thread(target=start_asyncio_loop, daemon=True) # background_thread.start() print("Launching Gradio Interface...") # Share=True to create a public link (remove if not needed) demo.queue().launch(share=False) # Keep main thread alive while Gradio is running (Gradio launch blocks) print("Gradio Interface closed.") # Cleanup is handled by atexit