File size: 4,301 Bytes
5101617
ba6a8ea
935769d
 
ba6a8ea
60b383e
 
 
 
 
 
 
935769d
60b383e
 
ba6a8ea
935769d
b9d2bb7
 
935769d
b9d2bb7
 
 
935769d
 
 
 
b9d2bb7
935769d
ba6a8ea
935769d
8e7b22f
 
935769d
60b383e
935769d
 
 
 
ba6a8ea
60b383e
 
3a175cd
60b383e
 
 
 
 
 
ba6a8ea
935769d
ba6a8ea
60b383e
 
 
 
 
 
 
 
935769d
60b383e
935769d
 
b9d2bb7
 
935769d
 
 
 
 
ba6a8ea
935769d
60b383e
8e7b22f
60b383e
935769d
 
 
 
60b383e
935769d
60b383e
 
ba6a8ea
60b383e
ba6a8ea
935769d
 
 
 
5101617
8977de8
95b307f
935769d
95b307f
60b383e
935769d
 
 
 
60b383e
935769d
 
60b383e
935769d
60b383e
 
 
 
 
5101617
8977de8
b9d2bb7
60b383e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# app_gradio.py
# ──────────────────────────────────────────────────────────
# Webcam β†’ geometric detector β†’ static WAV alert (with cooldown)
# Live console logs of per-frame latency + status.
# ──────────────────────────────────────────────────────────
import time
import os
import yaml
import logging
import numpy as np
import gradio as gr
import soundfile as sf

from dotenv import load_dotenv
from src.detection.factory import get_detector  # your existing factory

# ───────────────────────────── logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s β”‚ %(message)s",
    datefmt="%H:%M:%S",
)

# ───────────────────────────── config / detector
load_dotenv()
with open("config.yaml") as f:
    CFG = yaml.safe_load(f)

detector = get_detector(CFG)

# ───────────────────────────── alert sound (read once)
wav_path = CFG["alerting"].get("alert_sound_path")
logging.info(f"Processing {wav_path}")
try:
    ALERT_SR, ALERT_DATA = sf.read(wav_path, dtype="float32")
    logging.info(f"Loaded alert sound: {wav_path}  ({len(ALERT_DATA)/ALERT_SR:.2f}s)")
except Exception as e:
    ALERT_SR, ALERT_DATA = None, None
    logging.warning(f"Failed to load alert sound: {e}")

# ───────────────────────────── frame processing
def process_live_frame(frame, last_alert_ts):
    if frame is None:
        return (
            np.zeros((480, 640, 3), dtype=np.uint8),
            "Status: Inactive",
            None,
            last_alert_ts
        )

    t0 = time.perf_counter()

    try:
        processed, indic, _ = detector.process_frame(frame)
    except Exception as e:
        logging.error(f"Error processing frame: {e}")
        processed = np.zeros_like(frame)
        indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}}

    level = indic.get("drowsiness_level", "Awake")
    lighting = indic.get("lighting", "Good")
    score = indic.get("details", {}).get("Score", 0.0)

    dt_ms = (time.perf_counter() - t0) * 1000.0
    logging.info(f"{dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")

    status_txt = (
        f"Lighting: {lighting}\n"
        + ("Detection paused – low light." if lighting == "Low"
           else f"Status: {level}\nScore: {score:.2f}")
    )

    audio_out = None
    new_last_alert_ts = last_alert_ts
    ALERT_COOLDOWN = CFG["alerting"].get("alert_cooldown_seconds", 5)

    if (
        ALERT_DATA is not None
        and level != "Awake"
        and lighting != "Low"
        and (time.monotonic() - last_alert_ts) > ALERT_COOLDOWN
    ):
        new_last_alert_ts = time.monotonic()
        audio_out = (ALERT_SR, ALERT_DATA.copy())

    return processed, status_txt, audio_out, new_last_alert_ts

# ───────────────────────────── UI
with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app:
    gr.Markdown("# πŸš— **Drive Paddy** – Static-file Alert Demo")
    gr.Markdown("Webcam-based drowsiness detection Β· console shows real-time logs.")

    with gr.Row():
        with gr.Column(scale=2):
            cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed")
        with gr.Column(scale=1):
            out_img = gr.Image(label="Processed Feed")
            out_text = gr.Textbox(label="Live Status", lines=3, interactive=False)
            out_audio = gr.Audio(
                label="Alert",
                autoplay=True,
                type="numpy",
                visible=True,
            )
            last_alert_state = gr.State(value=0.0)

    cam.stream(
        fn=process_live_frame,
        inputs=[cam, last_alert_state],
        outputs=[out_img, out_text, out_audio, last_alert_state]
    )

if __name__ == "__main__":
    logging.info("Launching Gradio app …")
    app.launch(debug=True)