# app.py import time import os import yaml import logging import numpy as np import gradio as gr import soundfile as sf from dotenv import load_dotenv import cv2 # Retained for detector compatibility # Assuming the factory and processor are in the src directory from src.detection.factory import get_detector # ───────────────────────────── logging logging.basicConfig( level=logging.INFO, format="%(asctime)s │ %(message)s", datefmt="%H:%M:%S", ) # ───────────────────────────── config / detector load_dotenv() try: with open("config.yaml") as f: CFG = yaml.safe_load(f) except FileNotFoundError: logging.error("FATAL: config.yaml not found. Please ensure the file exists.") # Create a dummy CFG to prevent crashing the app on load CFG = {"alerting": {}, "geometric_settings": {}} detector = get_detector(CFG) # ───────────────────────────── Alert Manager Class (Unchanged) class AlertManager: def __init__(self, config): self.cooldown_seconds = config.get("alert_cooldown_seconds", 5) self.last_alert_time = 0 # --- FIX: The is_alert_active flag is no longer needed --- self._load_sound(config.get("alert_sound_path")) def _load_sound(self, wav_path): if not wav_path or not os.path.exists(wav_path): logging.warning(f"Alert sound not found at '{wav_path}'. Alerts will be silent.") self.alert_data = None self.sample_rate = None return try: data, sr = sf.read(wav_path, dtype="int16") self.alert_data = data self.sample_rate = sr logging.info(f"Loaded alert sound: {wav_path}") except Exception as e: logging.error(f"Failed to load alert sound: {e}") self.alert_data = None def trigger_alert(self, level, lighting): """ Triggers a repeating alert if drowsiness persists, governed by a cooldown. """ is_drowsy = level != "Awake" is_good_light = lighting != "Low" # --- FIX: New simplified logic for repeating alerts --- # Check for drowsiness conditions first. if is_drowsy and is_good_light and self.alert_data is not None: # Then, check if the cooldown period has passed since the last alert. cooldown_passed = (time.monotonic() - self.last_alert_time) > self.cooldown_seconds if cooldown_passed: # If conditions are met, fire the alert and reset the timer. self.last_alert_time = time.monotonic() logging.info(f"🔊 Drowsiness alert! Repeating every {self.cooldown_seconds}s until 'Awake'.") return (self.sample_rate, self.alert_data.copy()) # If conditions are not met (e.g., user is Awake or cooldown hasn't passed), do nothing. return None alert_manager = AlertManager(CFG.get("alerting", {})) # ───────────────────────────── Frame Processing for Tab 1 (Image Stream) - UPDATED def process_live_frame(frame): if frame is None: return (np.zeros((480, 640, 3), dtype=np.uint8), "Status: Inactive", None) t0 = time.perf_counter() try: # Call with draw_visuals=True processed, indic = detector.process_frame(frame, draw_visuals=True) except Exception as e: logging.error(f"Error processing frame: {e}") processed = np.zeros_like(frame) if frame is not None else np.zeros((480, 640, 3), dtype=np.uint8) indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}} level = indic.get("drowsiness_level", "Awake") lighting = indic.get("lighting", "Good") score = indic.get("details", {}).get("Score", 0.0) dt_ms = (time.perf_counter() - t0) * 1000.0 logging.info(f"IMAGE STREAM │ {dt_ms:6.1f} ms │ {lighting:<4} │ {level:<14} │ score={score:.2f}") status_txt = f"Lighting: {lighting}\n" + \ ("Detection paused – low light." if lighting == "Low" else f"Status: {level}\nScore: {score:.2f}") audio_payload = alert_manager.trigger_alert(level, lighting) return processed, status_txt, gr.Audio(value=audio_payload, autoplay=True) if audio_payload else None def process_for_stats_only(frame): """ Processes a frame but does not return any video/image output. This is the fastest method, focused only on status and alerts. """ if frame is None: return "Status: Inactive", None t0 = time.perf_counter() try: # Call with draw_visuals=False. The first returned value will be None. _, indic = detector.process_frame(frame, draw_visuals=False) except Exception as e: logging.error(f"Error processing frame: {e}") indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}} level = indic.get("drowsiness_level", "Awake") lighting = indic.get("lighting", "Good") score = indic.get("details", {}).get("Score", 0.0) dt_ms = (time.perf_counter() - t0) * 1000.0 logging.info(f"ANALYSIS ONLY │ {dt_ms:6.1f} ms │ {lighting:<4} │ {level:<14} │ score={score:.2f}") status_txt = ( f"Status: {level} (Score: {score:.2f})\n" f"Lighting: {lighting}\n" f"Processing Time: {dt_ms:.1f} ms" ) audio_payload = alert_manager.trigger_alert(level, lighting) audio_out = gr.Audio(value=audio_payload, autoplay=True) if audio_payload else None return status_txt, audio_out # ───────────────────────────── UI Definition (Unchanged) def create_readme_tab(): """Creates the content for the 'About' tab.""" gr.Markdown( """
Your AI-Powered Drowsiness Detection Assistant