Testys commited on
Commit
8054642
·
verified ·
1 Parent(s): 8ae9de4

Update src/detection/strategies/geometric.py

Browse files
Files changed (1) hide show
  1. src/detection/strategies/geometric.py +86 -67
src/detection/strategies/geometric.py CHANGED
@@ -1,4 +1,3 @@
1
- # drive_paddy/detection/strategies/geometric.py
2
  import cv2
3
  import mediapipe as mp
4
  import numpy as np
@@ -7,6 +6,8 @@ from src.detection.base_processor import BaseProcessor
7
 
8
  # --- Helper Functions (Unchanged) ---
9
  def calculate_ear(eye_landmarks, frame_shape):
 
 
10
  coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
11
  v1 = np.linalg.norm(coords[1] - coords[5])
12
  v2 = np.linalg.norm(coords[2] - coords[4])
@@ -14,6 +15,7 @@ def calculate_ear(eye_landmarks, frame_shape):
14
  return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0
15
 
16
  def calculate_mar(mouth_landmarks, frame_shape):
 
17
  coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
18
  v1 = np.linalg.norm(coords[1] - coords[7])
19
  v2 = np.linalg.norm(coords[2] - coords[6])
@@ -22,132 +24,148 @@ def calculate_mar(mouth_landmarks, frame_shape):
22
  return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
23
 
24
  class GeometricProcessor(BaseProcessor):
 
25
  L_EYE = [362, 385, 387, 263, 373, 380]
26
  R_EYE = [33, 160, 158, 133, 153, 144]
27
  MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
 
 
 
28
 
29
  def __init__(self, config):
30
  self.settings = config['geometric_settings']
31
  self.face_mesh = mp.solutions.face_mesh.FaceMesh(
32
  max_num_faces=1,
33
- refine_landmarks=False,
34
  min_detection_confidence=0.5,
35
  min_tracking_confidence=0.5)
36
 
37
- self.downscale_factor = self.settings.get('downscale_factor', 0.35)
38
  self.default_skip = max(1, self.settings.get("skip_frames", 2))
39
 
40
-
41
- # State variables for frame skipping
42
  self.frame_counter = 0
 
43
  self.last_indicators = {"drowsiness_level": "Initializing...", "lighting": "Good", "details": {}}
44
  self.last_landmarks = None
 
45
 
46
- # Landmark and counter definitions
47
  self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
48
 
49
- # Pre-allocated buffers
50
  self.zeros_4x1 = np.zeros((4, 1), np.float32)
51
- self.landmark_ids = self.L_EYE + self.R_EYE + self.MOUTH
52
 
53
-
54
  def process_frame(self, frame):
55
  self.frame_counter += 1
56
 
57
- # adaptive skipping: fewer analyses when driver is awake
 
58
  last_level = self.last_indicators.get("drowsiness_level", "Awake")
59
  skip_n = 1 if last_level != "Awake" else self.default_skip
60
 
61
  if self.frame_counter % skip_n != 0:
62
- drawn = self.draw_visuals(frame.copy(), self.last_indicators, self.last_landmarks)
63
- return drawn, self.last_indicators, self.last_landmarks
64
-
65
- # --- FRAME PROCESSING ---
 
 
 
 
66
  original_frame = frame.copy()
67
  h_orig, w_orig, _ = original_frame.shape
68
-
69
- # --- OPTIMIZATION 1: DOWNSCALING ---
70
- # Create a small frame for fast processing
71
  small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor, interpolation=cv2.INTER_AREA)
72
  h, w, _ = small_frame.shape
73
-
74
- # --- FIX 1: PROCESS THE SMALL FRAME, NOT THE ORIGINAL ---
75
- # All processing should be done on the `small_frame` to gain the speed advantage.
76
  gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
77
  brightness = np.mean(gray)
78
- is_low_light = brightness < self.settings['low_light_thresh']
79
-
80
  drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}}
81
- face_landmarks = None
82
 
83
- if not is_low_light:
 
 
84
  # Convert the SMALL frame to RGB for MediaPipe
85
  img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
 
86
  results = self.face_mesh.process(img_rgb)
87
- face_landmarks = results.multi_face_landmarks
88
 
89
- if face_landmarks:
90
- landmarks = face_landmarks[0].landmark
 
91
  score = 0
92
  weights = self.settings['indicator_weights']
