import google.generativeai as genai import numpy as np from scipy.io import wavfile import tempfile import os from facial_detection import OpenCVFaceDetector, MetricsCalculator, DrowsinessAnalyzer, AlertManager, VisualizationRenderer, StatusLogger class AIAlertGenerator: """Generate AI-powered voice alerts using Gemini""" def __init__(self, api_key=None): self.model = None if api_key: try: genai.configure(api_key=api_key) self.model = genai.GenerativeModel('gemini-1.5-flash') print("✅ Gemini AI initialized for voice alerts") except Exception as e: print(f"⚠️ Failed to initialize Gemini: {e}") def generate_alert_text(self, alert_type, severity="medium"): """Generate contextual alert text using Gemini""" if not self.model: return self._get_default_alert_text(alert_type, severity) try: prompts = { "EYES_CLOSED": f"Generate a brief, urgent wake-up message (max 12 words) for a drowsy driver whose eyes are closing. Severity: {severity}. Sound caring but firm.", "YAWNING": f"Generate a brief, gentle alert (max 10 words) for a driver who is yawning frequently. Severity: {severity}. Sound encouraging.", "HEAD_NOD": f"Generate a brief, firm alert (max 10 words) for a driver whose head is nodding. Severity: {severity}. Sound urgent but supportive.", "COMBINED": f"Generate a brief, critical alert (max 15 words) for a driver showing multiple drowsiness signs. Severity: {severity}. Sound very urgent but caring." } prompt_key = "COMBINED" if isinstance(alert_type, list) and len(alert_type) > 1 else alert_type[0] if isinstance(alert_type, list) else alert_type prompt = prompts.get(prompt_key, prompts["EYES_CLOSED"]) response = self.model.generate_content(prompt) alert_text = response.text.strip().replace('"', '').replace("'", "") return alert_text[:100] except Exception as e: print(f"Error generating AI alert: {e}") return self._get_default_alert_text(alert_type, severity) def _get_default_alert_text(self, alert_type, severity): """Fallback alert messages""" default_alerts = { "EYES_CLOSED": { "critical": "WAKE UP NOW! Pull over immediately!", "high": "Eyes closing! Stay alert and pull over soon!", "medium": "Please keep your eyes open while driving!" }, "YAWNING": { "critical": "Excessive yawning detected! Take a break!", "high": "You seem tired. Consider resting soon.", "medium": "Frequent yawning noticed. Stay alert!" }, "HEAD_NOD": { "critical": "Head nodding detected! Stop driving now!", "high": "Your head is nodding. Pull over safely!", "medium": "Head movement detected. Stay focused!" } } alert_key = alert_type[0] if isinstance(alert_type, list) else alert_type return default_alerts.get(alert_key, {}).get(severity, "Stay alert while driving!") def create_audio_alert(self, text, sample_rate=22050): """Create audio alert (generates beep pattern)""" try: duration = 2.0 freq = 800 frames = int(duration * sample_rate) # Create attention-grabbing beep pattern t = np.linspace(0, duration, frames) beep1 = np.sin(2 * np.pi * freq * t) * np.exp(-t * 3) beep2 = np.sin(2 * np.pi * (freq * 1.5) * t) * np.exp(-t * 3) # Combine beeps with pause silence = np.zeros(int(0.1 * sample_rate)) audio = np.concatenate([beep1, silence, beep2, silence, beep1]) # Normalize and convert to int16 audio = (audio * 32767).astype(np.int16) # Save to temporary file temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.wav') wavfile.write(temp_file.name, sample_rate, audio) return temp_file.name, text except Exception as e: print(f"Error creating audio alert: {e}") return None, text class DrowsinessDetectionSystem: """Main system coordinator""" def __init__(self): self.face_detector = OpenCVFaceDetector() self.metrics_calculator = MetricsCalculator() self.drowsiness_analyzer = DrowsinessAnalyzer() self.alert_manager = AlertManager() self.visualization_renderer = VisualizationRenderer() self.logger = StatusLogger() print("✅ Drowsiness Detection System initialized with OpenCV") def process_frame(self, frame): """Process a single frame and return results""" try: # Detect face and landmarks face_rects, landmarks_list = self.face_detector.detect_landmarks(frame) if not face_rects or not landmarks_list: self.logger.log("No face detected") return frame, ["👤 No face detected"], False, {} # Process first detected face face_rect = face_rects[0] landmarks = landmarks_list[0] # Calculate metrics ear_left = ear_right = 0.25 # Default values if 'left_eye_corners' in landmarks: ear_left = self.metrics_calculator.calculate_ear_from_points(landmarks['left_eye_corners']) if 'right_eye_corners' in landmarks: ear_right = self.metrics_calculator.calculate_ear_from_points(landmarks['right_eye_corners']) ear = (ear_left + ear_right) / 2.0 mar = 0.3 # Default value if 'mouth_corners' in landmarks: mar = self.metrics_calculator.calculate_mar_from_points(landmarks['mouth_corners']) # Head pose estimation frame_center = (frame.shape[1] // 2, frame.shape[0] // 2) head_angles = self.metrics_calculator.estimate_head_pose_simple( landmarks.get('nose_tip'), landmarks.get('chin'), frame_center ) # Analyze drowsiness indicators = self.drowsiness_analyzer.analyze_drowsiness(ear, mar, head_angles) severity = self.drowsiness_analyzer.get_severity_level(indicators) # Check for alerts should_alert = self.alert_manager.should_trigger_alert(indicators) # Render visualization self.visualization_renderer.draw_landmarks_and_contours(frame, landmarks, face_rect) self.visualization_renderer.draw_metrics_overlay(frame, ear, mar, head_angles[0], indicators) # Generate status text status_text = self._generate_status_text(ear, mar, head_angles[0], indicators) # Log events if indicators: self.logger.log(f"Drowsiness detected: {', '.join(indicators)} (Severity: {severity})") # Prepare metrics metrics = { 'ear': ear, 'mar': mar, 'head_angle': head_angles[0], 'indicators': indicators, 'severity': severity } return frame, status_text, should_alert, metrics except Exception as e: error_msg = f"Error processing frame: {str(e)}" self.logger.log(error_msg) return frame, [error_msg], False, {} def _generate_status_text(self, ear, mar, head_angle, indicators): """Generate human-readable status text""" status = [] # EAR status if ear < self.drowsiness_analyzer.EAR_THRESHOLD: status.append(f"👁️ Eyes closing! EAR: {ear:.3f}") else: status.append(f"👁️ Eyes open - EAR: {ear:.3f}") # MAR status if mar > self.drowsiness_analyzer.YAWN_THRESHOLD: status.append(f"🥱 Yawning detected! MAR: {mar:.3f}") else: status.append(f"👄 Normal mouth - MAR: {mar:.3f}") # Head pose status if abs(head_angle) > self.drowsiness_analyzer.NOD_THRESHOLD: status.append(f"📉 Head nodding! Angle: {head_angle:.1f}°") else: status.append(f"📏 Head pose normal - Pitch: {head_angle:.1f}°") # Overall status if indicators: status.append(f"⚠️ ALERT: {', '.join(indicators)}") else: status.append("✅ Driver appears alert") return status def get_logs(self): """Get recent system logs""" return "\n".join(self.logger.get_recent_logs())