Spaces:
Runtime error
Runtime error
# facial_detection.py | |
import cv2 | |
import numpy as np | |
from scipy.spatial import distance as dist | |
from collections import deque | |
import time | |
from datetime import datetime | |
class OpenCVFaceDetector: | |
"""Face detection and landmark estimation using OpenCV""" | |
def __init__(self): | |
# Load OpenCV's pre-trained face detection models | |
self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
self.eye_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml') | |
self.mouth_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_smile.xml') | |
# Try to load MediaPipe for better landmark detection (fallback if not available) | |
self.use_mediapipe = False | |
try: | |
import mediapipe as mp | |
self.mp_face_mesh = mp.solutions.face_mesh | |
self.mp_drawing = mp.solutions.drawing_utils | |
self.face_mesh = self.mp_face_mesh.FaceMesh( | |
static_image_mode=False, | |
max_num_faces=1, | |
refine_landmarks=True, | |
min_detection_confidence=0.5, | |
min_tracking_confidence=0.5 | |
) | |
self.use_mediapipe = True | |
print("β Using MediaPipe for enhanced landmark detection") | |
except ImportError: | |
print("β οΈ MediaPipe not available, using OpenCV cascade classifiers") | |
# Define landmark indices for MediaPipe (68-point equivalent) | |
self.LEFT_EYE_INDICES = [33, 7, 163, 144, 145, 153, 154, 155, 133, 173, 157, 158, 159, 160, 161, 246] | |
self.RIGHT_EYE_INDICES = [362, 382, 381, 380, 374, 373, 390, 249, 263, 466, 388, 387, 386, 385, 384, 398] | |
self.MOUTH_INDICES = [78, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308, 415, 310, 311, 312, 13, 82, 81, 80, 62] | |
def detect_faces_opencv(self, frame): | |
"""Detect faces using OpenCV Haar cascades""" | |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
faces = self.face_cascade.detectMultiScale(gray, 1.3, 5) | |
return faces, gray | |
def estimate_landmarks_opencv(self, frame, face_rect): | |
"""Estimate key facial landmarks using OpenCV cascades""" | |
x, y, w, h = face_rect | |
roi_gray = frame[y:y+h, x:x+w] | |
roi_color = frame[y:y+h, x:x+w] | |
# Detect eyes | |
eyes = self.eye_cascade.detectMultiScale(roi_gray, 1.1, 3) | |
# Detect mouth/smile | |
mouths = self.mouth_cascade.detectMultiScale(roi_gray, 1.1, 3) | |
landmarks = {} | |
# Process eyes | |
if len(eyes) >= 2: | |
# Sort eyes by x-coordinate (left to right) | |
eyes = sorted(eyes, key=lambda e: e[0]) | |
landmarks['left_eye'] = (x + eyes[0][0] + eyes[0][2]//2, y + eyes[0][1] + eyes[0][3]//2) | |
landmarks['right_eye'] = (x + eyes[1][0] + eyes[1][2]//2, y + eyes[1][1] + eyes[1][3]//2) | |
# Estimate eye corners based on eye rectangles | |
landmarks['left_eye_corners'] = [ | |
(x + eyes[0][0], y + eyes[0][1] + eyes[0][3]//2), # left corner | |
(x + eyes[0][0] + eyes[0][2], y + eyes[0][1] + eyes[0][3]//2), # right corner | |
(x + eyes[0][0] + eyes[0][2]//2, y + eyes[0][1]), # top | |
(x + eyes[0][0] + eyes[0][2]//2, y + eyes[0][1] + eyes[0][3]) # bottom | |
] | |
landmarks['right_eye_corners'] = [ | |
(x + eyes[1][0], y + eyes[1][1] + eyes[1][3]//2), | |
(x + eyes[1][0] + eyes[1][2], y + eyes[1][1] + eyes[1][3]//2), | |
(x + eyes[1][0] + eyes[1][2]//2, y + eyes[1][1]), | |
(x + eyes[1][0] + eyes[1][2]//2, y + eyes[1][1] + eyes[1][3]) | |
] | |
# Process mouth | |
if len(mouths) > 0: | |
mouth = mouths[0] # Take the first detected mouth | |
landmarks['mouth_center'] = (x + mouth[0] + mouth[2]//2, y + mouth[1] + mouth[3]//2) | |
landmarks['mouth_corners'] = [ | |
(x + mouth[0], y + mouth[1] + mouth[3]//2), # left corner | |
(x + mouth[0] + mouth[2], y + mouth[1] + mouth[3]//2), # right corner | |
(x + mouth[0] + mouth[2]//2, y + mouth[1]), # top | |
(x + mouth[0] + mouth[2]//2, y + mouth[1] + mouth[3]) # bottom | |
] | |
# Estimate nose tip (center of face, slightly above mouth) | |
landmarks['nose_tip'] = (x + w//2, y + int(h*0.6)) | |
# Estimate chin (bottom center of face) | |
landmarks['chin'] = (x + w//2, y + h) | |
return landmarks | |
def detect_landmarks_mediapipe(self, frame): | |
"""Detect landmarks using MediaPipe""" | |
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
results = self.face_mesh.process(rgb_frame) | |
landmarks_dict = {} | |
if results.multi_face_landmarks: | |
face_landmarks = results.multi_face_landmarks[0] | |
h, w, _ = frame.shape | |
# Extract eye landmarks | |
left_eye_points = [] | |
right_eye_points = [] | |
mouth_points = [] | |
for i in self.LEFT_EYE_INDICES[:6]: # Take first 6 points for eye shape | |
point = face_landmarks.landmark[i] | |
left_eye_points.append((int(point.x * w), int(point.y * h))) | |
for i in self.RIGHT_EYE_INDICES[:6]: | |
point = face_landmarks.landmark[i] | |
right_eye_points.append((int(point.x * w), int(point.y * h))) | |
for i in self.MOUTH_INDICES[:8]: # Take key mouth points | |
point = face_landmarks.landmark[i] | |
mouth_points.append((int(point.x * w), int(point.y * h))) | |
landmarks_dict['left_eye_corners'] = left_eye_points | |
landmarks_dict['right_eye_corners'] = right_eye_points | |
landmarks_dict['mouth_corners'] = mouth_points | |
# Key points | |
nose_tip = face_landmarks.landmark[1] # Nose tip | |
chin = face_landmarks.landmark[175] # Chin | |
landmarks_dict['nose_tip'] = (int(nose_tip.x * w), int(nose_tip.y * h)) | |
landmarks_dict['chin'] = (int(chin.x * w), int(chin.y * h)) | |
# Calculate face bounding box | |
x_coords = [int(lm.x * w) for lm in face_landmarks.landmark] | |
y_coords = [int(lm.y * h) for lm in face_landmarks.landmark] | |
face_rect = (min(x_coords), min(y_coords), | |
max(x_coords) - min(x_coords), | |
max(y_coords) - min(y_coords)) | |
return face_rect, landmarks_dict | |
return None, {} | |
def detect_landmarks(self, frame): | |
"""Main method to detect face and landmarks""" | |
if self.use_mediapipe: | |
face_rect, landmarks = self.detect_landmarks_mediapipe(frame) | |
if face_rect is not None: | |
return [face_rect], [landmarks] | |
# Fallback to OpenCV | |
faces, gray = self.detect_faces_opencv(frame) | |
landmarks_list = [] | |
face_rects = [] | |
for face in faces: | |
landmarks = self.estimate_landmarks_opencv(gray, face) | |
if landmarks: | |
landmarks_list.append(landmarks) | |
face_rects.append(face) | |
return face_rects, landmarks_list | |
class MetricsCalculator: | |
"""Calculate drowsiness metrics from facial landmarks""" | |
def calculate_ear_from_points(eye_points): | |
"""Calculate Eye Aspect Ratio from eye corner points""" | |
if len(eye_points) < 4: | |
return 0.3 # Default value | |
# For 4-point eye estimation: [left, right, top, bottom] | |
if len(eye_points) == 4: | |
left, right, top, bottom = eye_points | |
# Vertical distances | |
vertical_dist = dist.euclidean(top, bottom) | |
# Horizontal distance | |
horizontal_dist = dist.euclidean(left, right) | |
if horizontal_dist == 0: | |
return 0.3 | |
ear = vertical_dist / horizontal_dist | |
return ear | |
# For 6-point eye estimation (MediaPipe style) | |
elif len(eye_points) >= 6: | |
# Calculate vertical distances | |
v1 = dist.euclidean(eye_points[1], eye_points[5]) | |
v2 = dist.euclidean(eye_points[2], eye_points[4]) | |
# Horizontal distance | |
h = dist.euclidean(eye_points[0], eye_points[3]) | |
if h == 0: | |
return 0.3 | |
ear = (v1 + v2) / (2.0 * h) | |
return ear | |
return 0.3 | |
def calculate_mar_from_points(mouth_points): | |
"""Calculate Mouth Aspect Ratio from mouth points""" | |
if len(mouth_points) < 4: | |
return 0.3 # Default value | |
if len(mouth_points) == 4: | |
# [left, right, top, bottom] | |
left, right, top, bottom = mouth_points | |
vertical_dist = dist.euclidean(top, bottom) | |
horizontal_dist = dist.euclidean(left, right) | |
if horizontal_dist == 0: | |
return 0.3 | |
mar = vertical_dist / horizontal_dist | |
return mar | |
elif len(mouth_points) >= 8: | |
# More sophisticated mouth analysis | |
# Calculate multiple vertical distances | |
v1 = dist.euclidean(mouth_points[1], mouth_points[7]) | |
v2 = dist.euclidean(mouth_points[2], mouth_points[6]) | |
v3 = dist.euclidean(mouth_points[3], mouth_points[5]) | |
# Horizontal distance | |
h = dist.euclidean(mouth_points[0], mouth_points[4]) | |
if h == 0: | |
return 0.3 | |
mar = (v1 + v2 + v3) / (3.0 * h) | |
return mar | |
return 0.3 | |
def estimate_head_pose_simple(nose_tip, chin, frame_center): | |
"""Simple head pose estimation using nose and chin""" | |
if nose_tip is None or chin is None: | |
return np.array([0, 0, 0]) | |
# Calculate head tilt based on nose-chin line deviation from vertical | |
nose_chin_vector = np.array([chin[0] - nose_tip[0], chin[1] - nose_tip[1]]) | |
vertical_vector = np.array([0, 1]) | |
# Calculate angle from vertical | |
dot_product = np.dot(nose_chin_vector, vertical_vector) | |
norms = np.linalg.norm(nose_chin_vector) * np.linalg.norm(vertical_vector) | |
if norms == 0: | |
return np.array([0, 0, 0]) | |
cos_angle = dot_product / norms | |
angle = np.arccos(np.clip(cos_angle, -1, 1)) * 180 / np.pi | |
# Determine direction of tilt | |
if nose_chin_vector[0] < 0: | |
angle = -angle | |
# Simple pitch estimation based on nose position relative to frame center | |
pitch = (nose_tip[1] - frame_center[1]) / frame_center[1] * 30 # Scale to degrees | |
return np.array([pitch, 0, angle]) # [pitch, yaw, roll] | |
class DrowsinessAnalyzer: | |
"""Analyze drowsiness based on facial metrics""" | |
def __init__(self): | |
# Thresholds | |
self.EAR_THRESHOLD = 0.20 # Adjusted for OpenCV detection | |
self.EAR_CONSECUTIVE_FRAMES = 15 | |
self.YAWN_THRESHOLD = 0.8 # Adjusted for mouth detection | |
self.YAWN_CONSECUTIVE_FRAMES = 10 | |
self.NOD_THRESHOLD = 20 | |
# Counters | |
self.ear_counter = 0 | |
self.yawn_counter = 0 | |
self.nod_counter = 0 | |
# History tracking | |
self.ear_history = deque(maxlen=30) | |
self.yawn_history = deque(maxlen=30) | |
self.head_pose_history = deque(maxlen=30) | |
def analyze_drowsiness(self, ear, mar, head_angles): | |
"""Analyze current metrics and return drowsiness indicators""" | |
drowsiness_indicators = [] | |
# Update history | |
self.ear_history.append(ear) | |
self.yawn_history.append(mar) | |
self.head_pose_history.append(head_angles[0]) | |
# Check EAR (eyes closed detection) | |
if ear < self.EAR_THRESHOLD: | |
self.ear_counter += 1 | |
if self.ear_counter >= self.EAR_CONSECUTIVE_FRAMES: | |
drowsiness_indicators.append("EYES_CLOSED") | |
else: | |
self.ear_counter = 0 | |
# Check yawning | |
if mar > self.YAWN_THRESHOLD: | |
self.yawn_counter += 1 | |
if self.yawn_counter >= self.YAWN_CONSECUTIVE_FRAMES: | |
drowsiness_indicators.append("YAWNING") | |
else: | |
self.yawn_counter = 0 | |
# Check head nodding | |
if abs(head_angles[0]) > self.NOD_THRESHOLD: | |
self.nod_counter += 1 | |
if self.nod_counter >= 8: | |
drowsiness_indicators.append("HEAD_NOD") | |
else: | |
self.nod_counter = 0 | |
return drowsiness_indicators | |
def get_severity_level(self, indicators): | |
"""Determine severity based on indicators""" | |
if len(indicators) >= 2: | |
return "critical" | |
elif "EYES_CLOSED" in indicators: | |
return "high" | |
elif indicators: | |
return "medium" | |
else: | |
return "normal" | |
class AlertManager: | |
"""Manage alert generation and timing""" | |
def __init__(self, cooldown_seconds=8): | |
self.last_alert_time = 0 | |
self.cooldown_seconds = cooldown_seconds | |
def should_trigger_alert(self, indicators): | |
"""Check if alert should be triggered""" | |
current_time = time.time() | |
if indicators and (current_time - self.last_alert_time) > self.cooldown_seconds: | |
self.last_alert_time = current_time | |
return True | |
return False | |
class VisualizationRenderer: | |
"""Handle visual rendering of detection results""" | |
def draw_landmarks_and_contours(frame, landmarks, face_rect): | |
"""Draw facial landmarks and detection areas""" | |
x, y, w, h = face_rect | |
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) | |
# Draw eye areas | |
if 'left_eye_corners' in landmarks: | |
points = np.array(landmarks['left_eye_corners'], np.int32) | |
cv2.polylines(frame, [points], True, (0, 255, 0), 2) | |
if 'right_eye_corners' in landmarks: | |
points = np.array(landmarks['right_eye_corners'], np.int32) | |
cv2.polylines(frame, [points], True, (0, 255, 0), 2) | |
# Draw mouth area | |
if 'mouth_corners' in landmarks: | |
points = np.array(landmarks['mouth_corners'], np.int32) | |
cv2.polylines(frame, [points], True, (0, 255, 255), 2) | |
# Draw key points | |
key_points = ['nose_tip', 'chin'] | |
for point_name in key_points: | |
if point_name in landmarks: | |
cv2.circle(frame, landmarks[point_name], 3, (255, 0, 0), -1) | |
def draw_metrics_overlay(frame, ear, mar, head_angle, indicators): | |
"""Draw metrics and alerts on frame""" | |
# Metrics text | |
cv2.putText(frame, f"EAR: {ear:.3f}", (10, frame.shape[0] - 80), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
cv2.putText(frame, f"MAR: {mar:.3f}", (10, frame.shape[0] - 60), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
cv2.putText(frame, f"Head: {head_angle:.1f}Β°", (10, frame.shape[0] - 40), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
# Alert overlay | |
if indicators: | |
cv2.putText(frame, "β οΈ DROWSINESS ALERT! β οΈ", (50, 50), | |
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3) | |
class StatusLogger: | |
"""Handle logging and status tracking""" | |
def __init__(self, max_logs=100): | |
self.status_log = deque(maxlen=max_logs) | |
def log(self, message): | |
"""Add timestamped log entry""" | |
timestamp = datetime.now().strftime("%H:%M:%S") | |
self.status_log.append(f"[{timestamp}] {message}") | |
def get_recent_logs(self, count=10): | |
"""Get recent log entries""" | |
return list(self.status_log)[-count:] |