Spaces:
Sleeping
Sleeping
File size: 7,164 Bytes
8ae9de4 60b383e a7ec466 8ae9de4 a7ec466 ba6a8ea 935769d b9d2bb7 935769d b9d2bb7 935769d b9d2bb7 935769d ba6a8ea 8ae9de4 a7ec466 8ae9de4 7c9f785 a7ec466 8ae9de4 7c9f785 8ae9de4 a7ec466 7c9f785 e0dcec0 8ae9de4 a7ec466 8ae9de4 7c9f785 8ae9de4 7c9f785 8ae9de4 7c9f785 8ae9de4 7c9f785 8ae9de4 a7ec466 7c9f785 8ae9de4 ba6a8ea 8ae9de4 a7ec466 3a175cd 60b383e 8ae9de4 60b383e 8ae9de4 935769d 8ae9de4 60b383e 8ae9de4 a7ec466 60b383e 8ae9de4 60b383e 935769d 60b383e 8ae9de4 935769d b9d2bb7 8ae9de4 a7ec466 8ae9de4 a7ec466 8ae9de4 5101617 8977de8 8ae9de4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
# app_gradio.py
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Webcam β geometric detector β static WAV alert (with cooldown)
# Live console logs of per-frame latency + status.
#
# EDITED: This version uses a more robust method for audio playback
# in Gradio by dynamically creating the Audio component.
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
import time
import os
import yaml
import logging
import numpy as np
import gradio as gr
import soundfile as sf
from dotenv import load_dotenv
# This is a mock factory and detector for demonstration.
# Replace with your actual import.
from src.detection.factory import get_detector
# βββββββββββββββββββββββββββββ logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s β %(message)s",
datefmt="%H:%M:%S",
)
# βββββββββββββββββββββββββββββ config / detector
load_dotenv()
with open("config.yaml") as f:
CFG = yaml.safe_load(f)
detector = get_detector(CFG)
# βββββββββββββββββββββββββββββ Alert Manager Class <--- CHANGE
# Encapsulating the alert logic makes the code much cleaner.
# It handles its own state (last alert time) internally.
class AlertManager:
def __init__(self, config):
self.cooldown_seconds = config.get("alert_cooldown_seconds", 5)
self.last_alert_time = 0
self.alert_data = None
self.sample_rate = None
# --- NEW: State variable to track if an alert is active ---
self.is_alert_active = False
self._load_sound(config.get("alert_sound_path"))
def _load_sound(self, wav_path):
if not wav_path:
logging.warning("No 'alert_sound_path' found in config.")
return
try:
# Load as int16 to avoid the Gradio conversion warning
data, sr = sf.read(wav_path, dtype="int16")
self.alert_data = data
self.sample_rate = sr
logging.info(f"Loaded alert sound: {wav_path} ({len(self.alert_data)/self.sample_rate:.2f}s)")
except Exception as e:
logging.error(f"Failed to load alert sound: {e}")
self.alert_data = None
def trigger_alert(self, level, lighting):
"""
Checks conditions and returns audio payload if a new alert should fire.
This is now stateful.
"""
# --- NEW LOGIC: Part 1 ---
# If an alert is currently active, we do nothing until the user is 'Awake'.
if self.is_alert_active:
if level == "Awake":
logging.info("β
Alert state reset. User is Awake. Re-arming system.")
self.is_alert_active = False
return None # Important: Return None to prevent any sound
# --- ORIGINAL LOGIC (with a small change) ---
# If no alert is active, check for conditions to fire a new one.
is_drowsy = level != "Awake"
is_good_light = lighting != "Low"
# The time-based cooldown is still useful to prevent flickering alerts.
is_ready = (time.monotonic() - self.last_alert_time) > self.cooldown_seconds
if self.alert_data is not None and is_drowsy and is_good_light and is_ready:
self.last_alert_time = time.monotonic()
# --- NEW LOGIC: Part 2 ---
# Set the alert to active so it doesn't fire again immediately.
self.is_alert_active = True
logging.info("π Drowsiness detected! Firing alert and setting state to active.")
return (self.sample_rate, self.alert_data.copy())
return None
# Initialize the alert manager
alert_manager = AlertManager(CFG["alerting"])
# βββββββββββββββββββββββββββββ frame processing <--- MAJOR CHANGE
# Simplified by the AlertManager. No longer needs to pass 'last_alert_ts' back and forth.
def process_live_frame(frame):
if frame is None:
return (
np.zeros((480, 640, 3), dtype=np.uint8),
"Status: Inactive",
None # No audio output
)
t0 = time.perf_counter()
try:
# Assuming your detector returns (processed_image, indicators_dict)
processed, indic = detector.process_frame(frame)
except Exception as e:
logging.error(f"Error processing frame: {e}")
processed = np.zeros_like(frame)
indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}}
level = indic.get("drowsiness_level", "Awake")
lighting = indic.get("lighting", "Good")
score = indic.get("details", {}).get("Score", 0.0)
dt_ms = (time.perf_counter() - t0) * 1000.0
logging.info(f"{dt_ms:6.1f} ms β {lighting:<4} β {level:<14} β score={score:.2f}")
status_txt = (
f"Lighting: {lighting}\n"
+ ("Detection paused β low light." if lighting == "Low"
else f"Status: {level}\nScore: {score:.2f}")
)
# Check for an alert and get the audio payload if ready
audio_payload = alert_manager.trigger_alert(level, lighting)
# This is the key: return a new gr.Audio component when an alert fires.
# Otherwise, return None to clear the component on the frontend.
if audio_payload:
return processed, status_txt, gr.Audio(value=audio_payload, autoplay=True)
else:
return processed, status_txt, None
# βββββββββββββββββββββββββββββ UI <--- CHANGE
with gr.Blocks(title="Drive Paddy β Drowsiness Detection") as app:
gr.Markdown("# π **Drive Paddy** β Robust Alert Demo")
gr.Markdown("Webcam-based drowsiness detection Β· console shows real-time logs.")
with gr.Row():
with gr.Column(scale=2):
cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed")
with gr.Column(scale=1):
out_img = gr.Image(label="Processed Feed")
out_text = gr.Textbox(label="Live Status", lines=3, interactive=False)
# This audio component now acts as a placeholder.
# We make it invisible because we don't need to show the player controls.
# The backend will dynamically send a new, playable component to it.
out_audio = gr.Audio(
label="Alert",
autoplay=True,
visible=False, # Hiding the component for a cleaner UX
)
# The gr.State for managing the timestamp is no longer needed, simplifying the stream call.
cam.stream(
fn=process_live_frame,
inputs=[cam],
outputs=[out_img, out_text, out_audio] # The output now targets the placeholder
)
if __name__ == "__main__":
logging.info("Launching Gradio appβ¦")
app.launch(debug=True) |