Spaces:
Sleeping
Sleeping
File size: 8,221 Bytes
19f420a 3cfc6a1 19f420a f6146b6 19f420a f6146b6 19f420a f6146b6 19f420a 653fae1 3cfc6a1 653fae1 3cfc6a1 19f420a f6146b6 653fae1 f6146b6 8054642 3cfc6a1 653fae1 f6146b6 653fae1 f37553c 3cfc6a1 653fae1 f6146b6 653fae1 3cfc6a1 87a985c 653fae1 8054642 653fae1 8054642 653fae1 f6146b6 8054642 3cfc6a1 f6146b6 8054642 f6146b6 f37553c 653fae1 f6146b6 8054642 f37553c 8054642 f6146b6 653fae1 f37553c 8054642 19f420a 8054642 f37553c 19f420a 653fae1 f37553c 8054642 f37553c 19f420a 653fae1 8054642 3cfc6a1 f37553c 653fae1 8054642 f37553c 653fae1 f37553c 653fae1 f6146b6 8054642 653fae1 f6146b6 653fae1 f6146b6 653fae1 f37553c 653fae1 f6146b6 8054642 f6146b6 19f420a 8054642 f37553c 19f420a f6146b6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 |
import cv2
import mediapipe as mp
import numpy as np
import math
from src.detection.base_processor import BaseProcessor
# --- Helper Functions (Unchanged) ---
def calculate_ear(eye_landmarks, frame_shape):
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
v1 = np.linalg.norm(coords[1] - coords[5])
v2 = np.linalg.norm(coords[2] - coords[4])
h1 = np.linalg.norm(coords[0] - coords[3])
return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0
def calculate_mar(mouth_landmarks, frame_shape):
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
v1 = np.linalg.norm(coords[1] - coords[7])
v2 = np.linalg.norm(coords[2] - coords[6])
v3 = np.linalg.norm(coords[3] - coords[5])
h1 = np.linalg.norm(coords[0] - coords[4])
return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
class GeometricProcessor(BaseProcessor):
# Landmark indices
L_EYE = [362, 385, 387, 263, 373, 380]
R_EYE = [33, 160, 158, 133, 153, 144]
MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
HEAD_POSE_LANDMARKS = [1, 152, 263, 33, 287, 57]
def __init__(self, config):
self.settings = config['geometric_settings']
self.face_mesh = mp.solutions.face_mesh.FaceMesh(
max_num_faces=1,
refine_landmarks=False,
min_detection_confidence=0.5,
min_tracking_confidence=0.5)
self.downscale_factor = self.settings.get('downscale_factor', 0.35)
self.default_skip = max(1, self.settings.get("skip_frames", 2))
# Caching and state variables
self.frame_counter = 0
self.last_indicators = {"drowsiness_level": "Initializing...", "lighting": "Good", "details": {}}
self.last_landmarks = None
self.last_drawn_frame = None
self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
self.zeros_4x1 = np.zeros((4, 1), np.float32)
# --- FIX: Combined process_frame and analyse_frame into one method ---
def process_frame(self, frame, draw_visuals=True):
self.frame_counter += 1
# Adaptive frame skipping logic
last_level = self.last_indicators.get("drowsiness_level", "Awake")
skip_n = 1 if last_level != "Awake" else self.default_skip
if self.frame_counter % skip_n != 0:
# If we are drawing visuals and have a cached frame, return it.
if draw_visuals and self.last_drawn_frame is not None:
return self.last_drawn_frame, self.last_indicators
# If we are not drawing, we can just return the last indicators.
elif not draw_visuals:
return None, self.last_indicators
# Fallback for the first frame
else:
return frame.copy(), self.last_indicators
# --- Core Frame Analysis (This part runs for both modes) ---
original_frame = frame.copy()
h_orig, w_orig, _ = original_frame.shape
small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor, interpolation=cv2.INTER_AREA)
h, w, _ = small_frame.shape
gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
brightness = np.mean(gray)
drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}}
face_landmarks_data = None
if brightness < self.settings['low_light_thresh']:
drowsiness_indicators["lighting"] = "Low"
else:
img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
img_rgb.flags.writeable = False
results = self.face_mesh.process(img_rgb)
img_rgb.flags.writeable = True
if results.multi_face_landmarks:
face_landmarks_data = results.multi_face_landmarks[0]
landmarks = face_landmarks_data.landmark
score = 0
weights = self.settings['indicator_weights']
# --- Drowsiness Calculations ---
ear = (calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) + calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)))/2.0
if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
else: self.counters['eye_closure']=0
if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure']
mar = calculate_mar([landmarks[i] for i in self.MOUTH], (h, w))
if mar > self.settings['yawn_mar_thresh']: self.counters['yawning']+=1
else: self.counters['yawning']=0
if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
# --- Head Pose Estimation ---
face_3d_model = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float32)
face_2d_points = np.array([(landmarks[i].x*w,landmarks[i].y*h) for i in self.HEAD_POSE_LANDMARKS],dtype=np.float32)
cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float32)
_, rvec, _ = cv2.solvePnP(face_3d_model, face_2d_points, cam_matrix, self.zeros_4x1, flags=cv2.SOLVEPNP_EPNP)
rmat, _ = cv2.Rodrigues(rvec)
angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
pitch, yaw = angles[0], angles[1]
if pitch > self.settings['head_nod_thresh']: self.counters['head_nod']+=1
else: self.counters['head_nod']=0
if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']: score += weights['head_nod']
if abs(yaw) > self.settings['head_look_away_thresh']: self.counters['looking_away']+=1
else: self.counters['looking_away']=0
if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']: score += weights['looking_away']
levels = self.settings['drowsiness_levels']
if score >= levels['very_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
drowsiness_indicators['details']['Score'] = score
self.last_indicators = drowsiness_indicators
self.last_landmarks = face_landmarks_data
# --- FIX: Conditional Drawing ---
if draw_visuals:
processed_frame = self.draw_visuals(original_frame, drowsiness_indicators, face_landmarks_data)
self.last_drawn_frame = processed_frame
return processed_frame, drowsiness_indicators
else:
# For analysis-only, return None for the frame to satisfy the unpack
return None, drowsiness_indicators
def draw_visuals(self, frame, indicators, landmarks_data=None):
h, w, _ = frame.shape
level = indicators['drowsiness_level']
score_val = indicators.get("details", {}).get("Score", 0)
color = (0, 255, 0)
if indicators['lighting'] == "Low":
color = (0, 165, 255)
cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA)
elif level == "Slightly Drowsy": color = (0, 255, 255)
elif level == "Very Drowsy": color = (0, 0, 255)
if landmarks_data:
landmarks = landmarks_data.landmark
eye_mouth_landmarks_indices = self.L_EYE + self.R_EYE + self.MOUTH
for idx in eye_mouth_landmarks_indices:
lm = landmarks[idx]
x, y = int(lm.x * w), int(lm.y * h)
cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)
cv2.rectangle(frame, (0, 0), (w - 1, h - 1), color, 10)
status_text = f"Status: {level} (Score: {score_val:.2f})"
cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
return frame
|