Spaces:
Sleeping
Sleeping
File size: 8,551 Bytes
19f420a 929f736 19f420a f6146b6 19f420a f6146b6 19f420a f6146b6 19f420a f6146b6 f37553c f6146b6 19f420a f6146b6 209de52 f6146b6 f37553c f6146b6 f37553c 929f736 f6146b6 f37553c 19f420a f37553c 19f420a f6146b6 f37553c 19f420a f37553c 19f420a f6146b6 929f736 f37553c 929f736 f37553c 26440c6 f37553c f6146b6 929f736 f37553c f6146b6 f37553c f6146b6 f37553c f6146b6 19f420a f37553c 19f420a f6146b6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
# drive_paddy/detection/strategies/geometric.py
import cv2
import mediapipe as mp
import numpy as np
import math
from ..base_processor import BaseProcessor
# --- Helper Functions (Unchanged) ---
def calculate_ear(eye_landmarks, frame_shape):
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in eye_landmarks])
v1 = np.linalg.norm(coords[1] - coords[5])
v2 = np.linalg.norm(coords[2] - coords[4])
h1 = np.linalg.norm(coords[0] - coords[3])
return (v1 + v2) / (2.0 * h1) if h1 > 0 else 0.0
def calculate_mar(mouth_landmarks, frame_shape):
coords = np.array([(lm.x * frame_shape[1], lm.y * frame_shape[0]) for lm in mouth_landmarks])
v1 = np.linalg.norm(coords[1] - coords[7])
v2 = np.linalg.norm(coords[2] - coords[6])
v3 = np.linalg.norm(coords[3] - coords[5])
h1 = np.linalg.norm(coords[0] - coords[4])
return (v1 + v2 + v3) / (2.0 * h1) if h1 > 0 else 0.0
class GeometricProcessor(BaseProcessor):
def __init__(self, config):
self.settings = config['geometric_settings']
self.face_mesh = mp.solutions.face_mesh.FaceMesh(
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5)
self.downscale_factor = self.settings.get('downscale_factor', 0.5)
self.skip_frames = self.settings.get('skip_frames', 2)
# State variables for frame skipping
self.frame_counter = 0
self.last_indicators = {"drowsiness_level": "Initializing...", "lighting": "Good", "details": {}}
self.last_landmarks = None
# Landmark and counter definitions
self.counters = { "eye_closure": 0, "yawning": 0, "head_nod": 0, "looking_away": 0 }
self.L_EYE = [362, 385, 387, 263, 373, 380]
self.R_EYE = [33, 160, 158, 133, 153, 144]
self.MOUTH = [61, 291, 39, 181, 0, 17, 84, 178]
def process_frame(self, frame):
self.frame_counter += 1
# --- FRAME SKIPPING LOGIC ---
# If we are skipping this frame, simply draw the last known status on the current frame and return.
if self.frame_counter % self.skip_frames != 0:
# We use the original, full-sized 'frame' for drawing to keep the video feed smooth.
frame_with_last_results = self.draw_visuals(frame, self.last_indicators, self.last_landmarks)
return frame_with_last_results, self.last_indicators, self.last_landmarks
# --- FRAME PROCESSING ---
original_frame = frame.copy()
h_orig, w_orig, _ = original_frame.shape
# --- OPTIMIZATION 1: DOWNSCALING ---
# Create a small frame for fast processing
small_frame = cv2.resize(original_frame, (0, 0), fx=self.downscale_factor, fy=self.downscale_factor)
h, w, _ = small_frame.shape
# --- FIX 1: PROCESS THE SMALL FRAME, NOT THE ORIGINAL ---
# All processing should be done on the `small_frame` to gain the speed advantage.
gray = cv2.cvtColor(small_frame, cv2.COLOR_BGR2GRAY)
brightness = np.mean(gray)
is_low_light = brightness < self.settings['low_light_thresh']
drowsiness_indicators = {"drowsiness_level": "Awake", "lighting": "Good", "details": {}}
face_landmarks = None
if not is_low_light:
# Convert the SMALL frame to RGB for MediaPipe
img_rgb = cv2.cvtColor(small_frame, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(img_rgb)
face_landmarks = results.multi_face_landmarks
if face_landmarks:
landmarks = face_landmarks[0].landmark
score = 0
weights = self.settings['indicator_weights']
# --- Drowsiness Calculations (these correctly use the small frame's 'h' and 'w') ---
ear = (calculate_ear([landmarks[i] for i in self.L_EYE],(h,w)) + calculate_ear([landmarks[i] for i in self.R_EYE],(h,w)))/2.0
if ear < self.settings['eye_ar_thresh']: self.counters['eye_closure']+=1
else: self.counters['eye_closure']=0
if self.counters['eye_closure'] >= self.settings['eye_ar_consec_frames']: score += weights['eye_closure']
mar = calculate_mar([landmarks[i] for i in self.MOUTH], (h, w))
if mar > self.settings['yawn_mar_thresh']: self.counters['yawning']+=1
else: self.counters['yawning']=0
if self.counters['yawning'] >= self.settings['yawn_consec_frames']: score += weights['yawning']
# --- Head Pose Estimation (also uses small frame dimensions 'h' and 'w') ---
face_3d = np.array([[0.0,0.0,0.0],[0.0,-330.0,-65.0],[-225.0,170.0,-135.0],[225.0,170.0,-135.0],[-150.0,-150.0,-125.0],[150.0,-150.0,-125.0]],dtype=np.float64)
face_2d = np.array([(landmarks[1].x*w,landmarks[1].y*h),(landmarks[152].x*w,landmarks[152].y*h),(landmarks[263].x*w,landmarks[263].y*h),(landmarks[33].x*w,landmarks[33].y*h),(landmarks[287].x*w,landmarks[287].y*h),(landmarks[57].x*w,landmarks[57].y*h)],dtype=np.float64)
cam_matrix = np.array([[w,0,w/2],[0,w,h/2],[0,0,1]],dtype=np.float64)
_, rot_vec, _ = cv2.solvePnP(face_3d, face_2d, cam_matrix, np.zeros((4,1),dtype=np.float64))
rmat, _ = cv2.Rodrigues(rot_vec); angles, _, _, _, _, _ = cv2.RQDecomp3x3(rmat)
pitch, yaw = angles[0], angles[1]
if pitch > self.settings['head_nod_thresh']: self.counters['head_nod']+=1
else: self.counters['head_nod']=0
if self.counters['head_nod'] >= self.settings['head_pose_consec_frames']: score += weights['head_nod']
if abs(yaw) > self.settings['head_look_away_thresh']: self.counters['looking_away']+=1
else: self.counters['looking_away']=0
if self.counters['looking_away'] >= self.settings['head_pose_consec_frames']: score += weights['looking_away']
levels = self.settings['drowsiness_levels']
if score >= levels['very_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Very Drowsy"
elif score >= levels['slightly_drowsy_threshold']: drowsiness_indicators['drowsiness_level'] = "Slightly Drowsy"
drowsiness_indicators['details']['Score'] = score
else:
drowsiness_indicators["lighting"] = "Low"
# --- Update state for skipped frames ---
self.last_indicators = drowsiness_indicators
self.last_landmarks = face_landmarks
# --- Draw visuals on the ORIGINAL frame for a high-quality output ---
processed_frame = self.draw_visuals(original_frame, drowsiness_indicators, face_landmarks)
return processed_frame, drowsiness_indicators, face_landmarks
def draw_visuals(self, frame, indicators, landmarks_data=None):
"""Helper function to draw all visualizations on the frame."""
# --- FIX 2: USE THE DIMENSIONS OF THE FRAME BEING DRAWN ON ---
# We get the height and width from the input 'frame' itself, which is the original, full-sized one.
h, w, _ = frame.shape
level = indicators['drowsiness_level']
score_val = indicators.get("details", {}).get("Score", 0)
color = (0, 255, 0) # Green for Awake
if indicators['lighting'] == "Low":
color = (0, 165, 255) # Orange
cv2.putText(frame, "LOW LIGHT", (w // 2 - 120, h // 2), cv2.FONT_HERSHEY_SIMPLEX, 2, color, 3, cv2.LINE_AA)
elif level == "Slightly Drowsy": color = (0, 255, 255) # Yellow
elif level == "Very Drowsy": color = (0, 0, 255) # Red
# Draw landmarks if they were detected
if landmarks_data:
landmarks = landmarks_data[0].landmark
eye_mouth_landmarks = self.L_EYE + self.R_EYE + self.MOUTH
for idx in eye_mouth_landmarks:
lm = landmarks[idx]
# Scale landmark coordinates to the full-sized frame using its 'w' and 'h'.
x, y = int(lm.x * w), int(lm.y * h)
cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)
cv2.rectangle(frame, (0, 0), (w, h), color, 10)
status_text = f"Status: {level} (Score: {score_val:.2f})"
cv2.putText(frame, status_text, (20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, cv2.LINE_AA)
return frame
|