Testys commited on
Commit
3cfc6a1
·
verified ·
1 Parent(s): b797773

Update src/detection/strategies/geometric.py

Browse files
Files changed (1) hide show
  1. src/detection/strategies/geometric.py +45 -28
src/detection/strategies/geometric.py CHANGED
@@ -3,7 +3,7 @@ import cv2
3
  import mediapipe as mp
4
  import numpy as np
5
  import math
6
- from ..base_processor import BaseProcessor
7
 
8
  # --- Helper Functions (Unchanged) ---
9
  def calculate_ear(eye_landmarks, frame_shape):
@@ -22,16 +22,21 @@ def calculate_mar(mouth_landmarks, frame_shape):
22
  return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
23
 
24
  class GeometricProcessor(BaseProcessor):
 
 
 
 
25
  def __init__(self, config):
26
  self.settings = config['geometric_settings']
27
  self.face_mesh = mp.solutions.face_mesh.FaceMesh(
28
  max_num_faces=1,
29
- refine_landmarks=True,
30
  min_detection_confidence=0.5,
31
  min_tracking_confidence=0.5)
32
 
33
- self.downscale_factor = self.settings.get('downscale_factor', 0.5)
34
- self.skip_frames = self.settings.get('skip_frames', 2)
 
35
 
36
  # State variables for frame skipping
37
  self.frame_counter = 0
@@ -40,27 +45,30 @@ class GeometricProcessor(BaseProcessor):
40
 
41
  # Landmark and counter definitions
42
  self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
43
- self.L_EYE = [362, 385, 387, 263, 373, 380]
44
- self.R_EYE = [33, 160, 158, 133, 153, 144]
45
- self.MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
46
 
 
 
 
 
 
47
  def process_frame(self, frame):
48
  self.frame_counter += 1
49
 
50
- # --- FRAME SKIPPING LOGIC ---
51
- # If we are skipping this frame, simply draw the last known status on the current frame and return.
52
- if self.frame_counter % self.skip_frames != 0:
53
- # We use the original, full-sized 'frame' for drawing to keep the video feed smooth.
54
- frame_with_last_results = self.draw_visuals(frame, self.last_indicators, self.last_landmarks)
55
- return frame_with_last_results, self.last_indicators, self.last_landmarks
56
-
 
57
  # --- FRAME PROCESSING ---
58
  original_frame = frame.copy()
59
  h_orig, w_orig, _ = original_frame.shape
60
 
61
  # --- OPTIMIZATION 1: DOWNSCALING ---
62
  # Create a small frame for fast processing
63
- small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor)
64
  h, w, _ = small_frame.shape
65
 
66
  # --- FIX 1: PROCESS THE SMALL FRAME, NOT THE ORIGINAL ---
@@ -95,26 +103,35 @@ class GeometricProcessor(BaseProcessor):
95
  if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
96
 
97
  # --- Head Pose Estimation (also uses small frame dimensions 'h' and 'w') ---
98
- face_3d = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float64)
99
- face_2d = np.array([(landmarks[1].x*w,landmarks[1].y*h),(landmarks[152].x*w,landmarks[152].y*h),(landmarks[263].x*w,landmarks[263].y*h),(landmarks[33].x*w,landmarks[33].y*h),(landmarks[287].x*w,landmarks[287].y*h),(landmarks[57].x*w,landmarks[57].y*h)],dtype=np.float64)
100
- cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float64)
101
- _, rot_vec, _ = cv2.solvePnP(face_3d, face_2d, cam_matrix, np.zeros((4,1),dtype=np.float64))
102
- rmat, _ = cv2.Rodrigues(rot_vec); angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
 
103
  pitch, yaw = angles[0], angles[1]
104
 
105
- if pitch > self.settings['head_nod_thresh']: self.counters['head_nod']+=1
106
- else: self.counters['head_nod']=0
107
- if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']: score += weights['head_nod']
108
-
109
- if abs(yaw) > self.settings['head_look_away_thresh']: self.counters['looking_away']+=1
110
- else: self.counters['looking_away']=0
111
- if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']: score += weights['looking_away']
 
 
 
 
 
 
112
 
113
  levels = self.settings['drowsiness_levels']
