drive-paddy / app.py
Testys's picture
Update app.py
7c9f785 verified
raw
history blame
7.16 kB
# app_gradio.py
# ──────────────────────────────────────────────────────────
# Webcam β†’ geometric detector β†’ static WAV alert (with cooldown)
# Live console logs of per-frame latency + status.
#
# EDITED: This version uses a more robust method for audio playback
# in Gradio by dynamically creating the Audio component.
# ──────────────────────────────────────────────────────────
import time
import os
import yaml
import logging
import numpy as np
import gradio as gr
import soundfile as sf
from dotenv import load_dotenv
# This is a mock factory and detector for demonstration.
# Replace with your actual import.
from src.detection.factory import get_detector
# ───────────────────────────── logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s β”‚ %(message)s",
datefmt="%H:%M:%S",
)
# ───────────────────────────── config / detector
load_dotenv()
with open("config.yaml") as f:
CFG = yaml.safe_load(f)
detector = get_detector(CFG)
# ───────────────────────────── Alert Manager Class <--- CHANGE
# Encapsulating the alert logic makes the code much cleaner.
# It handles its own state (last alert time) internally.
class AlertManager:
def __init__(self, config):
self.cooldown_seconds = config.get("alert_cooldown_seconds", 5)
self.last_alert_time = 0
self.alert_data = None
self.sample_rate = None
# --- NEW: State variable to track if an alert is active ---
self.is_alert_active = False
self._load_sound(config.get("alert_sound_path"))
def _load_sound(self, wav_path):
if not wav_path:
logging.warning("No 'alert_sound_path' found in config.")
return
try:
# Load as int16 to avoid the Gradio conversion warning
data, sr = sf.read(wav_path, dtype="int16")
self.alert_data = data
self.sample_rate = sr
logging.info(f"Loaded alert sound: {wav_path} ({len(self.alert_data)/self.sample_rate:.2f}s)")
except Exception as e:
logging.error(f"Failed to load alert sound: {e}")
self.alert_data = None
def trigger_alert(self, level, lighting):
"""
Checks conditions and returns audio payload if a new alert should fire.
This is now stateful.
"""
# --- NEW LOGIC: Part 1 ---
# If an alert is currently active, we do nothing until the user is 'Awake'.
if self.is_alert_active:
if level == "Awake":
logging.info("βœ… Alert state reset. User is Awake. Re-arming system.")
self.is_alert_active = False
return None # Important: Return None to prevent any sound
# --- ORIGINAL LOGIC (with a small change) ---
# If no alert is active, check for conditions to fire a new one.
is_drowsy = level != "Awake"
is_good_light = lighting != "Low"
# The time-based cooldown is still useful to prevent flickering alerts.
is_ready = (time.monotonic() - self.last_alert_time) > self.cooldown_seconds
if self.alert_data is not None and is_drowsy and is_good_light and is_ready:
self.last_alert_time = time.monotonic()
# --- NEW LOGIC: Part 2 ---
# Set the alert to active so it doesn't fire again immediately.
self.is_alert_active = True
logging.info("πŸ”Š Drowsiness detected! Firing alert and setting state to active.")
return (self.sample_rate, self.alert_data.copy())
return None
# Initialize the alert manager
alert_manager = AlertManager(CFG["alerting"])
# ───────────────────────────── frame processing <--- MAJOR CHANGE
# Simplified by the AlertManager. No longer needs to pass 'last_alert_ts' back and forth.
def process_live_frame(frame):
if frame is None:
return (
np.zeros((480, 640, 3), dtype=np.uint8),
"Status: Inactive",
None # No audio output
)
t0 = time.perf_counter()
try:
# Assuming your detector returns (processed_image, indicators_dict)
processed, indic = detector.process_frame(frame)
except Exception as e:
logging.error(f"Error processing frame: {e}")
processed = np.zeros_like(frame)
indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}}
level = indic.get("drowsiness_level", "Awake")
lighting = indic.get("lighting", "Good")
score = indic.get("details", {}).get("Score", 0.0)
dt_ms = (time.perf_counter() - t0) * 1000.0
logging.info(f"{dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")
status_txt = (
f"Lighting: {lighting}\n"
+ ("Detection paused – low light." if lighting == "Low"
else f"Status: {level}\nScore: {score:.2f}")
)
# Check for an alert and get the audio payload if ready
audio_payload = alert_manager.trigger_alert(level, lighting)
# This is the key: return a new gr.Audio component when an alert fires.
# Otherwise, return None to clear the component on the frontend.
if audio_payload:
return processed, status_txt, gr.Audio(value=audio_payload, autoplay=True)
else:
return processed, status_txt, None
# ───────────────────────────── UI <--- CHANGE
with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app:
gr.Markdown("# πŸš— **Drive Paddy** – Robust Alert Demo")
gr.Markdown("Webcam-based drowsiness detection Β· console shows real-time logs.")
with gr.Row():
with gr.Column(scale=2):
cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed")
with gr.Column(scale=1):
out_img = gr.Image(label="Processed Feed")
out_text = gr.Textbox(label="Live Status", lines=3, interactive=False)
# This audio component now acts as a placeholder.
# We make it invisible because we don't need to show the player controls.
# The backend will dynamically send a new, playable component to it.
out_audio = gr.Audio(
label="Alert",
autoplay=True,
visible=False, # Hiding the component for a cleaner UX
)
# The gr.State for managing the timestamp is no longer needed, simplifying the stream call.
cam.stream(
fn=process_live_frame,
inputs=[cam],
outputs=[out_img, out_text, out_audio] # The output now targets the placeholder
)
if __name__ == "__main__":
logging.info("Launching Gradio app…")
app.launch(debug=True)