# drive_paddy/pages/1_Live_Detection.py import streamlit as st from streamlit_webrtc import webrtc_streamer, RTCConfiguration, VideoProcessorBase import yaml import av import os from dotenv import load_dotenv import base64 import queue import time from src.detection.factory import get_detector from src.alerting.alert_system import get_alerter # --- Load Configuration and Environment Variables --- @st.cache_resource def load_app_config(): """Loads config from yaml and .env files.""" load_dotenv() config_path = "/config.yaml" if os.path.exists("/config.yaml") else "config.yaml" with open(config_path, 'r') as f: config = yaml.safe_load(f) # Load secrets from environment secrets = { "gemini_api_key": os.getenv("GEMINI_API_KEY"), "turn_username": os.getenv("TURN_USERNAME"), "turn_credential": os.getenv("TURN_CREDENTIAL") } return config, secrets config, secrets = load_app_config() # --- Initialize Session State for Queues at the TOP of the script --- if "status_queue" not in st.session_state: st.session_state.status_queue = queue.Queue() if "audio_queue" not in st.session_state: st.session_state.audio_queue = queue.Queue() # --- Client-Side Audio Playback Function --- def autoplay_audio(audio_bytes: bytes): """Injects HTML to autoplay audio in the user's browser.""" b64 = base64.b64encode(audio_bytes).decode() md = f""" """ st.markdown(md, unsafe_allow_html=True) # --- WebRTC Video Processor --- class VideoProcessor(VideoProcessorBase): # The __init__ method now accepts the queues as arguments def __init__(self, status_queue: queue.Queue, audio_queue: queue.Queue): self.status_queue = status_queue self.audio_queue = audio_queue self._detector = get_detector(config) self._alerter = get_alerter(config, secrets["gemini_api_key"]) def recv(self, frame: av.VideoFrame) -> av.VideoFrame: img = frame.to_ndarray(format="bgr24") strategy = config.get('detection_strategy') if strategy == 'hybrid': processed_frame, alert_triggered, active_alerts = self._detector.process_frame(img) # Use self.status_queue.put() instead of st.session_state self.status_queue.put(active_alerts if alert_triggered or 'Low Light' in active_alerts else {"status": "Awake"}) else: processed_frame, indicators = self._detector.process_frame(img) alert_triggered = any(v for k, v in indicators.items() if k != 'low_light') self.status_queue.put(indicators) if alert_triggered: audio_data = self._alerter.trigger_alert() if audio_data: # Use self.audio_queue.put() self.audio_queue.put(audio_data) else: self._alerter.reset_alert() return av.VideoFrame.from_ndarray(processed_frame, format="bgr24") # --- Page UI --- st.title("📹 Live Drowsiness Detection") st.info("Press 'START' to activate your camera and begin monitoring.") # --- Dynamically Build RTC Configuration --- ice_servers = [{'urls': 'stun:global.stun.twilio.com:3478'}] if secrets["turn_username"] and secrets["turn_credential"]: print("TURN credentials found, adding TURN servers to config.") turn_servers = [ {'urls': 'turn:global.turn.twilio.com:3478?transport=udp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]}, {'urls': 'turn:global.turn.twilio.com:3478?transport=tcp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]} ] ice_servers.extend(turn_servers) RTC_CONFIGURATION = RTCConfiguration({"iceServers": ice_servers}) col1, col2 = st.columns([3, 1]) with col1: webrtc_ctx = webrtc_streamer( key="drowsiness-detection", # The factory now correctly passes the queues from session_state video_processor_factory=VideoProcessor, rtc_configuration=RTC_CONFIGURATION, media_stream_constraints={"video": True, "audio": False}, async_processing=True, ) with col2: st.header("System Status") if not webrtc_ctx.state.playing: st.warning("System Inactive.") else: st.success("✅ System Active & Monitoring") st.subheader("Live Status:") status_placeholder = st.empty() audio_placeholder = st.empty() if webrtc_ctx.state.playing: if "last_status" not in st.session_state: st.session_state.last_status = {"status": "Awake"} try: # This line will now work because st.session_state.status_queue was initialized status_result = st.session_state.status_queue.get(timeout=0.1) st.session_state.last_status = status_result except queue.Empty: pass with status_placeholder.container(): last_status = st.session_state.last_status if last_status.get("Low Light"): st.warning("⚠️ Low Light Detected! Accuracy may be affected.") elif last_status.get("status") == "Awake": st.info("✔️ Driver is Awake") else: st.error("🚨 DROWSINESS DETECTED!") for key, value in last_status.items(): if key != "Low Light" and key != "status": st.warning(f"-> {key}: {value:.2f}" if isinstance(value, float) else f"-> {key}") try: audio_data = st.session_state.audio_queue.get(timeout=0.1) with audio_placeholder.container(): autoplay_audio(audio_data) except queue.Empty: pass time.sleep(0.1) st.rerun() else: with status_placeholder.container(): st.info("✔️ Driver is Awake")