Spaces:
Sleeping
Sleeping
File size: 8,817 Bytes
19f420a 3cfc6a1 19f420a f6146b6 19f420a f6146b6 19f420a f6146b6 19f420a 3cfc6a1 19f420a f6146b6 3cfc6a1 f6146b6 3cfc6a1 f6146b6 f37553c 19f420a 3cfc6a1 19f420a f6146b6 3cfc6a1 87a985c 3cfc6a1 f6146b6 209de52 f6146b6 3cfc6a1 f6146b6 f37553c f6146b6 f37553c 929f736 f6146b6 f37553c 19f420a f37553c 19f420a f6146b6 f37553c 19f420a f37553c 19f420a f6146b6 3cfc6a1 22fab0d 3cfc6a1 f37553c 3cfc6a1 f37553c 3cfc6a1 929f736 f37553c 3cfc6a1 f37553c 26440c6 f37553c 3cfc6a1 f6146b6 929f736 f37553c f6146b6 f37553c f6146b6 f37553c f6146b6 19f420a f37553c 19f420a f6146b6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 |
# drive_paddy/detection/strategies/geometric.py
import cv2
import mediapipe as mp
import numpy as np
import math
from src.detection.base_processor import BaseProcessor
# --- Helper Functions (Unchanged) ---
def calculate_ear(eye_landmarks, frame_shape):
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
v1 = np.linalg.norm(coords[1] - coords[5])
v2 = np.linalg.norm(coords[2] - coords[4])
h1 = np.linalg.norm(coords[0] - coords[3])
return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0
def calculate_mar(mouth_landmarks, frame_shape):
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
v1 = np.linalg.norm(coords[1] - coords[7])
v2 = np.linalg.norm(coords[2] - coords[6])
v3 = np.linalg.norm(coords[3] - coords[5])
h1 = np.linalg.norm(coords[0] - coords[4])
return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
class GeometricProcessor(BaseProcessor):
L_EYE = [362, 385, 387, 263, 373, 380]
R_EYE = [33, 160, 158, 133, 153, 144]
MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
def __init__(self, config):
self.settings = config['geometric_settings']
self.face_mesh = mp.solutions.face_mesh.FaceMesh(
max_num_faces=1,
refine_landmarks=False,
min_detection_confidence=0.5,
min_tracking_confidence=0.5)
self.downscale_factor = self.settings.get('downscale_factor', 0.35)
self.default_skip = max(1, self.settings.get("skip_frames", 2))
# State variables for frame skipping
self.frame_counter = 0
self.last_indicators = {"drowsiness_level": "Initializing...", "lighting": "Good", "details": {}}
self.last_landmarks = None
# Landmark and counter definitions
self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
# Pre-allocated buffers
self.zeros_4x1 = np.zeros((4, 1), np.float32)
self.landmark_ids = self.L_EYE + self.R_EYE + self.MOUTH
def process_frame(self, frame):
self.frame_counter += 1
# adaptive skipping: fewer analyses when driver is awake
last_level = self.last_indicators.get("drowsiness_level", "Awake")
skip_n = 1 if last_level != "Awake" else self.default_skip
if self.frame_counter % skip_n != 0:
drawn = self.draw_visuals(frame.copy(), self.last_indicators, self.last_landmarks)
return drawn, self.last_indicators, self.last_landmarks
# --- FRAME PROCESSING ---
original_frame = frame.copy()
h_orig, w_orig, _ = original_frame.shape
# --- OPTIMIZATION 1: DOWNSCALING ---
# Create a small frame for fast processing
small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor, interpolation=cv2.INTER_AREA)
h, w, _ = small_frame.shape
# --- FIX 1: PROCESS THE SMALL FRAME, NOT THE ORIGINAL ---
# All processing should be done on the `small_frame` to gain the speed advantage.
gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
brightness = np.mean(gray)
is_low_light = brightness < self.settings['low_light_thresh']
drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}}
face_landmarks = None
if not is_low_light:
# Convert the SMALL frame to RGB for MediaPipe
img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(img_rgb)
face_landmarks = results.multi_face_landmarks
if face_landmarks:
landmarks = face_landmarks[0].landmark
score = 0
weights = self.settings['indicator_weights']
# --- Drowsiness Calculations (these correctly use the small frame's 'h' and 'w') ---
ear = (calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) + calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)))/2.0
if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
else: self.counters['eye_closure']=0
if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure']
mar = calculate_mar([landmarks[i] for i in self.MOUTH], (h, w))
if mar > self.settings['yawn_mar_thresh']: self.counters['yawning']+=1
else: self.counters['yawning']=0
if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
# --- Head Pose Estimation (also uses small frame dimensions 'h' and 'w') ---
face_3d = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float32)
face_2d = np.array([(landmarks[1].x*w,landmarks[1].y*h),(landmarks[152].x*w,landmarks[152].y*h),(landmarks[263].x*w,landmarks[263].y*h),(landmarks[33].x*w,landmarks[33].y*h),(landmarks[287].x*w,landmarks[287].y*h),(landmarks[57].x*w,landmarks[57].y*h)],dtype=np.float32)
cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float32)
_, rvec, _ = cv2.solvePnP(face_3d, face_2d, cam_matrix, self.zeros_4x1, flags=cv2.SOLVEPNP_EPNP)
rmat, _ = cv2.Rodrigues(rvec)
angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
pitch, yaw = angles[0], angles[1]
if pitch > self.settings['head_nod_thresh']:
self.counters['head_nod']+=1
else:
self.counters['head_nod']=0
if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']:
score += weights['head_nod']
if abs(yaw) > self.settings['head_look_away_thresh']:
self.counters['looking_away']+=1
else:
self.counters['looking_away']=0
if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']:
score += weights['looking_away']
levels = self.settings['drowsiness_levels']
if score >= levels['very_drowsy_threshold']:
drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
drowsiness_indicators['details']['Score'] = score
else:
drowsiness_indicators["lighting"] = "Low"
# --- Update state for skipped frames ---
self.last_indicators = drowsiness_indicators
self.last_landmarks = face_landmarks
# --- Draw visuals on the ORIGINAL frame for a high-quality output ---
processed_frame = self.draw_visuals(original_frame, drowsiness_indicators, face_landmarks)
return processed_frame, drowsiness_indicators, face_landmarks
def draw_visuals(self, frame, indicators, landmarks_data=None):
"""Helper function to draw all visualizations on the frame."""
# --- FIX 2: USE THE DIMENSIONS OF THE FRAME BEING DRAWN ON ---
# We get the height and width from the input 'frame' itself, which is the original, full-sized one.
h, w, _ = frame.shape
level = indicators['drowsiness_level']
score_val = indicators.get("details", {}).get("Score", 0)
color = (0, 255, 0) # Green for Awake
if indicators['lighting'] == "Low":
color = (0, 165, 255) # Orange
cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA)
elif level == "Slightly Drowsy": color = (0, 255, 255) # Yellow
elif level == "Very Drowsy": color = (0, 0, 255) # Red
# Draw landmarks if they were detected
if landmarks_data:
landmarks = landmarks_data[0].landmark
eye_mouth_landmarks = self.L_EYE + self.R_EYE + self.MOUTH
for idx in eye_mouth_landmarks:
lm = landmarks[idx]
# Scale landmark coordinates to the full-sized frame using its 'w' and 'h'.
x, y = int(lm.x * w), int(lm.y * h)
cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)
cv2.rectangle(frame, (0, 0), (w, h), color, 10)
status_text = f"Status: {level} (Score: {score_val:.2f})"
cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
return frame
|