drive-paddy / app.py
Testys's picture
Update app.py
57edb95 verified
raw
history blame
4.02 kB
# app_gradio.py
# ──────────────────────────────────────────────────────────
# Webcam β†’ geometric detector β†’ static WAV alert (with cooldown)
# Live console logs of per-frame latency + status.
# ──────────────────────────────────────────────────────────
import time, os, yaml, logging, numpy as np, gradio as gr, soundfile as sf
from dotenv import load_dotenv
from src.detection.factory import get_detector # your existing factory
# ───────────────────────────── logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s β”‚ %(message)s",
datefmt="%H:%M:%S",
)
# ───────────────────────────── config / detector
load_dotenv()
with open("config.yaml") as f:
CFG = yaml.safe_load(f)
detector = get_detector(CFG)
# ───────────────────────────── alert sound (read once)
wav_path = CFG["alerting"]["alert_sound_path"]
try:
ALERT_SR, ALERT_DATA = sf.read(wav_path, dtype="float32") # (sr, np.ndarray)
logging.info(f"Loaded alert sound: {wav_path} ({len(ALERT_DATA)/ALERT_SR:.2f}s)")
except Exception as e:
ALERT_SR, ALERT_DATA = None, None
logging.warning(f"Failed to load alert sound: {e}")
# ───────────────────────────── simple cooldown
ALERT_COOLDOWN = CFG["alerting"].get("alert_cooldown_seconds", 7)
_last_alert_ts = 0.0
# ───────────────────────────── frame callback
def process_live_frame(frame):
global _last_alert_ts
if frame is None:
return np.zeros((480, 640, 3), np.uint8), "Status: Inactive", None
t0 = time.perf_counter()
processed, indic, _ = detector.process_frame(frame)
level = indic.get("drowsiness_level", "Awake")
lighting = indic.get("lighting", "Good")
score = indic.get("details", {}).get("Score", 0.0)
dt_ms = (time.perf_counter() - t0) * 1000.0
logging.info(f"{dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")
status_txt = (
f"Lighting: {lighting}\n"
+ ("Detection paused – low light." if lighting == "Low"
else f"Status: {level}\nScore: {score:.2f}")
)
# decide whether to play the alert
audio_out = None
if (
ALERT_DATA is not None
and level != "Awake"
and lighting != "Low"
and (time.time() - _last_alert_ts) > ALERT_COOLDOWN
):
_last_alert_ts = time.time()
audio_out = (ALERT_SR, ALERT_DATA.copy()) # hand a fresh copy to Gradio
return processed, status_txt, audio_out
# ───────────────────────────── UI
with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app:
gr.Markdown("# πŸš— **Drive Paddy** – Static-file Alert Demo")
gr.Markdown("Webcam-based drowsiness detection Β· console shows real-time logs.")
with gr.Row():
with gr.Column(scale=2):
cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed")
with gr.Column(scale=1):
out_img = gr.Image(label="Processed Feed")
out_text = gr.Textbox(label="Live Status", lines=3, interactive=False)
out_audio = gr.Audio(
label="Alert",
autoplay=True,
type="numpy", # expects (sr, np.ndarray)
visible=True,
)
cam.stream(fn=process_live_frame,
inputs=cam,
outputs=[out_img, out_text, out_audio])
if __name__ == "__main__":
logging.info("Launching Gradio app …")
app.launch(debug=True)