Testys commited on
Commit
8ae9de4
Β·
verified Β·
1 Parent(s): a7ec466

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +94 -138
app.py CHANGED
@@ -1,3 +1,11 @@
 
 
 
 
 
 
 
 
1
  import time
2
  import os
3
  import yaml
@@ -6,10 +14,9 @@ import numpy as np
6
  import gradio as gr
7
  import soundfile as sf
8
  from dotenv import load_dotenv
9
- from threading import Lock
10
- from queue import Queue
11
- import threading
12
 
 
 
13
  from src.detection.factory import get_detector
14
 
15
  # ───────────────────────────── logging
@@ -26,168 +33,117 @@ with open("config.yaml") as f:
26
 
27
  detector = get_detector(CFG)
28
 
29
- # ───────────────────────────── alert sound manager
 
 
30
  class AlertManager:
31
- def __init__(self, wav_path, cooldown_seconds=5):
32
- self.cooldown_seconds = cooldown_seconds
33
  self.last_alert_time = 0
34
- self.lock = Lock()
35
- self.alert_queue = Queue(maxsize=1)
36
-
37
- # Load alert sound
 
 
 
 
38
  try:
39
- self.sample_rate, self.audio_data = sf.read(wav_path, dtype="float32")
40
-
41
- # Ensure stereo format for Gradio
42
- if self.audio_data.ndim == 1:
43
- self.audio_data = np.column_stack([self.audio_data, self.audio_data])
44
-
45
- # Normalize to [-1, 1] range
46
- max_val = np.abs(self.audio_data).max()
47
- if max_val > 0:
48
- self.audio_data = self.audio_data / max_val
49
-
50
- logging.info(f"Loaded alert sound: {wav_path} "
51
- f"({len(self.audio_data)/self.sample_rate:.2f}s, "
52
- f"shape: {self.audio_data.shape})")
53
- self.is_loaded = True
54
  except Exception as e:
55
  logging.error(f"Failed to load alert sound: {e}")
56
- self.is_loaded = False
57
- self.sample_rate = 44100
58
- self.audio_data = None
59
-
60
- def should_alert(self, drowsiness_level, lighting):
61
- """Check if we should trigger an alert"""
62
- if not self.is_loaded:
63
- return False
 
 
 
 
64
 
65
- with self.lock:
66
- current_time = time.monotonic()
67
- if (drowsiness_level != "Awake"
68
- and lighting != "Low"
69
- and (current_time - self.last_alert_time) > self.cooldown_seconds):
70
- self.last_alert_time = current_time
71
- return True
72
- return False
73
-
74
- def get_alert_audio(self):
75
- """Get the alert audio data"""
76
- if self.is_loaded:
77
- return (int(self.sample_rate), self.audio_data.copy())
78
  return None
79
 
80
- # Initialize alert manager
81
- alert_manager = AlertManager(
82
- wav_path=CFG["alerting"].get("alert_sound_path"),
83
- cooldown_seconds=CFG["alerting"].get("alert_cooldown_seconds", 5)
84
- )
85
 
86
- # ───────────────────────────── frame processing
 
87
  def process_live_frame(frame):
88
- """Process frame for drowsiness detection"""
89
  if frame is None:
90
  return (
91
  np.zeros((480, 640, 3), dtype=np.uint8),
92
  "Status: Inactive",
93
- None
94
  )
95
-
96
  t0 = time.perf_counter()
97
-
98
  try:
99
- # Process frame through detector
100
  processed, indic = detector.process_frame(frame)
101
  except Exception as e:
102
  logging.error(f"Error processing frame: {e}")
103
  processed = np.zeros_like(frame)
104
- indic = {
105
- "drowsiness_level": "Error",
106
- "lighting": "Unknown",
107
- "details": {"Score": 0.0}
108
- }
109
-
110
- # Extract detection results
111
  level = indic.get("drowsiness_level", "Awake")
112
  lighting = indic.get("lighting", "Good")
113
  score = indic.get("details", {}).get("Score", 0.0)
114
-
115
- # Log performance
116
  dt_ms = (time.perf_counter() - t0) * 1000.0
117
  logging.info(f"{dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")
118
-
119
- # Create status text
120
- status_txt = f"Lighting: {lighting}\n"
121
- if lighting == "Low":
122
- status_txt += "Detection paused – low light."
 
 
 
 
 
 
 
 
 