93
 
94
- # --- Drowsiness Calculations (these correctly use the small frame's 'h' and 'w') ---
95
- ear = (calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) + calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)))/2.0
 
 
 
96
  if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
97
  else: self.counters['eye_closure']=0
98
  if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure']
99
-
100
  mar = calculate_mar([landmarks[i] for i in self.MOUTH], (h, w))
101
  if mar > self.settings['yawn_mar_thresh']: self.counters['yawning']+=1
102
  else: self.counters['yawning']=0
103
  if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
104
 
105
- # --- Head Pose Estimation (also uses small frame dimensions 'h' and 'w') ---
106
- face_3d = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float32)
107
- face_2d = np.array([(landmarks[1].x*w,landmarks[1].y*h),(landmarks[152].x*w,landmarks[152].y*h),(landmarks[263].x*w,landmarks[263].y*h),(landmarks[33].x*w,landmarks[33].y*h),(landmarks[287].x*w,landmarks[287].y*h),(landmarks[57].x*w,landmarks[57].y*h)],dtype=np.float32)
108
- cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float32)
109
- _, rvec, _ = cv2.solvePnP(face_3d, face_2d, cam_matrix, self.zeros_4x1, flags=cv2.SOLVEPNP_EPNP)
 
 
 
 
 
 
 
 
 
110
  rmat, _ = cv2.Rodrigues(rvec)
111
  angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
112
  pitch, yaw = angles[0], angles[1]
113
 
114
- if pitch > self.settings['head_nod_thresh']:
115
- self.counters['head_nod']+=1
116
- else:
117
- self.counters['head_nod']=0
118
- if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']:
119
- score += weights['head_nod']
120
-
121
- if abs(yaw) > self.settings['head_look_away_thresh']:
122
- self.counters['looking_away']+=1
123
- else:
124
- self.counters['looking_away']=0
125
- if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']:
126
- score += weights['looking_away']
127
-
128
  levels = self.settings['drowsiness_levels']
129
- if score >= levels['very_drowsy_threshold']:
130
  drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
131
- elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
132
-
 
133
  drowsiness_indicators['details']['Score'] = score
134
 
135
- else:
136
- drowsiness_indicators["lighting"] = "Low"
137
-
138
- # --- Update state for skipped frames ---
139
  self.last_indicators = drowsiness_indicators
140
- self.last_landmarks = face_landmarks
 
 
 
141
 
142
- # --- Draw visuals on the ORIGINAL frame for a high-quality output ---
143
- processed_frame = self.draw_visuals(original_frame, drowsiness_indicators, face_landmarks)
144
 
145
- return processed_frame, drowsiness_indicators, face_landmarks
 
146
 
147
  def draw_visuals(self, frame, indicators, landmarks_data=None):
148
  """Helper function to draw all visualizations on the frame."""
149
- # --- FIX 2: USE THE DIMENSIONS OF THE FRAME BEING DRAWN ON ---
150
- # We get the height and width from the input 'frame' itself, which is the original, full-sized one.
151
  h, w, _ = frame.shape
152
  level = indicators['drowsiness_level']
153
  score_val = indicators.get("details", {}).get("Score", 0)
@@ -161,16 +179,17 @@ class GeometricProcessor(BaseProcessor):
161
 
162
  # Draw landmarks if they were detected
163
  if landmarks_data:
164
- landmarks = landmarks_data[0].landmark
165
- eye_mouth_landmarks = self.L_EYE + self.R_EYE + self.MOUTH
166
- for idx in eye_mouth_landmarks:
167
  lm = landmarks[idx]
168
- # Scale landmark coordinates to the full-sized frame using its 'w' and 'h'.
169
  x, y = int(lm.x * w), int(lm.y * h)
170
  cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)
171
 
172
- cv2.rectangle(frame, (0, 0), (w, h), color, 10)
173
  status_text = f"Status: {level} (Score: {score_val:.2f})"
174
  cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
175
 
176
  return frame
 
 
 
1
  import cv2
2
  import mediapipe as mp
3
  import numpy as np
 
6
 
7
  # --- Helper Functions (Unchanged) ---
8
  def calculate_ear(eye_landmarks, frame_shape):
9
+ """Calculates the Eye Aspect Ratio for a single eye."""
10
+ # Note: frame_shape is (height, width)
11
  coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
