Testys commited on
Commit
a7ec466
Β·
verified Β·
1 Parent(s): 8e7b22f

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +150 -72
app.py CHANGED
@@ -1,8 +1,3 @@
1
- # app_gradio.py
2
- # ──────────────────────────────────────────────────────────
3
- # Webcam β†’ geometric detector β†’ static WAV alert (with cooldown)
4
- # Live console logs of per-frame latency + status.
5
- # ──────────────────────────────────────────────────────────
6
  import time
7
  import os
8
  import yaml
@@ -10,9 +5,12 @@ import logging
10
  import numpy as np
11
  import gradio as gr
12
  import soundfile as sf
13
-
14
  from dotenv import load_dotenv
15
- from src.detection.factory import get_detector # your existing factory
 
 
 
 
16
 
17
  # ───────────────────────────── logging
18
  logging.basicConfig(
@@ -28,88 +26,168 @@ with open("config.yaml") as f:
28
 
29
  detector = get_detector(CFG)
30
 
31
- # ───────────────────────────── alert sound (read once)
32
- wav_path = CFG["alerting"].get("alert_sound_path")
33
- logging.info(f"Processing {wav_path}")
34
- try:
35
- ALERT_SR, ALERT_DATA = sf.read(wav_path, dtype="float32")
36
- logging.info(f"Loaded alert sound: {wav_path} ({len(ALERT_DATA)/ALERT_SR:.2f}s)")
37
- except Exception as e:
38
- ALERT_SR, ALERT_DATA = None, None
39
- logging.warning(f"Failed to load alert sound: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
 
41
  # ───────────────────────────── frame processing
42
- def process_live_frame(frame, last_alert_ts):
 
43
  if frame is None:
44
  return (
45
  np.zeros((480, 640, 3), dtype=np.uint8),
46
  "Status: Inactive",
47
- None,
48
- last_alert_ts
49
  )
50
-
51
  t0 = time.perf_counter()
52
-
53
  try:
54
- processed, indic, _ = detector.process_frame(frame)
 
55
  except Exception as e:
56
  logging.error(f"Error processing frame: {e}")
57
  processed = np.zeros_like(frame)
58
- indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}}
59
-
 
 
 
 
 
60
  level = indic.get("drowsiness_level", "Awake")
61
  lighting = indic.get("lighting", "Good")
62
  score = indic.get("details", {}).get("Score", 0.0)
63
-
 
64
  dt_ms = (time.perf_counter() - t0) * 1000.0
65
  logging.info(f"{dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")
66
-
67
- status_txt = (
68
- f"Lighting: {lighting}\n"
69
- + ("Detection paused – low light." if lighting == "Low"
70
- else f"Status: {level}\nScore: {score:.2f}")
71
- )
72
-
 
 
73
  audio_out = None
