import asyncio import base64 import json import os from threading import Event from datetime import datetime import gradio as gr import numpy as np import websockets.sync.client from dotenv import load_dotenv from gradio_webrtc import StreamHandler, WebRTC load_dotenv() # Predefined API key GEMINI_API_KEY = "AIzaSyBem8AlttTGdGxGH3bZEs0xcnw5RIF5BsY" class MedicalGeminiConfig: def __init__(self, api_key): self.api_key = api_key self.host = "generativelanguage.googleapis.com" self.model = "models/gemini-2.0-flash-exp" self.ws_url = f"wss://{self.host}/ws/google.ai.generativelanguage.v1alpha.GenerativeService.BidiGenerateContent?key={self.api_key}" def get_medical_system_prompt(self): return """You are SocioCare AI, a compassionate and knowledgeable medical preconsultation assistant. You are engaging in a real-time voice conversation with a patient for their preliminary health assessment. IMPORTANT GUIDELINES: - Speak naturally and conversationally, as if you're a caring healthcare professional - Be empathetic, warm, and reassuring while maintaining professionalism - Ask relevant follow-up questions to understand symptoms and concerns better - Provide general health guidance and preliminary assessments - ALWAYS emphasize that this is a preconsultation and not a substitute for professional medical care - If symptoms seem serious or urgent, encourage immediate medical attention - Maintain patient confidentiality and professionalism - Use simple, clear language that patients can understand - Be patient and allow time for the patient to explain their concerns thoroughly PRECONSULTATION FLOW: 1. Greet the patient warmly and introduce yourself as SocioCare AI 2. Ask about their main health concern or symptoms 3. Listen actively and ask clarifying questions about symptoms, duration, severity 4. Provide general health information and preliminary guidance 5. Recommend appropriate next steps (rest, hydration, seeing a doctor, specialist referral, etc.) 6. Offer to answer any additional questions about their health concerns 7. Provide a summary of key points discussed Remember: You are providing preliminary health assessment and information only. For diagnosis, treatment, and comprehensive care, patients should consult with licensed healthcare professionals.""" class AudioProcessor: @staticmethod def encode_audio(data, sample_rate): encoded = base64.b64encode(data.tobytes()).decode("UTF-8") return { "realtimeInput": { "mediaChunks": [ { "mimeType": f"audio/pcm;rate={sample_rate}", "data": encoded, } ], }, } @staticmethod def process_audio_response(data): audio_data = base64.b64decode(data) return np.frombuffer(audio_data, dtype=np.int16) class MedicalGeminiHandler(StreamHandler): def __init__( self, expected_layout="mono", output_sample_rate=24000, output_frame_size=480 ) -> None: super().__init__( expected_layout, output_sample_rate, output_frame_size, input_sample_rate=24000, ) self.config = None self.ws = None self.all_output_data = None self.audio_processor = AudioProcessor() self.args_set = Event() self.session_started = False self.conversation_log = [] def copy(self): return MedicalGeminiHandler( expected_layout=self.expected_layout, output_sample_rate=self.output_sample_rate, output_frame_size=self.output_frame_size, ) def _initialize_websocket(self): assert self.config, "Config not set" try: self.ws = websockets.sync.client.connect(self.config.ws_url, timeout=30) initial_request = { "setup": { "model": self.config.model, "generationConfig": { "responseModalities": ["AUDIO"], "speechConfig": { "voiceConfig": { "prebuiltVoiceConfig": { "voiceName": "Aoede" # Warm, professional voice } } } }, "systemInstruction": { "parts": [ { "text": self.config.get_medical_system_prompt() } ] } } } self.ws.send(json.dumps(initial_request)) setup_response = json.loads(self.ws.recv()) print(f"SocioCare AI preconsultation setup: {setup_response}") # Send initial greeting if not self.session_started: self._send_initial_greeting() self.session_started = True except websockets.exceptions.WebSocketException as e: print(f"WebSocket connection failed: {str(e)}") self.ws = None except Exception as e: print(f"Setup failed: {str(e)}") self.ws = None def _send_initial_greeting(self): """Send initial greeting to start the medical preconsultation""" try: greeting_message = { "clientContent": { "turns": [ { "role": "user", "parts": [ { "text": "Please start the preconsultation by greeting me as a patient and introducing yourself as SocioCare AI." } ] } ], "turnComplete": True } } self.ws.send(json.dumps(greeting_message)) except Exception as e: print(f"Error sending initial greeting: {str(e)}") async def fetch_args(self): if self.channel: self.channel.send("tick") def set_args(self, args): super().set_args(args) self.args_set.set() def receive(self, frame: tuple[int, np.ndarray]) -> None: if not self.channel: return if not self.config: # Use predefined API key instead of fetching from args self.config = MedicalGeminiConfig(GEMINI_API_KEY) try: if not self.ws: self._initialize_websocket() _, array = frame array = array.squeeze() audio_message = self.audio_processor.encode_audio( array, self.output_sample_rate ) self.ws.send(json.dumps(audio_message)) except Exception as e: print(f"Error in receive: {str(e)}") if self.ws: self.ws.close() self.ws = None def _process_server_content(self, content): for part in content.get("parts", []): data = part.get("inlineData", {}).get("data", "") if data: audio_array = self.audio_processor.process_audio_response(data) if self.all_output_data is None: self.all_output_data = audio_array else: self.all_output_data = np.concatenate( (self.all_output_data, audio_array) ) while self.all_output_data.shape[-1] >= self.output_frame_size: yield ( self.output_sample_rate, self.all_output_data[: self.output_frame_size].reshape(1, -1), ) self.all_output_data = self.all_output_data[ self.output_frame_size : ] def generator(self): while True: if not self.ws or not self.config: print("WebSocket not connected") yield None continue try: message = self.ws.recv(timeout=5) msg = json.loads(message) if "serverContent" in msg: content = msg["serverContent"].get("modelTurn", {}) yield from self._process_server_content(content) except TimeoutError: print("Timeout waiting for server response") yield None except Exception as e: print(f"Error in generator: {str(e)}") yield None def emit(self) -> tuple[int, np.ndarray] | None: if not self.ws: return None if not hasattr(self, "_generator"): self._generator = self.generator() try: return next(self._generator) except StopIteration: self.reset() return None def reset(self) -> None: if hasattr(self, "_generator"): delattr(self, "_generator") self.all_output_data = None def shutdown(self) -> None: if self.ws: self.ws.close() def check_connection(self): try: if not self.ws or self.ws.closed: self._initialize_websocket() return True except Exception as e: print(f"Connection check failed: {str(e)}") return False def get_rtc_configuration(): """ Get RTC configuration using only public STUN servers """ return { "iceServers": [ {"urls": "stun:stun.l.google.com:19302"}, {"urls": "stun:stun1.l.google.com:19302"}, {"urls": "stun:stun2.l.google.com:19302"}, {"urls": "stun:stun3.l.google.com:19302"}, {"urls": "stun:stun4.l.google.com:19302"}, ] } class SocioCareAIPreconsultation: def __init__(self): self.demo = self._create_interface() def _create_interface(self): # Modern dark theme CSS matching the image custom_css = """ """ with gr.Blocks(theme=gr.themes.Glass(), css=custom_css) as demo: with gr.Column(elem_classes=["main-container"]): # Audio waves visualization gr.HTML("""
""") # AI Icon gr.HTML("""
AI✨
""") # Title and Subtitle gr.HTML("""

AI Voice Agent

By SocioCare

""") # WebRTC Component with gr.Column(elem_classes=["webrtc-container"]): webrtc = WebRTC( label="", modality="audio", mode="send-receive", rtc_configuration=get_rtc_configuration(), ) webrtc.stream( MedicalGeminiHandler(), inputs=[webrtc], outputs=[webrtc], time_limit=600, # 10 minutes consultation concurrency_limit=3, ) # Status indicator gr.HTML("""
Ready to assist with your health consultation
""") return demo def launch(self): # Try to find an available port starting from 7860 import socket def find_free_port(start_port=7860): """Find a free port starting from the given port number""" for port in range(start_port, start_port + 100): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(('', port)) return port except OSError: continue return None # Get port from environment or find a free one port = int(os.environ.get("PORT", 0)) if os.environ.get("PORT") else find_free_port() if port is None: print("Could not find an available port. Please set the PORT environment variable.") return print(f"Starting AI Voice Agent server on port {port}") self.demo.launch( server_name="0.0.0.0", server_port=port, ssl_verify=False, ssl_keyfile=None, ssl_certfile=None, show_api=False, quiet=False, inbrowser=True # Automatically open in browser ) if __name__ == "__main__": app = SocioCareAIPreconsultation() app.launch()