Spaces:
Sleeping
Sleeping
File size: 6,519 Bytes
60b383e a7ec466 ba6a8ea 935769d b9d2bb7 935769d b9d2bb7 935769d b9d2bb7 935769d ba6a8ea a7ec466 ba6a8ea 60b383e a7ec466 3a175cd 60b383e a7ec466 60b383e a7ec466 935769d a7ec466 60b383e a7ec466 60b383e a7ec466 60b383e 935769d 60b383e a7ec466 935769d b9d2bb7 a7ec466 935769d a7ec466 5101617 a7ec466 8977de8 a7ec466 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 |
import time
import os
import yaml
import logging
import numpy as np
import gradio as gr
import soundfile as sf
from dotenv import load_dotenv
from threading import Lock
from queue import Queue
import threading
from src.detection.factory import get_detector
# βββββββββββββββββββββββββββββ logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s β %(message)s",
datefmt="%H:%M:%S",
)
# βββββββββββββββββββββββββββββ config / detector
load_dotenv()
with open("config.yaml") as f:
CFG = yaml.safe_load(f)
detector = get_detector(CFG)
# βββββββββββββββββββββββββββββ alert sound manager
class AlertManager:
def __init__(self, wav_path, cooldown_seconds=5):
self.cooldown_seconds = cooldown_seconds
self.last_alert_time = 0
self.lock = Lock()
self.alert_queue = Queue(maxsize=1)
# Load alert sound
try:
self.sample_rate, self.audio_data = sf.read(wav_path, dtype="float32")
# Ensure stereo format for Gradio
if self.audio_data.ndim == 1:
self.audio_data = np.column_stack([self.audio_data, self.audio_data])
# Normalize to [-1, 1] range
max_val = np.abs(self.audio_data).max()
if max_val > 0:
self.audio_data = self.audio_data / max_val
logging.info(f"Loaded alert sound: {wav_path} "
f"({len(self.audio_data)/self.sample_rate:.2f}s, "
f"shape: {self.audio_data.shape})")
self.is_loaded = True
except Exception as e:
logging.error(f"Failed to load alert sound: {e}")
self.is_loaded = False
self.sample_rate = 44100
self.audio_data = None
def should_alert(self, drowsiness_level, lighting):
"""Check if we should trigger an alert"""
if not self.is_loaded:
return False
with self.lock:
current_time = time.monotonic()
if (drowsiness_level != "Awake"
and lighting != "Low"
and (current_time - self.last_alert_time) > self.cooldown_seconds):
self.last_alert_time = current_time
return True
return False
def get_alert_audio(self):
"""Get the alert audio data"""
if self.is_loaded:
return (int(self.sample_rate), self.audio_data.copy())
return None
# Initialize alert manager
alert_manager = AlertManager(
wav_path=CFG["alerting"].get("alert_sound_path"),
cooldown_seconds=CFG["alerting"].get("alert_cooldown_seconds", 5)
)
# βββββββββββββββββββββββββββββ frame processing
def process_live_frame(frame):
"""Process frame for drowsiness detection"""
if frame is None:
return (
np.zeros((480, 640, 3), dtype=np.uint8),
"Status: Inactive",
None
)
t0 = time.perf_counter()
try:
# Process frame through detector
processed, indic = detector.process_frame(frame)
except Exception as e:
logging.error(f"Error processing frame: {e}")
processed = np.zeros_like(frame)
indic = {
"drowsiness_level": "Error",
"lighting": "Unknown",
"details": {"Score": 0.0}
}
# Extract detection results
level = indic.get("drowsiness_level", "Awake")
lighting = indic.get("lighting", "Good")
score = indic.get("details", {}).get("Score", 0.0)
# Log performance
dt_ms = (time.perf_counter() - t0) * 1000.0
logging.info(f"{dt_ms:6.1f} ms β {lighting:<4} β {level:<14} β score={score:.2f}")
# Create status text
status_txt = f"Lighting: {lighting}\n"
if lighting == "Low":
status_txt += "Detection paused β low light."
else:
status_txt += f"Status: {level}\nScore: {score:.2f}"
# Check if we should trigger alert
audio_out = None
if alert_manager.should_alert(level, lighting):
audio_out = alert_manager.get_alert_audio()
if audio_out:
logging.info("π Alert triggered!")
return processed, status_txt, audio_out
# βββββββββββββββββββββββββββββ UI with error handling
def create_ui():
with gr.Blocks(title="Drive Paddy β Drowsiness Detection") as app:
gr.Markdown("# π **Drive Paddy** β Real-time Drowsiness Detection")
gr.Markdown("Webcam-based drowsiness detection with audio alerts.")
with gr.Row():
with gr.Column(scale=2):
cam = gr.Image(
sources=["webcam"],
streaming=True,
label="Live Camera Feed"
)
with gr.Column(scale=1):
out_img = gr.Image(label="Processed Feed")
out_text = gr.Textbox(
label="Live Status",
lines=4,
interactive=False
)
out_audio = gr.Audio(
label="Alert Sound",
autoplay=True,
type="numpy",
visible=True,
)
# Add system info
with gr.Row():
gr.Markdown(f"""
**System Info:**
- Alert cooldown: {CFG['alerting'].get('alert_cooldown_seconds', 5)}s
- Alert sound loaded: {'β' if alert_manager.is_loaded else 'β'}
""")
# Connect the streaming
cam.stream(
fn=process_live_frame,
inputs=[cam],
outputs=[out_img, out_text, out_audio]
)
return app
# βββββββββββββββββββββββββββββ main
if __name__ == "__main__":
logging.info("Initializing Drive Paddy...")
# Create and launch app
app = create_ui()
logging.info("Launching Gradio app...")
app.launch(
debug=True,
share=False, # Set to True if you want a public link
server_name="0.0.0.0", # Allow external connections
server_port=7860
) |