Spaces:
Sleeping
Sleeping
Update src/detection/strategies/geometric.py
Browse files
src/detection/strategies/geometric.py
CHANGED
@@ -6,8 +6,6 @@ from src.detection.base_processor import BaseProcessor
|
|
6 |
|
7 |
# --- Helper Functions (Unchanged) ---
|
8 |
def calculate_ear(eye_landmarks, frame_shape):
|
9 |
-
"""Calculates the Eye Aspect Ratio for a single eye."""
|
10 |
-
# Note: frame_shape is (height, width)
|
11 |
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
|
12 |
v1 = np.linalg.norm(coords[1] - coords[5])
|
13 |
v2 = np.linalg.norm(coords[2] - coords[4])
|
@@ -15,7 +13,6 @@ def calculate_ear(eye_landmarks, frame_shape):
|
|
15 |
return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0
|
16 |
|
17 |
def calculate_mar(mouth_landmarks, frame_shape):
|
18 |
-
"""Calculates the Mouth Aspect Ratio."""
|
19 |
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
|
20 |
v1 = np.linalg.norm(coords[1] - coords[7])
|
21 |
v2 = np.linalg.norm(coords[2] - coords[6])
|
@@ -24,75 +21,68 @@ def calculate_mar(mouth_landmarks, frame_shape):
|
|
24 |
return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
|
25 |
|
26 |
class GeometricProcessor(BaseProcessor):
|
27 |
-
# Landmark indices
|
28 |
L_EYE = [362, 385, 387, 263, 373, 380]
|
29 |
R_EYE = [33, 160, 158, 133, 153, 144]
|
30 |
MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
|
31 |
-
|
32 |
-
# Landmark indices for Head Pose Estimation
|
33 |
-
HEAD_POSE_LANDMARKS = [1, 152, 263, 33, 287, 57] # Nose tip, Chin, Left eye left corner, Right eye right corner, Left mouth corner, Right mouth corner
|
34 |
|
35 |
def __init__(self, config):
|
36 |
self.settings = config['geometric_settings']
|
37 |
self.face_mesh = mp.solutions.face_mesh.FaceMesh(
|
38 |
max_num_faces=1,
|
39 |
-
refine_landmarks=False,
|
40 |
min_detection_confidence=0.5,
|
41 |
min_tracking_confidence=0.5)
|
42 |
|
43 |
self.downscale_factor = self.settings.get('downscale_factor', 0.35)
|
44 |
self.default_skip = max(1, self.settings.get("skip_frames", 2))
|
45 |
-
|
46 |
-
#
|
47 |
self.frame_counter = 0
|
48 |
-
# Initialize with safe defaults
|
49 |
self.last_indicators = {"drowsiness_level": "Initializing...", "lighting": "Good", "details": {}}
|
50 |
self.last_landmarks = None
|
51 |
-
self.last_drawn_frame = None
|
52 |
-
|
53 |
-
# Drowsiness event counters
|
54 |
self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
|
55 |
-
|
56 |
-
# Pre-allocated buffer for solvePnP
|
57 |
self.zeros_4x1 = np.zeros((4, 1), np.float32)
|
58 |
|
59 |
-
|
|
|
60 |
self.frame_counter += 1
|
61 |
|
62 |
-
#
|
63 |
-
# Adaptive skipping: process more frequently if drowsiness is detected.
|
64 |
last_level = self.last_indicators.get("drowsiness_level", "Awake")
|
65 |
skip_n = 1 if last_level != "Awake" else self.default_skip
|
66 |
|
67 |
if self.frame_counter % skip_n != 0:
|
68 |
-
# If we have a cached frame, return it
|
69 |
-
if self.last_drawn_frame is not None:
|
70 |
return self.last_drawn_frame, self.last_indicators
|
71 |
-
#
|
|
|
|
|
|
|
72 |
else:
|
73 |
return frame.copy(), self.last_indicators
|
74 |
|
75 |
-
# ---
|
76 |
original_frame = frame.copy()
|
77 |
h_orig, w_orig, _ = original_frame.shape
|
78 |
|
79 |
-
# Optimization: Downscale frame for faster processing
|
80 |
small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor, interpolation=cv2.INTER_AREA)
|
81 |
h, w, _ = small_frame.shape
|
82 |
|
83 |
-
# All processing is done on the `small_frame` for speed.
|
84 |
gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
|
85 |
brightness = np.mean(gray)
|
86 |
-
|
87 |
drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}}
|
88 |
face_landmarks_data = None
|
89 |
|
90 |
if brightness < self.settings['low_light_thresh']:
|
91 |
drowsiness_indicators["lighting"] = "Low"
|
92 |
else:
|
93 |
-
# Convert the SMALL frame to RGB for MediaPipe
|
94 |
img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
|
95 |
-
img_rgb.flags.writeable = False
|
96 |
results = self.face_mesh.process(img_rgb)
|
97 |
img_rgb.flags.writeable = True
|
98 |
|
@@ -102,11 +92,8 @@ class GeometricProcessor(BaseProcessor):
|
|
102 |
score = 0
|
103 |
weights = self.settings['indicator_weights']
|
104 |
|
105 |
-
# --- Drowsiness Calculations
|
106 |
-
|
107 |
-
ear_right = calculate_ear([landmarks[i] for i in self.R_EYE],(h,w))
|
108 |
-
ear = (ear_left + ear_right) / 2.0
|
109 |
-
|
110 |
if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
|
111 |
else: self.counters['eye_closure']=0
|
112 |
if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure']
|
@@ -116,24 +103,15 @@ class GeometricProcessor(BaseProcessor):
|
|
116 |
else: self.counters['yawning']=0
|
117 |
if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
|
118 |
|
119 |
-
# --- Head Pose Estimation
|
120 |
-
face_3d_model = np.array([
|
121 |
-
|
122 |
-
|
123 |
-
[-225.0, 170.0, -135.0], # Left eye left corner
|
124 |
-
[225.0, 170.0, -135.0], # Right eye right corner
|
125 |
-
[-150.0, -150.0, -125.0], # Left Mouth corner
|
126 |
-
[150.0, -150.0, -125.0] # Right mouth corner
|
127 |
-
], dtype=np.float32)
|
128 |
-
|
129 |
-
face_2d_points = np.array([(landmarks[i].x * w, landmarks[i].y * h) for i in self.HEAD_POSE_LANDMARKS], dtype=np.float32)
|
130 |
-
cam_matrix = np.array([[w, 0, w/2], [0, w, h/2], [0, 0, 1]], dtype=np.float32)
|
131 |
-
|
132 |
_, rvec, _ = cv2.solvePnP(face_3d_model, face_2d_points, cam_matrix, self.zeros_4x1, flags=cv2.SOLVEPNP_EPNP)
|
133 |
rmat, _ = cv2.Rodrigues(rvec)
|
134 |
angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
|
135 |
pitch, yaw = angles[0], angles[1]
|
136 |
-
|
137 |
if pitch > self.settings['head_nod_thresh']: self.counters['head_nod']+=1
|
138 |
else: self.counters['head_nod']=0
|
139 |
if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']: score += weights['head_nod']
|
@@ -142,48 +120,40 @@ class GeometricProcessor(BaseProcessor):
|
|
142 |
else: self.counters['looking_away']=0
|
143 |
if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']: score += weights['looking_away']
|
144 |
|
145 |
-
# Determine final drowsiness level based on score
|
146 |
levels = self.settings['drowsiness_levels']
|
147 |
-
if score >= levels['very_drowsy_threshold']:
|
148 |
-
|
149 |
-
elif score >= levels['slightly_drowsy_threshold']:
|
150 |
-
drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
|
151 |
-
|
152 |
drowsiness_indicators['details']['Score'] = score
|
153 |
-
|
154 |
-
# --- Update state for next frame (skipped or processed) ---
|
155 |
self.last_indicators = drowsiness_indicators
|
156 |
self.last_landmarks = face_landmarks_data
|
157 |
|
158 |
-
# ---
|
159 |
-
|
160 |
-
|
161 |
-
|
162 |
-
|
163 |
-
|
164 |
-
|
165 |
-
|
166 |
|
167 |
def draw_visuals(self, frame, indicators, landmarks_data=None):
|
168 |
-
"""Helper function to draw all visualizations on the frame."""
|
169 |
h, w, _ = frame.shape
|
170 |
level = indicators['drowsiness_level']
|
171 |
score_val = indicators.get("details", {}).get("Score", 0)
|
172 |
-
color = (0, 255, 0)
|
173 |
|
174 |
if indicators['lighting'] == "Low":
|
175 |
-
color = (0, 165, 255)
|
176 |
cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA)
|
177 |
-
elif level == "Slightly Drowsy": color = (0, 255, 255)
|
178 |
-
elif level == "Very Drowsy": color = (0, 0, 255)
|
179 |
|
180 |
-
# Draw landmarks if they were detected
|
181 |
if landmarks_data:
|
182 |
landmarks = landmarks_data.landmark
|
183 |
eye_mouth_landmarks_indices = self.L_EYE + self.R_EYE + self.MOUTH
|
184 |
for idx in eye_mouth_landmarks_indices:
|
185 |
lm = landmarks[idx]
|
186 |
-
# Scale landmark coordinates to the full-sized frame
|
187 |
x, y = int(lm.x * w), int(lm.y * h)
|
188 |
cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)
|
189 |
|
@@ -192,111 +162,3 @@ class GeometricProcessor(BaseProcessor):
|
|
192 |
cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
|
193 |
|
194 |
return frame
|
195 |
-
|
196 |
-
def analyse_frame(self, frame):
|
197 |
-
# self.frame_counter += 1
|
198 |
-
|
199 |
-
# --- FIX: More efficient frame skipping ---
|
200 |
-
# # Adaptive skipping: process more frequently if drowsiness is detected.
|
201 |
-
# last_level = self.last_indicators.get("drowsiness_level", "Awake")
|
202 |
-
# skip_n = 1 if last_level != "Awake" else self.default_skip
|
203 |
-
|
204 |
-
# if self.frame_counter % skip_n != 0:
|
205 |
-
# # If we have a cached frame, return it to avoid re-drawing.
|
206 |
-
# if self.last_drawn_frame is not None:
|
207 |
-
# return self.last_drawn_frame, self.last_indicators
|
208 |
-
# # Fallback if the first frame was skipped (unlikely but safe)
|
209 |
-
# else:
|
210 |
-
# return frame.copy(), self.last_indicators
|
211 |
-
|
212 |
-
# --- CORE FRAME PROCESSING ---
|
213 |
-
original_frame = frame.copy()
|
214 |
-
h_orig, w_orig, _ = original_frame.shape
|
215 |
-
|
216 |
-
# Optimization: Downscale frame for faster processing
|
217 |
-
small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor, interpolation=cv2.INTER_AREA)
|
218 |
-
h, w, _ = small_frame.shape
|
219 |
-
|
220 |
-
# All processing is done on the `small_frame` for speed.
|
221 |
-
gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
|
222 |
-
brightness = np.mean(gray)
|
223 |
-
|
224 |
-
drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}}
|
225 |
-
face_landmarks_data = None
|
226 |
-
|
227 |
-
if brightness < self.settings['low_light_thresh']:
|
228 |
-
drowsiness_indicators["lighting"] = "Low"
|
229 |
-
else:
|
230 |
-
# Convert the SMALL frame to RGB for MediaPipe
|
231 |
-
img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
|
232 |
-
img_rgb.flags.writeable = False # Performance enhancement
|
233 |
-
results = self.face_mesh.process(img_rgb)
|
234 |
-
img_rgb.flags.writeable = True
|
235 |
-
|
236 |
-
if results.multi_face_landmarks:
|
237 |
-
face_landmarks_data = results.multi_face_landmarks[0]
|
238 |
-
landmarks = face_landmarks_data.landmark
|
239 |
-
score = 0
|
240 |
-
weights = self.settings['indicator_weights']
|
241 |
-
|
242 |
-
# --- Drowsiness Calculations (on small frame dimensions 'h', 'w') ---
|
243 |
-
ear_left = calculate_ear([landmarks[i] for i in self.L_EYE],(h,w))
|
244 |
-
ear_right = calculate_ear([landmarks[i] for i in self.R_EYE],(h,w))
|
245 |
-
ear = (ear_left + ear_right) / 2.0
|
246 |
-
|
247 |
-
if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
|
248 |
-
else: self.counters['eye_closure']=0
|
249 |
-
if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure']
|
250 |
-
|
251 |
-
mar = calculate_mar([landmarks[i] for i in self.MOUTH], (h, w))
|
252 |
-
if mar > self.settings['yawn_mar_thresh']: self.counters['yawning']+=1
|
253 |
-
else: self.counters['yawning']=0
|
254 |
-
if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
|
255 |
-
|
256 |
-
# --- Head Pose Estimation (on small frame dimensions 'h', 'w') ---
|
257 |
-
face_3d_model = np.array([
|
258 |
-
[0.0, 0.0, 0.0], # Nose tip
|
259 |
-
[0.0, -330.0, -65.0], # Chin
|
260 |
-
[-225.0, 170.0, -135.0], # Left eye left corner
|
261 |
-
[225.0, 170.0, -135.0], # Right eye right corner
|
262 |
-
[-150.0, -150.0, -125.0], # Left Mouth corner
|
263 |
-
[150.0, -150.0, -125.0] # Right mouth corner
|
264 |
-
], dtype=np.float32)
|
265 |
-
|
266 |
-
face_2d_points = np.array([(landmarks[i].x * w, landmarks[i].y * h) for i in self.HEAD_POSE_LANDMARKS], dtype=np.float32)
|
267 |
-
cam_matrix = np.array([[w, 0, w/2], [0, w, h/2], [0, 0, 1]], dtype=np.float32)
|
268 |
-
|
269 |
-
_, rvec, _ = cv2.solvePnP(face_3d_model, face_2d_points, cam_matrix, self.zeros_4x1, flags=cv2.SOLVEPNP_EPNP)
|
270 |
-
rmat, _ = cv2.Rodrigues(rvec)
|
271 |
-
angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
|
272 |
-
pitch, yaw = angles[0], angles[1]
|
273 |
-
|
274 |
-
if pitch > self.settings['head_nod_thresh']: self.counters['head_nod']+=1
|
275 |
-
else: self.counters['head_nod']=0
|
276 |
-
if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']: score += weights['head_nod']
|
277 |
-
|
278 |
-
if abs(yaw) > self.settings['head_look_away_thresh']: self.counters['looking_away']+=1
|
279 |
-
else: self.counters['looking_away']=0
|
280 |
-
if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']: score += weights['looking_away']
|
281 |
-
|
282 |
-
# Determine final drowsiness level based on score
|
283 |
-
levels = self.settings['drowsiness_levels']
|
284 |
-
if score >= levels['very_drowsy_threshold']:
|
285 |
-
drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
|
286 |
-
elif score >= levels['slightly_drowsy_threshold']:
|
287 |
-
drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
|
288 |
-
|
289 |
-
drowsiness_indicators['details']['Score'] = score
|
290 |
-
|
291 |
-
# --- Update state for next frame (skipped or processed) ---
|
292 |
-
self.last_indicators = drowsiness_indicators
|
293 |
-
self.last_landmarks = face_landmarks_data
|
294 |
-
|
295 |
-
# --- Draw visuals on the ORIGINAL frame for high-quality output ---
|
296 |
-
# processed_frame = self.draw_visuals(original_frame, drowsiness_indicators, face_landmarks_data)
|
297 |
-
|
298 |
-
# --- FIX: Cache the newly drawn frame ---
|
299 |
-
# self.last_drawn_frame = processed_frame
|
300 |
-
|
301 |
-
# --- FIX: Return only the two values expected by the Gradio app ---
|
302 |
-
return drowsiness_indicators
|
|
|
6 |
|
7 |
# --- Helper Functions (Unchanged) ---
|
8 |
def calculate_ear(eye_landmarks, frame_shape):
|
|
|
|
|
9 |
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
|
10 |
v1 = np.linalg.norm(coords[1] - coords[5])
|
11 |
v2 = np.linalg.norm(coords[2] - coords[4])
|
|
|
13 |
return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0
|
14 |
|
15 |
def calculate_mar(mouth_landmarks, frame_shape):
|
|
|
16 |
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
|
17 |
v1 = np.linalg.norm(coords[1] - coords[7])
|
18 |
v2 = np.linalg.norm(coords[2] - coords[6])
|
|
|
21 |
return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
|
22 |
|
23 |
class GeometricProcessor(BaseProcessor):
|
24 |
+
# Landmark indices
|
25 |
L_EYE = [362, 385, 387, 263, 373, 380]
|
26 |
R_EYE = [33, 160, 158, 133, 153, 144]
|
27 |
MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
|
28 |
+
HEAD_POSE_LANDMARKS = [1, 152, 263, 33, 287, 57]
|
|
|
|
|
29 |
|
30 |
def __init__(self, config):
|
31 |
self.settings = config['geometric_settings']
|
32 |
self.face_mesh = mp.solutions.face_mesh.FaceMesh(
|
33 |
max_num_faces=1,
|
34 |
+
refine_landmarks=False,
|
35 |
min_detection_confidence=0.5,
|
36 |
min_tracking_confidence=0.5)
|
37 |
|
38 |
self.downscale_factor = self.settings.get('downscale_factor', 0.35)
|
39 |
self.default_skip = max(1, self.settings.get("skip_frames", 2))
|
40 |
+
|
41 |
+
# Caching and state variables
|
42 |
self.frame_counter = 0
|
|
|
43 |
self.last_indicators = {"drowsiness_level": "Initializing...", "lighting": "Good", "details": {}}
|
44 |
self.last_landmarks = None
|
45 |
+
self.last_drawn_frame = None
|
|
|
|
|
46 |
self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
|
|
|
|
|
47 |
self.zeros_4x1 = np.zeros((4, 1), np.float32)
|
48 |
|
49 |
+
# --- FIX: Combined process_frame and analyse_frame into one method ---
|
50 |
+
def process_frame(self, frame, draw_visuals=True):
|
51 |
self.frame_counter += 1
|
52 |
|
53 |
+
# Adaptive frame skipping logic
|
|
|
54 |
last_level = self.last_indicators.get("drowsiness_level", "Awake")
|
55 |
skip_n = 1 if last_level != "Awake" else self.default_skip
|
56 |
|
57 |
if self.frame_counter % skip_n != 0:
|
58 |
+
# If we are drawing visuals and have a cached frame, return it.
|
59 |
+
if draw_visuals and self.last_drawn_frame is not None:
|
60 |
return self.last_drawn_frame, self.last_indicators
|
61 |
+
# If we are not drawing, we can just return the last indicators.
|
62 |
+
elif not draw_visuals:
|
63 |
+
return None, self.last_indicators
|
64 |
+
# Fallback for the first frame
|
65 |
else:
|
66 |
return frame.copy(), self.last_indicators
|
67 |
|
68 |
+
# --- Core Frame Analysis (This part runs for both modes) ---
|
69 |
original_frame = frame.copy()
|
70 |
h_orig, w_orig, _ = original_frame.shape
|
71 |
|
|
|
72 |
small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor, interpolation=cv2.INTER_AREA)
|
73 |
h, w, _ = small_frame.shape
|
74 |
|
|
|
75 |
gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
|
76 |
brightness = np.mean(gray)
|
77 |
+
|
78 |
drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}}
|
79 |
face_landmarks_data = None
|
80 |
|
81 |
if brightness < self.settings['low_light_thresh']:
|
82 |
drowsiness_indicators["lighting"] = "Low"
|
83 |
else:
|
|
|
84 |
img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
|
85 |
+
img_rgb.flags.writeable = False
|
86 |
results = self.face_mesh.process(img_rgb)
|
87 |
img_rgb.flags.writeable = True
|
88 |
|
|
|
92 |
score = 0
|
93 |
weights = self.settings['indicator_weights']
|
94 |
|
95 |
+
# --- Drowsiness Calculations ---
|
96 |
+
ear = (calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) + calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)))/2.0
|
|
|
|
|
|
|
97 |
if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
|
98 |
else: self.counters['eye_closure']=0
|
99 |
if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure']
|
|
|
103 |
else: self.counters['yawning']=0
|
104 |
if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
|
105 |
|
106 |
+
# --- Head Pose Estimation ---
|
107 |
+
face_3d_model = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float32)
|
108 |
+
face_2d_points = np.array([(landmarks[i].x*w,landmarks[i].y*h) for i in self.HEAD_POSE_LANDMARKS],dtype=np.float32)
|
109 |
+
cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float32)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
110 |
_, rvec, _ = cv2.solvePnP(face_3d_model, face_2d_points, cam_matrix, self.zeros_4x1, flags=cv2.SOLVEPNP_EPNP)
|
111 |
rmat, _ = cv2.Rodrigues(rvec)
|
112 |
angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
|
113 |
pitch, yaw = angles[0], angles[1]
|
114 |
+
|
115 |
if pitch > self.settings['head_nod_thresh']: self.counters['head_nod']+=1
|
116 |
else: self.counters['head_nod']=0
|
117 |
if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']: score += weights['head_nod']
|
|
|
120 |
else: self.counters['looking_away']=0
|
121 |
if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']: score += weights['looking_away']
|
122 |
|
|
|
123 |
levels = self.settings['drowsiness_levels']
|
124 |
+
if score >= levels['very_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
|
125 |
+
elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
|
|
|
|
|
|
|
126 |
drowsiness_indicators['details']['Score'] = score
|
127 |
+
|
|
|
128 |
self.last_indicators = drowsiness_indicators
|
129 |
self.last_landmarks = face_landmarks_data
|
130 |
|
131 |
+
# --- FIX: Conditional Drawing ---
|
132 |
+
if draw_visuals:
|
133 |
+
processed_frame = self.draw_visuals(original_frame, drowsiness_indicators, face_landmarks_data)
|
134 |
+
self.last_drawn_frame = processed_frame
|
135 |
+
return processed_frame, drowsiness_indicators
|
136 |
+
else:
|
137 |
+
# For analysis-only, return None for the frame to satisfy the unpack
|
138 |
+
return None, drowsiness_indicators
|
139 |
|
140 |
def draw_visuals(self, frame, indicators, landmarks_data=None):
|
|
|
141 |
h, w, _ = frame.shape
|
142 |
level = indicators['drowsiness_level']
|
143 |
score_val = indicators.get("details", {}).get("Score", 0)
|
144 |
+
color = (0, 255, 0)
|
145 |
|
146 |
if indicators['lighting'] == "Low":
|
147 |
+
color = (0, 165, 255)
|
148 |
cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA)
|
149 |
+
elif level == "Slightly Drowsy": color = (0, 255, 255)
|
150 |
+
elif level == "Very Drowsy": color = (0, 0, 255)
|
151 |
|
|
|
152 |
if landmarks_data:
|
153 |
landmarks = landmarks_data.landmark
|
154 |
eye_mouth_landmarks_indices = self.L_EYE + self.R_EYE + self.MOUTH
|
155 |
for idx in eye_mouth_landmarks_indices:
|
156 |
lm = landmarks[idx]
|
|
|
157 |
x, y = int(lm.x * w), int(lm.y * h)
|
158 |
cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)
|
159 |
|
|
|
162 |
cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
|
163 |
|
164 |
return frame
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|