114
- if score >= levels['very_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
 
115
  elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
116
 
117
  drowsiness_indicators['details']['Score'] = score
 
118
  else:
119
  drowsiness_indicators["lighting"] = "Low"
120
 
 
3
  import mediapipe as mp
4
  import numpy as np
5
  import math
6
+ from src.detection.base_processor import BaseProcessor
7
 
8
  # --- Helper Functions (Unchanged) ---
9
  def calculate_ear(eye_landmarks, frame_shape):
 
22
  return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
23
 
24
  class GeometricProcessor(BaseProcessor):
25
+ L_EYE = [362, 385, 387, 263, 373, 380]
26
+ R_EYE = [33, 160, 158, 133, 153, 144]
27
+ MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
28
+
29
  def __init__(self, config):
30
  self.settings = config['geometric_settings']
31
  self.face_mesh = mp.solutions.face_mesh.FaceMesh(
32
  max_num_faces=1,
33
+ refine_landmarks=False,
34
  min_detection_confidence=0.5,
35
  min_tracking_confidence=0.5)
36
 
37
+ self.downscale_factor = self.settings.get('downscale_factor', 0.35)
38
+ self.default_skip = max(1, self.settings.get("skip_frames", 2))
39
+
40
 
41
  # State variables for frame skipping
42
  self.frame_counter = 0
 
45
 
46
  # Landmark and counter definitions
47
  self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
 
 
 
48
 
49
+ # Pre-allocated buffers
50
+ self.zeros_4x1 = np.zeros((4, 1), np.float32)
51
+ self.landmark_ids = self.L_EYE + self.R_EYE + self.MOUTH
52
+
53
+
54
  def process_frame(self, frame):
55
  self.frame_counter += 1
56
 
57
+ # adaptive skipping: fewer analyses when driver is awake
58
+ last_level = self.last_indicators.get("drowsiness_level", "Awake")
59
+ skip_n = 1 if last_level != "Awake" else self.default_skip
60
+
61
+ if self.frame_ctr % skip_n != 0:
62
+ drawn = self.draw_visuals(frame.copy(), self.last_indicators, self.last_landmarks)
63
+ return drawn, self.last_indicators, self.last_landmarks
64
+
65
  # --- FRAME PROCESSING ---
66
  original_frame = frame.copy()
67
  h_orig, w_orig, _ = original_frame.shape
68
 
69
  # --- OPTIMIZATION 1: DOWNSCALING ---
70
  # Create a small frame for fast processing
71
+ small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor, interpolation=cv2.INTER_AREA)
72
  h, w, _ = small_frame.shape
73
 
74
  # --- FIX 1: PROCESS THE SMALL FRAME, NOT THE ORIGINAL ---
 
103
  if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
104
 
105
  # --- Head Pose Estimation (also uses small frame dimensions 'h' and 'w') ---
106
+ face_3d = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float32)
107
+ face_2d = np.array([(landmarks[1].x*w,landmarks[1].y*h),(landmarks[152].x*w,landmarks[152].y*h),(landmarks[263].x*w,landmarks[263].y*h),(landmarks[33].x*w,landmarks[33].y*h),(landmarks[287].x*w,landmarks[287].y*h),(landmarks[57].x*w,landmarks[57].y*h)],dtype=np.float32)
108
+ cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float32)
109
+ _, rvec, _ = cv2.solvePnP(face_3d, face_2d, cam_mtx, self.zeros_4x1, flags=cv2.SOLVEPNP_EPNP)
110
+ rmat, _ = cv2.Rodrigues(rvec)
111
+ angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
112
  pitch, yaw = angles[0], angles[1]
113
 
114
+ if pitch > self.settings['head_nod_thresh']:
115
+ self.counters['head_nod']+=1
116
+ else:
117
+ self.counters['head_nod']=0
118
+ if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']:
119
+ score += weights['head_nod']
120
+
121
+ if abs(yaw) > self.settings['head_look_away_thresh']:
122
+ self.counters['looking_away']+=1
123
+ else:
124
+ self.counters['looking_away']=0
125
+ if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']:
126
+ score += weights['looking_away']
127
 
128
  levels = self.settings['drowsiness_levels']
129
+ if score >= levels['very_drowsy_threshold']:
130
+ drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
131
  elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
132
 
133
  drowsiness_indicators['details']['Score'] = score
134
+
135
  else:
136
  drowsiness_indicators["lighting"] = "Low"
137