# facial_detection.py import cv2 import numpy as np from scipy.spatial import distance as dist from collections import deque import time from datetime import datetime class OpenCVFaceDetector: """Face detection and landmark estimation using OpenCV""" def __init__(self): # Load OpenCV's pre-trained face detection models self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') self.eye_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml') self.mouth_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_smile.xml') # Try to load MediaPipe for better landmark detection (fallback if not available) self.use_mediapipe = False try: import mediapipe as mp self.mp_face_mesh = mp.solutions.face_mesh self.mp_drawing = mp.solutions.drawing_utils self.face_mesh = self.mp_face_mesh.FaceMesh( static_image_mode=False, max_num_faces=1, refine_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5 ) self.use_mediapipe = True print("✅ Using MediaPipe for enhanced landmark detection") except ImportError: print("⚠️ MediaPipe not available, using OpenCV cascade classifiers") # Define landmark indices for MediaPipe (68-point equivalent) self.LEFT_EYE_INDICES = [33, 7, 163, 144, 145, 153, 154, 155, 133, 173, 157, 158, 159, 160, 161, 246] self.RIGHT_EYE_INDICES = [362, 382, 381, 380, 374, 373, 390, 249, 263, 466, 388, 387, 386, 385, 384, 398] self.MOUTH_INDICES = [78, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308, 415, 310, 311, 312, 13, 82, 81, 80, 62] def detect_faces_opencv(self, frame): """Detect faces using OpenCV Haar cascades""" gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = self.face_cascade.detectMultiScale(gray, 1.3, 5) return faces, gray def estimate_landmarks_opencv(self, frame, face_rect): """Estimate key facial landmarks using OpenCV cascades""" x, y, w, h = face_rect roi_gray = frame[y:y+h, x:x+w] roi_color = frame[y:y+h, x:x+w] # Detect eyes eyes = self.eye_cascade.detectMultiScale(roi_gray, 1.1, 3) # Detect mouth/smile mouths = self.mouth_cascade.detectMultiScale(roi_gray, 1.1, 3) landmarks = {} # Process eyes if len(eyes) >= 2: # Sort eyes by x-coordinate (left to right) eyes = sorted(eyes, key=lambda e: e[0]) landmarks['left_eye'] = (x + eyes[0][0] + eyes[0][2]//2, y + eyes[0][1] + eyes[0][3]//2) landmarks['right_eye'] = (x + eyes[1][0] + eyes[1][2]//2, y + eyes[1][1] + eyes[1][3]//2) # Estimate eye corners based on eye rectangles landmarks['left_eye_corners'] = [ (x + eyes[0][0], y + eyes[0][1] + eyes[0][3]//2), # left corner (x + eyes[0][0] + eyes[0][2], y + eyes[0][1] + eyes[0][3]//2), # right corner (x + eyes[0][0] + eyes[0][2]//2, y + eyes[0][1]), # top (x + eyes[0][0] + eyes[0][2]//2, y + eyes[0][1] + eyes[0][3]) # bottom ] landmarks['right_eye_corners'] = [ (x + eyes[1][0], y + eyes[1][1] + eyes[1][3]//2), (x + eyes[1][0] + eyes[1][2], y + eyes[1][1] + eyes[1][3]//2), (x + eyes[1][0] + eyes[1][2]//2, y + eyes[1][1]), (x + eyes[1][0] + eyes[1][2]//2, y + eyes[1][1] + eyes[1][3]) ] # Process mouth if len(mouths) > 0: mouth = mouths[0] # Take the first detected mouth landmarks['mouth_center'] = (x + mouth[0] + mouth[2]//2, y + mouth[1] + mouth[3]//2) landmarks['mouth_corners'] = [ (x + mouth[0], y + mouth[1] + mouth[3]//2), # left corner (x + mouth[0] + mouth[2], y + mouth[1] + mouth[3]//2), # right corner (x + mouth[0] + mouth[2]//2, y + mouth[1]), # top (x + mouth[0] + mouth[2]//2, y + mouth[1] + mouth[3]) # bottom ] # Estimate nose tip (center of face, slightly above mouth) landmarks['nose_tip'] = (x + w//2, y + int(h*0.6)) # Estimate chin (bottom center of face) landmarks['chin'] = (x + w//2, y + h) return landmarks def detect_landmarks_mediapipe(self, frame): """Detect landmarks using MediaPipe""" rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = self.face_mesh.process(rgb_frame) landmarks_dict = {} if results.multi_face_landmarks: face_landmarks = results.multi_face_landmarks[0] h, w, _ = frame.shape # Extract eye landmarks left_eye_points = [] right_eye_points = [] mouth_points = [] for i in self.LEFT_EYE_INDICES[:6]: # Take first 6 points for eye shape point = face_landmarks.landmark[i] left_eye_points.append((int(point.x * w), int(point.y * h))) for i in self.RIGHT_EYE_INDICES[:6]: point = face_landmarks.landmark[i] right_eye_points.append((int(point.x * w), int(point.y * h))) for i in self.MOUTH_INDICES[:8]: # Take key mouth points point = face_landmarks.landmark[i] mouth_points.append((int(point.x * w), int(point.y * h))) landmarks_dict['left_eye_corners'] = left_eye_points landmarks_dict['right_eye_corners'] = right_eye_points landmarks_dict['mouth_corners'] = mouth_points # Key points nose_tip = face_landmarks.landmark[1] # Nose tip chin = face_landmarks.landmark[175] # Chin landmarks_dict['nose_tip'] = (int(nose_tip.x * w), int(nose_tip.y * h)) landmarks_dict['chin'] = (int(chin.x * w), int(chin.y * h)) # Calculate face bounding box x_coords = [int(lm.x * w) for lm in face_landmarks.landmark] y_coords = [int(lm.y * h) for lm in face_landmarks.landmark] face_rect = (min(x_coords), min(y_coords), max(x_coords) - min(x_coords), max(y_coords) - min(y_coords)) return face_rect, landmarks_dict return None, {} def detect_landmarks(self, frame): """Main method to detect face and landmarks""" if self.use_mediapipe: face_rect, landmarks = self.detect_landmarks_mediapipe(frame) if face_rect is not None: return [face_rect], [landmarks] # Fallback to OpenCV faces, gray = self.detect_faces_opencv(frame) landmarks_list = [] face_rects = [] for face in faces: landmarks = self.estimate_landmarks_opencv(gray, face) if landmarks: landmarks_list.append(landmarks) face_rects.append(face) return face_rects, landmarks_list class MetricsCalculator: """Calculate drowsiness metrics from facial landmarks""" @staticmethod def calculate_ear_from_points(eye_points): """Calculate Eye Aspect Ratio from eye corner points""" if len(eye_points) < 4: return 0.3 # Default value # For 4-point eye estimation: [left, right, top, bottom] if len(eye_points) == 4: left, right, top, bottom = eye_points # Vertical distances vertical_dist = dist.euclidean(top, bottom) # Horizontal distance horizontal_dist = dist.euclidean(left, right) if horizontal_dist == 0: return 0.3 ear = vertical_dist / horizontal_dist return ear # For 6-point eye estimation (MediaPipe style) elif len(eye_points) >= 6: # Calculate vertical distances v1 = dist.euclidean(eye_points[1], eye_points[5]) v2 = dist.euclidean(eye_points[2], eye_points[4]) # Horizontal distance h = dist.euclidean(eye_points[0], eye_points[3]) if h == 0: return 0.3 ear = (v1 + v2) / (2.0 * h) return ear return 0.3 @staticmethod def calculate_mar_from_points(mouth_points): """Calculate Mouth Aspect Ratio from mouth points""" if len(mouth_points) < 4: return 0.3 # Default value if len(mouth_points) == 4: # [left, right, top, bottom] left, right, top, bottom = mouth_points vertical_dist = dist.euclidean(top, bottom) horizontal_dist = dist.euclidean(left, right) if horizontal_dist == 0: return 0.3 mar = vertical_dist / horizontal_dist return mar elif len(mouth_points) >= 8: # More sophisticated mouth analysis # Calculate multiple vertical distances v1 = dist.euclidean(mouth_points[1], mouth_points[7]) v2 = dist.euclidean(mouth_points[2], mouth_points[6]) v3 = dist.euclidean(mouth_points[3], mouth_points[5]) # Horizontal distance h = dist.euclidean(mouth_points[0], mouth_points[4]) if h == 0: return 0.3 mar = (v1 + v2 + v3) / (3.0 * h) return mar return 0.3 @staticmethod def estimate_head_pose_simple(nose_tip, chin, frame_center): """Simple head pose estimation using nose and chin""" if nose_tip is None or chin is None: return np.array([0, 0, 0]) # Calculate head tilt based on nose-chin line deviation from vertical nose_chin_vector = np.array([chin[0] - nose_tip[0], chin[1] - nose_tip[1]]) vertical_vector = np.array([0, 1]) # Calculate angle from vertical dot_product = np.dot(nose_chin_vector, vertical_vector) norms = np.linalg.norm(nose_chin_vector) * np.linalg.norm(vertical_vector) if norms == 0: return np.array([0, 0, 0]) cos_angle = dot_product / norms angle = np.arccos(np.clip(cos_angle, -1, 1)) * 180 / np.pi # Determine direction of tilt if nose_chin_vector[0] < 0: angle = -angle # Simple pitch estimation based on nose position relative to frame center pitch = (nose_tip[1] - frame_center[1]) / frame_center[1] * 30 # Scale to degrees return np.array([pitch, 0, angle]) # [pitch, yaw, roll] class DrowsinessAnalyzer: """Analyze drowsiness based on facial metrics""" def __init__(self): # Thresholds self.EAR_THRESHOLD = 0.20 # Adjusted for OpenCV detection self.EAR_CONSECUTIVE_FRAMES = 15 self.YAWN_THRESHOLD = 0.8 # Adjusted for mouth detection self.YAWN_CONSECUTIVE_FRAMES = 10 self.NOD_THRESHOLD = 20 # Counters self.ear_counter = 0 self.yawn_counter = 0 self.nod_counter = 0 # History tracking self.ear_history = deque(maxlen=30) self.yawn_history = deque(maxlen=30) self.head_pose_history = deque(maxlen=30) def analyze_drowsiness(self, ear, mar, head_angles): """Analyze current metrics and return drowsiness indicators""" drowsiness_indicators = [] # Update history self.ear_history.append(ear) self.yawn_history.append(mar) self.head_pose_history.append(head_angles[0]) # Check EAR (eyes closed detection) if ear < self.EAR_THRESHOLD: self.ear_counter += 1 if self.ear_counter >= self.EAR_CONSECUTIVE_FRAMES: drowsiness_indicators.append("EYES_CLOSED") else: self.ear_counter = 0 # Check yawning if mar > self.YAWN_THRESHOLD: self.yawn_counter += 1 if self.yawn_counter >= self.YAWN_CONSECUTIVE_FRAMES: drowsiness_indicators.append("YAWNING") else: self.yawn_counter = 0 # Check head nodding if abs(head_angles[0]) > self.NOD_THRESHOLD: self.nod_counter += 1 if self.nod_counter >= 8: drowsiness_indicators.append("HEAD_NOD") else: self.nod_counter = 0 return drowsiness_indicators def get_severity_level(self, indicators): """Determine severity based on indicators""" if len(indicators) >= 2: return "critical" elif "EYES_CLOSED" in indicators: return "high" elif indicators: return "medium" else: return "normal" class AlertManager: """Manage alert generation and timing""" def __init__(self, cooldown_seconds=8): self.last_alert_time = 0 self.cooldown_seconds = cooldown_seconds def should_trigger_alert(self, indicators): """Check if alert should be triggered""" current_time = time.time() if indicators and (current_time - self.last_alert_time) > self.cooldown_seconds: self.last_alert_time = current_time return True return False class VisualizationRenderer: """Handle visual rendering of detection results""" @staticmethod def draw_landmarks_and_contours(frame, landmarks, face_rect): """Draw facial landmarks and detection areas""" x, y, w, h = face_rect cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) # Draw eye areas if 'left_eye_corners' in landmarks: points = np.array(landmarks['left_eye_corners'], np.int32) cv2.polylines(frame, [points], True, (0, 255, 0), 2) if 'right_eye_corners' in landmarks: points = np.array(landmarks['right_eye_corners'], np.int32) cv2.polylines(frame, [points], True, (0, 255, 0), 2) # Draw mouth area if 'mouth_corners' in landmarks: points = np.array(landmarks['mouth_corners'], np.int32) cv2.polylines(frame, [points], True, (0, 255, 255), 2) # Draw key points key_points = ['nose_tip', 'chin'] for point_name in key_points: if point_name in landmarks: cv2.circle(frame, landmarks[point_name], 3, (255, 0, 0), -1) @staticmethod def draw_metrics_overlay(frame, ear, mar, head_angle, indicators): """Draw metrics and alerts on frame""" # Metrics text cv2.putText(frame, f"EAR: {ear:.3f}", (10, frame.shape[0] - 80), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) cv2.putText(frame, f"MAR: {mar:.3f}", (10, frame.shape[0] - 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) cv2.putText(frame, f"Head: {head_angle:.1f}°", (10, frame.shape[0] - 40), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) # Alert overlay if indicators: cv2.putText(frame, "⚠️ DROWSINESS ALERT! ⚠️", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3) class StatusLogger: """Handle logging and status tracking""" def __init__(self, max_logs=100): self.status_log = deque(maxlen=max_logs) def log(self, message): """Add timestamped log entry""" timestamp = datetime.now().strftime("%H:%M:%S") self.status_log.append(f"[{timestamp}] {message}") def get_recent_logs(self, count=10): """Get recent log entries""" return list(self.status_log)[-count:]