12
  v1 = np.linalg.norm(coords[1] - coords[5])
13
  v2 = np.linalg.norm(coords[2] - coords[4])
 
15
  return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0
16
 
17
  def calculate_mar(mouth_landmarks, frame_shape):
18
+ """Calculates the Mouth Aspect Ratio."""
19
  coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
20
  v1 = np.linalg.norm(coords[1] - coords[7])
21
  v2 = np.linalg.norm(coords[2] - coords[6])
 
24
  return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
25
 
26
  class GeometricProcessor(BaseProcessor):
27
+ # Landmark indices for eyes and mouth
28
  L_EYE = [362, 385, 387, 263, 373, 380]
29
  R_EYE = [33, 160, 158, 133, 153, 144]
30
  MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
31
+
32
+ # Landmark indices for Head Pose Estimation
33
+ HEAD_POSE_LANDMARKS = [1, 152, 263, 33, 287, 57] # Nose tip, Chin, Left eye left corner, Right eye right corner, Left mouth corner, Right mouth corner
34
 
35
  def __init__(self, config):
36
  self.settings = config['geometric_settings']
37
  self.face_mesh = mp.solutions.face_mesh.FaceMesh(
38
  max_num_faces=1,
39
+ refine_landmarks=False, # Set to True for more detailed landmarks around eyes/lips, at a slight performance cost
40
  min_detection_confidence=0.5,
41
  min_tracking_confidence=0.5)
42
 
43
+ self.downscale_factor = self.settings.get('downscale_factor', 0.35)
44
  self.default_skip = max(1, self.settings.get("skip_frames", 2))
45
 
46
+ # --- FIX: Caching states for efficiency ---
 
47
  self.frame_counter = 0
48
+ # Initialize with safe defaults
49
  self.last_indicators = {"drowsiness_level": "Initializing...", "lighting": "Good", "details": {}}
50
  self.last_landmarks = None
51
+ self.last_drawn_frame = None # Cache the fully drawn frame
52
 
53
+ # Drowsiness event counters
54
  self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
55
 
56
+ # Pre-allocated buffer for solvePnP
57
  self.zeros_4x1 = np.zeros((4, 1), np.float32)
 
58
 
 
59
  def process_frame(self, frame):
60
  self.frame_counter += 1
61
 
62
+ # --- FIX: More efficient frame skipping ---
63
+ # Adaptive skipping: process more frequently if drowsiness is detected.
64
  last_level = self.last_indicators.get("drowsiness_level", "Awake")
65
  skip_n = 1 if last_level != "Awake" else self.default_skip
66
 
67
  if self.frame_counter % skip_n != 0:
68
+ # If we have a cached frame, return it to avoid re-drawing.
69
+ if self.last_drawn_frame is not None:
70
+ return self.last_drawn_frame, self.last_indicators
71
+ # Fallback if the first frame was skipped (unlikely but safe)
72
+ else:
73
+ return frame.copy(), self.last_indicators
74
+
75
+ # --- CORE FRAME PROCESSING ---
76
  original_frame = frame.copy()
77
  h_orig, w_orig, _ = original_frame.shape
78
+
79
+ # Optimization: Downscale frame for faster processing
 
80
  small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor, interpolation=cv2.INTER_AREA)
81
  h, w, _ = small_frame.shape
82
+
83
+ # All processing is done on the `small_frame` for speed.
 
84
  gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
85
  brightness = np.mean(gray)
86
+
 
87
  drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}}
88
+ face_landmarks_data = None
89
 
90
+ if brightness < self.settings['low_light_thresh']:
91
+ drowsiness_indicators["lighting"] = "Low"
92
+ else:
93
  # Convert the SMALL frame to RGB for MediaPipe
94
  img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
95
+ img_rgb.flags.writeable = False # Performance enhancement
96
  results = self.face_mesh.process(img_rgb)
97
+ img_rgb.flags.writeable = True
98
 
99
+ if results.multi_face_landmarks:
100
+ face_landmarks_data = results.multi_face_landmarks[0]
101
+ landmarks = face_landmarks_data.landmark
102
  score = 0
103
  weights = self.settings['indicator_weights']
104
 
