Testimony Adekoya commited on
Commit
f7db860
Β·
1 Parent(s): 9e88f11

Try something different

Browse files
.env.example DELETED
@@ -1,2 +0,0 @@
1
- GEMINI_API_KEY=your_gemini_api_key_here
2
- HUGGINGFACE_API_KEY=your_huggingface_api_key_here
 
 
 
app.py CHANGED
@@ -1,140 +1,457 @@
1
- # app_webrtc.py
2
- import gradio as gr
3
- import numpy as np
4
- import os
5
- import yaml
6
- from dotenv import load_dotenv
7
- import io
8
- from scipy.io.wavfile import read as read_wav
9
- from pydub import AudioSegment
10
  import cv2
 
 
 
 
11
  import time
12
- from gradio_webrtc import WebRTC
13
-
14
- # Correctly import from the drive_paddy package structure
15
- from src.detection.factory import get_detector
16
- from src.alerting.alert_system import get_alerter
17
-
18
- # --- Load Configuration and Environment Variables ---
19
- load_dotenv()
20
- config_path = 'config.yaml'
21
- with open(config_path, 'r') as f:
22
- config = yaml.safe_load(f)
23
- secrets = {
24
- "gemini_api_key": os.getenv("GEMINI_API_KEY"),
25
- }
26
-
27
- # --- Initialize Backend Components ---
28
- detector = get_detector(config)
29
- alerter = get_alerter(config, secrets["gemini_api_key"])
30
- geo_settings = config.get('geometric_settings', {})
31
- drowsiness_levels = geo_settings.get('drowsiness_levels', {})
32
- SLIGHTLY_DROWSY_DEFAULT = drowsiness_levels.get('slightly_drowsy_threshold', 0.3)
33
- VERY_DROWSY_DEFAULT = drowsiness_levels.get('very_drowsy_threshold', 0.8)
34
-
35
- # --- Global state for audio (simpler than queues for this component) ---
36
- # We use a global variable to hold the audio data, which the UI will poll.
37
- # This is a common pattern in simple Gradio streaming apps.
38
- latest_audio_alert = None
39
 
40
- # --- Main Processing Function ---
41
- def process_stream(frame: np.ndarray, sensitivity_threshold: float) -> np.ndarray:
42
- """
43
- This is the core function. It takes a frame and returns the processed frame.
44
- All logic, including status drawing and alert triggering, happens here.
45
- """
46
- global latest_audio_alert
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
 
