Spaces:
Runtime error
Runtime error
| # drive_paddy/pages/1_Live_Detection.py | |
| import streamlit as st | |
| from streamlit_webrtc import webrtc_streamer, RTCConfiguration, VideoProcessorBase | |
| import yaml | |
| import av | |
| import os | |
| from dotenv import load_dotenv | |
| import base64 | |
| import queue | |
| import time | |
| from src.detection.factory import get_detector | |
| from src.alerting.alert_system import get_alerter | |
| if "status_queue" not in st.session_state: | |
| st.session_state.status_queue = queue.Queue() | |
| if "audio_queue" not in st.session_state: | |
| st.session_state.audio_queue = queue.Queue() | |
| if "last_status" not in st.session_state: | |
| st.session_state.last_status = {"drowsiness_level": "Awake", "lighting": "Good"} | |
| # --- Load Configuration and Environment Variables --- | |
| def load_app_config(): | |
| """Loads config from yaml and .env files.""" | |
| load_dotenv() | |
| config_path = "/config.yaml" if os.path.exists("/config.yaml") else "config.yaml" | |
| with open(config_path, 'r') as f: | |
| config = yaml.safe_load(f) | |
| # Load secrets from environment | |
| secrets = { | |
| "gemini_api_key": os.getenv("GEMINI_API_KEY"), | |
| "turn_username": os.getenv("TURN_USERNAME"), | |
| "turn_credential": os.getenv("TURN_CREDENTIAL") | |
| } | |
| return config, secrets | |
| config, secrets = load_app_config() | |
| # --- Client-Side Audio Playback Function --- | |
| def autoplay_audio(audio_bytes: bytes): | |
| """Injects HTML to autoplay audio in the user's browser.""" | |
| b64 = base64.b64encode(audio_bytes).decode() | |
| md = f""" | |
| <audio controls autoplay="true" style="display:none;"> | |
| <source src="data:audio/mp3;base64,{b64}" type="audio/mp3"> | |
| </audio> | |
| """ | |
| st.markdown(md, unsafe_allow_html=True) | |
| # --- WebRTC Video Processor --- | |
| class VideoProcessor(VideoProcessorBase): | |
| def __init__(self): | |
| self._detector = get_detector(config) | |
| self._alerter = get_alerter(config, secrets["gemini_api_key"]) | |
| # Thread-safe queues for communication | |
| self.status_queue = queue.Queue() | |
| self.audio_queue = queue.Queue() | |
| def recv(self, frame: av.VideoFrame) -> av.VideoFrame: | |
| img = frame.to_ndarray(format="bgr24") | |
| strategy = config.get('detection_strategy') | |
| # The return signature of process_frame varies by strategy. | |
| # We need to handle each case correctly. | |
| if strategy == 'hybrid': | |
| processed_frame, alert_triggered, active_alerts = self._detector.process_frame(img) | |
| self.status_queue.put(active_alerts if alert_triggered or 'Low Light' in active_alerts else {"status": "Awake"}) | |
| elif strategy == 'geometric': | |
| # The geometric processor returns frame, indicators, and landmarks. | |
| processed_frame, indicators, _ = self._detector.process_frame(img) | |
| alert_triggered = any(v for k, v in indicators.items() if k not in ['low_light', 'details']) | |
| self.status_queue.put(indicators if alert_triggered or indicators.get('low_light') else {"status": "Awake"}) | |
| print(f"Indicators: {indicators}") # Debugging line | |
| print(self.status_queue) # Debugging line | |
| elif strategy == 'cnn_model': | |
| # The cnn_model processor returns frame and indicators. | |
| processed_frame, indicators = self._detector.process_frame(img) | |
| alert_triggered = any(indicators.values()) | |
| self.status_queue.put(indicators if alert_triggered else {"status": "Awake"}) | |
| else: | |
| # Default case if strategy is unknown | |
| processed_frame = img | |
| alert_triggered = False | |
| if alert_triggered: | |
| audio_data = self._alerter.trigger_alert() | |
| if audio_data: | |
| self.audio_queue.put(audio_data) | |
| else: | |
| self._alerter.reset_alert() | |
| return av.VideoFrame.from_ndarray(processed_frame, format="bgr24") | |
| # --- Page UI --- | |
| st.title("πΉ Live Drowsiness Detection") | |
| st.info("Press 'START' to activate your camera and begin monitoring.") | |
| # --- Dynamically Build RTC Configuration --- | |
| ice_servers = [{'urls': 'stun:global.stun.twilio.com:3478'}] | |
| if secrets["turn_username"] and secrets["turn_credential"]: | |
| turn_servers = [ | |
| {'urls': 'turn:global.turn.twilio.com:3478?transport=udp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]}, | |
| {'urls': 'turn:global.turn.twilio.com:3478?transport=tcp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]} | |
| ] | |
| ice_servers.extend(turn_servers) | |
| RTC_CONFIGURATION = RTCConfiguration({"iceServers": ice_servers}) | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| webrtc_ctx = webrtc_streamer( | |
| key="drowsiness-detection", | |
| video_processor_factory=VideoProcessor, | |
| rtc_configuration=RTC_CONFIGURATION, # Use the new robust configuration | |
| media_stream_constraints={"video": True, "audio": False}, | |
| async_processing=True, | |
| ) | |
| with col2: | |
| st.header("System Status") | |
| if not webrtc_ctx.state.playing: | |
| st.warning("System Inactive.") | |
| else: | |
| st.success("β System Active & Monitoring") | |
| st.subheader("Live Status:") | |
| status_placeholder = st.empty() | |
| audio_placeholder = st.empty() | |
| if webrtc_ctx.state.playing: | |
| if "last_status" not in st.session_state: | |
| st.session_state.last_status = {"status": "Awake"} | |
| try: | |
| # This line will now work because st.session_state.status_queue was initialized | |
| status_result = st.session_state.status_queue.get(timeout=0.1) | |
| st.session_state.last_status = status_result | |
| except queue.Empty: | |
| pass | |
| with status_placeholder.container(): | |
| last_status = st.session_state.last_status | |
| drowsiness_level = last_status.get("drowsiness_level", "Awake") | |
| lighting = last_status.get("lighting", "Good") | |
| score = last_status.get("details", {}).get("Score", 0) | |
| st.metric(label="Lighting Condition", value=lighting) | |
| if lighting == "Low": st.warning("Detection paused due to low light.") | |
| if drowsiness_level == "Awake": st.info(f"βοΈ Awake (Score: {score:.2f})") | |
| elif drowsiness_level == "Slightly Drowsy": st.warning(f"β οΈ Slightly Drowsy (Score: {score:.2f})") | |
| elif drowsiness_level == "Very Drowsy": st.error(f"π¨ Very Drowsy! (Score: {score:.2f})") | |
| try: | |
| audio_data = st.session_state.audio_queue.get(timeout=0.1) | |
| with audio_placeholder.container(): | |
| autoplay_audio(audio_data) | |
| except queue.Empty: | |
| pass | |
| time.sleep(0.1) | |
| st.rerun() | |
| else: | |
| with status_placeholder.container(): | |
| st.info("βοΈ Driver is Awake") | |