Testys's picture
Update src/detection/strategies/geometric.py
22fab0d verified
raw
history blame
8.82 kB
# drive_paddy/detection/strategies/geometric.py
import cv2
import mediapipe as mp
import numpy as np
import math
from src.detection.base_processor import BaseProcessor
# --- Helper Functions (Unchanged) ---
def calculate_ear(eye_landmarks, frame_shape):
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
v1 = np.linalg.norm(coords[1] - coords[5])
v2 = np.linalg.norm(coords[2] - coords[4])
h1 = np.linalg.norm(coords[0] - coords[3])
return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0
def calculate_mar(mouth_landmarks, frame_shape):
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
v1 = np.linalg.norm(coords[1] - coords[7])
v2 = np.linalg.norm(coords[2] - coords[6])
v3 = np.linalg.norm(coords[3] - coords[5])
h1 = np.linalg.norm(coords[0] - coords[4])
return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
class GeometricProcessor(BaseProcessor):
L_EYE = [362, 385, 387, 263, 373, 380]
R_EYE = [33, 160, 158, 133, 153, 144]
MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
def __init__(self, config):
self.settings = config['geometric_settings']
self.face_mesh = mp.solutions.face_mesh.FaceMesh(
max_num_faces=1,
refine_landmarks=False,
min_detection_confidence=0.5,
min_tracking_confidence=0.5)
self.downscale_factor = self.settings.get('downscale_factor', 0.35)
self.default_skip = max(1, self.settings.get("skip_frames", 2))
# State variables for frame skipping
self.frame_counter = 0
self.last_indicators = {"drowsiness_level": "Initializing...", "lighting": "Good", "details": {}}
self.last_landmarks = None
# Landmark and counter definitions
self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
# Pre-allocated buffers
self.zeros_4x1 = np.zeros((4, 1), np.float32)
self.landmark_ids = self.L_EYE + self.R_EYE + self.MOUTH
def process_frame(self, frame):
self.frame_counter += 1
# adaptive skipping: fewer analyses when driver is awake
last_level = self.last_indicators.get("drowsiness_level", "Awake")
skip_n = 1 if last_level != "Awake" else self.default_skip
if self.frame_counter % skip_n != 0:
drawn = self.draw_visuals(frame.copy(), self.last_indicators, self.last_landmarks)
return drawn, self.last_indicators, self.last_landmarks
# --- FRAME PROCESSING ---
original_frame = frame.copy()
h_orig, w_orig, _ = original_frame.shape
# --- OPTIMIZATION 1: DOWNSCALING ---
# Create a small frame for fast processing
small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor, interpolation=cv2.INTER_AREA)
h, w, _ = small_frame.shape
# --- FIX 1: PROCESS THE SMALL FRAME, NOT THE ORIGINAL ---
# All processing should be done on the `small_frame` to gain the speed advantage.
gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
brightness = np.mean(gray)
is_low_light = brightness < self.settings['low_light_thresh']
drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}}
face_landmarks = None
if not is_low_light:
# Convert the SMALL frame to RGB for MediaPipe
img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(img_rgb)
face_landmarks = results.multi_face_landmarks
if face_landmarks:
landmarks = face_landmarks[0].landmark
score = 0
weights = self.settings['indicator_weights']
# --- Drowsiness Calculations (these correctly use the small frame's 'h' and 'w') ---
ear = (calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) + calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)))/2.0
if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
else: self.counters['eye_closure']=0
if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure']
mar = calculate_mar([landmarks[i] for i in self.MOUTH], (h, w))
if mar > self.settings['yawn_mar_thresh']: self.counters['yawning']+=1
else: self.counters['yawning']=0
if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
# --- Head Pose Estimation (also uses small frame dimensions 'h' and 'w') ---
face_3d = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float32)
face_2d = np.array([(landmarks[1].x*w,landmarks[1].y*h),(landmarks[152].x*w,landmarks[152].y*h),(landmarks[263].x*w,landmarks[263].y*h),(landmarks[33].x*w,landmarks[33].y*h),(landmarks[287].x*w,landmarks[287].y*h),(landmarks[57].x*w,landmarks[57].y*h)],dtype=np.float32)
cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float32)
_, rvec, _ = cv2.solvePnP(face_3d, face_2d, cam_matrix, self.zeros_4x1, flags=cv2.SOLVEPNP_EPNP)
rmat, _ = cv2.Rodrigues(rvec)
angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
pitch, yaw = angles[0], angles[1]
if pitch > self.settings['head_nod_thresh']:
self.counters['head_nod']+=1
else:
self.counters['head_nod']=0
if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']:
score += weights['head_nod']
if abs(yaw) > self.settings['head_look_away_thresh']:
self.counters['looking_away']+=1
else:
self.counters['looking_away']=0
if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']:
score += weights['looking_away']
levels = self.settings['drowsiness_levels']
if score >= levels['very_drowsy_threshold']:
drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
drowsiness_indicators['details']['Score'] = score
else:
drowsiness_indicators["lighting"] = "Low"
# --- Update state for skipped frames ---
self.last_indicators = drowsiness_indicators
self.last_landmarks = face_landmarks
# --- Draw visuals on the ORIGINAL frame for a high-quality output ---
processed_frame = self.draw_visuals(original_frame, drowsiness_indicators, face_landmarks)
return processed_frame, drowsiness_indicators, face_landmarks
def draw_visuals(self, frame, indicators, landmarks_data=None):
"""Helper function to draw all visualizations on the frame."""
# --- FIX 2: USE THE DIMENSIONS OF THE FRAME BEING DRAWN ON ---
# We get the height and width from the input 'frame' itself, which is the original, full-sized one.
h, w, _ = frame.shape
level = indicators['drowsiness_level']
score_val = indicators.get("details", {}).get("Score", 0)
color = (0, 255, 0) # Green for Awake
if indicators['lighting'] == "Low":
color = (0, 165, 255) # Orange
cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA)
elif level == "Slightly Drowsy": color = (0, 255, 255) # Yellow
elif level == "Very Drowsy": color = (0, 0, 255) # Red
# Draw landmarks if they were detected
if landmarks_data:
landmarks = landmarks_data[0].landmark
eye_mouth_landmarks = self.L_EYE + self.R_EYE + self.MOUTH
for idx in eye_mouth_landmarks:
lm = landmarks[idx]
# Scale landmark coordinates to the full-sized frame using its 'w' and 'h'.
x, y = int(lm.x * w), int(lm.y * h)
cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)
cv2.rectangle(frame, (0, 0), (w, h), color, 10)
status_text = f"Status: {level} (Score: {score_val:.2f})"
cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
return frame