48
- if frame is None:
49
- return np.zeros((480, 640, 3), dtype=np.uint8)
50
-
51
- # Process the frame using our existing detector.
52
- # The detector already draws landmarks and status overlays.
53
- processed_frame, indicators, _ = detector.process_frame(frame)
54
- drowsiness_level = indicators.get("drowsiness_level", "Awake")
55
-
56
- # Handle audio alerts
57
- if drowsiness_level != "Awake":
58
- audio_data = alerter.trigger_alert(level=drowsiness_level)
59
- if audio_data:
60
- # Convert audio for Gradio and store it in the global variable
61
- try:
62
- byte_io = io.BytesIO(audio_data)
63
- audio = AudioSegment.from_mp3(byte_io)
64
- wav_byte_io = io.BytesIO()
65
- audio.export(wav_byte_io, format="wav")
66
- wav_byte_io.seek(0)
67
- sample_rate, data = read_wav(wav_byte_io)
68
- latest_audio_alert = (sample_rate, data)
69
- except Exception as e:
70
- print(f"Audio processing error: {e}")
71
- latest_audio_alert = None
72
- else:
73
- alerter.reset_alert()
74
-
75
- return processed_frame
76
-
77
- # --- Function to check for and return audio alerts ---
78
- def get_audio_update():
79
- """
80
- This function is polled by the UI to check for new audio alerts.
81
- """
82
- global latest_audio_alert
83
- if latest_audio_alert:
84
- audio_to_play = latest_audio_alert
85
- latest_audio_alert = None # Clear the alert after sending it
86
- return audio_to_play
87
- return None
88
-
89
- # --- Gradio UI Definition ---
90
- with gr.Blocks(theme=gr.themes.Default(primary_hue="blue", secondary_hue="blue")) as app:
91
- gr.HTML(
92
- """
93
- <div align="center">
94
- <img src="https://em-content.zobj.net/source/samsung/380/automobile_1f697.png" alt="Car Emoji" width="100"/>
95
- <h1>Drive Paddyn</h1>
96
- </div>
97
- """
98
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99
 
100
- with gr.Row():
101
- # The WebRTC component now directly shows the processed output
102
- webrtc_output = WebRTC(
103
- label="Live Detection Feed",
104
- video_source="webcam",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
105
  )
106
-
107
- with gr.Row():
108
- sensitivity_slider = gr.Slider(
109
- minimum=0.1,
110
- maximum=1.0,
111
- value=SLIGHTLY_DROWSY_DEFAULT,
112
- step=0.05,
113
- label="Alert Sensitivity Threshold",
114
- info="Lower value = more sensitive to drowsiness signs."
115
  )
116
-
117
- # Hidden audio component for playing alerts
118
- audio_player = gr.Audio(autoplay=True, visible=False)
119
-
120
- # Connect the WebRTC stream to the processing function
121
- webrtc_output.stream(
122
- fn=process_stream,
123
- inputs=[webrtc_output, sensitivity_slider],
124
- outputs=[webrtc_output],
125
- # The 'every' parameter is not needed for this component; it streams as fast as possible.
126
- )
 
 
127
 
128
- # Use a separate loop to poll for audio updates.
129
- # This is more stable than returning multiple values in a high-frequency stream.
130
- app.load(
131
- fn=get_audio_update,
132
- inputs=None,
133
- outputs=[audio_player],
134
- every=1 # Check for a new audio alert every 1 second
135
- )
136
 
137
-
138
- # --- Launch the App ---
139
  if __name__ == "__main__":
140
- app.launch(debug=True)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import cv2
2
+ import numpy as np
3
+ import dlib
4
+ import gradio as gr
5
+ import threading
6
  import time
7
+ import queue
8
+ import pygame
9
+ import io
10
+ import google.generativeai as genai
11
+ from scipy.spatial import distance as dist
12
+ from collections import deque
13
+ import tempfile
14
+ import os
15
+ from datetime import datetime
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
16
 
17
+ class DrowsinessDetector:
18
+ def __init__(self, gemini_api_key=None):
19
+ # Initialize face detector and landmark predictor
20
+ self.detector = dlib.get_frontal_face_detector()
21
+ self.predictor = dlib.shape_predictor('shape_predictor_68_face_landmarks.dat')
22
+
23
+ # EAR (Eye Aspect Ratio) parameters
24
+ self.EAR_THRESHOLD = 0.25
25
+ self.EAR_CONSECUTIVE_FRAMES = 20
26
+ self.ear_counter = 0
27
+ self.ear_history = deque(maxlen=30)
28
+
29
+ # Yawn detection parameters
30
+ self.YAWN_THRESHOLD = 0.6
31
+ self.YAWN_CONSECUTIVE_FRAMES = 15
32
+ self.yawn_counter = 0
33
+ self.yawn_history = deque(maxlen=30)
34
+
35
+ # Head pose estimation parameters
36
+ self.NOD_THRESHOLD = 15
37
+ self.nod_counter = 0
38
+ self.head_pose_history = deque(maxlen=30)
39
+
40
+ # Alert system
41
+ self.drowsy_alert = False
42
+ self.last_alert_time = 0
43
+ self.alert_cooldown = 10 # seconds
44
+
45
+ # Initialize pygame for audio
46
+ pygame.mixer.init()
47
+
48
+ # Initialize Gemini AI
49
+ if gemini_api_key:
50
+ genai.configure(api_key=gemini_api_key)
51
+ self.model = genai.GenerativeModel('gemini-pro')
52
+ else:
53
+ self.model = None
54
+
55
+ # Eye and mouth landmark indices
56
+ self.LEFT_EYE = list(range(36, 42))
57
+ self.RIGHT_EYE = list(range(42, 48))
58
+ self.MOUTH = list(range(48, 68))
59
+
60
+ # 3D model points for head pose estimation
61
+ self.model_points = np.array([
62
+ (0.0, 0.0, 0.0), # Nose tip
63
+ (0.0, -330.0, -65.0), # Chin
64
+ (-225.0, 170.0, -135.0), # Left eye left corner
65
+ (225.0, 170.0, -135.0), # Right eye right corner
66
+ (-150.0, -150.0, -125.0), # Left Mouth corner
67
+ (150.0, -150.0, -125.0) # Right mouth corner
68
+ ])
69
+
70
+ self.status_log = deque(maxlen=100)
71
+
72
+ def calculate_ear(self, eye_landmarks):
73
+ """Calculate Eye Aspect Ratio"""
74
+ # Vertical eye landmarks
75
+ A = dist.euclidean(eye_landmarks[1], eye_landmarks[5])
76
+ B = dist.euclidean(eye_landmarks[2], eye_landmarks[4])
77
+ # Horizontal eye landmark
78
+ C = dist.euclidean(eye_landmarks[0], eye_landmarks[3])
79
+ # EAR calculation
80
+ ear = (A + B) / (2.0 * C)
81
+ return ear
82
 
83
+ def calculate_mar(self, mouth_landmarks):
84
+ """Calculate Mouth Aspect Ratio for yawn detection"""
85
+ # Vertical mouth landmarks
86
+ A = dist.euclidean(mouth_landmarks[2], mouth_landmarks[10]) # 50, 58
87
+ B = dist.euclidean(mouth_landmarks[4], mouth_landmarks[8]) # 52, 56
88
+ # Horizontal mouth landmark
89
+ C = dist.euclidean(mouth_landmarks[0], mouth_landmarks[6]) # 48, 54
90
+ # MAR calculation
91
+ mar = (A + B) / (2.0 * C)
92
+ return mar
93
+
94
+ def get_head_pose(self, landmarks, img_size):
95
+ """Estimate head pose using facial landmarks"""
96
+ image_points = np.array([
97
+ (landmarks[30][0], landmarks[30][1]), # Nose tip
98
+ (landmarks[8][0], landmarks[8][1]), # Chin
99
+ (landmarks[36][0], landmarks[36][1]), # Left eye left corner
100
+ (landmarks[45][0], landmarks[45][1]), # Right eye right corner
101
+ (landmarks[48][0], landmarks[48][1]), # Left Mouth corner
102
+ (landmarks[54][0], landmarks[54][1]) # Right mouth corner
103
+ ], dtype="double")
104
+
105
+ # Camera internals
106
+ focal_length = img_size[1]
107
+ center = (img_size[1]/2, img_size[0]/2)
108
+ camera_matrix = np.array([
109
+ [focal_length, 0, center[0]],
110
+ [0, focal_length, center[1]],
111
+ [0, 0, 1]], dtype="double")
112
+
113
+ dist_coeffs = np.zeros((4,1)) # Assuming no lens distortion
114
+
115
+ # Solve PnP
116
+ (success, rotation_vector, translation_vector) = cv2.solvePnP(
117
+ self.model_points, image_points, camera_matrix, dist_coeffs,
118
+ flags=cv2.SOLVEPNP_ITERATIVE)
119
+
120
+ # Convert rotation vector to rotation matrix
121
+ (rotation_matrix, jacobian) = cv2.Rodrigues(rotation_vector)
122
+
123
+ # Calculate Euler angles
124
+ sy = np.sqrt(rotation_matrix[0,0] * rotation_matrix[0,0] + rotation_matrix[1,0] * rotation_matrix[1,0])
125
+ singular = sy < 1e-6
126
+ if not singular:
127
+ x = np.arctan2(rotation_matrix[2,1], rotation_matrix[2,2])
128
+ y = np.arctan2(-rotation_matrix[2,0], sy)
129
+ z = np.arctan2(rotation_matrix[1,0], rotation_matrix[0,0])
130
+ else:
131
+ x = np.arctan2(-rotation_matrix[1,2], rotation_matrix[1,1])
132
+ y = np.arctan2(-rotation_matrix[2,0], sy)
133
+ z = 0
134
+
135
+ # Convert to degrees
136
+ angles = np.array([x, y, z]) * 180.0 / np.pi
137
+ return angles
138
+
139
+ def generate_voice_alert(self, alert_type, severity="medium"):
140
+ """Generate voice alert using Gemini AI"""
141
+ if not self.model:
142
+ return self.play_default_alert()
143
+
144
+ try:
145
+ prompts = {
146
+ "drowsy": f"Generate a brief, urgent but caring voice alert (max 15 words) to wake up a drowsy driver. Severity: {severity}. Make it sound natural and concerned.",
147
+ "yawn": f"Generate a brief, gentle voice alert (max 12 words) for a driver who is yawning frequently. Severity: {severity}. Sound caring but alert.",
148
+ "nod": f"Generate a brief, firm voice alert (max 12 words) for a driver whose head is nodding. Severity: {severity}. Sound urgent but supportive."
149
+ }
150
+
151
+ response = self.model.generate_content(prompts.get(alert_type, prompts["drowsy"]))
152
+ alert_text = response.text.strip().replace('"', '').replace("'", "")
153
+
154
+ # Use text-to-speech (you would need to install pyttsx3 or use cloud TTS)
155
+ # For this example, we'll use a placeholder
156
+ self.log_status(f"πŸ”Š ALERT: {alert_text}")
157
+ return alert_text
158
+
159
+ except Exception as e:
160
+ self.log_status(f"Error generating alert: {str(e)}")
161
+ return self.play_default_alert()
162
+
163
+ def play_default_alert(self):
164
+ """Play default beep alert"""
165
+ try:
166
+ # Generate a simple beep sound
167
+ duration = 0.5 # seconds
168
+ freq = 800 # Hz
169
+ sample_rate = 22050
170
+ frames = int(duration * sample_rate)
171
+ arr = np.zeros(frames)
172
+
173
+ for i in range(frames):
174
+ arr[i] = np.sin(2 * np.pi * freq * i / sample_rate)
175
+
176
+ arr = (arr * 32767).astype(np.int16)
177
+ sound = pygame.sndarray.make_sound(arr)
178
+ sound.play()
179
+
180
+ alert_text = "⚠️ WAKE UP! Please stay alert while driving!"
181
+ self.log_status(f"πŸ”Š {alert_text}")
182
+ return alert_text
183
+
184
+ except Exception as e:
185
+ self.log_status(f"Error playing alert: {str(e)}")
186
+ return "Alert system activated"
187
+
188
+ def log_status(self, message):
189
+ """Log status messages with timestamp"""
190
+ timestamp = datetime.now().strftime("%H:%M:%S")
191
+ self.status_log.append(f"[{timestamp}] {message}")
192
+
193
+ def detect_drowsiness(self, frame):
194
+ """Main drowsiness detection function"""
195
+ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
196
+ faces = self.detector(gray)
197
+
198
+ status_text = []
199
+ alert_message = ""
200
+
201
+ if len(faces) == 0:
202
+ status_text.append("πŸ‘€ No face detected")
203
+ self.log_status("No face detected in frame")
204
+ return frame, status_text, alert_message
205
+
206
+ for face in faces:
207
+ landmarks = self.predictor(gray, face)
208
+ landmarks = np.array([[p.x, p.y] for p in landmarks.parts()])
209
+
210
+ # Draw face rectangle
211
+ x, y, w, h = face.left(), face.top(), face.width(), face.height()
212
+ cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
213
+
214
+ # Eye Aspect Ratio calculation
215
+ left_eye = landmarks[self.LEFT_EYE]
216
+ right_eye = landmarks[self.RIGHT_EYE]
217
+
218
+ left_ear = self.calculate_ear(left_eye)
219
+ right_ear = self.calculate_ear(right_eye)
220
+ ear = (left_ear + right_ear) / 2.0
221
+ self.ear_history.append(ear)
222
+
223
+ # Draw eye contours
224
+ cv2.drawContours(frame, [cv2.convexHull(left_eye)], -1, (0, 255, 0), 1)
225
+ cv2.drawContours(frame, [cv2.convexHull(right_eye)], -1, (0, 255, 0), 1)
226
+
227
+ # Yawn detection
228
+ mouth = landmarks[self.MOUTH]
229
+ mar = self.calculate_mar(mouth)
230
+ self.yawn_history.append(mar)
231
+
232
+ # Draw mouth contour
233
+ cv2.drawContours(frame, [cv2.convexHull(mouth)], -1, (0, 255, 255), 1)
234
+
235
+ # Head pose estimation
236
+ head_angles = self.get_head_pose(landmarks, frame.shape)
237
+ self.head_pose_history.append(head_angles[0]) # Pitch angle
238
+
239
+ # Drowsiness detection logic
240
+ drowsy_indicators = []
241
+
242
+ # Check EAR
243
+ if ear < self.EAR_THRESHOLD:
244
+ self.ear_counter += 1
245
+ if self.ear_counter >= self.EAR_CONSECUTIVE_FRAMES:
246
+ drowsy_indicators.append("EYES_CLOSED")
247
+ status_text.append(f"πŸ‘οΈ Eyes closed! EAR: {ear:.3f}")
248
+ else:
249
+ self.ear_counter = 0
250
+ status_text.append(f"πŸ‘οΈ Eyes open - EAR: {ear:.3f}")
251
+
252
+ # Check for yawning
253
+ if mar > self.YAWN_THRESHOLD:
254
+ self.yawn_counter += 1
255
+ if self.yawn_counter >= self.YAWN_CONSECUTIVE_FRAMES:
256
+ drowsy_indicators.append("YAWNING")
257
+ status_text.append(f"πŸ₯± Yawning detected! MAR: {mar:.3f}")
258
+ else:
259
+ self.yawn_counter = 0
260
+ status_text.append(f"πŸ‘„ Normal mouth - MAR: {mar:.3f}")
261
+
262
+ # Check head nodding
263
+ if abs(head_angles[0]) > self.NOD_THRESHOLD:
264
+ self.nod_counter += 1
265
+ if self.nod_counter >= 10:
266
+ drowsy_indicators.append("HEAD_NOD")
267
+ status_text.append(f"πŸ“‰ Head nodding! Angle: {head_angles[0]:.1f}Β°")
268
+ else:
269
+ self.nod_counter = 0
270
+ status_text.append(f"πŸ“ Head pose - Pitch: {head_angles[0]:.1f}Β°")
271
+
272
+ # Generate alerts
273
+ current_time = time.time()
274
+ if drowsy_indicators and (current_time - self.last_alert_time) > self.alert_cooldown:
275
+ self.drowsy_alert = True
276
+ self.last_alert_time = current_time
277
+
278
+ # Determine alert type and severity
279
+ if "EYES_CLOSED" in drowsy_indicators:
280
+ severity = "high" if len(drowsy_indicators) > 1 else "medium"
281
+ alert_message = self.generate_voice_alert("drowsy", severity)
282
+ elif "YAWNING" in drowsy_indicators:
283
+ alert_message = self.generate_voice_alert("yawn", "medium")
284
+ elif "HEAD_NOD" in drowsy_indicators:
285
+ alert_message = self.generate_voice_alert("nod", "medium")
286
+
287
+ # Visual alert on frame
288
+ cv2.putText(frame, "⚠️ DROWSINESS ALERT! ⚠️", (50, 50),
289
+ cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
290
+
291
+ # Draw landmark points
292
+ for (x, y) in landmarks:
293
+ cv2.circle(frame, (x, y), 1, (255, 255, 255), -1)
294
+
295
+ # Add metrics overlay
296
+ cv2.putText(frame, f"EAR: {ear:.3f}", (10, frame.shape[0] - 80),
297
+ cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
298
+ cv2.putText(frame, f"MAR: {mar:.3f}", (10, frame.shape[0] - 60),
299
+ cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
300
+ cv2.putText(frame, f"Head: {head_angles[0]:.1f}Β°", (10, frame.shape[0] - 40),
301
+ cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
302
+
303
+ return frame, status_text, alert_message
304
 
305
+ def create_gradio_interface():
306
+ """Create Gradio interface for the drowsiness detection system"""
307
+
308
+ # Global detector instance
309
+ detector = None
310
+
311
+ def initialize_system(gemini_key):
312
+ """Initialize the drowsiness detection system"""
313
+ global detector
314
+ try:
315
+ detector = DrowsinessDetector(gemini_key if gemini_key.strip() else None)
316
+ return "βœ… System initialized successfully!", "System ready for detection."
317
+ except Exception as e:
318
+ return f"❌ Error initializing system: {str(e)}", "System initialization failed."
319
+
320
+ def process_video_frame(frame, gemini_key):
321
+ """Process a single video frame"""
322
+ global detector
323
+
324
+ if detector is None:
325
+ detector = DrowsinessDetector(gemini_key if gemini_key.strip() else None)
326
+
327
+ try:
328
+ processed_frame, status_list, alert_msg = detector.detect_drowsiness(frame)
329
+
330
+ # Format status text
331
+ status_text = "\n".join(status_list) if status_list else "Processing..."
332
+
333
+ # Get recent logs
334
+ log_text = "\n".join(list(detector.status_log)[-10:]) if detector.status_log else "No logs yet."
335
+
336
+ return processed_frame, status_text, alert_msg, log_text
337
+
338
+ except Exception as e:
339
+ error_msg = f"Error processing frame: {str(e)}"
340
+ return frame, error_msg, "", error_msg
341
+
342
+ # Create the Gradio interface
343
+ with gr.Blocks(title="Driver Drowsiness Detection System", theme=gr.themes.Soft()) as demo:
344
+ gr.Markdown("""
345
+ # πŸš— Real-time Driver Drowsiness Detection System
346
+
347
+ This system uses computer vision and AI to detect driver drowsiness through:
348
+ - **Eye Aspect Ratio (EAR)** - Detects closed/droopy eyes
349
+ - **Mouth Aspect Ratio (MAR)** - Detects yawning
350
+ - **Head Pose Estimation** - Detects head nodding
351
+ - **AI Voice Alerts** - Uses Gemini AI for personalized wake-up messages
352
+
353
+ ### πŸ“‹ Setup Instructions:
354
+ 1. Download `shape_predictor_68_face_landmarks.dat` from [dlib models](http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2)
355
+ 2. Place it in the same directory as this script
356
+ 3. (Optional) Enter your Gemini API key for AI-powered voice alerts
357
+ 4. Start your webcam and begin monitoring!
358
+ """)
359
+
360
+ with gr.Row():
361
+ with gr.Column(scale=2):
362
+ # Gemini API key input
363
+ gemini_key_input = os.getenv('GEMINI_API_KEY')
364
+
365
+
366
+ # Initialize button
367
+ init_btn = gr.Button("πŸš€ Initialize System", variant="primary")
368
+ init_status = gr.Textbox(label="Initialization Status", interactive=False)
369
+
370
+ with gr.Column(scale=1):
371
+ # System info
372
+ gr.Markdown("""
373
+ ### πŸ“Š Detection Thresholds:
374
+ - **EAR Threshold**: 0.25
375
+ - **Yawn Threshold**: 0.6
376
+ - **Head Nod**: 15Β° deviation
377
+ - **Alert Cooldown**: 10 seconds
378
+ """)
379
+
380
+ with gr.Row():
381
+ with gr.Column(scale=2):
382
+ # Video input/output
383
+ video_input = gr.Video(
384
+ sources=["webcam"],
385
+ label="πŸ“Ή Camera Feed",
386
+ streaming=True
387
+ )
388
+
389
+ with gr.Column(scale=1):
390
+ # Status displays
391
+ current_status = gr.Textbox(
392
+ label="πŸ“ˆ Current Status",
393
+ lines=6,
394
+ interactive=False
395
+ )
396
+
397
+ alert_display = gr.Textbox(
398
+ label="πŸ”Š Latest Alert",
399
+ interactive=False,
400
+ placeholder="No alerts yet..."
401
+ )
402
+
403
+ system_logs = gr.Textbox(
404
+ label="πŸ“ System Logs",
405
+ lines=8,
406
+ interactive=False,
407
+ placeholder="System logs will appear here..."
408
+ )
409
+
410
+ # Event handlers
411
+ init_btn.click(
412
+ fn=initialize_system,
413
+ inputs=[gemini_key_input],
414
+ outputs=[init_status, alert_display]
415
  )
416
+
417
+ video_input.stream(
418
+ fn=process_video_frame,
419
+ inputs=[video_input, gemini_key_input],
420
+ outputs=[video_input, current_status, alert_display, system_logs],
421
+ stream_every=0.1, # Process every 100ms
422
+ show_progress=False
 
 
423
  )
424
+
425
+ # Instructions
426
+ gr.Markdown("""
427
+ ### πŸ”§ Troubleshooting:
428
+ - **No face detected**: Ensure good lighting and face is visible to camera
429
+ - **Poor detection**: Adjust camera angle and distance (arm's length recommended)
430
+ - **No alerts**: Check if Gemini API key is valid (optional feature)
431
+ - **High CPU usage**: Reduce video resolution or increase stream interval
432
+
433
+ ### ⚠️ Safety Notice:
434
+ This system is for demonstration purposes. Always prioritize real-world driving safety measures.
435
+ Pull over safely if you feel drowsy while driving.
436
+ """)
437
 
438
+ return demo
 
 
 
 
 
 
 
439
 
440
+ # Main execution
 
441
  if __name__ == "__main__":
442
+ # Check for required files
443
+ if not os.path.exists('shape_predictor_68_face_landmarks.dat'):
444
+ print("❌ Missing required file: shape_predictor_68_face_landmarks.dat")
445
+ print("πŸ“₯ Please download from: http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2")
446
+ print("πŸ“ Extract and place in the same directory as this script")
447
+ else:
448
+ print("βœ… All required files found!")
449
+
450
+ # Create and launch the interface
451
+ demo = create_gradio_interface()
452
+ demo.launch(
453
+ share=True,
454
+ server_name="0.0.0.0",
455
+ server_port=7860,
456
+ show_error=True
457
+ )
assets/alert.wav DELETED
File without changes
assets/sleep.jpeg DELETED
Binary file (7.85 kB)
 
config.yaml DELETED
@@ -1,64 +0,0 @@
1
- # config.yaml
2
- # -----------------------------------------------------------------------------
3
- # Main configuration file for the Drive Paddy application.
4
- # -----------------------------------------------------------------------------
5
-
6
- # -- Detection Strategy --
7
- # Sets the active drowsiness detection method.
8
- # Options: "geometric", "cnn_model", "hybrid"
9
- detection_strategy: "geometric"
10
-
11
- # -- Geometric Strategy Settings --
12
- # Parameters for the facial landmark-based detection methods.
13
- geometric_settings:
14
- # Eye Aspect Ratio (EAR) for blink/closure detection
15
- eye_ar_thresh: 0.23
16
- eye_ar_consec_frames: 15
17
-
18
- # Mouth Aspect Ratio (MAR) for yawn detection
19
- yawn_mar_thresh: 0.70
20
- yawn_consec_frames: 20
21
-
22
- # Head Pose Estimation for look-away/nod-off detection
23
- head_nod_thresh: 15.0
24
- head_look_away_thresh: 20.0
25
- head_pose_consec_frames: 20
26
-
27
- # Low Light Detection
28
- low_light_thresh: 70 # Average frame brightness below which a warning is shown (0-255).
29
-
30
- # Drowsiness Level Scoring
31
- # The system will sum the weights of active indicators (eyes, mouth, head).
32
- drowsiness_levels:
33
- very_drowsy_threshold: 0.8 # e.g., Eyes + Head Nod (0.45 + 0.55 = 1.0)
34
- slightly_drowsy_threshold: 0.4 # e.g., Just Yawning (0.30) or Eyes Closed (0.45)
35
- indicator_weights:
36
- eye_closure: 0.2
37
- yawning: 0.20
38
- head_nod: 0.45
39
- looking_away: 0.15
40
-
41
- # -- CNN Model Settings (Not used in 'geometric' mode) --
42
- cnn_model_settings:
43
- model_path: "models/best_model_efficientnet_b7.pth"
44
- confidence_thresh: 0.8
45
-
46
- # -- Hybrid Strategy Settings (Not used in 'geometric' mode) --
47
- hybrid_settings:
48
- alert_threshold: 1.0
49
- # CORRECTED: Each weight is now on its own line.
50
- weights:
51
- eye_closure: 0.45
52
- yawning: 0.30
53
- head_nod: 0.55
54
- looking_away: 0.25
55
- cnn_prediction: 0.60
56
-
57
- # -- Alerting System --
58
- alerting:
59
- alert_sound_path: "assets/alert.wav"
60
- alert_cooldown_seconds: 7 # Increased cooldown to prevent alert fatigue
61
-
62
- # -- Gemini API (Optional) --
63
- gemini_api:
64
- enabled: true
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
download_model.py DELETED
@@ -1,42 +0,0 @@
1
- # download_model.py
2
- import os
3
- from huggingface_hub import hf_hub_download
4
-
5
- # --- Configuration ---
6
- # Details from your Hugging Face repository screenshot.
7
- REPO_ID = "Testys/drowsiness-detection-model"
8
- FILENAME = "best_model_efficientnet_b7.pth"
9
- LOCAL_DIR = "models"
10
-
11
- def download_model():
12
- """
13
- Downloads the specified model file from Hugging Face Hub
14
- and saves it to the local models/ directory.
15
- """
16
- print(f"Downloading model '{FILENAME}' from repository '{REPO_ID}'...")
17
-
18
- # Ensure the local directory exists.
19
- if not os.path.exists(LOCAL_DIR):
20
- os.makedirs(LOCAL_DIR)
21
- print(f"Created directory: {LOCAL_DIR}")
22
-
23
- try:
24
- # Download the file.
25
- # local_dir_use_symlinks=False ensures the file is copied to your directory
26
- # instead of just pointing to the cache.
27
- model_path = hf_hub_download(
28
- repo_id=REPO_ID,
29
- filename=FILENAME,
30
- local_dir=LOCAL_DIR,
31
- local_dir_use_symlinks=False,
32
- # token=True # Use token for private repos, can be omitted for public ones
33
- )
34
- print(f"\nModel downloaded successfully!")
35
- print(f"Saved to: {model_path}")
36
-
37
- except Exception as e:
38
- print(f"\nAn error occurred during download: {e}")
39
- print("Please check the repository ID, filename, and your network connection.")
40
-
41
- if __name__ == "__main__":
42
- download_model()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
main.py DELETED
@@ -1,80 +0,0 @@
1
- # drive_paddy/main.py
2
- import streamlit as st
3
- import yaml
4
- import os
5
- from dotenv import load_dotenv
6
- import queue
7
-
8
- # --- Main Application UI ---
9
- st.set_page_config(
10
- page_title="Drive Paddy | Home",
11
- page_icon="πŸš—",
12
- layout="wide"
13
- )
14
-
15
- # Load config to display current settings on the home page
16
- @st.cache_resource
17
- def load_app_config():
18
- load_dotenv()
19
- gemini_api_key = os.getenv("GEMINI_API_KEY")
20
- with open('config.yaml', 'r') as f:
21
- config = yaml.safe_load(f)
22
- return config, gemini_api_key
23
-
24
- config, gemini_api_key = load_app_config()
25
-
26
- # --- Initialize Session State ---
27
- # This ensures they are set when the app first loads.
28
- if "play_audio" not in st.session_state:
29
- st.session_state.play_audio = None
30
- if "active_alerts" not in st.session_state:
31
- st.session_state.active_alerts = {"status": "Awake"}
32
- if "status_queue" not in st.session_state:
33
- st.session_state.status_queue = queue.Queue()
34
- if "audio_queue" not in st.session_state:
35
- st.session_state.audio_queue = queue.Queue()
36
- if "last_status" not in st.session_state:
37
- st.session_state.last_status = {"status": "Awake"}
38
-
39
-
40
-
41
- # --- Page Content ---
42
- st.title("πŸš— Welcome to Drive Paddy!")
43
- st.subheader("Your AI-Powered Drowsiness Detection Assistant")
44
-
45
- st.markdown("""
46
- Drive Paddy is a real-time system designed to enhance driver safety by detecting signs of drowsiness.
47
- It uses your computer's webcam to analyze facial features and head movements, providing timely alerts
48
- to help prevent fatigue-related accidents.
49
- """)
50
-
51
- st.info("Navigate to the **Live Detection** page from the sidebar on the left to start the system.")
52
-
53
- st.markdown("---")
54
-
55
- col1, col2 = st.columns(2)
56
-
57
- with col1:
58
- st.header("How It Works")
59
- st.markdown("""
60
- The system employs a sophisticated hybrid strategy to monitor for signs of fatigue:
61
- - **πŸ‘€ Eye Closure Detection**: Measures Eye Aspect Ratio (EAR) to detect prolonged blinks or closed eyes.
62
- - **πŸ₯± Yawn Detection**: Measures Mouth Aspect Ratio (MAR) to identify yawns.
63
- - **😴 Head Pose Analysis**: Tracks head pitch and yaw to detect nodding off or looking away from the road.
64
- - **🧠 CNN Model Inference**: A deep learning model provides an additional layer of analysis.
65
-
66
- These signals are combined into a single drowsiness score to trigger alerts accurately.
67
- """)
68
-
69
- with col2:
70
- st.header("Current Configuration")
71
- alert_method = "Gemini API" if config.get('gemini_api', {}).get('enabled') and gemini_api_key else "Static Audio File"
72
- st.markdown(f"""
73
- - **Detection Strategy**: `{config['detection_strategy']}`
74
- - **Alert Method**: `{alert_method}`
75
- """)
76
- st.warning("Ensure good lighting and that your face is clearly visible for best results.")
77
-
78
- st.markdown("---")
79
- st.markdown("Created with ❀️ using Streamlit, OpenCV, and MediaPipe.")
80
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
models/best_model_efficientnet_b7.pth DELETED
@@ -1,3 +0,0 @@
1
- version https://git-lfs.github.com/spec/v1
2
- oid sha256:b13c1e5e4f1a03e0e559ad8f7988c14b63d2b028c55f380814f241dd788a99df
3
- size 256870774
 
 
 
 
pages/1_Live_Detection.py DELETED
@@ -1,164 +0,0 @@
1
- # drive_paddy/pages/1_Live_Detection.py
2
- import streamlit as st
3
- from streamlit_webrtc import webrtc_streamer, RTCConfiguration, VideoProcessorBase
4
- import yaml
5
- import av
6
- import os
7
- from dotenv import load_dotenv
8
- import base64
9
- import queue
10
- import time
11
- from typing import List, Dict, Union
12
-
13
- # Correctly import from the drive_paddy package structure
14
- from src.detection.factory import get_detector
15
- from src.alerting.alert_system import get_alerter
16
-
17
- # --- Initialize Session State at the TOP of the script ---
18
- # This is the single source of truth for our queues and must run on every page load.
19
- if "status_queue" not in st.session_state:
20
- st.session_state.status_queue = queue.Queue()
21
- if "audio_queue" not in st.session_state:
22
- st.session_state.audio_queue = queue.Queue()
23
- if "last_status" not in st.session_state:
24
- st.session_state.last_status = {"drowsiness_level": "Awake", "lighting": "Good"}
25
-
26
-
27
- # --- Load Configuration and Environment Variables ---
28
- @st.cache_resource
29
- def load_app_config():
30
- """Loads config from yaml and .env files."""
31
- load_dotenv()
32
- # Navigate up to the root to find the config file
33
- config_path = "./config.yaml"
34
- with open(config_path, 'r') as f:
35
- config = yaml.safe_load(f)
36
- # Load secrets from environment
37
- secrets = {
38
- "gemini_api_key": os.getenv("GEMINI_API_KEY"),
39
- "turn_username": os.getenv("TURN_USERNAME"),
40
- "turn_credential": os.getenv("TURN_CREDENTIAL")
41
- }
42
- return config, secrets
43
-
44
- config, secrets = load_app_config()
45
-
46
- # --- Client-Side Audio Playback Function ---
47
- def autoplay_audio(audio_bytes: bytes):
48
- """Injects HTML to autoplay audio in the user's browser."""
49
- b64 = base64.b64encode(audio_bytes).decode()
50
- md = f"""
51
- <audio controls autoplay="true" style="display:none;">
52
- <source src="data:audio/mp3;base64,{b64}" type="audio/mp3">
53
- </audio>
54
- """
55
- st.markdown(md, unsafe_allow_html=True)
56
-
57
- # --- WebRTC Video Processor ---
58
- class VideoProcessor(VideoProcessorBase):
59
- # The __init__ method now accepts the queues as arguments
60
- def __init__(self):
61
- # It uses the queues passed in from session_state, not new ones.
62
- self.status_queue = queue.Queue
63
- self.audio_queue = queue.Queue
64
- self._detector = get_detector(config)
65
- self._alerter = get_alerter(config, secrets["gemini_api_key"])
66
-
67
- def recv(self, frame: av.VideoFrame) -> av.VideoFrame:
68
- img = frame.to_ndarray(format="bgr24")
69
-
70
- strategy = config.get('detection_strategy')
71
-
72
- # The return signature of process_frame varies by strategy.
73
- processed_frame, indicators, _ = self._detector.process_frame(img)
74
- drowsiness_level = indicators.get("drowsiness_level", "Awake")
75
-
76
- # This now correctly puts data into the shared session_state queue.
77
- self.status_queue.put(indicators)
78
-
79
- if drowsiness_level != "Awake":
80
- audio_data = self._alerter.trigger_alert(level=drowsiness_level)
81
- if audio_data:
82
- # This now correctly puts audio data into the shared queue.
83
- self.audio_queue.put(audio_data)
84
- else:
85
- self._alerter.reset_alert()
86
-
87
- return av.VideoFrame.from_ndarray(processed_frame, format="bgr24")
88
-
89
- # --- Page UI ---
90
- st.title("πŸ“Ή Live Drowsiness Detection")
91
- st.info("Press 'START' to activate your camera and begin monitoring.")
92
-
93
- # --- Dynamically Build RTC Configuration ---
94
- ice_servers = [{'urls': 'stun:global.stun.twilio.com:3478'}]
95
- if secrets["turn_username"] and secrets["turn_credential"]:
96
- turn_servers = [
97
- {'urls': 'turn:global.turn.twilio.com:3478?transport=udp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]},
98
- {'urls': 'turn:global.turn.twilio.com:3478?transport=tcp', 'username': secrets["turn_username"], 'credential': secrets["turn_credential"]}
99
- ]
100
- ice_servers.extend(turn_servers)
101
-
102
- RTC_CONFIGURATION = RTCConfiguration({"iceServers": ice_servers})
103
-
104
-
105
- col1, col2 = st.columns([3, 1])
106
-
107
- with col1:
108
- webrtc_ctx = webrtc_streamer(
109
- key="drowsiness-detection",
110
- # The factory now correctly passes the queues from session_state
111
- video_processor_factory=VideoProcessor,
112
- rtc_configuration=RTC_CONFIGURATION,
113
- media_stream_constraints={"video": True, "audio": False},
114
- async_processing=True,
115
- )
116
-
117
- with col2:
118
- st.header("System Status")
119
- audio_placeholder = st.empty()
120
- if not webrtc_ctx.state.playing:
121
- st.warning("System Inactive.")
122
- else:
123
- st.success("βœ… System Active & Monitoring")
124
-
125
- st.subheader("Live Status:")
126
- status_placeholder = st.empty()
127
-
128
- if webrtc_ctx.state.playing:
129
- try:
130
- # This now reads from the correct queue that the processor is writing to.
131
- status_result = st.session_state.status_queue.get(timeout=0.1)
132
- st.session_state.last_status = status_result
133
- except queue.Empty:
134
- pass
135
-
136
- with status_placeholder.container():
137
- last_status = st.session_state.last_status
138
- drowsiness_level = last_status.get("drowsiness_level", "Awake")
139
- lighting = last_status.get("lighting", "Good")
140
- score = last_status.get("details", {}).get("Score", 0)
141
-
142
- st.metric(label="Lighting Condition", value=lighting)
143
- if lighting == "Low":
144
- st.warning("Detection paused due to low light.")
145
-
146
- if drowsiness_level == "Awake":
147
- st.info(f"βœ”οΈ Awake (Score: {score:.2f})")
148
- elif drowsiness_level == "Slightly Drowsy":
149
- st.warning(f"⚠️ Slightly Drowsy (Score: {score:.2f})")
150
- elif drowsiness_level == "Very Drowsy":
151
- st.error(f"🚨 Very Drowsy! (Score: {score:.2f})")
152
-
153
- try:
154
- audio_data = st.session_state.audio_queue.get(timeout=0.1)
155
- with audio_placeholder.container():
156
- autoplay_audio(audio_data)
157
- except queue.Empty:
158
- pass
159
-
160
- time.sleep(0.1)
161
- st.rerun()
162
- else:
163
- with status_placeholder.container():
164
- st.info("βœ”οΈ Driver is Awake")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/__init__.py DELETED
File without changes
src/__pycache__/__init__.cpython-312.pyc DELETED
Binary file (136 Bytes)
 
src/alerting/__init__.py DELETED
File without changes
src/alerting/__pycache__/__init__.cpython-312.pyc DELETED
Binary file (145 Bytes)
 
src/alerting/__pycache__/alert_system.cpython-312.pyc DELETED
Binary file (6.55 kB)
 
src/alerting/alert_system.py DELETED
@@ -1,57 +0,0 @@
1
- # drive_paddy/alerting/alert_system.py
2
- import time, os, io, google.generativeai as genai
3
- from gtts import gTTS
4
- class BaseAlerter:
5
- def __init__(self, config):
6
- self.config = config['alerting']
7
- self.cooldown = self.config['alert_cooldown_seconds']
8
- self.last_alert_time = 0
9
- self.alert_on = False
10
- def trigger_alert(self, level="Very Drowsy"): raise NotImplementedError
11
- def reset_alert(self):
12
- if self.alert_on: print("Resetting Alert."); self.alert_on = False
13
-
14
- class FileAlertSystem(BaseAlerter):
15
- def __init__(self, config):
16
- super().__init__(config)
17
- self.audio_bytes = None
18
- try:
19
- if os.path.exists(config['alerting']['alert_sound_path']):
20
- with open(config['alerting']['alert_sound_path'], "rb") as f: self.audio_bytes = f.read()
21
- except Exception as e: print(f"Warning: Could not load audio file. Error: {e}.")
22
- def trigger_alert(self, level="Very Drowsy"):
23
- current_time = time.time()
24
- if (current_time - self.last_alert_time) > self.cooldown and not self.alert_on and self.audio_bytes:
25
- self.last_alert_time = current_time; self.alert_on = True
26
- print("Triggering Static Alert!")
27
- return self.audio_bytes
28
- return None
29
-
30
- class GeminiAlertSystem(BaseAlerter):
31
- def __init__(self, config, api_key):
32
- super().__init__(config)
33
- try: genai.configure(api_key=api_key); self.model = genai.GenerativeModel('gemini-pro')
34
- except Exception as e: print(f"Error initializing Gemini: {e}."); self.model = None
35
- def _generate_audio_data(self, level):
36
- if not self.model: return None
37
- if level == "Slightly Drowsy":
38
- prompt = "You are an AI driving assistant. Generate a short, gentle reminder (under 10 words) for a driver showing minor signs of fatigue."
39
- else: # Very Drowsy
40
- prompt = "You are an AI driving assistant. Generate a short, firm, and urgent alert (under 10 words) for a driver who is very drowsy."
41
- try:
42
- response = self.model.generate_content(prompt)
43
- alert_text = response.text.strip().replace('*', '')
44
- print(f"Generated Alert Text ({level}): '{alert_text}'")
45
- mp3_fp = io.BytesIO(); tts = gTTS(text=alert_text, lang='en'); tts.write_to_fp(mp3_fp)
46
- mp3_fp.seek(0); return mp3_fp.getvalue()
47
- except Exception as e: print(f"Error generating TTS audio: {e}"); return None
48
- def trigger_alert(self, level="Very Drowsy"):
49
- current_time = time.time()
50
- if (current_time - self.last_alert_time) > self.cooldown and not self.alert_on and self.model:
51
- self.last_alert_time = current_time; self.alert_on = True
52
- return self._generate_audio_data(level)
53
- return None
54
-
55
- def get_alerter(config, api_key=None):
56
- if config.get('gemini_api', {}).get('enabled', False) and api_key: return GeminiAlertSystem(config, api_key)
57
- return FileAlertSystem(config)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/detection/__init__.py DELETED
File without changes
src/detection/__pycache__/__init__.cpython-312.pyc DELETED
Binary file (146 Bytes)
 
src/detection/__pycache__/base_processor.cpython-312.pyc DELETED
Binary file (1.12 kB)
 
src/detection/__pycache__/factory.cpython-312.pyc DELETED
Binary file (1.18 kB)
 
src/detection/base_processor.py DELETED
@@ -1,26 +0,0 @@
1
- # drive_paddy/detection/base_processor.py
2
- from abc import ABC, abstractmethod
3
-
4
- class BaseProcessor(ABC):
5
- """
6
- Abstract Base Class for a drowsiness detection processor.
7
-
8
- This defines the common interface that all detection strategies
9
- (e.g., Geometric, CNN Model) must follow.
10
- """
11
-
12
- @abstractmethod
13
- def process_frame(self, frame):
14
- """
15
- Processes a single video frame to detect drowsiness.
16
-
17
- Args:
18
- frame: The video frame (as a NumPy array) to process.
19
-
20
- Returns:
21
- A tuple containing:
22
- - The processed frame (NumPy array) with visualizations.
23
- - A boolean indicating if an alert should be triggered.
24
- """
25
- pass
26
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/detection/factory.py DELETED
@@ -1,22 +0,0 @@
1
- # drive_paddy/detection/factory.py
2
- from src.detection.strategies.geometric import GeometricProcessor
3
- from src.detection.strategies.cnn_model import CnnProcessor
4
- from src.detection.strategies.hybrid import HybridProcessor
5
-
6
- def get_detector(config):
7
- """
8
- Factory function to get the appropriate drowsiness detector.
9
- """
10
- strategy = config.get('detection_strategy', 'geometric')
11
-
12
- if strategy == 'geometric':
13
- print("Initializing Geometric drowsiness detector...")
14
- return GeometricProcessor(config)
15
- elif strategy == 'cnn_model':
16
- print("Initializing CNN Model drowsiness detector...")
17
- return CnnProcessor(config)
18
- elif strategy == 'hybrid':
19
- print("Initializing Hybrid (Geometric + CNN) drowsiness detector...")
20
- return HybridProcessor(config)
21
- else:
22
- raise ValueError(f"Unknown detection strategy: {strategy}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/detection/strategies/__init__.py DELETED
File without changes
src/detection/strategies/__pycache__/__init__.cpython-312.pyc DELETED
Binary file (157 Bytes)
 
src/detection/strategies/__pycache__/cnn_model.cpython-312.pyc DELETED
Binary file (5.3 kB)
 
src/detection/strategies/__pycache__/geometric.cpython-312.pyc DELETED
Binary file (7.89 kB)
 
src/detection/strategies/__pycache__/hybrid.cpython-312.pyc DELETED
Binary file (4.97 kB)
 
src/detection/strategies/cnn_model.py DELETED
@@ -1,93 +0,0 @@
1
- # drive_paddy/detection/strategies/cnn_model.py
2
- from src.detection.base_processor import BaseProcessor
3
- import numpy as np
4
- import torch
5
- import torchvision.transforms as transforms
6
- from torchvision.models import efficientnet_b7
7
- import cv2
8
- from PIL import Image
9
- import os
10
-
11
- class CnnProcessor(BaseProcessor):
12
- """
13
- Drowsiness detection using a pre-trained EfficientNet-B7 model.
14
- This version receives face landmarks from another processor instead of using dlib.
15
- """
16
- def __init__(self, config):
17
- self.settings = config['cnn_model_settings']
18
- self.model_path = self.settings['model_path']
19
- self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
20
-
21
- # dlib is no longer needed.
22
- # self.face_detector = dlib.get_frontal_face_detector()
23
-
24
- self.model = self._load_model()
25
-
26
- self.transform = transforms.Compose([
27
- transforms.Resize((224, 224)),
28
- transforms.ToTensor(),
29
- transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
30
- ])
31
-
32
- def _load_model(self):
33
- """Loads the EfficientNet-B7 model and custom weights."""
34
- if not os.path.exists(self.model_path):
35
- print(f"Error: Model file not found at {self.model_path}")
36
- return None
37
-
38
- try:
39
- model = efficientnet_b7()
40
- num_ftrs = model.classifier[1].in_features
41
- model.classifier[1] = torch.nn.Linear(num_ftrs, 2)
42
- model.load_state_dict(torch.load(self.model_path, map_location=self.device))
43
- model.to(self.device)
44
- model.eval()
45
- print(f"CNN Model '{self.model_path}' loaded successfully on {self.device}.")
46
- return model
47
- except Exception as e:
48
- print(f"Error loading CNN model: {e}")
49
- return None
50
-
51
- def process_frame(self, frame, face_landmarks=None):
52
- """
53
- Processes a frame using the CNN model with pre-supplied landmarks.
54
- """
55
- if self.model is None or face_landmarks is None:
56
- return frame, {"cnn_prediction": False}
57
-
58
- is_drowsy_prediction = False
59
- h, w, _ = frame.shape
60
-
61
- landmarks = face_landmarks[0].landmark
62
-
63
- # Calculate bounding box from landmarks
64
- x_coords = [lm.x * w for lm in landmarks]
65
- y_coords = [lm.y * h for lm in landmarks]
66
- x1, y1 = int(min(x_coords)), int(min(y_coords))
67
- x2, y2 = int(max(x_coords)), int(max(y_coords))
68
-
69
- # Add some padding to the bounding box
70
- padding = 10
71
- x1 = max(0, x1 - padding)
72
- y1 = max(0, y1 - padding)
73
- x2 = min(w, x2 + padding)
74
- y2 = min(h, y2 + padding)
75
-
76
- # Crop the face
77
- face_crop = frame[y1:y2, x1:x2]
78
-
79
- if face_crop.size > 0:
80
- pil_image = Image.fromarray(cv2.cvtColor(face_crop, cv2.COLOR_BGR2RGB))
81
- image_tensor = self.transform(pil_image).unsqueeze(0).to(self.device)
82
-
83
- with torch.no_grad():
84
- outputs = self.model(image_tensor)
85
- _, preds = torch.max(outputs, 1)
86
- if preds.item() == 1: # Assuming class 1 is 'drowsy'
87
- is_drowsy_prediction = True
88
-
89
- cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 255, 0), 2)
90
- label = "Drowsy" if is_drowsy_prediction else "Awake"
91
- cv2.putText(frame, f"CNN: {label}", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
92
-
93
- return frame, {"cnn_prediction": is_drowsy_prediction}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/detection/strategies/geometric.py DELETED
@@ -1,111 +0,0 @@
1
- # drive_paddy/detection/strategies/geometric.py
2
- import cv2
3
- import mediapipe as mp
4
- import numpy as np
5
- import math
6
- from ..base_processor import BaseProcessor
7
-
8
- # --- Helper Functions (No changes here) ---
9
- def calculate_ear(eye_landmarks, frame_shape):
10
- coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
11
- v1 = np.linalg.norm(coords[1] - coords[5]); v2 = np.linalg.norm(coords[2] - coords[4])
12
- h1 = np.linalg.norm(coords[0] - coords[3]); return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0
13
-
14
- def calculate_mar(mouth_landmarks, frame_shape):
15
- coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
16
- v1 = np.linalg.norm(coords[1] - coords[7]); v2 = np.linalg.norm(coords[2] - coords[6])
17
- v3 = np.linalg.norm(coords[3] - coords[5]); h1 = np.linalg.norm(coords[0] - coords[4])
18
- return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
19
-
20
- class GeometricProcessor(BaseProcessor):
21
- def __init__(self, config):
22
- self.settings = config['geometric_settings']
23
- self.face_mesh = mp.solutions.face_mesh.FaceMesh(max_num_faces=1, refine_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5)
24
- self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
25
- self.L_EYE = [362, 385, 387, 263, 373, 380]; self.R_EYE = [33, 160, 158, 133, 153, 144]
26
- self.MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
27
-
28
- def process_frame(self, frame):
29
- # Create a writable copy to prevent read-only errors from Gradio/OpenCV
30
- frame = frame.copy()
31
-
32
- h, w, _ = frame.shape
33
- gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
34
- brightness = np.mean(gray)
35
- is_low_light = brightness < self.settings['low_light_thresh']
36
-
37
- drowsiness_indicators = {
38
- "drowsiness_level": "Awake", "lighting": "Good", "details": {}
39
- }
40
- face_landmarks = None
41
-
42
- if not is_low_light:
43
- img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
44
- results = self.face_mesh.process(img_rgb)
45
- face_landmarks = results.multi_face_landmarks
46
-
47
- if face_landmarks:
48
- landmarks = face_landmarks[0].landmark
49
- score = 0
50
- weights = self.settings['indicator_weights']
51
-
52
- # --- Draw Facial Landmarks (Logic Added Back) ---
53
- # This will draw the green dots for eyes and mouth to show what is being tracked.
54
- eye_mouth_landmarks_indices = self.L_EYE + self.R_EYE + self.MOUTH
55
- for idx in eye_mouth_landmarks_indices:
56
- lm = landmarks[idx]
57
- x, y = int(lm.x * w), int(lm.y * h)
58
- cv2.circle(frame, (x, y), 1, (0, 255, 0), -1)
59
-
60
- # --- Drowsiness Calculations ---
61
- ear = (calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) + calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)))/2.0
62
- if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
63
- else: self.counters['eye_closure']=0
64
- if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure']
65
-
66
- mar = calculate_mar([landmarks[i] for i in self.MOUTH], (h, w))
67
- if mar > self.settings['yawn_mar_thresh']: self.counters['yawning']+=1
68
- else: self.counters['yawning']=0
69
- if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
70
-
71
- face_3d = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float64)
72
- face_2d = np.array([(landmarks[1].x*w,landmarks[1].y*h),(landmarks[152].x*w,landmarks[152].y*h),(landmarks[263].x*w,landmarks[263].y*h),(landmarks[33].x*w,landmarks[33].y*h),(landmarks[287].x*w,landmarks[287].y*h),(landmarks[57].x*w,landmarks[57].y*h)],dtype=np.float64)
73
- cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float64)
74
- _, rot_vec, _ = cv2.solvePnP(face_3d, face_2d, cam_matrix, np.zeros((4,1),dtype=np.float64))
75
- rmat, _ = cv2.Rodrigues(rot_vec); angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
76
- pitch, yaw = angles[0], angles[1]
77
-
78
- if pitch > self.settings['head_nod_thresh']: self.counters['head_nod']+=1
79
- else: self.counters['head_nod']=0
80
- if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']: score += weights['head_nod']
81
-
82
- if abs(yaw) > self.settings['head_look_away_thresh']: self.counters['looking_away']+=1
83
- else: self.counters['looking_away']=0
84
- if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']: score += weights['looking_away']
85
-
86
- levels = self.settings['drowsiness_levels']
87
- if score >= levels['very_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
88
- elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
89
-
90
- drowsiness_indicators['details']['Score'] = score
91
- else: # is_low_light is True
92
- drowsiness_indicators["lighting"] = "Low"
93
-
94
- # --- Visualization on Video Frame ---
95
- level = drowsiness_indicators['drowsiness_level']
96
- score_val = drowsiness_indicators.get("details", {}).get("Score", 0)
97
- color = (0, 255, 0) # Green for Awake
98
-
99
- if drowsiness_indicators['lighting'] == "Low":
100
- color = (0, 165, 255) # Orange for low light
101
- cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA)
102
- elif level == "Slightly Drowsy":
103
- color = (0, 255, 255) # Yellow
104
- elif level == "Very Drowsy":
105
- color = (0, 0, 255) # Red
106
-
107
- cv2.rectangle(frame, (0, 0), (w, h), color, 10)
108
- status_text = f"Status: {level} (Score: {score_val:.2f})"
109
- cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
110
-
111
- return frame, drowsiness_indicators, face_landmarks
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/detection/strategies/hybrid.py DELETED
@@ -1,84 +0,0 @@
1
- # drive_paddy/detection/strategies/hybrid.py
2
- from src.detection.base_processor import BaseProcessor
3
- from src.detection.strategies.geometric import GeometricProcessor
4
- from src.detection.strategies.cnn_model import CnnProcessor
5
- import cv2
6
- import concurrent.futures
7
-
8
- class HybridProcessor(BaseProcessor):
9
- """
10
- Combines outputs from multiple detection strategies (Geometric and CNN)
11
- concurrently to make a more robust and efficient drowsiness decision.
12
- This version includes frame skipping for the CNN model to improve performance.
13
- """
14
- def __init__(self, config):
15
- self.geometric_processor = GeometricProcessor(config)
16
- self.cnn_processor = CnnProcessor(config)
17
- self.weights = config['hybrid_settings']['weights']
18
- self.alert_threshold = config['hybrid_settings']['alert_threshold']
19
- self.active_alerts = {}
20
-
21
- # --- Performance Optimization ---
22
- self.frame_counter = 0
23
- self.cnn_process_interval = 10 # Run CNN every 10 frames
24
- self.last_cnn_indicators = {"cnn_prediction": False} # Cache the last CNN result
25
-
26
- self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
27
-
28
- def process_frame(self, frame):
29
- self.frame_counter += 1
30
-
31
- # --- Concurrent Execution ---
32
- # The geometric processor runs on every frame.
33
- geo_future = self.executor.submit(self.geometric_processor.process_frame, frame.copy())
34
-
35
- # The CNN processor only runs on specified intervals.
36
- if self.frame_counter % self.cnn_process_interval == 0:
37
- cnn_future = self.executor.submit(self.cnn_processor.process_frame, frame.copy())
38
-
39
- # Get the result from the geometric processor.
40
- geo_frame, geo_indicators = geo_future.result()
41
-
42
- # Get the CNN result if it was run, otherwise use the cached result.
43
- if self.frame_counter % self.cnn_process_interval == 0:
44
- _, self.last_cnn_indicators = cnn_future.result()
45
-
46
- cnn_indicators = self.last_cnn_indicators
47
-
48
- # Calculate weighted drowsiness score from the combined results.
49
- score = 0
50
- self.active_alerts.clear()
51
-
52
- if geo_indicators.get("eye_closure"):
53
- score += self.weights['eye_closure']
54
- self.active_alerts['Eyes Closed'] = geo_indicators['details'].get('EAR', 0)
55
- if geo_indicators.get("yawning"):
56
- score += self.weights['yawning']
57
- self.active_alerts['Yawning'] = geo_indicators['details'].get('MAR', 0)
58
- if geo_indicators.get("head_nod"):
59
- score += self.weights['head_nod']
60
- self.active_alerts['Head Nod'] = geo_indicators['details'].get('Pitch', 0)
61
- if geo_indicators.get("looking_away"):
62
- score += self.weights['looking_away']
63
- self.active_alerts['Looking Away'] = geo_indicators['details'].get('Yaw', 0)
64
- if cnn_indicators.get("cnn_prediction"):
65
- score += self.weights['cnn_prediction']
66
- self.active_alerts['CNN Alert'] = 'Active'
67
-
68
- # --- Visualization ---
69
- output_frame = geo_frame
70
- y_pos = 30
71
- for alert, value in self.active_alerts.items():
72
- text = f"{alert}: {value:.2f}" if isinstance(value, float) else alert
73
- cv2.putText(output_frame, text, (10, y_pos), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 255), 2)
74
- y_pos += 25
75
-
76
- cv2.putText(output_frame, f"Score: {score:.2f}", (output_frame.shape[1] - 150, 30),
77
- cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
78
-
79
- alert_triggered = score >= self.alert_threshold
80
- if alert_triggered:
81
- cv2.rectangle(output_frame, (0, 0), (output_frame.shape[1], output_frame.shape[0]), (0, 0, 255), 5)
82
-
83
- # Return the processed frame, the alert trigger, and the active alert details
84
- return output_frame, alert_triggered, self.active_alerts
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
utils.py DELETED
@@ -1,76 +0,0 @@
1
- # utils.py
2
-
3
- import numpy as np
4
- import cv2
5
- # Removed: import random, string, generate_gibberish
6
-
7
- # Function to calculate Eye Aspect Ratio (EAR)
8
- def calculate_ear(eye_landmarks, frame_shape):
9
- """
10
- Calculates the Eye Aspect Ratio (EAR) for a given eye.
11
-
12
- Args:
13
- eye_landmarks: A list of 6 MediaPipe landmark objects for the eye.
14
- Expected order: [p1, p2, p3, p4, p5, p6]
15
- where p1, p4 are horizontal extremes, and p2, p3, p5, p6
16
- are vertical extremes.
17
- frame_shape: Tuple (height, width) of the frame.
18
-
19
- Returns:
20
- The calculated EAR value.
21
- """
22
- if len(eye_landmarks) != 6:
23
- # print("Warning: Expected 6 eye landmarks, but received", len(eye_landmarks)) # Optional warning
24
- return 0.0 # Return 0 or handle error appropriately
25
-
26
- # Convert MediaPipe landmarks to numpy array (pixel coordinates)
27
- coords = np.array([(landmark.x * frame_shape[1], landmark.y * frame_shape[0])
28
- for landmark in eye_landmarks])
29
-
30
- # Calculate the Euclidean distances between the two sets of vertical eye landmarks
31
- # p2-p6 and p3-p5
32
- vertical_dist1 = np.linalg.norm(coords[1] - coords[5])
33
- vertical_dist2 = np.linalg.norm(coords[2] - coords[4])
34
-
35
- # Calculate the Euclidean distance between the horizontal eye landmark
36
- # p1-p4
37
- horizontal_dist = np.linalg.norm(coords[0] - coords[3])
38
-
39
- # Calculate the EAR
40
- # Avoid division by zero
41
- if horizontal_dist == 0:
42
- return 0.0
43
-
44
- ear = (vertical_dist1 + vertical_dist2) / (2.0 * horizontal_dist)
45
-
46
- return ear
47
-
48
- def draw_landmarks(image, landmarks, connections=None, point_color=(0, 255, 0), connection_color=(255, 255, 255)):
49
- """
50
- Draws landmarks and connections on the image.
51
-
52
- Args:
53
- image: The image (numpy array) to draw on.
54
- landmarks: A list of MediaPipe landmark objects.
55
- connections: A list of tuples representing landmark connections (e.g., [(0, 1), (1, 2)]).
56
- point_color: Color for the landmarks (BGR tuple).
57
- connection_color: Color for the connections (BGR tuple).
58
- """
59
- if not landmarks:
60
- return image
61
-
62
- img_h, img_w, _ = image.shape
63
- landmark_points = [(int(l.x * img_w), int(l.y * img_h)) for l in landmarks]
64
-
65
- # Draw connections
66
- if connections:
67
- for connection in connections:
68
- p1 = landmark_points[connection[0]]
69
- p2 = landmark_points[connection[1]]
70
- cv2.line(image, p1, p2, connection_color, 1)
71
-
72
- # Draw points
73
- for point in landmark_points:
74
- cv2.circle(image, point, 2, point_color, -1)
75
-
76
- return image