import cv2 import mediapipe as mp import numpy as np import math from src.detection.base_processor import BaseProcessor # --- Helper Functions (Unchanged) --- def calculate_ear(eye_landmarks, frame_shape): """Calculates the Eye Aspect Ratio for a single eye.""" # Note: frame_shape is (height, width) coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks]) v1 = np.linalg.norm(coords[1] - coords[5]) v2 = np.linalg.norm(coords[2] - coords[4]) h1 = np.linalg.norm(coords[0] - coords[3]) return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0 def calculate_mar(mouth_landmarks, frame_shape): """Calculates the Mouth Aspect Ratio.""" coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks]) v1 = np.linalg.norm(coords[1] - coords[7]) v2 = np.linalg.norm(coords[2] - coords[6]) v3 = np.linalg.norm(coords[3] - coords[5]) h1 = np.linalg.norm(coords[0] - coords[4]) return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0 class GeometricProcessor(BaseProcessor): # Landmark indices for eyes and mouth L_EYE = [362, 385, 387, 263, 373, 380] R_EYE = [33, 160, 158, 133, 153, 144] MOUTH = [61, 291, 39, 181, 0, 17, 84, 178] # Landmark indices for Head Pose Estimation HEAD_POSE_LANDMARKS = [1, 152, 263, 33, 287, 57] # Nose tip, Chin, Left eye left corner, Right eye right corner, Left mouth corner, Right mouth corner def __init__(self, config): self.settings = config['geometric_settings'] self.face_mesh = mp.solutions.face_mesh.FaceMesh( max_num_faces=1, refine_landmarks=False, # Set to True for more detailed landmarks around eyes/lips, at a slight performance cost min_detection_confidence=0.5, min_tracking_confidence=0.5) self.downscale_factor = self.settings.get('downscale_factor', 0.35) self.default_skip = max(1, self.settings.get("skip_frames", 2)) # --- FIX: Caching states for efficiency --- self.frame_counter = 0 # Initialize with safe defaults self.last_indicators = {"drowsiness_level": "Initializing...", "lighting": "Good", "details": {}} self.last_landmarks = None self.last_drawn_frame = None # Cache the fully drawn frame # Drowsiness event counters self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 } # Pre-allocated buffer for solvePnP self.zeros_4x1 = np.zeros((4, 1), np.float32) def process_frame(self, frame): self.frame_counter += 1 # --- FIX: More efficient frame skipping --- # Adaptive skipping: process more frequently if drowsiness is detected. last_level = self.last_indicators.get("drowsiness_level", "Awake") skip_n = 1 if last_level != "Awake" else self.default_skip if self.frame_counter % skip_n != 0: # If we have a cached frame, return it to avoid re-drawing. if self.last_drawn_frame is not None: return self.last_drawn_frame, self.last_indicators # Fallback if the first frame was skipped (unlikely but safe) else: return frame.copy(), self.last_indicators # --- CORE FRAME PROCESSING --- original_frame = frame.copy() h_orig, w_orig, _ = original_frame.shape # Optimization: Downscale frame for faster processing small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor, interpolation=cv2.INTER_AREA) h, w, _ = small_frame.shape # All processing is done on the `small_frame` for speed. gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY) brightness = np.mean(gray) drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}} face_landmarks_data = None if brightness < self.settings['low_light_thresh']: drowsiness_indicators["lighting"] = "Low" else: # Convert the SMALL frame to RGB for MediaPipe img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB) img_rgb.flags.writeable = False # Performance enhancement results = self.face_mesh.process(img_rgb) img_rgb.flags.writeable = True if results.multi_face_landmarks: face_landmarks_data = results.multi_face_landmarks[0] landmarks = face_landmarks_data.landmark score = 0 weights = self.settings['indicator_weights'] # --- Drowsiness Calculations (on small frame dimensions 'h', 'w') --- ear_left = calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) ear_right = calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)) ear = (ear_left + ear_right) / 2.0 if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1 else: self.counters['eye_closure']=0 if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure'] mar = calculate_mar([landmarks[i] for i in self.MOUTH], (h, w)) if mar > self.settings['yawn_mar_thresh']: self.counters['yawning']+=1 else: self.counters['yawning']=0 if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning'] # --- Head Pose Estimation (on small frame dimensions 'h', 'w') --- face_3d_model = np.array([ [0.0, 0.0, 0.0], # Nose tip [0.0, -330.0, -65.0], # Chin [-225.0, 170.0, -135.0], # Left eye left corner [225.0, 170.0, -135.0], # Right eye right corner [-150.0, -150.0, -125.0], # Left Mouth corner [150.0, -150.0, -125.0] # Right mouth corner ], dtype=np.float32) face_2d_points = np.array([(landmarks[i].x * w, landmarks[i].y * h) for i in self.HEAD_POSE_LANDMARKS], dtype=np.float32) cam_matrix = np.array([[w, 0, w/2], [0, w, h/2], [0, 0, 1]], dtype=np.float32) _, rvec, _ = cv2.solvePnP(face_3d_model, face_2d_points, cam_matrix, self.zeros_4x1, flags=cv2.SOLVEPNP_EPNP) rmat, _ = cv2.Rodrigues(rvec) angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat) pitch, yaw = angles[0], angles[1] if pitch > self.settings['head_nod_thresh']: self.counters['head_nod']+=1 else: self.counters['head_nod']=0 if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']: score += weights['head_nod'] if abs(yaw) > self.settings['head_look_away_thresh']: self.counters['looking_away']+=1 else: self.counters['looking_away']=0 if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']: score += weights['looking_away'] # Determine final drowsiness level based on score levels = self.settings['drowsiness_levels'] if score >= levels['very_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Very Drowsy" elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy" drowsiness_indicators['details']['Score'] = score # --- Update state for next frame (skipped or processed) --- self.last_indicators = drowsiness_indicators self.last_landmarks = face_landmarks_data # --- Draw visuals on the ORIGINAL frame for high-quality output --- processed_frame = self.draw_visuals(original_frame, drowsiness_indicators, face_landmarks_data) # --- FIX: Cache the newly drawn frame --- self.last_drawn_frame = processed_frame # --- FIX: Return only the two values expected by the Gradio app --- return processed_frame, drowsiness_indicators def draw_visuals(self, frame, indicators, landmarks_data=None): """Helper function to draw all visualizations on the frame.""" h, w, _ = frame.shape level = indicators['drowsiness_level'] score_val = indicators.get("details", {}).get("Score", 0) color = (0, 255, 0) # Green for Awake if indicators['lighting'] == "Low": color = (0, 165, 255) # Orange cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA) elif level == "Slightly Drowsy": color = (0, 255, 255) # Yellow elif level == "Very Drowsy": color = (0, 0, 255) # Red # Draw landmarks if they were detected if landmarks_data: landmarks = landmarks_data.landmark eye_mouth_landmarks_indices = self.L_EYE + self.R_EYE + self.MOUTH for idx in eye_mouth_landmarks_indices: lm = landmarks[idx] # Scale landmark coordinates to the full-sized frame x, y = int(lm.x * w), int(lm.y * h) cv2.circle(frame, (x, y), 2, (0, 255, 0), -1) cv2.rectangle(frame, (0, 0), (w - 1, h - 1), color, 10) status_text = f"Status: {level} (Score: {score_val:.2f})" cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA) return frame