Spaces:
Sleeping
Sleeping
# app_gradio.py | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
# Webcam β geometric detector β static WAV alert (with cooldown) | |
# Live console logs of per-frame latency + status. | |
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
import time | |
import os | |
import yaml | |
import logging | |
import numpy as np | |
import gradio as gr | |
import soundfile as sf | |
from dotenv import load_dotenv | |
from src.detection.factory import get_detector # your existing factory | |
# βββββββββββββββββββββββββββββ logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s β %(message)s", | |
datefmt="%H:%M:%S", | |
) | |
# βββββββββββββββββββββββββββββ config / detector | |
load_dotenv() | |
with open("config.yaml") as f: | |
CFG = yaml.safe_load(f) | |
detector = get_detector(CFG) | |
# βββββββββββββββββββββββββββββ alert sound (read once) | |
wav_path = CFG["alerting"].get("alert_sound_path") | |
logging.info(f"Processing {wav_path}") | |
try: | |
ALERT_SR, ALERT_DATA = sf.read(wav_path, dtype="float32") | |
logging.info(f"Loaded alert sound: {wav_path} ({len(ALERT_DATA)/ALERT_SR:.2f}s)") | |
except Exception as e: | |
ALERT_SR, ALERT_DATA = None, None | |
logging.warning(f"Failed to load alert sound: {e}") | |
# βββββββββββββββββββββββββββββ frame processing | |
def process_live_frame(frame, last_alert_ts): | |
if frame is None: | |
return ( | |
np.zeros((480, 640, 3), dtype=np.uint8), | |
"Status: Inactive", | |
None, | |
last_alert_ts | |
) | |
t0 = time.perf_counter() | |
try: | |
processed, indic, _ = detector.process_frame(frame) | |
except Exception as e: | |
logging.error(f"Error processing frame: {e}") | |
processed = np.zeros_like(frame) | |
indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}} | |
level = indic.get("drowsiness_level", "Awake") | |
lighting = indic.get("lighting", "Good") | |
score = indic.get("details", {}).get("Score", 0.0) | |
dt_ms = (time.perf_counter() - t0) * 1000.0 | |
logging.info(f"{dt_ms:6.1f} ms β {lighting:<4} β {level:<14} β score={score:.2f}") | |
status_txt = ( | |
f"Lighting: {lighting}\n" | |
+ ("Detection paused β low light." if lighting == "Low" | |
else f"Status: {level}\nScore: {score:.2f}") | |
) | |
audio_out = None | |
new_last_alert_ts = last_alert_ts | |
ALERT_COOLDOWN = CFG["alerting"].get("alert_cooldown_seconds", 5) | |
if ( | |
ALERT_DATA is not None | |
and level != "Awake" | |
and lighting != "Low" | |
and (time.monotonic() - last_alert_ts) > ALERT_COOLDOWN | |
): | |
new_last_alert_ts = time.monotonic() | |
audio_out = (ALERT_SR, ALERT_DATA.copy()) | |
return processed, status_txt, audio_out, new_last_alert_ts | |
# βββββββββββββββββββββββββββββ UI | |
with gr.Blocks(title="Drive Paddy β Drowsiness Detection") as app: | |
gr.Markdown("# π **Drive Paddy** β Static-file Alert Demo") | |
gr.Markdown("Webcam-based drowsiness detection Β· console shows real-time logs.") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed") | |
with gr.Column(scale=1): | |
out_img = gr.Image(label="Processed Feed") | |
out_text = gr.Textbox(label="Live Status", lines=3, interactive=False) | |
out_audio = gr.Audio( | |
label="Alert", | |
autoplay=True, | |
type="numpy", | |
visible=True, | |
) | |
last_alert_state = gr.State(value=0.0) | |
cam.stream( | |
fn=process_live_frame, | |
inputs=[cam, last_alert_state], | |
outputs=[out_img, out_text, out_audio, last_alert_state] | |
) | |
if __name__ == "__main__": | |
logging.info("Launching Gradio app β¦") | |
app.launch(debug=True) |