123
  else:
124
- status_txt += f"Status: {level}\nScore: {score:.2f}"
125
-
126
- # Check if we should trigger alert
127
- audio_out = None
128
- if alert_manager.should_alert(level, lighting):
129
- audio_out = alert_manager.get_alert_audio()
130
- if audio_out:
131
- logging.info("πŸ”Š Alert triggered!")
132
-
133
- return processed, status_txt, audio_out
134
-
135
- # ───────────────────────────── UI with error handling
136
- def create_ui():
137
- with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app:
138
- gr.Markdown("# πŸš— **Drive Paddy** – Real-time Drowsiness Detection")
139
- gr.Markdown("Webcam-based drowsiness detection with audio alerts.")
140
-
141
- with gr.Row():
142
- with gr.Column(scale=2):
143
- cam = gr.Image(
144
- sources=["webcam"],
145
- streaming=True,
146
- label="Live Camera Feed"
147
- )
148
 
149
- with gr.Column(scale=1):
150
- out_img = gr.Image(label="Processed Feed")
151
- out_text = gr.Textbox(
152
- label="Live Status",
153
- lines=4,
154
- interactive=False
155
- )
156
- out_audio = gr.Audio(
157
- label="Alert Sound",
158
- autoplay=True,
159
- type="numpy",
160
- visible=True,
161
- )
162
-
163
- # Add system info
164
- with gr.Row():
165
- gr.Markdown(f"""
166
- **System Info:**
167
- - Alert cooldown: {CFG['alerting'].get('alert_cooldown_seconds', 5)}s
168
- - Alert sound loaded: {'βœ“' if alert_manager.is_loaded else 'βœ—'}
169
- """)
170
-
171
- # Connect the streaming
172
- cam.stream(
173
- fn=process_live_frame,
174
- inputs=[cam],
175
- outputs=[out_img, out_text, out_audio]
176
- )
177
-
178
- return app
179
 
180
- # ───────────────────────────── main
181
  if __name__ == "__main__":
182
- logging.info("Initializing Drive Paddy...")
183
-
184
- # Create and launch app
185
- app = create_ui()
186
-
187
- logging.info("Launching Gradio app...")
188
- app.launch(
189
- debug=True,
190
- share=False, # Set to True if you want a public link
191
- server_name="0.0.0.0", # Allow external connections
192
- server_port=7860
193
- )
 
1
+ # app_gradio.py
2
+ # ──────────────────────────────────────────────────────────
3
+ # Webcam β†’ geometric detector β†’ static WAV alert (with cooldown)
4
+ # Live console logs of per-frame latency + status.
5
+ #
6
+ # EDITED: This version uses a more robust method for audio playback
7
+ # in Gradio by dynamically creating the Audio component.
8
+ # ──────────────────────────────────────────────────────────
9
  import time
10
  import os
11
  import yaml
 
14
  import gradio as gr
15
  import soundfile as sf
16
  from dotenv import load_dotenv
 
 
 
17
 
18
+ # This is a mock factory and detector for demonstration.
19
+ # Replace with your actual import.
20
  from src.detection.factory import get_detector
21
 
22
  # ───────────────────────────── logging
 
33
 
34
  detector = get_detector(CFG)
35
 
36
+ # ───────────────────────────── Alert Manager Class <--- CHANGE
37
+ # Encapsulating the alert logic makes the code much cleaner.
38
+ # It handles its own state (last alert time) internally.
39
  class AlertManager:
40
+ def __init__(self, config):
41
+ self.cooldown_seconds = config.get("alert_cooldown_seconds", 5)
42
  self.last_alert_time = 0
43
+ self.alert_data = None
44
+ self.sample_rate = None
45
+ self._load_sound(config.get("alert_sound_path"))
46
+
47
+ def _load_sound(self, wav_path):
48
+ if not wav_path:
49
+ logging.warning("No 'alert_sound_path' found in config.")
50
+ return
51
  try:
52
+ data, sr = sf.read(wav_path, dtype="float32")
53
+ self.alert_data = data
54
+ self.sample_rate = sr
55
+ logging.info(f"Loaded alert sound: {wav_path} ({len(self.alert_data)/self.sample_rate:.2f}s)")
 
 
 
 
 
 
 
 
 
 
 
56
  except Exception as e:
57
  logging.error(f"Failed to load alert sound: {e}")
