Spaces:
Sleeping
Sleeping
# app_gradio.py | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# Webcam β geometric detector β static WAV alert (with cooldown) | |
# Live console logs of per-frame latency + status. | |
# | |
# EDITED: This version uses a more robust method for audio playback | |
# in Gradio by dynamically creating the Audio component. | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
import time | |
import os | |
import yaml | |
import logging | |
import numpy as np | |
import gradio as gr | |
import soundfile as sf | |
from dotenv import load_dotenv | |
# This is a mock factory and detector for demonstration. | |
# Replace with your actual import. | |
from src.detection.factory import get_detector | |
# βββββββββββββββββββββββββββββ logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s β %(message)s", | |
datefmt="%H:%M:%S", | |
) | |
# βββββββββββββββββββββββββββββ config / detector | |
load_dotenv() | |
with open("config.yaml") as f: | |
CFG = yaml.safe_load(f) | |
detector = get_detector(CFG) | |
# βββββββββββββββββββββββββββββ Alert Manager Class <--- CHANGE | |
# Encapsulating the alert logic makes the code much cleaner. | |
# It handles its own state (last alert time) internally. | |
class AlertManager: | |
def __init__(self, config): | |
self.cooldown_seconds = config.get("alert_cooldown_seconds", 5) | |
self.last_alert_time = 0 | |
self.alert_data = None | |
self.sample_rate = None | |
# --- NEW: State variable to track if an alert is active --- | |
self.is_alert_active = False | |
self._load_sound(config.get("alert_sound_path")) | |
def _load_sound(self, wav_path): | |
if not wav_path: | |
logging.warning("No 'alert_sound_path' found in config.") | |
return | |
try: | |
# Load as int16 to avoid the Gradio conversion warning | |
data, sr = sf.read(wav_path, dtype="int16") | |
self.alert_data = data | |
self.sample_rate = sr | |
logging.info(f"Loaded alert sound: {wav_path} ({len(self.alert_data)/self.sample_rate:.2f}s)") | |
except Exception as e: | |
logging.error(f"Failed to load alert sound: {e}") | |
self.alert_data = None | |
def trigger_alert(self, level, lighting): | |
""" | |
Checks conditions and returns audio payload if a new alert should fire. | |
This is now stateful. | |
""" | |
# --- NEW LOGIC: Part 1 --- | |
# If an alert is currently active, we do nothing until the user is 'Awake'. | |
if self.is_alert_active: | |
if level == "Awake": | |
logging.info("β Alert state reset. User is Awake. Re-arming system.") | |
self.is_alert_active = False | |
return None # Important: Return None to prevent any sound | |
# --- ORIGINAL LOGIC (with a small change) --- | |
# If no alert is active, check for conditions to fire a new one. | |
is_drowsy = level != "Awake" | |
is_good_light = lighting != "Low" | |
# The time-based cooldown is still useful to prevent flickering alerts. | |
is_ready = (time.monotonic() - self.last_alert_time) > self.cooldown_seconds | |
if self.alert_data is not None and is_drowsy and is_good_light and is_ready: | |
self.last_alert_time = time.monotonic() | |
# --- NEW LOGIC: Part 2 --- | |
# Set the alert to active so it doesn't fire again immediately. | |
self.is_alert_active = True | |
logging.info("π Drowsiness detected! Firing alert and setting state to active.") | |
return (self.sample_rate, self.alert_data.copy()) | |
return None | |
# Initialize the alert manager | |
alert_manager = AlertManager(CFG["alerting"]) | |
# βββββββββββββββββββββββββββββ frame processing <--- MAJOR CHANGE | |
# Simplified by the AlertManager. No longer needs to pass 'last_alert_ts' back and forth. | |
def process_live_frame(frame): | |
if frame is None: | |
return ( | |
np.zeros((480, 640, 3), dtype=np.uint8), | |
"Status: Inactive", | |
None # No audio output | |
) | |
t0 = time.perf_counter() | |
try: | |
# Assuming your detector returns (processed_image, indicators_dict) | |
processed, indic = detector.process_frame(frame) | |
except Exception as e: | |
logging.error(f"Error processing frame: {e}") | |
processed = np.zeros_like(frame) | |
indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}} | |
level = indic.get("drowsiness_level", "Awake") | |
lighting = indic.get("lighting", "Good") | |
score = indic.get("details", {}).get("Score", 0.0) | |
dt_ms = (time.perf_counter() - t0) * 1000.0 | |
logging.info(f"{dt_ms:6.1f} ms β {lighting:<4} β {level:<14} β score={score:.2f}") | |
status_txt = ( | |
f"Lighting: {lighting}\n" | |
+ ("Detection paused β low light." if lighting == "Low" | |
else f"Status: {level}\nScore: {score:.2f}") | |
) | |
# Check for an alert and get the audio payload if ready | |
audio_payload = alert_manager.trigger_alert(level, lighting) | |
# This is the key: return a new gr.Audio component when an alert fires. | |
# Otherwise, return None to clear the component on the frontend. | |
if audio_payload: | |
return processed, status_txt, gr.Audio(value=audio_payload, autoplay=True) | |
else: | |
return processed, status_txt, None | |
# βββββββββββββββββββββββββββββ UI <--- CHANGE | |
with gr.Blocks(title="Drive Paddy β Drowsiness Detection") as app: | |
gr.Markdown("# π **Drive Paddy** β Robust Alert Demo") | |
gr.Markdown("Webcam-based drowsiness detection Β· console shows real-time logs.") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed") | |
with gr.Column(scale=1): | |
out_img = gr.Image(label="Processed Feed") | |
out_text = gr.Textbox(label="Live Status", lines=3, interactive=False) | |
# This audio component now acts as a placeholder. | |
# We make it invisible because we don't need to show the player controls. | |
# The backend will dynamically send a new, playable component to it. | |
out_audio = gr.Audio( | |
label="Alert", | |
autoplay=True, | |
visible=False, # Hiding the component for a cleaner UX | |
) | |
# The gr.State for managing the timestamp is no longer needed, simplifying the stream call. | |
cam.stream( | |
fn=process_live_frame, | |
inputs=[cam], | |
outputs=[out_img, out_text, out_audio] # The output now targets the placeholder | |
) | |
if __name__ == "__main__": | |
logging.info("Launching Gradio appβ¦") | |
app.launch(debug=True) |