105
+ # --- Drowsiness Calculations (on small frame dimensions 'h', 'w') ---
106
+ ear_left = calculate_ear([landmarks[i] for i in self.L_EYE],(h,w))
107
+ ear_right = calculate_ear([landmarks[i] for i in self.R_EYE],(h,w))
108
+ ear = (ear_left + ear_right) / 2.0
109
+
110
  if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
111
  else: self.counters['eye_closure']=0
112
  if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure']
113
+
114
  mar = calculate_mar([landmarks[i] for i in self.MOUTH], (h, w))
115
  if mar > self.settings['yawn_mar_thresh']: self.counters['yawning']+=1
116
  else: self.counters['yawning']=0
117
  if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
118
 
119
+ # --- Head Pose Estimation (on small frame dimensions 'h', 'w') ---
120
+ face_3d_model = np.array([
121
+ [0.0, 0.0, 0.0], # Nose tip
122
+ [0.0, -330.0, -65.0], # Chin
123
+ [-225.0, 170.0, -135.0], # Left eye left corner
124
+ [225.0, 170.0, -135.0], # Right eye right corner
125
+ [-150.0, -150.0, -125.0], # Left Mouth corner
126
+ [150.0, -150.0, -125.0] # Right mouth corner
127
+ ], dtype=np.float32)
128
+
129
+ face_2d_points = np.array([(landmarks[i].x * w, landmarks[i].y * h) for i in self.HEAD_POSE_LANDMARKS], dtype=np.float32)
130
+ cam_matrix = np.array([[w, 0, w/2], [0, w, h/2], [0, 0, 1]], dtype=np.float32)
131
+
132
+ _, rvec, _ = cv2.solvePnP(face_3d_model, face_2d_points, cam_matrix, self.zeros_4x1, flags=cv2.SOLVEPNP_EPNP)
133
  rmat, _ = cv2.Rodrigues(rvec)
134
  angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
135
  pitch, yaw = angles[0], angles[1]
136
 
137
+ if pitch > self.settings['head_nod_thresh']: self.counters['head_nod']+=1
138
+ else: self.counters['head_nod']=0
139
+ if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']: score += weights['head_nod']
140
+
141
+ if abs(yaw) > self.settings['head_look_away_thresh']: self.counters['looking_away']+=1
142
+ else: self.counters['looking_away']=0
143
+ if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']: score += weights['looking_away']
144
+
145
+ # Determine final drowsiness level based on score
 
 
 
 
 
146
  levels = self.settings['drowsiness_levels']
147
+ if score >= levels['very_drowsy_threshold']:
148
  drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
149
+ elif score >= levels['slightly_drowsy_threshold']:
150
+ drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
151
+
152
  drowsiness_indicators['details']['Score'] = score
153
 
154
+ # --- Update state for next frame (skipped or processed) ---
 
 
 
155
  self.last_indicators = drowsiness_indicators
156
+ self.last_landmarks = face_landmarks_data
157
+
158
+ # --- Draw visuals on the ORIGINAL frame for high-quality output ---
159
+ processed_frame = self.draw_visuals(original_frame, drowsiness_indicators, face_landmarks_data)
160
 
161
+ # --- FIX: Cache the newly drawn frame ---
162
+ self.last_drawn_frame = processed_frame
163
 
164
+ # --- FIX: Return only the two values expected by the Gradio app ---
165
+ return processed_frame, drowsiness_indicators
166
 
167
  def draw_visuals(self, frame, indicators, landmarks_data=None):
168
  """Helper function to draw all visualizations on the frame."""
 
 
169
  h, w, _ = frame.shape
170
  level = indicators['drowsiness_level']
171
  score_val = indicators.get("details", {}).get("Score", 0)
 
179
 
180
  # Draw landmarks if they were detected
181
  if landmarks_data:
182
+ landmarks = landmarks_data.landmark
183
+ eye_mouth_landmarks_indices = self.L_EYE + self.R_EYE + self.MOUTH
184
+ for idx in eye_mouth_landmarks_indices:
185
  lm = landmarks[idx]
186
+ # Scale landmark coordinates to the full-sized frame
187
  x, y = int(lm.x * w), int(lm.y * h)
188
  cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)
189
 
190
+ cv2.rectangle(frame, (0, 0), (w - 1, h - 1), color, 10)
191
  status_text = f"Status: {level} (Score: {score_val:.2f})"
192
  cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
193
 
194
  return frame
195
+