Spaces:
Runtime error
Runtime error
import google.generativeai as genai | |
import numpy as np | |
from scipy.io import wavfile | |
import tempfile | |
import os | |
from facial_detection import OpenCVFaceDetector, MetricsCalculator, DrowsinessAnalyzer, AlertManager, VisualizationRenderer, StatusLogger | |
class AIAlertGenerator: | |
"""Generate AI-powered voice alerts using Gemini""" | |
def __init__(self, api_key=None): | |
self.model = None | |
if api_key: | |
try: | |
genai.configure(api_key=api_key) | |
self.model = genai.GenerativeModel('gemini-1.5-flash') | |
print("β Gemini AI initialized for voice alerts") | |
except Exception as e: | |
print(f"β οΈ Failed to initialize Gemini: {e}") | |
def generate_alert_text(self, alert_type, severity="medium"): | |
"""Generate contextual alert text using Gemini""" | |
if not self.model: | |
return self._get_default_alert_text(alert_type, severity) | |
try: | |
prompts = { | |
"EYES_CLOSED": f"Generate a brief, urgent wake-up message (max 12 words) for a drowsy driver whose eyes are closing. Severity: {severity}. Sound caring but firm.", | |
"YAWNING": f"Generate a brief, gentle alert (max 10 words) for a driver who is yawning frequently. Severity: {severity}. Sound encouraging.", | |
"HEAD_NOD": f"Generate a brief, firm alert (max 10 words) for a driver whose head is nodding. Severity: {severity}. Sound urgent but supportive.", | |
"COMBINED": f"Generate a brief, critical alert (max 15 words) for a driver showing multiple drowsiness signs. Severity: {severity}. Sound very urgent but caring." | |
} | |
prompt_key = "COMBINED" if isinstance(alert_type, list) and len(alert_type) > 1 else alert_type[0] if isinstance(alert_type, list) else alert_type | |
prompt = prompts.get(prompt_key, prompts["EYES_CLOSED"]) | |
response = self.model.generate_content(prompt) | |
alert_text = response.text.strip().replace('"', '').replace("'", "") | |
return alert_text[:100] | |
except Exception as e: | |
print(f"Error generating AI alert: {e}") | |
return self._get_default_alert_text(alert_type, severity) | |
def _get_default_alert_text(self, alert_type, severity): | |
"""Fallback alert messages""" | |
default_alerts = { | |
"EYES_CLOSED": { | |
"critical": "WAKE UP NOW! Pull over immediately!", | |
"high": "Eyes closing! Stay alert and pull over soon!", | |
"medium": "Please keep your eyes open while driving!" | |
}, | |
"YAWNING": { | |
"critical": "Excessive yawning detected! Take a break!", | |
"high": "You seem tired. Consider resting soon.", | |
"medium": "Frequent yawning noticed. Stay alert!" | |
}, | |
"HEAD_NOD": { | |
"critical": "Head nodding detected! Stop driving now!", | |
"high": "Your head is nodding. Pull over safely!", | |
"medium": "Head movement detected. Stay focused!" | |
} | |
} | |
alert_key = alert_type[0] if isinstance(alert_type, list) else alert_type | |
return default_alerts.get(alert_key, {}).get(severity, "Stay alert while driving!") | |
def create_audio_alert(self, text, sample_rate=22050): | |
"""Create audio alert (generates beep pattern)""" | |
try: | |
duration = 2.0 | |
freq = 800 | |
frames = int(duration * sample_rate) | |
# Create attention-grabbing beep pattern | |
t = np.linspace(0, duration, frames) | |
beep1 = np.sin(2 * np.pi * freq * t) * np.exp(-t * 3) | |
beep2 = np.sin(2 * np.pi * (freq * 1.5) * t) * np.exp(-t * 3) | |
# Combine beeps with pause | |
silence = np.zeros(int(0.1 * sample_rate)) | |
audio = np.concatenate([beep1, silence, beep2, silence, beep1]) | |
# Normalize and convert to int16 | |
audio = (audio * 32767).astype(np.int16) | |
# Save to temporary file | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav') | |
wavfile.write(temp_file.name, sample_rate, audio) | |
return temp_file.name, text | |
except Exception as e: | |
print(f"Error creating audio alert: {e}") | |
return None, text | |
class DrowsinessDetectionSystem: | |
"""Main system coordinator""" | |
def __init__(self): | |
self.face_detector = OpenCVFaceDetector() | |
self.metrics_calculator = MetricsCalculator() | |
self.drowsiness_analyzer = DrowsinessAnalyzer() | |
self.alert_manager = AlertManager() | |
self.visualization_renderer = VisualizationRenderer() | |
self.logger = StatusLogger() | |
print("β Drowsiness Detection System initialized with OpenCV") | |
def process_frame(self, frame): | |
"""Process a single frame and return results""" | |
try: | |
# Detect face and landmarks | |
face_rects, landmarks_list = self.face_detector.detect_landmarks(frame) | |
if not face_rects or not landmarks_list: | |
self.logger.log("No face detected") | |
return frame, ["π€ No face detected"], False, {} | |
# Process first detected face | |
face_rect = face_rects[0] | |
landmarks = landmarks_list[0] | |
# Calculate metrics | |
ear_left = ear_right = 0.25 # Default values | |
if 'left_eye_corners' in landmarks: | |
ear_left = self.metrics_calculator.calculate_ear_from_points(landmarks['left_eye_corners']) | |
if 'right_eye_corners' in landmarks: | |
ear_right = self.metrics_calculator.calculate_ear_from_points(landmarks['right_eye_corners']) | |
ear = (ear_left + ear_right) / 2.0 | |
mar = 0.3 # Default value | |
if 'mouth_corners' in landmarks: | |
mar = self.metrics_calculator.calculate_mar_from_points(landmarks['mouth_corners']) | |
# Head pose estimation | |
frame_center = (frame.shape[1] // 2, frame.shape[0] // 2) | |
head_angles = self.metrics_calculator.estimate_head_pose_simple( | |
landmarks.get('nose_tip'), | |
landmarks.get('chin'), | |
frame_center | |
) | |
# Analyze drowsiness | |
indicators = self.drowsiness_analyzer.analyze_drowsiness(ear, mar, head_angles) | |
severity = self.drowsiness_analyzer.get_severity_level(indicators) | |
# Check for alerts | |
should_alert = self.alert_manager.should_trigger_alert(indicators) | |
# Render visualization | |
self.visualization_renderer.draw_landmarks_and_contours(frame, landmarks, face_rect) | |
self.visualization_renderer.draw_metrics_overlay(frame, ear, mar, head_angles[0], indicators) | |
# Generate status text | |
status_text = self._generate_status_text(ear, mar, head_angles[0], indicators) | |
# Log events | |
if indicators: | |
self.logger.log(f"Drowsiness detected: {', '.join(indicators)} (Severity: {severity})") | |
# Prepare metrics | |
metrics = { | |
'ear': ear, | |
'mar': mar, | |
'head_angle': head_angles[0], | |
'indicators': indicators, | |
'severity': severity | |
} | |
return frame, status_text, should_alert, metrics | |
except Exception as e: | |
error_msg = f"Error processing frame: {str(e)}" | |
self.logger.log(error_msg) | |
return frame, [error_msg], False, {} | |
def _generate_status_text(self, ear, mar, head_angle, indicators): | |
"""Generate human-readable status text""" | |
status = [] | |
# EAR status | |
if ear < self.drowsiness_analyzer.EAR_THRESHOLD: | |
status.append(f"ποΈ Eyes closing! EAR: {ear:.3f}") | |
else: | |
status.append(f"ποΈ Eyes open - EAR: {ear:.3f}") | |
# MAR status | |
if mar > self.drowsiness_analyzer.YAWN_THRESHOLD: | |
status.append(f"π₯± Yawning detected! MAR: {mar:.3f}") | |
else: | |
status.append(f"π Normal mouth - MAR: {mar:.3f}") | |
# Head pose status | |
if abs(head_angle) > self.drowsiness_analyzer.NOD_THRESHOLD: | |
status.append(f"π Head nodding! Angle: {head_angle:.1f}Β°") | |
else: | |
status.append(f"π Head pose normal - Pitch: {head_angle:.1f}Β°") | |
# Overall status | |
if indicators: | |
status.append(f"β οΈ ALERT: {', '.join(indicators)}") | |
else: | |
status.append("β Driver appears alert") | |
return status | |
def get_logs(self): | |
"""Get recent system logs""" | |
return "\n".join(self.logger.get_recent_logs()) |