Spaces:
Sleeping
Sleeping
import cv2 | |
import mediapipe as mp | |
import numpy as np | |
import math | |
from src.detection.base_processor import BaseProcessor | |
# --- Helper Functions (Unchanged) --- | |
def calculate_ear(eye_landmarks, frame_shape): | |
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks]) | |
v1 = np.linalg.norm(coords[1] - coords[5]) | |
v2 = np.linalg.norm(coords[2] - coords[4]) | |
h1 = np.linalg.norm(coords[0] - coords[3]) | |
return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0 | |
def calculate_mar(mouth_landmarks, frame_shape): | |
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks]) | |
v1 = np.linalg.norm(coords[1] - coords[7]) | |
v2 = np.linalg.norm(coords[2] - coords[6]) | |
v3 = np.linalg.norm(coords[3] - coords[5]) | |
h1 = np.linalg.norm(coords[0] - coords[4]) | |
return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0 | |
class GeometricProcessor(BaseProcessor): | |
# Landmark indices | |
L_EYE = [362, 385, 387, 263, 373, 380] | |
R_EYE = [33, 160, 158, 133, 153, 144] | |
MOUTH = [61, 291, 39, 181, 0, 17, 84, 178] | |
HEAD_POSE_LANDMARKS = [1, 152, 263, 33, 287, 57] | |
def __init__(self, config): | |
self.settings = config['geometric_settings'] | |
self.face_mesh = mp.solutions.face_mesh.FaceMesh( | |
max_num_faces=1, | |
refine_landmarks=False, | |
min_detection_confidence=0.5, | |
min_tracking_confidence=0.5) | |
self.downscale_factor = self.settings.get('downscale_factor', 0.35) | |
self.default_skip = max(1, self.settings.get("skip_frames", 2)) | |
# Caching and state variables | |
self.frame_counter = 0 | |
self.last_indicators = {"drowsiness_level": "Initializing...", "lighting": "Good", "details": {}} | |
self.last_landmarks = None | |
self.last_drawn_frame = None | |
self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 } | |
self.zeros_4x1 = np.zeros((4, 1), np.float32) | |
# --- FIX: Combined process_frame and analyse_frame into one method --- | |
def process_frame(self, frame, draw_visuals=True): | |
self.frame_counter += 1 | |
# Adaptive frame skipping logic | |
last_level = self.last_indicators.get("drowsiness_level", "Awake") | |
skip_n = 1 if last_level != "Awake" else self.default_skip | |
if self.frame_counter % skip_n != 0: | |
# If we are drawing visuals and have a cached frame, return it. | |
if draw_visuals and self.last_drawn_frame is not None: | |
return self.last_drawn_frame, self.last_indicators | |
# If we are not drawing, we can just return the last indicators. | |
elif not draw_visuals: | |
return None, self.last_indicators | |
# Fallback for the first frame | |
else: | |
return frame.copy(), self.last_indicators | |
# --- Core Frame Analysis (This part runs for both modes) --- | |
original_frame = frame.copy() | |
h_orig, w_orig, _ = original_frame.shape | |
small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor, interpolation=cv2.INTER_AREA) | |
h, w, _ = small_frame.shape | |
gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY) | |
brightness = np.mean(gray) | |
drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}} | |
face_landmarks_data = None | |
if brightness < self.settings['low_light_thresh']: | |
drowsiness_indicators["lighting"] = "Low" | |
else: | |
img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB) | |
img_rgb.flags.writeable = False | |
results = self.face_mesh.process(img_rgb) | |
img_rgb.flags.writeable = True | |
if results.multi_face_landmarks: | |
face_landmarks_data = results.multi_face_landmarks[0] | |
landmarks = face_landmarks_data.landmark | |
score = 0 | |
weights = self.settings['indicator_weights'] | |
# --- Drowsiness Calculations --- | |
ear = (calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) + calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)))/2.0 | |
if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1 | |
else: self.counters['eye_closure']=0 | |
if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure'] | |
mar = calculate_mar([landmarks[i] for i in self.MOUTH], (h, w)) | |
if mar > self.settings['yawn_mar_thresh']: self.counters['yawning']+=1 | |
else: self.counters['yawning']=0 | |
if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning'] | |
# --- Head Pose Estimation --- | |
face_3d_model = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float32) | |
face_2d_points = np.array([(landmarks[i].x*w,landmarks[i].y*h) for i in self.HEAD_POSE_LANDMARKS],dtype=np.float32) | |
cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float32) | |
_, rvec, _ = cv2.solvePnP(face_3d_model, face_2d_points, cam_matrix, self.zeros_4x1, flags=cv2.SOLVEPNP_EPNP) | |
rmat, _ = cv2.Rodrigues(rvec) | |
angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat) | |
pitch, yaw = angles[0], angles[1] | |
if pitch > self.settings['head_nod_thresh']: self.counters['head_nod']+=1 | |
else: self.counters['head_nod']=0 | |
if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']: score += weights['head_nod'] | |
if abs(yaw) > self.settings['head_look_away_thresh']: self.counters['looking_away']+=1 | |
else: self.counters['looking_away']=0 | |
if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']: score += weights['looking_away'] | |
levels = self.settings['drowsiness_levels'] | |
if score >= levels['very_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Very Drowsy" | |
elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy" | |
drowsiness_indicators['details']['Score'] = score | |
self.last_indicators = drowsiness_indicators | |
self.last_landmarks = face_landmarks_data | |
# --- FIX: Conditional Drawing --- | |
if draw_visuals: | |
processed_frame = self.draw_visuals(original_frame, drowsiness_indicators, face_landmarks_data) | |
self.last_drawn_frame = processed_frame | |
return processed_frame, drowsiness_indicators | |
else: | |
# For analysis-only, return None for the frame to satisfy the unpack | |
return None, drowsiness_indicators | |
def draw_visuals(self, frame, indicators, landmarks_data=None): | |
h, w, _ = frame.shape | |
level = indicators['drowsiness_level'] | |
score_val = indicators.get("details", {}).get("Score", 0) | |
color = (0, 255, 0) | |
if indicators['lighting'] == "Low": | |
color = (0, 165, 255) | |
cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA) | |
elif level == "Slightly Drowsy": color = (0, 255, 255) | |
elif level == "Very Drowsy": color = (0, 0, 255) | |
if landmarks_data: | |
landmarks = landmarks_data.landmark | |
eye_mouth_landmarks_indices = self.L_EYE + self.R_EYE + self.MOUTH | |
for idx in eye_mouth_landmarks_indices: | |
lm = landmarks[idx] | |
x, y = int(lm.x * w), int(lm.y * h) | |
cv2.circle(frame, (x, y), 2, (0, 255, 0), -1) | |
cv2.rectangle(frame, (0, 0), (w - 1, h - 1), color, 10) | |
status_text = f"Status: {level} (Score: {score_val:.2f})" | |
cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA) | |
return frame | |