Spaces:
Sleeping
Sleeping
Update src/detection/strategies/geometric.py
Browse files
src/detection/strategies/geometric.py
CHANGED
@@ -5,43 +5,76 @@ import numpy as np
|
|
5 |
import math
|
6 |
from ..base_processor import BaseProcessor
|
7 |
|
8 |
-
# --- Helper Functions (
|
9 |
def calculate_ear(eye_landmarks, frame_shape):
|
10 |
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
|
11 |
-
v1 = np.linalg.norm(coords[1] - coords[5])
|
12 |
-
|
|
|
|
|
13 |
|
14 |
def calculate_mar(mouth_landmarks, frame_shape):
|
15 |
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
|
16 |
-
v1 = np.linalg.norm(coords[1] - coords[7])
|
17 |
-
|
|
|
|
|
18 |
return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
|
19 |
|
20 |
class GeometricProcessor(BaseProcessor):
|
21 |
def __init__(self, config):
|
22 |
self.settings = config['geometric_settings']
|
23 |
-
self.face_mesh = mp.solutions.face_mesh.FaceMesh(
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
24 |
self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
|
25 |
-
self.L_EYE = [362, 385, 387, 263, 373, 380]
|
|
|
26 |
self.MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
|
27 |
|
28 |
def process_frame(self, frame):
|
29 |
-
|
30 |
-
|
31 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
32 |
|
33 |
-
|
34 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
35 |
brightness = np.mean(gray)
|
36 |
is_low_light = brightness < self.settings['low_light_thresh']
|
37 |
|
38 |
-
drowsiness_indicators = {
|
39 |
-
"drowsiness_level": "Awake", "lighting": "Good", "details": {}
|
40 |
-
}
|
41 |
face_landmarks = None
|
42 |
|
43 |
if not is_low_light:
|
44 |
-
|
|
|
45 |
results = self.face_mesh.process(img_rgb)
|
46 |
face_landmarks = results.multi_face_landmarks
|
47 |
|
@@ -50,14 +83,7 @@ class GeometricProcessor(BaseProcessor):
|
|
50 |
score = 0
|
51 |
weights = self.settings['indicator_weights']
|
52 |
|
53 |
-
# ---
|
54 |
-
eye_mouth_landmarks = self.L_EYE + self.R_EYE + self.MOUTH
|
55 |
-
for idx in eye_mouth_landmarks:
|
56 |
-
lm = landmarks[idx]
|
57 |
-
x, y = int(lm.x * w), int(lm.y * h)
|
58 |
-
cv2.circle(frame, (x, y), 1, (0, 255, 0), -1)
|
59 |
-
|
60 |
-
# --- Drowsiness Calculations ---
|
61 |
ear = (calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) + calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)))/2.0
|
62 |
if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
|
63 |
else: self.counters['eye_closure']=0
|
@@ -68,6 +94,7 @@ class GeometricProcessor(BaseProcessor):
|
|
68 |
else: self.counters['yawning']=0
|
69 |
if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
|
70 |
|
|
|
71 |
face_3d = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float64)
|
72 |
face_2d = np.array([(landmarks[1].x*w,landmarks[1].y*h),(landmarks[152].x*w,landmarks[152].y*h),(landmarks[263].x*w,landmarks[263].y*h),(landmarks[33].x*w,landmarks[33].y*h),(landmarks[287].x*w,landmarks[287].y*h),(landmarks[57].x*w,landmarks[57].y*h)],dtype=np.float64)
|
73 |
cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float64)
|
@@ -88,24 +115,45 @@ class GeometricProcessor(BaseProcessor):
|
|
88 |
elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
|
89 |
|
90 |
drowsiness_indicators['details']['Score'] = score
|
91 |
-
else:
|
92 |
drowsiness_indicators["lighting"] = "Low"
|
93 |
|
94 |
-
# ---
|
95 |
-
|
96 |
-
|
97 |
-
color = (0, 255, 0) # Green for Awake
|
98 |
|
99 |
-
|
100 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
101 |
cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA)
|
102 |
-
elif level == "Slightly Drowsy":
|
103 |
-
|
104 |
-
|
105 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
106 |
|
107 |
cv2.rectangle(frame, (0, 0), (w, h), color, 10)
|
108 |
status_text = f"Status: {level} (Score: {score_val:.2f})"
|
109 |
cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
|
110 |
|
111 |
-
return frame
|
|
|
5 |
import math
|
6 |
from ..base_processor import BaseProcessor
|
7 |
|
8 |
+
# --- Helper Functions (Unchanged) ---
|
9 |
def calculate_ear(eye_landmarks, frame_shape):
|
10 |
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
|
11 |
+
v1 = np.linalg.norm(coords[1] - coords[5])
|
12 |
+
v2 = np.linalg.norm(coords[2] - coords[4])
|
13 |
+
h1 = np.linalg.norm(coords[0] - coords[3])
|
14 |
+
return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0
|
15 |
|
16 |
def calculate_mar(mouth_landmarks, frame_shape):
|
17 |
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
|
18 |
+
v1 = np.linalg.norm(coords[1] - coords[7])
|
19 |
+
v2 = np.linalg.norm(coords[2] - coords[6])
|
20 |
+
v3 = np.linalg.norm(coords[3] - coords[5])
|
21 |
+
h1 = np.linalg.norm(coords[0] - coords[4])
|
22 |
return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
|
23 |
|
24 |
class GeometricProcessor(BaseProcessor):
|
25 |
def __init__(self, config):
|
26 |
self.settings = config['geometric_settings']
|
27 |
+
self.face_mesh = mp.solutions.face_mesh.FaceMesh(
|
28 |
+
max_num_faces=1,
|
29 |
+
refine_landmarks=True,
|
30 |
+
min_detection_confidence=0.5,
|
31 |
+
min_tracking_confidence=0.5)
|
32 |
+
|
33 |
+
self.downscale_factor = self.settings.get('downscale_factor', 0.5)
|
34 |
+
self.skip_frames = self.settings.get('skip_frames', 2)
|
35 |
+
|
36 |
+
# State variables for frame skipping
|
37 |
+
self.frame_counter = 0
|
38 |
+
self.last_indicators = {"drowsiness_level": "Initializing...", "lighting": "Good", "details": {}}
|
39 |
+
self.last_landmarks = None
|
40 |
+
|
41 |
+
# Landmark and counter definitions
|
42 |
self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
|
43 |
+
self.L_EYE = [362, 385, 387, 263, 373, 380]
|
44 |
+
self.R_EYE = [33, 160, 158, 133, 153, 144]
|
45 |
self.MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
|
46 |
|
47 |
def process_frame(self, frame):
|
48 |
+
self.frame_counter += 1
|
49 |
+
|
50 |
+
# --- FRAME SKIPPING LOGIC ---
|
51 |
+
# If we are skipping this frame, simply draw the last known status on the current frame and return.
|
52 |
+
if self.frame_counter % self.skip_frames != 0:
|
53 |
+
# We use the original, full-sized 'frame' for drawing to keep the video feed smooth.
|
54 |
+
frame_with_last_results = self.draw_visuals(frame, self.last_indicators, self.last_landmarks)
|
55 |
+
return frame_with_last_results, self.last_indicators, self.last_landmarks
|
56 |
+
|
57 |
+
# --- FRAME PROCESSING ---
|
58 |
+
original_frame = frame.copy()
|
59 |
+
h_orig, w_orig, _ = original_frame.shape
|
60 |
|
61 |
+
# --- OPTIMIZATION 1: DOWNSCALING ---
|
62 |
+
# Create a small frame for fast processing
|
63 |
+
small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor)
|
64 |
+
h, w, _ = small_frame.shape
|
65 |
+
|
66 |
+
# --- FIX 1: PROCESS THE SMALL FRAME, NOT THE ORIGINAL ---
|
67 |
+
# All processing should be done on the `small_frame` to gain the speed advantage.
|
68 |
+
gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
|
69 |
brightness = np.mean(gray)
|
70 |
is_low_light = brightness < self.settings['low_light_thresh']
|
71 |
|
72 |
+
drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}}
|
|
|
|
|
73 |
face_landmarks = None
|
74 |
|
75 |
if not is_low_light:
|
76 |
+
# Convert the SMALL frame to RGB for MediaPipe
|
77 |
+
img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
|
78 |
results = self.face_mesh.process(img_rgb)
|
79 |
face_landmarks = results.multi_face_landmarks
|
80 |
|
|
|
83 |
score = 0
|
84 |
weights = self.settings['indicator_weights']
|
85 |
|
86 |
+
# --- Drowsiness Calculations (these correctly use the small frame's 'h' and 'w') ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
87 |
ear = (calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) + calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)))/2.0
|
88 |
if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
|
89 |
else: self.counters['eye_closure']=0
|
|
|
94 |
else: self.counters['yawning']=0
|
95 |
if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
|
96 |
|
97 |
+
# --- Head Pose Estimation (also uses small frame dimensions 'h' and 'w') ---
|
98 |
face_3d = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float64)
|
99 |
face_2d = np.array([(landmarks[1].x*w,landmarks[1].y*h),(landmarks[152].x*w,landmarks[152].y*h),(landmarks[263].x*w,landmarks[263].y*h),(landmarks[33].x*w,landmarks[33].y*h),(landmarks[287].x*w,landmarks[287].y*h),(landmarks[57].x*w,landmarks[57].y*h)],dtype=np.float64)
|
100 |
cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float64)
|
|
|
115 |
elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
|
116 |
|
117 |
drowsiness_indicators['details']['Score'] = score
|
118 |
+
else:
|
119 |
drowsiness_indicators["lighting"] = "Low"
|
120 |
|
121 |
+
# --- Update state for skipped frames ---
|
122 |
+
self.last_indicators = drowsiness_indicators
|
123 |
+
self.last_landmarks = face_landmarks
|
|
|
124 |
|
125 |
+
# --- Draw visuals on the ORIGINAL frame for a high-quality output ---
|
126 |
+
processed_frame = self.draw_visuals(original_frame, drowsiness_indicators, face_landmarks)
|
127 |
+
|
128 |
+
return processed_frame, drowsiness_indicators, face_landmarks
|
129 |
+
|
130 |
+
def draw_visuals(self, frame, indicators, landmarks_data=None):
|
131 |
+
"""Helper function to draw all visualizations on the frame."""
|
132 |
+
# --- FIX 2: USE THE DIMENSIONS OF THE FRAME BEING DRAWN ON ---
|
133 |
+
# We get the height and width from the input 'frame' itself, which is the original, full-sized one.
|
134 |
+
h, w, _ = frame.shape
|
135 |
+
level = indicators['drowsiness_level']
|
136 |
+
score_val = indicators.get("details", {}).get("Score", 0)
|
137 |
+
color = (0, 255, 0) # Green for Awake
|
138 |
+
|
139 |
+
if indicators['lighting'] == "Low":
|
140 |
+
color = (0, 165, 255) # Orange
|
141 |
cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA)
|
142 |
+
elif level == "Slightly Drowsy": color = (0, 255, 255) # Yellow
|
143 |
+
elif level == "Very Drowsy": color = (0, 0, 255) # Red
|
144 |
+
|
145 |
+
# Draw landmarks if they were detected
|
146 |
+
if landmarks_data:
|
147 |
+
landmarks = landmarks_data[0].landmark
|
148 |
+
eye_mouth_landmarks = self.L_EYE + self.R_EYE + self.MOUTH
|
149 |
+
for idx in eye_mouth_landmarks:
|
150 |
+
lm = landmarks[idx]
|
151 |
+
# Scale landmark coordinates to the full-sized frame using its 'w' and 'h'.
|
152 |
+
x, y = int(lm.x * w), int(lm.y * h)
|
153 |
+
cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)
|
154 |
|
155 |
cv2.rectangle(frame, (0, 0), (w, h), color, 10)
|
156 |
status_text = f"Status: {level} (Score: {score_val:.2f})"
|
157 |
cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
|
158 |
|
159 |
+
return frame
|