drive-paddy / app.py
Testys's picture
Update app.py
a7ec466 verified
raw
history blame
6.52 kB
import time
import os
import yaml
import logging
import numpy as np
import gradio as gr
import soundfile as sf
from dotenv import load_dotenv
from threading import Lock
from queue import Queue
import threading
from src.detection.factory import get_detector
# ───────────────────────────── logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s β”‚ %(message)s",
datefmt="%H:%M:%S",
)
# ───────────────────────────── config / detector
load_dotenv()
with open("config.yaml") as f:
CFG = yaml.safe_load(f)
detector = get_detector(CFG)
# ───────────────────────────── alert sound manager
class AlertManager:
def __init__(self, wav_path, cooldown_seconds=5):
self.cooldown_seconds = cooldown_seconds
self.last_alert_time = 0
self.lock = Lock()
self.alert_queue = Queue(maxsize=1)
# Load alert sound
try:
self.sample_rate, self.audio_data = sf.read(wav_path, dtype="float32")
# Ensure stereo format for Gradio
if self.audio_data.ndim == 1:
self.audio_data = np.column_stack([self.audio_data, self.audio_data])
# Normalize to [-1, 1] range
max_val = np.abs(self.audio_data).max()
if max_val > 0:
self.audio_data = self.audio_data / max_val
logging.info(f"Loaded alert sound: {wav_path} "
f"({len(self.audio_data)/self.sample_rate:.2f}s, "
f"shape: {self.audio_data.shape})")
self.is_loaded = True
except Exception as e:
logging.error(f"Failed to load alert sound: {e}")
self.is_loaded = False
self.sample_rate = 44100
self.audio_data = None
def should_alert(self, drowsiness_level, lighting):
"""Check if we should trigger an alert"""
if not self.is_loaded:
return False
with self.lock:
current_time = time.monotonic()
if (drowsiness_level != "Awake"
and lighting != "Low"
and (current_time - self.last_alert_time) > self.cooldown_seconds):
self.last_alert_time = current_time
return True
return False
def get_alert_audio(self):
"""Get the alert audio data"""
if self.is_loaded:
return (int(self.sample_rate), self.audio_data.copy())
return None
# Initialize alert manager
alert_manager = AlertManager(
wav_path=CFG["alerting"].get("alert_sound_path"),
cooldown_seconds=CFG["alerting"].get("alert_cooldown_seconds", 5)
)
# ───────────────────────────── frame processing
def process_live_frame(frame):
"""Process frame for drowsiness detection"""
if frame is None:
return (
np.zeros((480, 640, 3), dtype=np.uint8),
"Status: Inactive",
None
)
t0 = time.perf_counter()
try:
# Process frame through detector
processed, indic = detector.process_frame(frame)
except Exception as e:
logging.error(f"Error processing frame: {e}")
processed = np.zeros_like(frame)
indic = {
"drowsiness_level": "Error",
"lighting": "Unknown",
"details": {"Score": 0.0}
}
# Extract detection results
level = indic.get("drowsiness_level", "Awake")
lighting = indic.get("lighting", "Good")
score = indic.get("details", {}).get("Score", 0.0)
# Log performance
dt_ms = (time.perf_counter() - t0) * 1000.0
logging.info(f"{dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")
# Create status text
status_txt = f"Lighting: {lighting}\n"
if lighting == "Low":
status_txt += "Detection paused – low light."
else:
status_txt += f"Status: {level}\nScore: {score:.2f}"
# Check if we should trigger alert
audio_out = None
if alert_manager.should_alert(level, lighting):
audio_out = alert_manager.get_alert_audio()
if audio_out:
logging.info("πŸ”Š Alert triggered!")
return processed, status_txt, audio_out
# ───────────────────────────── UI with error handling
def create_ui():
with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app:
gr.Markdown("# πŸš— **Drive Paddy** – Real-time Drowsiness Detection")
gr.Markdown("Webcam-based drowsiness detection with audio alerts.")
with gr.Row():
with gr.Column(scale=2):
cam = gr.Image(
sources=["webcam"],
streaming=True,
label="Live Camera Feed"
)
with gr.Column(scale=1):
out_img = gr.Image(label="Processed Feed")
out_text = gr.Textbox(
label="Live Status",
lines=4,
interactive=False
)
out_audio = gr.Audio(
label="Alert Sound",
autoplay=True,
type="numpy",
visible=True,
)
# Add system info
with gr.Row():
gr.Markdown(f"""
**System Info:**
- Alert cooldown: {CFG['alerting'].get('alert_cooldown_seconds', 5)}s
- Alert sound loaded: {'βœ“' if alert_manager.is_loaded else 'βœ—'}
""")
# Connect the streaming
cam.stream(
fn=process_live_frame,
inputs=[cam],
outputs=[out_img, out_text, out_audio]
)
return app
# ───────────────────────────── main
if __name__ == "__main__":
logging.info("Initializing Drive Paddy...")
# Create and launch app
app = create_ui()
logging.info("Launching Gradio app...")
app.launch(
debug=True,
share=False, # Set to True if you want a public link
server_name="0.0.0.0", # Allow external connections
server_port=7860
)