# drive_paddy/pages/1_Live_Detection.py import streamlit as st from streamlit_webrtc import webrtc_streamer, RTCConfiguration, VideoProcessorBase import yaml import av import os from dotenv import load_dotenv import base64 import requests import queue import time from typing import List, Dict, Union # Correctly import from the drive_paddy package structure from src.detection.factory import get_detector from src.alerting.alert_system import get_alerter # --- Initialize Session State at the TOP of the script --- # This is the single source of truth for our queues and must run on every page load. if "status_queue" not in st.session_state: st.session_state.status_queue = queue.Queue() if "audio_queue" not in st.session_state: st.session_state.audio_queue = queue.Queue() if "last_status" not in st.session_state: st.session_state.last_status = {"drowsiness_level": "Awake", "lighting": "Good"} # --- Load Configuration and Environment Variables --- @st.cache_resource def load_app_config(): """Loads config from yaml and .env files.""" load_dotenv() # Navigate up to the root to find the config file config_path = "./config.yaml" with open(config_path, 'r') as f: config = yaml.safe_load(f) # Load secrets from environment secrets = { "gemini_api_key": os.getenv("GEMINI_API_KEY"), "turn_username": os.getenv("TURN_USERNAME"), "turn_credential": os.getenv("TURN_CREDENTIAL"), "metered_domain": os.getenv("METERED_DOMAIN"), "metered_api_key": os.getenv("METERED_KEY") } return config, secrets config, secrets = load_app_config() # --- Client-Side Audio Playback Function --- def autoplay_audio(audio_bytes: bytes): """Injects HTML to autoplay audio in the user's browser.""" b64 = base64.b64encode(audio_bytes).decode() md = f""" """ st.markdown(md, unsafe_allow_html=True) # --- WebRTC Video Processor --- class VideoProcessor(VideoProcessorBase): # The __init__ method now accepts the queues as arguments def __init__(self): # It uses the queues passed in from session_state, not new ones. self.status_queue = queue.Queue self.audio_queue = queue.Queue self._detector = get_detector(config) self._alerter = get_alerter(config, secrets["gemini_api_key"]) def recv(self, frame: av.VideoFrame) -> av.VideoFrame: img = frame.to_ndarray(format="bgr24") strategy = config.get('detection_strategy') # The return signature of process_frame varies by strategy. processed_frame, indicators, _ = self._detector.process_frame(img) drowsiness_level = indicators.get("drowsiness_level", "Awake") # This now correctly puts data into the shared session_state queue. self.status_queue.put(indicators) if drowsiness_level != "Awake": audio_data = self._alerter.trigger_alert(level=drowsiness_level) if audio_data: # This now correctly puts audio data into the shared queue. self.audio_queue.put(audio_data) else: self._alerter.reset_alert() return av.VideoFrame.from_ndarray(processed_frame, format="bgr24") # --- Page UI --- st.title("📹 Live Drowsiness Detection") st.info("Press 'START' to activate your camera and begin monitoring.") RTC_CONFIGURATION = RTCConfiguration({ "iceServers": [ { "urls": ["stun:stun.relay.metered.ca:80"], }, { "urls": ["turn:global.relay.metered.ca:80"], "username": "086986a440b20fe48229738b", "credential": "mOC7fVSg00zjlsTD", }, { "urls": ["turn:global.relay.metered.ca:80?transport=tcp"], "username": "086986a440b20fe48229738b", "credential": "mOC7fVSg00zjlsTD", }, { "urls": ["turn:global.relay.metered.ca:443"], "username": "086986a440b20fe48229738b", "credential": "mOC7fVSg00zjlsTD", }, { "urls": ["turns:global.relay.metered.ca:443?transport=tcp"], "username": "086986a440b20fe48229738b", "credential": "mOC7fVSg00zjlsTD", }, ] }) col1, col2 = st.columns([3, 1]) with col1: webrtc_ctx = webrtc_streamer( key="drowsiness-detection", # The factory now correctly passes the queues from session_state video_processor_factory=VideoProcessor, # rtc_configuration=RTC_CONFIGURATION, # media_stream_constraints={"video": True, "audio": False}, # async_processing=True, ) with col2: st.header("System Status") audio_placeholder = st.empty() if not webrtc_ctx.state.playing: st.warning("System Inactive.") else: st.success("✅ System Active & Monitoring") st.subheader("Live Status:") status_placeholder = st.empty() if webrtc_ctx.state.playing: try: # This now reads from the correct queue that the processor is writing to. status_result = st.session_state.status_queue.get(timeout=0.1) st.session_state.last_status = status_result except queue.Empty: pass with status_placeholder.container(): last_status = st.session_state.last_status drowsiness_level = last_status.get("drowsiness_level", "Awake") lighting = last_status.get("lighting", "Good") score = last_status.get("details", {}).get("Score", 0) st.metric(label="Lighting Condition", value=lighting) if lighting == "Low": st.warning("Detection paused due to low light.") if drowsiness_level == "Awake": st.info(f"✔️ Awake (Score: {score:.2f})") elif drowsiness_level == "Slightly Drowsy": st.warning(f"⚠️ Slightly Drowsy (Score: {score:.2f})") elif drowsiness_level == "Very Drowsy": st.error(f"🚨 Very Drowsy! (Score: {score:.2f})") try: audio_data = st.session_state.audio_queue.get(timeout=0.1) with audio_placeholder.container(): autoplay_audio(audio_data) except queue.Empty: pass time.sleep(0.1) st.rerun() else: with status_placeholder.container(): st.info("✔️ Driver is Awake")