drive-paddy / app.py
Testys's picture
Update app.py
8e7b22f verified
raw
history blame
4.3 kB
# app_gradio.py
# ──────────────────────────────────────────────────────────
# Webcam β†’ geometric detector β†’ static WAV alert (with cooldown)
# Live console logs of per-frame latency + status.
# ──────────────────────────────────────────────────────────
import time
import os
import yaml
import logging
import numpy as np
import gradio as gr
import soundfile as sf
from dotenv import load_dotenv
from src.detection.factory import get_detector # your existing factory
# ───────────────────────────── logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s β”‚ %(message)s",
datefmt="%H:%M:%S",
)
# ───────────────────────────── config / detector
load_dotenv()
with open("config.yaml") as f:
CFG = yaml.safe_load(f)
detector = get_detector(CFG)
# ───────────────────────────── alert sound (read once)
wav_path = CFG["alerting"].get("alert_sound_path")
logging.info(f"Processing {wav_path}")
try:
ALERT_SR, ALERT_DATA = sf.read(wav_path, dtype="float32")
logging.info(f"Loaded alert sound: {wav_path} ({len(ALERT_DATA)/ALERT_SR:.2f}s)")
except Exception as e:
ALERT_SR, ALERT_DATA = None, None
logging.warning(f"Failed to load alert sound: {e}")
# ───────────────────────────── frame processing
def process_live_frame(frame, last_alert_ts):
if frame is None:
return (
np.zeros((480, 640, 3), dtype=np.uint8),
"Status: Inactive",
None,
last_alert_ts
)
t0 = time.perf_counter()
try:
processed, indic, _ = detector.process_frame(frame)
except Exception as e:
logging.error(f"Error processing frame: {e}")
processed = np.zeros_like(frame)
indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}}
level = indic.get("drowsiness_level", "Awake")
lighting = indic.get("lighting", "Good")
score = indic.get("details", {}).get("Score", 0.0)
dt_ms = (time.perf_counter() - t0) * 1000.0
logging.info(f"{dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")
status_txt = (
f"Lighting: {lighting}\n"
+ ("Detection paused – low light." if lighting == "Low"
else f"Status: {level}\nScore: {score:.2f}")
)
audio_out = None
new_last_alert_ts = last_alert_ts
ALERT_COOLDOWN = CFG["alerting"].get("alert_cooldown_seconds", 5)
if (
ALERT_DATA is not None
and level != "Awake"
and lighting != "Low"
and (time.monotonic() - last_alert_ts) > ALERT_COOLDOWN
):
new_last_alert_ts = time.monotonic()
audio_out = (ALERT_SR, ALERT_DATA.copy())
return processed, status_txt, audio_out, new_last_alert_ts
# ───────────────────────────── UI
with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app:
gr.Markdown("# πŸš— **Drive Paddy** – Static-file Alert Demo")
gr.Markdown("Webcam-based drowsiness detection Β· console shows real-time logs.")
with gr.Row():
with gr.Column(scale=2):
cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed")
with gr.Column(scale=1):
out_img = gr.Image(label="Processed Feed")
out_text = gr.Textbox(label="Live Status", lines=3, interactive=False)
out_audio = gr.Audio(
label="Alert",
autoplay=True,
type="numpy",
visible=True,
)
last_alert_state = gr.State(value=0.0)
cam.stream(
fn=process_live_frame,
inputs=[cam, last_alert_state],
outputs=[out_img, out_text, out_audio, last_alert_state]
)
if __name__ == "__main__":
logging.info("Launching Gradio app …")
app.launch(debug=True)