74
- new_last_alert_ts = last_alert_ts
75
- ALERT_COOLDOWN = CFG["alerting"].get("alert_cooldown_seconds", 5)
76
-
77
- if (
78
- ALERT_DATA is not None
79
- and level != "Awake"
80
- and lighting != "Low"
81
- and (time.monotonic() - last_alert_ts) > ALERT_COOLDOWN
82
- ):
83
- new_last_alert_ts = time.monotonic()
84
- audio_out = (ALERT_SR, ALERT_DATA.copy())
85
-
86
- return processed, status_txt, audio_out, new_last_alert_ts
87
-
88
- # ───────────────────────────── UI
89
- with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app:
90
- gr.Markdown("# πŸš— **Drive Paddy** – Static-file Alert Demo")
91
- gr.Markdown("Webcam-based drowsiness detection Β· console shows real-time logs.")
92
-
93
- with gr.Row():
94
- with gr.Column(scale=2):
95
- cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed")
96
- with gr.Column(scale=1):
97
- out_img = gr.Image(label="Processed Feed")
98
- out_text = gr.Textbox(label="Live Status", lines=3, interactive=False)
99
- out_audio = gr.Audio(
100
- label="Alert",
101
- autoplay=True,
102
- type="numpy",
103
- visible=True,
104
- )
105
- last_alert_state = gr.State(value=0.0)
106
-
107
- cam.stream(
108
- fn=process_live_frame,
109
- inputs=[cam, last_alert_state],
110
- outputs=[out_img, out_text, out_audio, last_alert_state]
111
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
112
 
 
113
  if __name__ == "__main__":
114
- logging.info("Launching Gradio app …")
115
- app.launch(debug=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import time
2
  import os
3
  import yaml
 
5
  import numpy as np
6
  import gradio as gr
7
  import soundfile as sf
 
8
  from dotenv import load_dotenv
9
+ from threading import Lock
10
+ from queue import Queue
11
+ import threading
12
+
13
+ from src.detection.factory import get_detector
14
 
15
  # ───────────────────────────── logging
16
  logging.basicConfig(
 
26
 
27
  detector = get_detector(CFG)
28
 
29
+ # ───────────────────────────── alert sound manager
30
+ class AlertManager:
31
+ def __init__(self, wav_path, cooldown_seconds=5):
32
+ self.cooldown_seconds = cooldown_seconds
33
+ self.last_alert_time = 0
34
+ self.lock = Lock()
35
+ self.alert_queue = Queue(maxsize=1)
36
+
37
+ # Load alert sound
38
+ try:
39
+ self.sample_rate, self.audio_data = sf.read(wav_path, dtype="float32")
40
+
41
+ # Ensure stereo format for Gradio
42
+ if self.audio_data.ndim == 1:
43
+ self.audio_data = np.column_stack([self.audio_data, self.audio_data])
44
+
45
+ # Normalize to [-1, 1] range
46
+ max_val = np.abs(self.audio_data).max()
47
+ if max_val > 0:
48
+ self.audio_data = self.audio_data / max_val
49
+
50
+ logging.info(f"Loaded alert sound: {wav_path} "
51
+ f"({len(self.audio_data)/self.sample_rate:.2f}s, "
52
+ f"shape: {self.audio_data.shape})")
53
+ self.is_loaded = True
54
+ except Exception as e:
55
+ logging.error(f"Failed to load alert sound: {e}")
56
+ self.is_loaded = False
57
+ self.sample_rate = 44100
58
+ self.audio_data = None
59
+
60
+ def should_alert(self, drowsiness_level, lighting):
61
+ """Check if we should trigger an alert"""
62
+ if not self.is_loaded:
63
+ return False
64
+
65
+ with self.lock:
66
+ current_time = time.monotonic()
67
+ if (drowsiness_level != "Awake"
68
+ and lighting != "Low"
69
+ and (current_time - self.last_alert_time) > self.cooldown_seconds):
70
+ self.last_alert_time = current_time
71
+ return True
72
+ return False
73
+
74
+ def get_alert_audio(self):
75
+ """Get the alert audio data"""
76
+ if self.is_loaded:
77
+ return (int(self.sample_rate), self.audio_data.copy())
78
+ return None
79
+
80
+ # Initialize alert manager
81
+ alert_manager = AlertManager(
82
+ wav_path=CFG["alerting"].get("alert_sound_path"),
83
+ cooldown_seconds=CFG["alerting"].get("alert_cooldown_seconds", 5)
84
+ )
85
 
86
  # ───────────────────────────── frame processing
87
+ def process_live_frame(frame):
88
+ """Process frame for drowsiness detection"""
89
  if frame is None:
90
  return (
91
  np.zeros((480, 640, 3), dtype=np.uint8),
92
  "Status: Inactive",
93
+ None
 
94
  )
95
+
96
  t0 = time.perf_counter()
97
+
98
  try:
99
+ # Process frame through detector
100
+ processed, indic = detector.process_frame(frame)
101
  except Exception as e:
102
  logging.error(f"Error processing frame: {e}")
103
  processed = np.zeros_like(frame)
104
+ indic = {
105
+ "drowsiness_level": "Error",
106
+ "lighting": "Unknown",
107
+ "details": {"Score": 0.0}
108
+ }
109
+
110
+ # Extract detection results
111
  level = indic.get("drowsiness_level", "Awake")
112
  lighting = indic.get("lighting", "Good")
113
  score = indic.get("details", {}).get("Score", 0.0)
114
+
115
+ # Log performance
116
  dt_ms = (time.perf_counter() - t0) * 1000.0
117
  logging.info(f"{dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")
118
+
119
+ # Create status text
120
+ status_txt = f"Lighting: {lighting}\n"
121
+ if lighting == "Low":
122
+ status_txt += "Detection paused – low light."
123
+ else:
124
+ status_txt += f"Status: {level}\nScore: {score:.2f}"
125
+
126
+ # Check if we should trigger alert
127
  audio_out = None
128
+ if alert_manager.should_alert(level, lighting):
129
+ audio_out = alert_manager.get_alert_audio()
130
+ if audio_out:
131
+ logging.info("πŸ”Š Alert triggered!")
132
+
133
+ return processed, status_txt, audio_out
134
+
135
+ # ───────────────────────────── UI with error handling
136
+ def create_ui():
137
+ with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app:
138
+ gr.Markdown("# πŸš— **Drive Paddy** – Real-time Drowsiness Detection")
139
+ gr.Markdown("Webcam-based drowsiness detection with audio alerts.")
140
+
141
+ with gr.Row():
142
+ with gr.Column(scale=2):
143
+ cam = gr.Image(
144
+ sources=["webcam"],
145
+ streaming=True,
146
+ label="Live Camera Feed"
147
+ )
148
+
149
+ with gr.Column(scale=1):
150
+ out_img = gr.Image(label="Processed Feed")
151
+ out_text = gr.Textbox(
152
+ label="Live Status",
153
+ lines=4,
154
+ interactive=False
155
+ )
156
+ out_audio = gr.Audio(
157
+ label="Alert Sound",
158
+ autoplay=True,
159
+ type="numpy",
160
+ visible=True,
161
+ )
162
+
163
+ # Add system info
164
+ with gr.Row():
165
+ gr.Markdown(f"""
166
+ **System Info:**
167
+ - Alert cooldown: {CFG['alerting'].get('alert_cooldown_seconds', 5)}s
168
+ - Alert sound loaded: {'βœ“' if alert_manager.is_loaded else 'βœ—'}
169
+ """)
170
+
171
+ # Connect the streaming
172
+ cam.stream(
173
+ fn=process_live_frame,
174
+ inputs=[cam],
175
+ outputs=[out_img, out_text, out_audio]
176
+ )
177
+
178
+ return app
179
 
180
+ # ───────────────────────────── main
181
  if __name__ == "__main__":
182
+ logging.info("Initializing Drive Paddy...")
183
+
184
+ # Create and launch app
185
+ app = create_ui()
186
+
187
+ logging.info("Launching Gradio app...")
188
+ app.launch(
189
+ debug=True,
190
+ share=False, # Set to True if you want a public link
191
+ server_name="0.0.0.0", # Allow external connections
192
+ server_port=7860
193
+ )