driver-paddy / facial_detection.py
Testimony Adekoya
WIP: Quick demo for project
e65b3b4
# facial_detection.py
import cv2
import numpy as np
from scipy.spatial import distance as dist
from collections import deque
import time
from datetime import datetime
class OpenCVFaceDetector:
"""Face detection and landmark estimation using OpenCV"""
def __init__(self):
# Load OpenCV's pre-trained face detection models
self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
self.eye_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml')
self.mouth_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_smile.xml')
# Try to load MediaPipe for better landmark detection (fallback if not available)
self.use_mediapipe = False
try:
import mediapipe as mp
self.mp_face_mesh = mp.solutions.face_mesh
self.mp_drawing = mp.solutions.drawing_utils
self.face_mesh = self.mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
min_tracking_confidence=0.5
)
self.use_mediapipe = True
print("βœ… Using MediaPipe for enhanced landmark detection")
except ImportError:
print("⚠️ MediaPipe not available, using OpenCV cascade classifiers")
# Define landmark indices for MediaPipe (68-point equivalent)
self.LEFT_EYE_INDICES = [33, 7, 163, 144, 145, 153, 154, 155, 133, 173, 157, 158, 159, 160, 161, 246]
self.RIGHT_EYE_INDICES = [362, 382, 381, 380, 374, 373, 390, 249, 263, 466, 388, 387, 386, 385, 384, 398]
self.MOUTH_INDICES = [78, 95, 88, 178, 87, 14, 317, 402, 318, 324, 308, 415, 310, 311, 312, 13, 82, 81, 80, 62]
def detect_faces_opencv(self, frame):
"""Detect faces using OpenCV Haar cascades"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = self.face_cascade.detectMultiScale(gray, 1.3, 5)
return faces, gray
def estimate_landmarks_opencv(self, frame, face_rect):
"""Estimate key facial landmarks using OpenCV cascades"""
x, y, w, h = face_rect
roi_gray = frame[y:y+h, x:x+w]
roi_color = frame[y:y+h, x:x+w]
# Detect eyes
eyes = self.eye_cascade.detectMultiScale(roi_gray, 1.1, 3)
# Detect mouth/smile
mouths = self.mouth_cascade.detectMultiScale(roi_gray, 1.1, 3)
landmarks = {}
# Process eyes
if len(eyes) >= 2:
# Sort eyes by x-coordinate (left to right)
eyes = sorted(eyes, key=lambda e: e[0])
landmarks['left_eye'] = (x + eyes[0][0] + eyes[0][2]//2, y + eyes[0][1] + eyes[0][3]//2)
landmarks['right_eye'] = (x + eyes[1][0] + eyes[1][2]//2, y + eyes[1][1] + eyes[1][3]//2)
# Estimate eye corners based on eye rectangles
landmarks['left_eye_corners'] = [
(x + eyes[0][0], y + eyes[0][1] + eyes[0][3]//2), # left corner
(x + eyes[0][0] + eyes[0][2], y + eyes[0][1] + eyes[0][3]//2), # right corner
(x + eyes[0][0] + eyes[0][2]//2, y + eyes[0][1]), # top
(x + eyes[0][0] + eyes[0][2]//2, y + eyes[0][1] + eyes[0][3]) # bottom
]
landmarks['right_eye_corners'] = [
(x + eyes[1][0], y + eyes[1][1] + eyes[1][3]//2),
(x + eyes[1][0] + eyes[1][2], y + eyes[1][1] + eyes[1][3]//2),
(x + eyes[1][0] + eyes[1][2]//2, y + eyes[1][1]),
(x + eyes[1][0] + eyes[1][2]//2, y + eyes[1][1] + eyes[1][3])
]
# Process mouth
if len(mouths) > 0:
mouth = mouths[0] # Take the first detected mouth
landmarks['mouth_center'] = (x + mouth[0] + mouth[2]//2, y + mouth[1] + mouth[3]//2)
landmarks['mouth_corners'] = [
(x + mouth[0], y + mouth[1] + mouth[3]//2), # left corner
(x + mouth[0] + mouth[2], y + mouth[1] + mouth[3]//2), # right corner
(x + mouth[0] + mouth[2]//2, y + mouth[1]), # top
(x + mouth[0] + mouth[2]//2, y + mouth[1] + mouth[3]) # bottom
]
# Estimate nose tip (center of face, slightly above mouth)
landmarks['nose_tip'] = (x + w//2, y + int(h*0.6))
# Estimate chin (bottom center of face)
landmarks['chin'] = (x + w//2, y + h)
return landmarks
def detect_landmarks_mediapipe(self, frame):
"""Detect landmarks using MediaPipe"""
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = self.face_mesh.process(rgb_frame)
landmarks_dict = {}
if results.multi_face_landmarks:
face_landmarks = results.multi_face_landmarks[0]
h, w, _ = frame.shape
# Extract eye landmarks
left_eye_points = []
right_eye_points = []
mouth_points = []
for i in self.LEFT_EYE_INDICES[:6]: # Take first 6 points for eye shape
point = face_landmarks.landmark[i]
left_eye_points.append((int(point.x * w), int(point.y * h)))
for i in self.RIGHT_EYE_INDICES[:6]:
point = face_landmarks.landmark[i]
right_eye_points.append((int(point.x * w), int(point.y * h)))
for i in self.MOUTH_INDICES[:8]: # Take key mouth points
point = face_landmarks.landmark[i]
mouth_points.append((int(point.x * w), int(point.y * h)))
landmarks_dict['left_eye_corners'] = left_eye_points
landmarks_dict['right_eye_corners'] = right_eye_points
landmarks_dict['mouth_corners'] = mouth_points
# Key points
nose_tip = face_landmarks.landmark[1] # Nose tip
chin = face_landmarks.landmark[175] # Chin
landmarks_dict['nose_tip'] = (int(nose_tip.x * w), int(nose_tip.y * h))
landmarks_dict['chin'] = (int(chin.x * w), int(chin.y * h))
# Calculate face bounding box
x_coords = [int(lm.x * w) for lm in face_landmarks.landmark]
y_coords = [int(lm.y * h) for lm in face_landmarks.landmark]
face_rect = (min(x_coords), min(y_coords),
max(x_coords) - min(x_coords),
max(y_coords) - min(y_coords))
return face_rect, landmarks_dict
return None, {}
def detect_landmarks(self, frame):
"""Main method to detect face and landmarks"""
if self.use_mediapipe:
face_rect, landmarks = self.detect_landmarks_mediapipe(frame)
if face_rect is not None:
return [face_rect], [landmarks]
# Fallback to OpenCV
faces, gray = self.detect_faces_opencv(frame)
landmarks_list = []
face_rects = []
for face in faces:
landmarks = self.estimate_landmarks_opencv(gray, face)
if landmarks:
landmarks_list.append(landmarks)
face_rects.append(face)
return face_rects, landmarks_list
class MetricsCalculator:
"""Calculate drowsiness metrics from facial landmarks"""
@staticmethod
def calculate_ear_from_points(eye_points):
"""Calculate Eye Aspect Ratio from eye corner points"""
if len(eye_points) < 4:
return 0.3 # Default value
# For 4-point eye estimation: [left, right, top, bottom]
if len(eye_points) == 4:
left, right, top, bottom = eye_points
# Vertical distances
vertical_dist = dist.euclidean(top, bottom)
# Horizontal distance
horizontal_dist = dist.euclidean(left, right)
if horizontal_dist == 0:
return 0.3
ear = vertical_dist / horizontal_dist
return ear
# For 6-point eye estimation (MediaPipe style)
elif len(eye_points) >= 6:
# Calculate vertical distances
v1 = dist.euclidean(eye_points[1], eye_points[5])
v2 = dist.euclidean(eye_points[2], eye_points[4])
# Horizontal distance
h = dist.euclidean(eye_points[0], eye_points[3])
if h == 0:
return 0.3
ear = (v1 + v2) / (2.0 * h)
return ear
return 0.3
@staticmethod
def calculate_mar_from_points(mouth_points):
"""Calculate Mouth Aspect Ratio from mouth points"""
if len(mouth_points) < 4:
return 0.3 # Default value
if len(mouth_points) == 4:
# [left, right, top, bottom]
left, right, top, bottom = mouth_points
vertical_dist = dist.euclidean(top, bottom)
horizontal_dist = dist.euclidean(left, right)
if horizontal_dist == 0:
return 0.3
mar = vertical_dist / horizontal_dist
return mar
elif len(mouth_points) >= 8:
# More sophisticated mouth analysis
# Calculate multiple vertical distances
v1 = dist.euclidean(mouth_points[1], mouth_points[7])
v2 = dist.euclidean(mouth_points[2], mouth_points[6])
v3 = dist.euclidean(mouth_points[3], mouth_points[5])
# Horizontal distance
h = dist.euclidean(mouth_points[0], mouth_points[4])
if h == 0:
return 0.3
mar = (v1 + v2 + v3) / (3.0 * h)
return mar
return 0.3
@staticmethod
def estimate_head_pose_simple(nose_tip, chin, frame_center):
"""Simple head pose estimation using nose and chin"""
if nose_tip is None or chin is None:
return np.array([0, 0, 0])
# Calculate head tilt based on nose-chin line deviation from vertical
nose_chin_vector = np.array([chin[0] - nose_tip[0], chin[1] - nose_tip[1]])
vertical_vector = np.array([0, 1])
# Calculate angle from vertical
dot_product = np.dot(nose_chin_vector, vertical_vector)
norms = np.linalg.norm(nose_chin_vector) * np.linalg.norm(vertical_vector)
if norms == 0:
return np.array([0, 0, 0])
cos_angle = dot_product / norms
angle = np.arccos(np.clip(cos_angle, -1, 1)) * 180 / np.pi
# Determine direction of tilt
if nose_chin_vector[0] < 0:
angle = -angle
# Simple pitch estimation based on nose position relative to frame center
pitch = (nose_tip[1] - frame_center[1]) / frame_center[1] * 30 # Scale to degrees
return np.array([pitch, 0, angle]) # [pitch, yaw, roll]
class DrowsinessAnalyzer:
"""Analyze drowsiness based on facial metrics"""
def __init__(self):
# Thresholds
self.EAR_THRESHOLD = 0.20 # Adjusted for OpenCV detection
self.EAR_CONSECUTIVE_FRAMES = 15
self.YAWN_THRESHOLD = 0.8 # Adjusted for mouth detection
self.YAWN_CONSECUTIVE_FRAMES = 10
self.NOD_THRESHOLD = 20
# Counters
self.ear_counter = 0
self.yawn_counter = 0
self.nod_counter = 0
# History tracking
self.ear_history = deque(maxlen=30)
self.yawn_history = deque(maxlen=30)
self.head_pose_history = deque(maxlen=30)
def analyze_drowsiness(self, ear, mar, head_angles):
"""Analyze current metrics and return drowsiness indicators"""
drowsiness_indicators = []
# Update history
self.ear_history.append(ear)
self.yawn_history.append(mar)
self.head_pose_history.append(head_angles[0])
# Check EAR (eyes closed detection)
if ear < self.EAR_THRESHOLD:
self.ear_counter += 1
if self.ear_counter >= self.EAR_CONSECUTIVE_FRAMES:
drowsiness_indicators.append("EYES_CLOSED")
else:
self.ear_counter = 0
# Check yawning
if mar > self.YAWN_THRESHOLD:
self.yawn_counter += 1
if self.yawn_counter >= self.YAWN_CONSECUTIVE_FRAMES:
drowsiness_indicators.append("YAWNING")
else:
self.yawn_counter = 0
# Check head nodding
if abs(head_angles[0]) > self.NOD_THRESHOLD:
self.nod_counter += 1
if self.nod_counter >= 8:
drowsiness_indicators.append("HEAD_NOD")
else:
self.nod_counter = 0
return drowsiness_indicators
def get_severity_level(self, indicators):
"""Determine severity based on indicators"""
if len(indicators) >= 2:
return "critical"
elif "EYES_CLOSED" in indicators:
return "high"
elif indicators:
return "medium"
else:
return "normal"
class AlertManager:
"""Manage alert generation and timing"""
def __init__(self, cooldown_seconds=8):
self.last_alert_time = 0
self.cooldown_seconds = cooldown_seconds
def should_trigger_alert(self, indicators):
"""Check if alert should be triggered"""
current_time = time.time()
if indicators and (current_time - self.last_alert_time) > self.cooldown_seconds:
self.last_alert_time = current_time
return True
return False
class VisualizationRenderer:
"""Handle visual rendering of detection results"""
@staticmethod
def draw_landmarks_and_contours(frame, landmarks, face_rect):
"""Draw facial landmarks and detection areas"""
x, y, w, h = face_rect
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2)
# Draw eye areas
if 'left_eye_corners' in landmarks:
points = np.array(landmarks['left_eye_corners'], np.int32)
cv2.polylines(frame, [points], True, (0, 255, 0), 2)
if 'right_eye_corners' in landmarks:
points = np.array(landmarks['right_eye_corners'], np.int32)
cv2.polylines(frame, [points], True, (0, 255, 0), 2)
# Draw mouth area
if 'mouth_corners' in landmarks:
points = np.array(landmarks['mouth_corners'], np.int32)
cv2.polylines(frame, [points], True, (0, 255, 255), 2)
# Draw key points
key_points = ['nose_tip', 'chin']
for point_name in key_points:
if point_name in landmarks:
cv2.circle(frame, landmarks[point_name], 3, (255, 0, 0), -1)
@staticmethod
def draw_metrics_overlay(frame, ear, mar, head_angle, indicators):
"""Draw metrics and alerts on frame"""
# Metrics text
cv2.putText(frame, f"EAR: {ear:.3f}", (10, frame.shape[0] - 80),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
cv2.putText(frame, f"MAR: {mar:.3f}", (10, frame.shape[0] - 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
cv2.putText(frame, f"Head: {head_angle:.1f}Β°", (10, frame.shape[0] - 40),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
# Alert overlay
if indicators:
cv2.putText(frame, "⚠️ DROWSINESS ALERT! ⚠️", (50, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3)
class StatusLogger:
"""Handle logging and status tracking"""
def __init__(self, max_logs=100):
self.status_log = deque(maxlen=max_logs)
def log(self, message):
"""Add timestamped log entry"""
timestamp = datetime.now().strftime("%H:%M:%S")
self.status_log.append(f"[{timestamp}] {message}")
def get_recent_logs(self, count=10):
"""Get recent log entries"""
return list(self.status_log)[-count:]