File size: 4,022 Bytes
5101617
ba6a8ea
935769d
 
ba6a8ea
935769d
 
 
 
ba6a8ea
935769d
b9d2bb7
 
935769d
b9d2bb7
 
 
935769d
 
 
 
b9d2bb7
935769d
ba6a8ea
935769d
 
 
 
 
 
 
 
ba6a8ea
935769d
 
 
ba6a8ea
935769d
3a175cd
935769d
 
3a175cd
ba6a8ea
 
935769d
ba6a8ea
935769d
 
 
 
 
 
b9d2bb7
 
935769d
 
 
 
 
ba6a8ea
935769d
 
 
 
 
 
 
 
 
 
ba6a8ea
 
 
935769d
 
 
 
5101617
8977de8
95b307f
935769d
95b307f
935769d
 
 
 
 
 
 
 
 
 
 
 
5101617
8977de8
b9d2bb7
ba6a8ea
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# app_gradio.py
# ──────────────────────────────────────────────────────────
# Webcam β†’ geometric detector β†’ static WAV alert (with cooldown)
# Live console logs of per-frame latency + status.
# ──────────────────────────────────────────────────────────
import time, os, yaml, logging, numpy as np, gradio as gr, soundfile as sf
from dotenv import load_dotenv

from src.detection.factory import get_detector    # your existing factory

# ───────────────────────────── logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s β”‚ %(message)s",
    datefmt="%H:%M:%S",
)

# ───────────────────────────── config / detector
load_dotenv()
with open("config.yaml") as f:
    CFG = yaml.safe_load(f)

detector = get_detector(CFG)

# ───────────────────────────── alert sound (read once)
wav_path = CFG["alerting"]["alert_sound_path"]
try:
    ALERT_SR, ALERT_DATA = sf.read(wav_path, dtype="float32")  #  (sr, np.ndarray)
    logging.info(f"Loaded alert sound: {wav_path}  ({len(ALERT_DATA)/ALERT_SR:.2f}s)")
except Exception as e:
    ALERT_SR, ALERT_DATA = None, None
    logging.warning(f"Failed to load alert sound: {e}")

# ───────────────────────────── simple cooldown
ALERT_COOLDOWN = CFG["alerting"].get("alert_cooldown_seconds", 7)
_last_alert_ts = 0.0

# ───────────────────────────── frame callback
def process_live_frame(frame):
    global _last_alert_ts

    if frame is None:
        return np.zeros((480, 640, 3), np.uint8), "Status: Inactive", None

    t0 = time.perf_counter()

    processed, indic, _ = detector.process_frame(frame)
    level    = indic.get("drowsiness_level", "Awake")
    lighting = indic.get("lighting", "Good")
    score    = indic.get("details", {}).get("Score", 0.0)

    dt_ms = (time.perf_counter() - t0) * 1000.0
    logging.info(f"{dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")

    status_txt = (
        f"Lighting: {lighting}\n"
        + ("Detection paused – low light." if lighting == "Low"
           else f"Status: {level}\nScore: {score:.2f}")
    )

    # decide whether to play the alert
    audio_out = None
    if (
        ALERT_DATA is not None
        and level != "Awake"
        and lighting != "Low"
        and (time.time() - _last_alert_ts) > ALERT_COOLDOWN
    ):
        _last_alert_ts = time.time()
        audio_out = (ALERT_SR, ALERT_DATA.copy())     # hand a fresh copy to Gradio

    return processed, status_txt, audio_out

# ───────────────────────────── UI
with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app:
    gr.Markdown("# πŸš— **Drive Paddy** – Static-file Alert Demo")
    gr.Markdown("Webcam-based drowsiness detection Β· console shows real-time logs.")

    with gr.Row():
        with gr.Column(scale=2):
            cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed")
        with gr.Column(scale=1):
            out_img  = gr.Image(label="Processed Feed")
            out_text = gr.Textbox(label="Live Status", lines=3, interactive=False)
            out_audio = gr.Audio(
                label="Alert",
                autoplay=True,
                type="numpy",        # expects (sr, np.ndarray)
                visible=True,
            )

    cam.stream(fn=process_live_frame,
               inputs=cam,
               outputs=[out_img, out_text, out_audio])

if __name__ == "__main__":
    logging.info("Launching Gradio app …")
    app.launch(debug=True)