Spaces:
Sleeping
Sleeping
import time | |
import os | |
import yaml | |
import logging | |
import numpy as np | |
import gradio as gr | |
import soundfile as sf | |
from dotenv import load_dotenv | |
from threading import Lock | |
from queue import Queue | |
import threading | |
from src.detection.factory import get_detector | |
# βββββββββββββββββββββββββββββ logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s β %(message)s", | |
datefmt="%H:%M:%S", | |
) | |
# βββββββββββββββββββββββββββββ config / detector | |
load_dotenv() | |
with open("config.yaml") as f: | |
CFG = yaml.safe_load(f) | |
detector = get_detector(CFG) | |
# βββββββββββββββββββββββββββββ alert sound manager | |
class AlertManager: | |
def __init__(self, wav_path, cooldown_seconds=5): | |
self.cooldown_seconds = cooldown_seconds | |
self.last_alert_time = 0 | |
self.lock = Lock() | |
self.alert_queue = Queue(maxsize=1) | |
# Load alert sound | |
try: | |
self.sample_rate, self.audio_data = sf.read(wav_path, dtype="float32") | |
# Ensure stereo format for Gradio | |
if self.audio_data.ndim == 1: | |
self.audio_data = np.column_stack([self.audio_data, self.audio_data]) | |
# Normalize to [-1, 1] range | |
max_val = np.abs(self.audio_data).max() | |
if max_val > 0: | |
self.audio_data = self.audio_data / max_val | |
logging.info(f"Loaded alert sound: {wav_path} " | |
f"({len(self.audio_data)/self.sample_rate:.2f}s, " | |
f"shape: {self.audio_data.shape})") | |
self.is_loaded = True | |
except Exception as e: | |
logging.error(f"Failed to load alert sound: {e}") | |
self.is_loaded = False | |
self.sample_rate = 44100 | |
self.audio_data = None | |
def should_alert(self, drowsiness_level, lighting): | |
"""Check if we should trigger an alert""" | |
if not self.is_loaded: | |
return False | |
with self.lock: | |
current_time = time.monotonic() | |
if (drowsiness_level != "Awake" | |
and lighting != "Low" | |
and (current_time - self.last_alert_time) > self.cooldown_seconds): | |
self.last_alert_time = current_time | |
return True | |
return False | |
def get_alert_audio(self): | |
"""Get the alert audio data""" | |
if self.is_loaded: | |
return (int(self.sample_rate), self.audio_data.copy()) | |
return None | |
# Initialize alert manager | |
alert_manager = AlertManager( | |
wav_path=CFG["alerting"].get("alert_sound_path"), | |
cooldown_seconds=CFG["alerting"].get("alert_cooldown_seconds", 5) | |
) | |
# βββββββββββββββββββββββββββββ frame processing | |
def process_live_frame(frame): | |
"""Process frame for drowsiness detection""" | |
if frame is None: | |
return ( | |
np.zeros((480, 640, 3), dtype=np.uint8), | |
"Status: Inactive", | |
None | |
) | |
t0 = time.perf_counter() | |
try: | |
# Process frame through detector | |
processed, indic = detector.process_frame(frame) | |
except Exception as e: | |
logging.error(f"Error processing frame: {e}") | |
processed = np.zeros_like(frame) | |
indic = { | |
"drowsiness_level": "Error", | |
"lighting": "Unknown", | |
"details": {"Score": 0.0} | |
} | |
# Extract detection results | |
level = indic.get("drowsiness_level", "Awake") | |
lighting = indic.get("lighting", "Good") | |
score = indic.get("details", {}).get("Score", 0.0) | |
# Log performance | |
dt_ms = (time.perf_counter() - t0) * 1000.0 | |
logging.info(f"{dt_ms:6.1f} ms β {lighting:<4} β {level:<14} β score={score:.2f}") | |
# Create status text | |
status_txt = f"Lighting: {lighting}\n" | |
if lighting == "Low": | |
status_txt += "Detection paused β low light." | |
else: | |
status_txt += f"Status: {level}\nScore: {score:.2f}" | |
# Check if we should trigger alert | |
audio_out = None | |
if alert_manager.should_alert(level, lighting): | |
audio_out = alert_manager.get_alert_audio() | |
if audio_out: | |
logging.info("π Alert triggered!") | |
return processed, status_txt, audio_out | |
# βββββββββββββββββββββββββββββ UI with error handling | |
def create_ui(): | |
with gr.Blocks(title="Drive Paddy β Drowsiness Detection") as app: | |
gr.Markdown("# π **Drive Paddy** β Real-time Drowsiness Detection") | |
gr.Markdown("Webcam-based drowsiness detection with audio alerts.") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
cam = gr.Image( | |
sources=["webcam"], | |
streaming=True, | |
label="Live Camera Feed" | |
) | |
with gr.Column(scale=1): | |
out_img = gr.Image(label="Processed Feed") | |
out_text = gr.Textbox( | |
label="Live Status", | |
lines=4, | |
interactive=False | |
) | |
out_audio = gr.Audio( | |
label="Alert Sound", | |
autoplay=True, | |
type="numpy", | |
visible=True, | |
) | |
# Add system info | |
with gr.Row(): | |
gr.Markdown(f""" | |
**System Info:** | |
- Alert cooldown: {CFG['alerting'].get('alert_cooldown_seconds', 5)}s | |
- Alert sound loaded: {'β' if alert_manager.is_loaded else 'β'} | |
""") | |
# Connect the streaming | |
cam.stream( | |
fn=process_live_frame, | |
inputs=[cam], | |
outputs=[out_img, out_text, out_audio] | |
) | |
return app | |
# βββββββββββββββββββββββββββββ main | |
if __name__ == "__main__": | |
logging.info("Initializing Drive Paddy...") | |
# Create and launch app | |
app = create_ui() | |
logging.info("Launching Gradio app...") | |
app.launch( | |
debug=True, | |
share=False, # Set to True if you want a public link | |
server_name="0.0.0.0", # Allow external connections | |
server_port=7860 | |
) |