Testys commited on
Commit
60b383e
Β·
verified Β·
1 Parent(s): 57edb95

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +44 -28
app.py CHANGED
@@ -3,10 +3,16 @@
3
  # Webcam β†’ geometric detector β†’ static WAV alert (with cooldown)
4
  # Live console logs of per-frame latency + status.
5
  # ──────────────────────────────────────────────────────────
6
- import time, os, yaml, logging, numpy as np, gradio as gr, soundfile as sf
7
- from dotenv import load_dotenv
 
 
 
 
 
8
 
9
- from src.detection.factory import get_detector # your existing factory
 
10
 
11
  # ───────────────────────────── logging
12
  logging.basicConfig(
@@ -23,31 +29,36 @@ with open("config.yaml") as f:
23
  detector = get_detector(CFG)
24
 
25
  # ───────────────────────────── alert sound (read once)
26
- wav_path = CFG["alerting"]["alert_sound_path"]
27
  try:
28
- ALERT_SR, ALERT_DATA = sf.read(wav_path, dtype="float32") # (sr, np.ndarray)
29
  logging.info(f"Loaded alert sound: {wav_path} ({len(ALERT_DATA)/ALERT_SR:.2f}s)")
30
  except Exception as e:
31
  ALERT_SR, ALERT_DATA = None, None
32
  logging.warning(f"Failed to load alert sound: {e}")
33
 
34
- # ───────────────────────────── simple cooldown
35
- ALERT_COOLDOWN = CFG["alerting"].get("alert_cooldown_seconds", 7)
36
- _last_alert_ts = 0.0
37
-
38
- # ───────────────────────────── frame callback
39
- def process_live_frame(frame):
40
- global _last_alert_ts
41
-
42
  if frame is None:
43
- return np.zeros((480, 640, 3), np.uint8), "Status: Inactive", None
 
 
 
 
 
44
 
45
  t0 = time.perf_counter()
46
 
47
- processed, indic, _ = detector.process_frame(frame)
48
- level = indic.get("drowsiness_level", "Awake")
 
 
 
 
 
 
49
  lighting = indic.get("lighting", "Good")
50
- score = indic.get("details", {}).get("Score", 0.0)
51
 
52
  dt_ms = (time.perf_counter() - t0) * 1000.0
53
  logging.info(f"{dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")
@@ -58,18 +69,20 @@ def process_live_frame(frame):
58
  else f"Status: {level}\nScore: {score:.2f}")
59
  )
60
 
61
- # decide whether to play the alert
62
  audio_out = None
 
 
 
63
  if (
64
  ALERT_DATA is not None
65
  and level != "Awake"
66
  and lighting != "Low"
67
- and (time.time() - _last_alert_ts) > ALERT_COOLDOWN
68
  ):
69
- _last_alert_ts = time.time()
70
- audio_out = (ALERT_SR, ALERT_DATA.copy()) # hand a fresh copy to Gradio
71
 
72
- return processed, status_txt, audio_out
73
 
74
  # ───────────────────────────── UI
75
  with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app:
@@ -80,19 +93,22 @@ with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app:
80
  with gr.Column(scale=2):
81
  cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed")
82
  with gr.Column(scale=1):
83
- out_img = gr.Image(label="Processed Feed")
84
  out_text = gr.Textbox(label="Live Status", lines=3, interactive=False)
85
  out_audio = gr.Audio(
86
  label="Alert",
87
  autoplay=True,
88
- type="numpy", # expects (sr, np.ndarray)
89
  visible=True,
90
  )
 
91
 
92
- cam.stream(fn=process_live_frame,
93
- inputs=cam,
94
- outputs=[out_img, out_text, out_audio])
 
 
95
 
96
  if __name__ == "__main__":
97
  logging.info("Launching Gradio app …")
98
- app.launch(debug=True)
 
3
  # Webcam β†’ geometric detector β†’ static WAV alert (with cooldown)
4
  # Live console logs of per-frame latency + status.
5
  # ──────────────────────────────────────────────────────────
6
+ import time
7
+ import os
8
+ import yaml
9
+ import logging
10
+ import numpy as np
11
+ import gradio as gr
12
+ import soundfile as sf
13
 
14
+ from dotenv import load_dotenv
15
+ from src.detection.factory import get_detector # your existing factory
16
 
17
  # ───────────────────────────── logging
18
  logging.basicConfig(
 
29
  detector = get_detector(CFG)
30
 
31
  # ───────────────────────────── alert sound (read once)
32
+ wav_path = CFG["alerting"].get("alert_sound_path", "alert.wav")
33
  try:
34
+ ALERT_SR, ALERT_DATA = sf.read(wav_path, dtype="float32")
35
  logging.info(f"Loaded alert sound: {wav_path} ({len(ALERT_DATA)/ALERT_SR:.2f}s)")
36
  except Exception as e:
37
  ALERT_SR, ALERT_DATA = None, None
38
  logging.warning(f"Failed to load alert sound: {e}")
39
 
40
+ # ───────────────────────────── frame processing
41
+ def process_live_frame(frame, last_alert_ts):
 
 
 
 
 
 
42
  if frame is None:
43
+ return (
44
+ np.zeros((480, 640, 3), dtype=np.uint8),
45
+ "Status: Inactive",
46
+ None,
47
+ last_alert_ts
48
+ )
49
 
50
  t0 = time.perf_counter()
51
 
52
+ try:
53
+ processed, indic, _ = detector.process_frame(frame)
54
+ except Exception as e:
55
+ logging.error(f"Error processing frame: {e}")
56
+ processed = np.zeros_like(frame)
57
+ indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}}
58
+
59
+ level = indic.get("drowsiness_level", "Awake")
60
  lighting = indic.get("lighting", "Good")
61
+ score = indic.get("details", {}).get("Score", 0.0)
62
 
63
  dt_ms = (time.perf_counter() - t0) * 1000.0
64
  logging.info(f"{dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")
 
69
  else f"Status: {level}\nScore: {score:.2f}")
70
  )
71
 
 
72
  audio_out = None
73
+ new_last_alert_ts = last_alert_ts
74
+ ALERT_COOLDOWN = CFG["alerting"].get("alert_cooldown_seconds", 7)
75
+
76
  if (
77
  ALERT_DATA is not None
78
  and level != "Awake"
79
  and lighting != "Low"
80
+ and (time.monotonic() - last_alert_ts) > ALERT_COOLDOWN
81
  ):
82
+ new_last_alert_ts = time.monotonic()
83
+ audio_out = (ALERT_SR, ALERT_DATA.copy())
84
 
85
+ return processed, status_txt, audio_out, new_last_alert_ts
86
 
87
  # ───────────────────────────── UI
88
  with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app:
 
93
  with gr.Column(scale=2):
94
  cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed")
95
  with gr.Column(scale=1):
96
+ out_img = gr.Image(label="Processed Feed")
97
  out_text = gr.Textbox(label="Live Status", lines=3, interactive=False)
98
  out_audio = gr.Audio(
99
  label="Alert",
100
  autoplay=True,
101
+ type="numpy",
102
  visible=True,
103
  )
104
+ last_alert_state = gr.State(value=0.0)
105
 
106
+ cam.stream(
107
+ fn=process_live_frame,
108
+ inputs=[cam, last_alert_state],
109
+ outputs=[out_img, out_text, out_audio, last_alert_state]
110
+ )
111
 
112
  if __name__ == "__main__":
113
  logging.info("Launching Gradio app …")
114
+ app.launch(debug=True)