import time import os import yaml import logging import numpy as np import gradio as gr import soundfile as sf from dotenv import load_dotenv from threading import Lock from queue import Queue import threading from src.detection.factory import get_detector # ───────────────────────────── logging logging.basicConfig( level=logging.INFO, format="%(asctime)s │ %(message)s", datefmt="%H:%M:%S", ) # ───────────────────────────── config / detector load_dotenv() with open("config.yaml") as f: CFG = yaml.safe_load(f) detector = get_detector(CFG) # ───────────────────────────── alert sound manager class AlertManager: def __init__(self, wav_path, cooldown_seconds=5): self.cooldown_seconds = cooldown_seconds self.last_alert_time = 0 self.lock = Lock() self.alert_queue = Queue(maxsize=1) # Load alert sound try: self.sample_rate, self.audio_data = sf.read(wav_path, dtype="float32") # Ensure stereo format for Gradio if self.audio_data.ndim == 1: self.audio_data = np.column_stack([self.audio_data, self.audio_data]) # Normalize to [-1, 1] range max_val = np.abs(self.audio_data).max() if max_val > 0: self.audio_data = self.audio_data / max_val logging.info(f"Loaded alert sound: {wav_path} " f"({len(self.audio_data)/self.sample_rate:.2f}s, " f"shape: {self.audio_data.shape})") self.is_loaded = True except Exception as e: logging.error(f"Failed to load alert sound: {e}") self.is_loaded = False self.sample_rate = 44100 self.audio_data = None def should_alert(self, drowsiness_level, lighting): """Check if we should trigger an alert""" if not self.is_loaded: return False with self.lock: current_time = time.monotonic() if (drowsiness_level != "Awake" and lighting != "Low" and (current_time - self.last_alert_time) > self.cooldown_seconds): self.last_alert_time = current_time return True return False def get_alert_audio(self): """Get the alert audio data""" if self.is_loaded: return (int(self.sample_rate), self.audio_data.copy()) return None # Initialize alert manager alert_manager = AlertManager( wav_path=CFG["alerting"].get("alert_sound_path"), cooldown_seconds=CFG["alerting"].get("alert_cooldown_seconds", 5) ) # ───────────────────────────── frame processing def process_live_frame(frame): """Process frame for drowsiness detection""" if frame is None: return ( np.zeros((480, 640, 3), dtype=np.uint8), "Status: Inactive", None ) t0 = time.perf_counter() try: # Process frame through detector processed, indic = detector.process_frame(frame) except Exception as e: logging.error(f"Error processing frame: {e}") processed = np.zeros_like(frame) indic = { "drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0} } # Extract detection results level = indic.get("drowsiness_level", "Awake") lighting = indic.get("lighting", "Good") score = indic.get("details", {}).get("Score", 0.0) # Log performance dt_ms = (time.perf_counter() - t0) * 1000.0 logging.info(f"{dt_ms:6.1f} ms │ {lighting:<4} │ {level:<14} │ score={score:.2f}") # Create status text status_txt = f"Lighting: {lighting}\n" if lighting == "Low": status_txt += "Detection paused – low light." else: status_txt += f"Status: {level}\nScore: {score:.2f}" # Check if we should trigger alert audio_out = None if alert_manager.should_alert(level, lighting): audio_out = alert_manager.get_alert_audio() if audio_out: logging.info("🔊 Alert triggered!") return processed, status_txt, audio_out # ───────────────────────────── UI with error handling def create_ui(): with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app: gr.Markdown("# 🚗 **Drive Paddy** – Real-time Drowsiness Detection") gr.Markdown("Webcam-based drowsiness detection with audio alerts.") with gr.Row(): with gr.Column(scale=2): cam = gr.Image( sources=["webcam"], streaming=True, label="Live Camera Feed" ) with gr.Column(scale=1): out_img = gr.Image(label="Processed Feed") out_text = gr.Textbox( label="Live Status", lines=4, interactive=False ) out_audio = gr.Audio( label="Alert Sound", autoplay=True, type="numpy", visible=True, ) # Add system info with gr.Row(): gr.Markdown(f""" **System Info:** - Alert cooldown: {CFG['alerting'].get('alert_cooldown_seconds', 5)}s - Alert sound loaded: {'✓' if alert_manager.is_loaded else '✗'} """) # Connect the streaming cam.stream( fn=process_live_frame, inputs=[cam], outputs=[out_img, out_text, out_audio] ) return app # ───────────────────────────── main if __name__ == "__main__": logging.info("Initializing Drive Paddy...") # Create and launch app app = create_ui() logging.info("Launching Gradio app...") app.launch( debug=True, share=False, # Set to True if you want a public link server_name="0.0.0.0", # Allow external connections server_port=7860 )