58
+ self.alert_data = None
59
+
60
+ def trigger_alert(self, level, lighting):
61
+ """Checks conditions and returns audio payload if an alert should fire."""
62
+ is_drowsy = level != "Awake"
63
+ is_good_light = lighting != "Low"
64
+ is_ready = (time.monotonic() - self.last_alert_time) > self.cooldown_seconds
65
+
66
+ if self.alert_data is not None and is_drowsy and is_good_light and is_ready:
67
+ self.last_alert_time = time.monotonic()
68
+ logging.info("πŸ”Š Alert conditions met! Triggering sound.")
69
+ return (self.sample_rate, self.alert_data.copy())
70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
  return None
72
 
73
+ # Initialize the alert manager
74
+ alert_manager = AlertManager(CFG["alerting"])
 
 
 
75
 
76
+ # ───────────────────────────── frame processing <--- MAJOR CHANGE
77
+ # Simplified by the AlertManager. No longer needs to pass 'last_alert_ts' back and forth.
78
  def process_live_frame(frame):
 
79
  if frame is None:
80
  return (
81
  np.zeros((480, 640, 3), dtype=np.uint8),
82
  "Status: Inactive",
83
+ None # No audio output
84
  )
85
+
86
  t0 = time.perf_counter()
87
+
88
  try:
89
+ # Assuming your detector returns (processed_image, indicators_dict)
90
  processed, indic = detector.process_frame(frame)
91
  except Exception as e:
92
  logging.error(f"Error processing frame: {e}")
93
  processed = np.zeros_like(frame)
94
+ indic = {"drowsiness_level": "Error", "lighting": "Unknown", "details": {"Score": 0.0}}
95
+
 
 
 
 
 
96
  level = indic.get("drowsiness_level", "Awake")
97
  lighting = indic.get("lighting", "Good")
98
  score = indic.get("details", {}).get("Score", 0.0)
99
+
 
100
  dt_ms = (time.perf_counter() - t0) * 1000.0
101
  logging.info(f"{dt_ms:6.1f} ms β”‚ {lighting:<4} β”‚ {level:<14} β”‚ score={score:.2f}")
102
+
103
+ status_txt = (
104
+ f"Lighting: {lighting}\n"
105
+ + ("Detection paused – low light." if lighting == "Low"
106
+ else f"Status: {level}\nScore: {score:.2f}")
107
+ )
108
+
109
+ # Check for an alert and get the audio payload if ready
110
+ audio_payload = alert_manager.trigger_alert(level, lighting)
111
+
112
+ # This is the key: return a new gr.Audio component when an alert fires.
113
+ # Otherwise, return None to clear the component on the frontend.
114
+ if audio_payload:
115
+ return processed, status_txt, gr.Audio(value=audio_payload, autoplay=True)
116
  else:
117
+ return processed, status_txt, None
118
+
119
+ # ───────────────────────────── UI <--- CHANGE
120
+ with gr.Blocks(title="Drive Paddy – Drowsiness Detection") as app:
121
+ gr.Markdown("# πŸš— **Drive Paddy** – Robust Alert Demo")
122
+ gr.Markdown("Webcam-based drowsiness detection Β· console shows real-time logs.")
123
+
124
+ with gr.Row():
125
+ with gr.Column(scale=2):
126
+ cam = gr.Image(sources=["webcam"], streaming=True, label="Live Camera Feed")
127
+ with gr.Column(scale=1):
128
+ out_img = gr.Image(label="Processed Feed")
129
+ out_text = gr.Textbox(label="Live Status", lines=3, interactive=False)
 
 
 
 
 
 
 
 
 
 
 
130
 
131
+ # This audio component now acts as a placeholder.
132
+ # We make it invisible because we don't need to show the player controls.
133
+ # The backend will dynamically send a new, playable component to it.
134
+ out_audio = gr.Audio(
135
+ label="Alert",
136
+ autoplay=True,
137
+ visible=False, # Hiding the component for a cleaner UX
138
+ )
139
+
140
+ # The gr.State for managing the timestamp is no longer needed, simplifying the stream call.
141
+ cam.stream(
142
+ fn=process_live_frame,
143
+ inputs=[cam],
144
+ outputs=[out_img, out_text, out_audio] # The output now targets the placeholder
145
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
146
 
 
147
  if __name__ == "__main__":
148
+ logging.info("Launching Gradio app…")
149
+ app.launch(debug=True)