Spaces:
Runtime error
Runtime error
import cv2 | |
import numpy as np | |
import dlib | |
import gradio as gr | |
import threading | |
import time | |
import queue | |
import pygame | |
import io | |
import google.generativeai as genai | |
from scipy.spatial import distance as dist | |
from collections import deque | |
import tempfile | |
import os | |
from datetime import datetime | |
class DrowsinessDetector: | |
def __init__(self, gemini_api_key=None): | |
# Initialize face detector and landmark predictor | |
self.detector = dlib.get_frontal_face_detector() | |
self.predictor = dlib.shape_predictor('shape_predictor_68_face_landmarks.dat') | |
# EAR (Eye Aspect Ratio) parameters | |
self.EAR_THRESHOLD = 0.25 | |
self.EAR_CONSECUTIVE_FRAMES = 20 | |
self.ear_counter = 0 | |
self.ear_history = deque(maxlen=30) | |
# Yawn detection parameters | |
self.YAWN_THRESHOLD = 0.6 | |
self.YAWN_CONSECUTIVE_FRAMES = 15 | |
self.yawn_counter = 0 | |
self.yawn_history = deque(maxlen=30) | |
# Head pose estimation parameters | |
self.NOD_THRESHOLD = 15 | |
self.nod_counter = 0 | |
self.head_pose_history = deque(maxlen=30) | |
# Alert system | |
self.drowsy_alert = False | |
self.last_alert_time = 0 | |
self.alert_cooldown = 10 # seconds | |
# Initialize pygame for audio | |
pygame.mixer.init() | |
# Initialize Gemini AI | |
if gemini_api_key: | |
genai.configure(api_key=gemini_api_key) | |
self.model = genai.GenerativeModel('gemini-pro') | |
else: | |
self.model = None | |
# Eye and mouth landmark indices | |
self.LEFT_EYE = list(range(36, 42)) | |
self.RIGHT_EYE = list(range(42, 48)) | |
self.MOUTH = list(range(48, 68)) | |
# 3D model points for head pose estimation | |
self.model_points = np.array([ | |
(0.0, 0.0, 0.0), # Nose tip | |
(0.0, -330.0, -65.0), # Chin | |
(-225.0, 170.0, -135.0), # Left eye left corner | |
(225.0, 170.0, -135.0), # Right eye right corner | |
(-150.0, -150.0, -125.0), # Left Mouth corner | |
(150.0, -150.0, -125.0) # Right mouth corner | |
]) | |
self.status_log = deque(maxlen=100) | |
def calculate_ear(self, eye_landmarks): | |
"""Calculate Eye Aspect Ratio""" | |
# Vertical eye landmarks | |
A = dist.euclidean(eye_landmarks[1], eye_landmarks[5]) | |
B = dist.euclidean(eye_landmarks[2], eye_landmarks[4]) | |
# Horizontal eye landmark | |
C = dist.euclidean(eye_landmarks[0], eye_landmarks[3]) | |
# EAR calculation | |
ear = (A + B) / (2.0 * C) | |
return ear | |
def calculate_mar(self, mouth_landmarks): | |
"""Calculate Mouth Aspect Ratio for yawn detection""" | |
# Vertical mouth landmarks | |
A = dist.euclidean(mouth_landmarks[2], mouth_landmarks[10]) # 50, 58 | |
B = dist.euclidean(mouth_landmarks[4], mouth_landmarks[8]) # 52, 56 | |
# Horizontal mouth landmark | |
C = dist.euclidean(mouth_landmarks[0], mouth_landmarks[6]) # 48, 54 | |
# MAR calculation | |
mar = (A + B) / (2.0 * C) | |
return mar | |
def get_head_pose(self, landmarks, img_size): | |
"""Estimate head pose using facial landmarks""" | |
image_points = np.array([ | |
(landmarks[30][0], landmarks[30][1]), # Nose tip | |
(landmarks[8][0], landmarks[8][1]), # Chin | |
(landmarks[36][0], landmarks[36][1]), # Left eye left corner | |
(landmarks[45][0], landmarks[45][1]), # Right eye right corner | |
(landmarks[48][0], landmarks[48][1]), # Left Mouth corner | |
(landmarks[54][0], landmarks[54][1]) # Right mouth corner | |
], dtype="double") | |
# Camera internals | |
focal_length = img_size[1] | |
center = (img_size[1]/2, img_size[0]/2) | |
camera_matrix = np.array([ | |
[focal_length, 0, center[0]], | |
[0, focal_length, center[1]], | |
[0, 0, 1]], dtype="double") | |
dist_coeffs = np.zeros((4,1)) # Assuming no lens distortion | |
# Solve PnP | |
(success, rotation_vector, translation_vector) = cv2.solvePnP( | |
self.model_points, image_points, camera_matrix, dist_coeffs, | |
flags=cv2.SOLVEPNP_ITERATIVE) | |
# Convert rotation vector to rotation matrix | |
(rotation_matrix, jacobian) = cv2.Rodrigues(rotation_vector) | |
# Calculate Euler angles | |
sy = np.sqrt(rotation_matrix[0,0] * rotation_matrix[0,0] + rotation_matrix[1,0] * rotation_matrix[1,0]) | |
singular = sy < 1e-6 | |
if not singular: | |
x = np.arctan2(rotation_matrix[2,1], rotation_matrix[2,2]) | |
y = np.arctan2(-rotation_matrix[2,0], sy) | |
z = np.arctan2(rotation_matrix[1,0], rotation_matrix[0,0]) | |
else: | |
x = np.arctan2(-rotation_matrix[1,2], rotation_matrix[1,1]) | |
y = np.arctan2(-rotation_matrix[2,0], sy) | |
z = 0 | |
# Convert to degrees | |
angles = np.array([x, y, z]) * 180.0 / np.pi | |
return angles | |
def generate_voice_alert(self, alert_type, severity="medium"): | |
"""Generate voice alert using Gemini AI""" | |
if not self.model: | |
return self.play_default_alert() | |
try: | |
prompts = { | |
"drowsy": f"Generate a brief, urgent but caring voice alert (max 15 words) to wake up a drowsy driver. Severity: {severity}. Make it sound natural and concerned.", | |
"yawn": f"Generate a brief, gentle voice alert (max 12 words) for a driver who is yawning frequently. Severity: {severity}. Sound caring but alert.", | |
"nod": f"Generate a brief, firm voice alert (max 12 words) for a driver whose head is nodding. Severity: {severity}. Sound urgent but supportive." | |
} | |
response = self.model.generate_content(prompts.get(alert_type, prompts["drowsy"])) | |
alert_text = response.text.strip().replace('"', '').replace("'", "") | |
# Use text-to-speech (you would need to install pyttsx3 or use cloud TTS) | |
# For this example, we'll use a placeholder | |
self.log_status(f"π ALERT: {alert_text}") | |
return alert_text | |
except Exception as e: | |
self.log_status(f"Error generating alert: {str(e)}") | |
return self.play_default_alert() | |
def play_default_alert(self): | |
"""Play default beep alert""" | |
try: | |
# Generate a simple beep sound | |
duration = 0.5 # seconds | |
freq = 800 # Hz | |
sample_rate = 22050 | |
frames = int(duration * sample_rate) | |
arr = np.zeros(frames) | |
for i in range(frames): | |
arr[i] = np.sin(2 * np.pi * freq * i / sample_rate) | |
arr = (arr * 32767).astype(np.int16) | |
sound = pygame.sndarray.make_sound(arr) | |
sound.play() | |
alert_text = "β οΈ WAKE UP! Please stay alert while driving!" | |
self.log_status(f"π {alert_text}") | |
return alert_text | |
except Exception as e: | |
self.log_status(f"Error playing alert: {str(e)}") | |
return "Alert system activated" | |
def log_status(self, message): | |
"""Log status messages with timestamp""" | |
timestamp = datetime.now().strftime("%H:%M:%S") | |
self.status_log.append(f"[{timestamp}] {message}") | |
def detect_drowsiness(self, frame): | |
"""Main drowsiness detection function""" | |
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
faces = self.detector(gray) | |
status_text = [] | |
alert_message = "" | |
if len(faces) == 0: | |
status_text.append("π€ No face detected") | |
self.log_status("No face detected in frame") | |
return frame, status_text, alert_message | |
for face in faces: | |
landmarks = self.predictor(gray, face) | |
landmarks = np.array([[p.x, p.y] for p in landmarks.parts()]) | |
# Draw face rectangle | |
x, y, w, h = face.left(), face.top(), face.width(), face.height() | |
cv2.rectangle(frame, (x, y), (x + w, y + h), (0, 255, 0), 2) | |
# Eye Aspect Ratio calculation | |
left_eye = landmarks[self.LEFT_EYE] | |
right_eye = landmarks[self.RIGHT_EYE] | |
left_ear = self.calculate_ear(left_eye) | |
right_ear = self.calculate_ear(right_eye) | |
ear = (left_ear + right_ear) / 2.0 | |
self.ear_history.append(ear) | |
# Draw eye contours | |
cv2.drawContours(frame, [cv2.convexHull(left_eye)], -1, (0, 255, 0), 1) | |
cv2.drawContours(frame, [cv2.convexHull(right_eye)], -1, (0, 255, 0), 1) | |
# Yawn detection | |
mouth = landmarks[self.MOUTH] | |
mar = self.calculate_mar(mouth) | |
self.yawn_history.append(mar) | |
# Draw mouth contour | |
cv2.drawContours(frame, [cv2.convexHull(mouth)], -1, (0, 255, 255), 1) | |
# Head pose estimation | |
head_angles = self.get_head_pose(landmarks, frame.shape) | |
self.head_pose_history.append(head_angles[0]) # Pitch angle | |
# Drowsiness detection logic | |
drowsy_indicators = [] | |
# Check EAR | |
if ear < self.EAR_THRESHOLD: | |
self.ear_counter += 1 | |
if self.ear_counter >= self.EAR_CONSECUTIVE_FRAMES: | |
drowsy_indicators.append("EYES_CLOSED") | |
status_text.append(f"ποΈ Eyes closed! EAR: {ear:.3f}") | |
else: | |
self.ear_counter = 0 | |
status_text.append(f"ποΈ Eyes open - EAR: {ear:.3f}") | |
# Check for yawning | |
if mar > self.YAWN_THRESHOLD: | |
self.yawn_counter += 1 | |
if self.yawn_counter >= self.YAWN_CONSECUTIVE_FRAMES: | |
drowsy_indicators.append("YAWNING") | |
status_text.append(f"π₯± Yawning detected! MAR: {mar:.3f}") | |
else: | |
self.yawn_counter = 0 | |
status_text.append(f"π Normal mouth - MAR: {mar:.3f}") | |
# Check head nodding | |
if abs(head_angles[0]) > self.NOD_THRESHOLD: | |
self.nod_counter += 1 | |
if self.nod_counter >= 10: | |
drowsy_indicators.append("HEAD_NOD") | |
status_text.append(f"π Head nodding! Angle: {head_angles[0]:.1f}Β°") | |
else: | |
self.nod_counter = 0 | |
status_text.append(f"π Head pose - Pitch: {head_angles[0]:.1f}Β°") | |
# Generate alerts | |
current_time = time.time() | |
if drowsy_indicators and (current_time - self.last_alert_time) > self.alert_cooldown: | |
self.drowsy_alert = True | |
self.last_alert_time = current_time | |
# Determine alert type and severity | |
if "EYES_CLOSED" in drowsy_indicators: | |
severity = "high" if len(drowsy_indicators) > 1 else "medium" | |
alert_message = self.generate_voice_alert("drowsy", severity) | |
elif "YAWNING" in drowsy_indicators: | |
alert_message = self.generate_voice_alert("yawn", "medium") | |
elif "HEAD_NOD" in drowsy_indicators: | |
alert_message = self.generate_voice_alert("nod", "medium") | |
# Visual alert on frame | |
cv2.putText(frame, "β οΈ DROWSINESS ALERT! β οΈ", (50, 50), | |
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 3) | |
# Draw landmark points | |
for (x, y) in landmarks: | |
cv2.circle(frame, (x, y), 1, (255, 255, 255), -1) | |
# Add metrics overlay | |
cv2.putText(frame, f"EAR: {ear:.3f}", (10, frame.shape[0] - 80), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
cv2.putText(frame, f"MAR: {mar:.3f}", (10, frame.shape[0] - 60), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
cv2.putText(frame, f"Head: {head_angles[0]:.1f}Β°", (10, frame.shape[0] - 40), | |
cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) | |
return frame, status_text, alert_message | |
def create_gradio_interface(): | |
"""Create Gradio interface for the drowsiness detection system""" | |
# Global detector instance | |
detector = None | |
def initialize_system(gemini_key): | |
"""Initialize the drowsiness detection system""" | |
global detector | |
try: | |
detector = DrowsinessDetector(gemini_key if gemini_key.strip() else None) | |
return "β System initialized successfully!", "System ready for detection." | |
except Exception as e: | |
return f"β Error initializing system: {str(e)}", "System initialization failed." | |
def process_video_frame(frame, gemini_key): | |
"""Process a single video frame""" | |
global detector | |
if detector is None: | |
detector = DrowsinessDetector(gemini_key if gemini_key.strip() else None) | |
try: | |
processed_frame, status_list, alert_msg = detector.detect_drowsiness(frame) | |
# Format status text | |
status_text = "\n".join(status_list) if status_list else "Processing..." | |
# Get recent logs | |
log_text = "\n".join(list(detector.status_log)[-10:]) if detector.status_log else "No logs yet." | |
return processed_frame, status_text, alert_msg, log_text | |
except Exception as e: | |
error_msg = f"Error processing frame: {str(e)}" | |
return frame, error_msg, "", error_msg | |
# Create the Gradio interface | |
with gr.Blocks(title="Driver Drowsiness Detection System", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# π Real-time Driver Drowsiness Detection System | |
This system uses computer vision and AI to detect driver drowsiness through: | |
- **Eye Aspect Ratio (EAR)** - Detects closed/droopy eyes | |
- **Mouth Aspect Ratio (MAR)** - Detects yawning | |
- **Head Pose Estimation** - Detects head nodding | |
- **AI Voice Alerts** - Uses Gemini AI for personalized wake-up messages | |
### π Setup Instructions: | |
1. Download `shape_predictor_68_face_landmarks.dat` from [dlib models](http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2) | |
2. Place it in the same directory as this script | |
3. (Optional) Enter your Gemini API key for AI-powered voice alerts | |
4. Start your webcam and begin monitoring! | |
""") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
# Gemini API key input | |
gemini_key_input = os.getenv('GEMINI_API_KEY') | |
# Initialize button | |
init_btn = gr.Button("π Initialize System", variant="primary") | |
init_status = gr.Textbox(label="Initialization Status", interactive=False) | |
with gr.Column(scale=1): | |
# System info | |
gr.Markdown(""" | |
### π Detection Thresholds: | |
- **EAR Threshold**: 0.25 | |
- **Yawn Threshold**: 0.6 | |
- **Head Nod**: 15Β° deviation | |
- **Alert Cooldown**: 10 seconds | |
""") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
# Video input/output | |
video_input = gr.Video( | |
sources=["webcam"], | |
label="πΉ Camera Feed", | |
streaming=True | |
) | |
with gr.Column(scale=1): | |
# Status displays | |
current_status = gr.Textbox( | |
label="π Current Status", | |
lines=6, | |
interactive=False | |
) | |
alert_display = gr.Textbox( | |
label="π Latest Alert", | |
interactive=False, | |
placeholder="No alerts yet..." | |
) | |
system_logs = gr.Textbox( | |
label="π System Logs", | |
lines=8, | |
interactive=False, | |
placeholder="System logs will appear here..." | |
) | |
# Event handlers | |
init_btn.click( | |
fn=initialize_system, | |
inputs=[gemini_key_input], | |
outputs=[init_status, alert_display] | |
) | |
video_input.stream( | |
fn=process_video_frame, | |
inputs=[video_input, gemini_key_input], | |
outputs=[video_input, current_status, alert_display, system_logs], | |
stream_every=0.1, # Process every 100ms | |
show_progress=False | |
) | |
# Instructions | |
gr.Markdown(""" | |
### π§ Troubleshooting: | |
- **No face detected**: Ensure good lighting and face is visible to camera | |
- **Poor detection**: Adjust camera angle and distance (arm's length recommended) | |
- **No alerts**: Check if Gemini API key is valid (optional feature) | |
- **High CPU usage**: Reduce video resolution or increase stream interval | |
### β οΈ Safety Notice: | |
This system is for demonstration purposes. Always prioritize real-world driving safety measures. | |
Pull over safely if you feel drowsy while driving. | |
""") | |
return demo | |
# Main execution | |
if __name__ == "__main__": | |
# Check for required files | |
if not os.path.exists('shape_predictor_68_face_landmarks.dat'): | |
print("β Missing required file: shape_predictor_68_face_landmarks.dat") | |
print("π₯ Please download from: http://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2") | |
print("π Extract and place in the same directory as this script") | |
else: | |
print("β All required files found!") | |
# Create and launch the interface | |
demo = create_gradio_interface() | |
demo.launch( | |
share=True, | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True | |
) |